Fork me on GitHub

2025年10月

深入 Dify 的应用运行器之内容审核

在上一篇文章中,我们学习了 Dify 应用运行器的提示词组装机制,了解了从用户输入到模型调用的完整转换流程。今天我们将继续深入 CompletionAppRunnerrun() 方法源码,详细讲解其中的内容审核相关逻辑,包括输入审核、托管审核和输出审核,以及审核的三种实现方式。

应用运行器流程回顾

让我们先回顾一下 CompletionAppRunnerrun() 方法的核心流程,在这个流程中,内容审核扮演着重要的安全守门员角色:

moderation-flow.png

可以看到,Dify 在输入处理时设置了两道内容安全防线:

  1. 输入审核:在第一次提示词组装后,根据已配置的审核策略,检查用户输入是否包含违规内容;
  2. 托管审核:在第二次提示词组装后,模型调用前,对完整的提示词进行合规性检查;托管审核是 Dify 提供的一个系统级别的额外安全层,它是在用户配置的常规审核机制之外,由 Dify 平台自动提供的内容安全保障服务;

此外,Dify 还有一套输出审核机制,在生成最终的输出内容时触发。通过这三重保障,确保了 AI 应用的内容安全。

输入审核

输入审核紧跟在第一次提示词组装之后,代码如下:

# 第一次提示词组装
prompt_messages, stop = self.organize_prompt_messages(...)

try:
  # 输入内容审核
  _, inputs, query = self.moderation_for_inputs(
    app_id=app_record.id,
    tenant_id=app_config.tenant_id,
    app_generate_entity=application_generate_entity,
    inputs=inputs,
    query=query or "",
    message_id=message.id,
  )
except ModerationError as e:
  # 审核失败,直接返回预设回复
  self.direct_output(
    queue_manager=queue_manager,
    app_generate_entity=application_generate_entity,
    prompt_messages=prompt_messages,
    text=str(e),  # 预设的错误回复
    stream=application_generate_entity.stream,
  )
  return

当审核检测到违规内容时,会抛出 ModerationError 异常,应用运行器捕获异常后调用 direct_output() 方法,直接向用户返回预设的安全回复,跳过后续的模型调用流程。这里的关键是 moderation_for_inputs() 方法,它位于基类 AppRunner 中:

def moderation_for_inputs(...) -> tuple[bool, Mapping[str, Any], str]:
  # 输入审核检查
  moderation_feature = InputModeration()
  return moderation_feature.check(...)

真正的审核逻辑位于 InputModeration 类的 check() 方法中:

def check(
    self,
    app_id: str,
    tenant_id: str,
    app_config: AppConfig,
    inputs: Mapping[str, Any],
    query: str,
    message_id: str,
    trace_manager: Optional[TraceQueueManager] = None,
) -> tuple[bool, Mapping[str, Any], str]:

  # 检查应用是否启用了内容审查功能
  if not app_config.sensitive_word_avoidance:
    return False, inputs, query

  # 获取内容审查配置
  sensitive_word_avoidance_config = app_config.sensitive_word_avoidance
  moderation_type = sensitive_word_avoidance_config.type

  # 创建审核工厂实例
  moderation_factory = ModerationFactory(
    name=moderation_type,
    app_id=app_id,
    tenant_id=tenant_id,
    config=sensitive_word_avoidance_config.config
  )

  # 执行审核,并记录耗时
  with measure_time() as timer:
    moderation_result = moderation_factory.moderation_for_inputs(inputs, query)

  # 添加追踪记录(用于调试和监控)
  if trace_manager:
    trace_manager.add_trace_task(
      TraceTask(
        TraceTaskName.MODERATION_TRACE,
        message_id=message_id,
        moderation_result=moderation_result,
        inputs=inputs,
        timer=timer,
      )
    )

  # 处理审核结果
  if not moderation_result.flagged:
    return False, inputs, query  # 审核通过

  # 根据审核动作进行相应处理
  if moderation_result.action == ModerationAction.DIRECT_OUTPUT:
    # 直接输出预设回复
    raise ModerationError(moderation_result.preset_response)
  elif moderation_result.action == ModerationAction.OVERRIDDEN:
    # 覆盖用户输入
    inputs = moderation_result.inputs
    query = moderation_result.query

  return True, inputs, query

这个方法的执行流程如下:

  1. 检查审核配置:从应用配置中获取内容审查设置,如果没有配置,则跳过输入审核;
  2. 创建审核实例:根据配置的审核策略,通过审核工厂 ModerationFactory 创建对应的审核实例;
  3. 执行审核检查:调用具体的审核实现进行内容检查,并通过追踪管理器将审核的输入、输出、耗时等添加到追踪记录;
  4. 处理审核结果:根据审核结果采取相应的行动,如果审核通过,继续后续流程;如果审核失败,支持 直接输出预设回复覆盖用户输入 两种处理手段;

审核策略详解

应用开发者可以在功能设置里开启 “内容审查” 开关:

features-moderation.png

Dify 通过工厂模式支持多种审核策略:

  • 关键词:这是最简单的一种审核策略,开发者可以定义需要审查的敏感词,当用户输入中包含这些关键词时触发,返回预设的回复内容;
  • OpenAI Moderation:调用 OpenAI 的 Moderation API 实现内容审查;
  • API 扩展:不同的企业内部往往有着不同的内容审查机制,Dify 支持通过 API 扩展的方式实现高度自定义的审核策略;

关键词策略

在应用编排页面,打开 “内容审查设置” 对话框,类别选择 “关键词”,在输入框中填上需要审查的敏感词,1 行 1 个,最多 100 行:

features-moderation-keywords.png

然后选择 “审查输入内容”,并填上预设回复,点击 “确定” 后,在调试面板进行验证:

features-moderation-keywords-test.png

关键词策略的实现比较简单,直接基于字符串匹配即可,不区分大小写:

class KeywordsModeration(Moderation):
  name: str = "keywords"

  def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
    flagged = False
    preset_response = ""

    if self.config["inputs_config"]["enabled"]:
      preset_response = self.config["inputs_config"]["preset_response"]

      # 将查询内容也加入检查范围
      if query:
        inputs["query__"] = query

      # 过滤掉空关键词
      keywords_list = [
        keyword for keyword in self.config["keywords"].split("\n")
        if keyword
      ]

      # 执行关键词违规检查
      flagged = self._is_violated(inputs, keywords_list)

    return ModerationInputsResult(
      flagged=flagged,
      action=ModerationAction.DIRECT_OUTPUT,
      preset_response=preset_response
    )

  def _is_violated(self, inputs: dict, keywords_list: list) -> bool:
    # 检查输入是否包含违规关键词
    return any(
      self._check_keywords_in_value(keywords_list, value)
      for value in inputs.values()
    )

  def _check_keywords_in_value(self, keywords_list: Sequence[str], value: Any) -> bool:
    # 在单个值中检查关键词(不区分大小写)
    return any(
      keyword.lower() in str(value).lower()
      for keyword in keywords_list
    )

关键词策略的特点是简单高效,是最基础也是最常用的审核策略。

审核结果处理

内容审核的结果通过 ModerationInputsResult 对象返回,它定义了审核系统的核心数据结构:

class ModerationInputsResult(BaseModel):
  flagged: bool = False              # 是否违反审核规则
  action: ModerationAction           # 执行动作
  preset_response: str = ""          # 预设回复
  inputs: dict = Field(default_factory=dict)  # 处理后的输入
  query: str = ""                    # 处理后的查询

调用方通过 flagged 字段判断输入是否违反审核规则,如果违反,则执行 action 对应的动作。Dify 支持两种审核动作:

  1. 直接输出(DIRECT_OUTPUT):这是最常见的处理方式,当检测到违规内容时,直接返回预设的回复消息;用户会看到类似 "您的输入包含不当内容,请重新输入" 这样的提示,而不会看到模型的任何响应;
  2. 内容覆盖(OVERRIDDEN):这种方式更加智能,它不是简单地拒绝用户输入,而是对内容进行修正或替换;这种方式允许审核器删除或替换敏感词,然后继续正常的处理流程。例如,可以将 "这个人是个XX" 修正为 "这个人是个不好的人";

我们可以开启应用追踪,在 LLMOps 平台查看审核的追踪记录:

moderation-langfuse.png

OpenAI Moderation 策略

在 “内容审查设置” 对话框中选择 “OpenAI Moderation” 类别:

features-moderation-openai.png

同样勾选 “审查输入内容”,并填上预设回复,点击 “确定” 后,在调试面板进行验证:

features-moderation-openai-test.png

OpenAI Moderation 策略的实现如下:

class OpenAIModeration(Moderation):
  name: str = "openai_moderation"

  def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
    flagged = False
    preset_response = ""

    if self.config["inputs_config"]["enabled"]:
      preset_response = self.config["inputs_config"]["preset_response"]

      # 将查询内容加入检查
      if query:
        inputs["query__"] = query

      # 使用 OpenAI Moderation API 进行内容检查
      flagged = self._is_violated(inputs)

    return ModerationInputsResult(
      flagged=flagged,
      action=ModerationAction.DIRECT_OUTPUT,
      preset_response=preset_response
    )

  def _is_violated(self, inputs: dict):
    # 将所有输入值合并为一个文本
    text = "\n".join(str(inputs.values()))

    # 获取 OpenAI Moderation 模型实例
    model_manager = ModelManager()
    model_instance = model_manager.get_model_instance(
      tenant_id=self.tenant_id,
      provider="openai",
      model_type=ModelType.MODERATION,
      model="text-moderation-stable"
    )

    # 调用 OpenAI Moderation API
    openai_moderation = model_instance.invoke_moderation(text=text)
    return openai_moderation

整体逻辑也很简单,直接调用 OpenAI 的 text-moderation-stable 模型即可。不过要注意的是,使用此策略需要提前在设置中添加 OpenAI 模型供应商,并启用对应的模型。

OpenAI Moderation 策略的优势是基于 OpenAI 训练的专业模型,能够更准确地识别各种类型的有害内容,包括暴力、性内容、自残、仇恨等多种分类。

学习 OpenAI Moderation API

这一节我们对 OpenAI 的 Moderation API 稍加介绍。其实,该 API 支持两种模型:

  1. omni-moderation-latest(推荐):最新的多模态模型,支持更多分类选项和文本+图像输入
  2. text-moderation-latest(遗留):仅支持文本输入的旧版模型

可以看到 Dify 使用的还是老版本的模型。

Moderation API 的基本用法如下:

from openai import OpenAI

client = OpenAI()

response = client.moderations.create(
  model="omni-moderation-latest",
  input="kill them all"
)

print(response)

对于图像和文本的混合内容审核:

response = client.moderations.create(
  model="omni-moderation-latest",
  input=[
    {"type": "text", "text": "需要检查的文本内容"},
    {
      "type": "image_url",
      "image_url": {
        "url": "https://example.com/image.png"
        # 也支持 base64 编码的图片: "data:image/jpeg;base64,abcdefg..."
      }
    }
  ]
)

API 响应结构如下:

{
  "id": "modr-6796",
  "model": "omni-moderation-latest",
  "results": [
    {
      "flagged": true, // 是否被标记为有害内容
      "categories": {
        "harassment": false,
        "harassment/threatening": false,
        "hate": false,
        "hate/threatening": false,
        "illicit": false,
        "illicit/violent": true,
        "self-harm": false,
        "self-harm/instructions": false,
        "self-harm/intent": false,
        "sexual": false,
        "sexual/minors": false,
        "violence": true,  // 检测到暴力内容
        "violence/graphic": false
      },
      "category_scores": {
        // 每个类别的置信度分数 (0-1)
        "harassment": 0.1996759395892913,
        "violence": 0.9430467818012114,
        // ... 其他分数
      },
      "category_applied_input_types": {
        // 指明哪些输入类型触发了特定类别
        "harassment": [ "text" ],
        "violence": [ "text" ],
        // ... 其他类别
      }
    }
  ],
  "usage": { // 消耗的 token 数
    "prompt_tokens": 6,
    "completion_tokens": 0,
    "total_tokens": 6
  }
}

Moderation API 支持以下内容分类:

分类描述支持的输入类型
harassment表达、煽动或促进对任何目标的骚扰语言仅文本
harassment/threatening包含暴力或严重伤害的骚扰内容仅文本
hate基于种族、性别、民族等的仇恨言论仅文本
hate/threatening包含暴力威胁的仇恨内容仅文本
illicit提供非法行为建议或指导的内容仅文本(仅 omni 模型)
illicit/violent涉及暴力或获取武器的非法内容仅文本(仅 omni 模型)
self-harm促进、鼓励或描述自残行为的内容文本和图像
self-harm/intent表达自残意图的内容文本和图像
self-harm/instructions教授自残方法的内容文本和图像
sexual旨在引起性兴奋的内容文本和图像
sexual/minors涉及18岁以下个体的性内容仅文本
violence描绘死亡、暴力或身体伤害的内容文本和图像
violence/graphic以图形方式描绘暴力的内容文本和图像

托管审核

除了输入内容审核,Dify 还在模型调用前还会进行一次托管审核检查,这是第二道安全防线:

hosting_moderation_result = self.check_hosting_moderation(
  application_generate_entity=application_generate_entity,
  queue_manager=queue_manager,
  prompt_messages=prompt_messages,
)

if hosting_moderation_result:
  return  # 审核失败,直接返回

托管审核 (Hosting Moderation) 是 Dify 提供的一个系统级别的额外安全层,由平台统一配置和管理,用户无法直接控制,属于平台安全策略的一部分。这个特性只在云托管环境下才生效,因此我们需要修改 EDITION 配置:

# 默认是 SELF_HOSTED 自托管
EDITION=CLOUD

并配置系统供应商:

HOSTED_OPENAI_TRIAL_ENABLED=true
HOSTED_OPENAI_QUOTA_LIMIT=999999
HOSTED_OPENAI_TRIAL_MODELS=
HOSTED_OPENAI_API_KEY=sk-...
HOSTED_OPENAI_API_BASE=...

然后启用托管审核:

HOSTED_MODERATION_ENABLED=true
HOSTED_MODERATION_PROVIDERS=openai

托管审核的核心逻辑位于 core/helper/moderation.py 文件:

def check_moderation(tenant_id: str, model_config: ModelConfigWithCredentialsEntity, text: str) -> bool:

  # 1. 检查托管审核配置是否启用
  moderation_config = hosting_configuration.moderation_config

  # 2. 验证 OpenAI 提供商是否可用
  openai_provider_name = f"{DEFAULT_PLUGIN_ID}/openai/openai"

  # 3. 检查当前模型提供商是否在审核范围内
  if using_provider_type == ProviderType.SYSTEM and provider_name in moderation_config.providers:

    # 4. 文本分块处理(每块2000字符)
    length = 2000
    text_chunks = [text[i:i + length] for i in range(0, len(text), length)]

    # 5. 随机选择一个文本块进行审核
    text_chunk = secrets.choice(text_chunks)

    # 6. 调用 OpenAI 的 omni-moderation-latest 模型
    moderation_result = model_type_instance.invoke(
      model="omni-moderation-latest",
      credentials=hosting_openai_config.credentials,
      text=text_chunk
    )

这里主要关注几点:

  1. 必须在 .env 文件中配置 OpenAI 系统供应商并启用托管审核,该功能才会生效;
  2. 由于托管审核针对的是第二次组装后的提示词,包含外部数据和知识库,完整的上下文可能非常长,因此这里对文本进行分块处理,并随机选择一个文本块进行审核;通过这种随机采样的策略,不仅提高程序性能,也降低 API 调用成本;
  3. 托管审核使用的是 OpenAI 最新的审核模型 omni-moderation-latest 模型,而不是输入审核的 text-moderation-stable 模型;
  4. 托管审核失败后直接返回固定的礼貌拒绝响应,而非用户自定义的消息;
  5. 托管审核是 Dify 平台自动提供的内容安全保障服务,不同于应用开发者只需关注用户的输入即可,平台侧需要关注模型的完整输入,包括开发者提供的知识库内容和外部接口数据,这也正是应用运行器中要进行两次提示词组装的根本原因。

用户审核和托管审核对比

下表对比了用户审核和托管审核的几点区别:

特征用户审核 (User Moderation)托管审核 (Hosting Moderation)
配置主体应用开发者Dify 平台
检查内容用户输入完整提示词
检查时机第一次提示词组装后模型调用前
配置方式应用级配置系统级配置
配置灵活性高度可定制固定策略
审核类型关键词、OpenAI、API扩展等仅 OpenAI omni-moderation
触发条件用户启用系统自动判断
失败响应用户自定义固定响应
性能策略全文审核随机采样

托管审核作为平台的最后一道防线,即使用户没有配置任何审核策略,或者用户配置的审核策略存在漏洞,平台仍能提供基础的内容安全保障,确保 Dify 平台的整体合规性和安全性。

输出审核

除了输入审核和托管审核,Dify 还支持对模型生成的输出内容进行审核。输出审核的实现位于 OutputModeration 类,当模型输出新的 token 时:

def append_new_token(self, token: str) -> None:
  self.buffer += token
  if not self.thread:
    self.thread = self.start_thread()

启动一个后台审核线程:

def worker(self, flask_app: Flask, buffer_size: int):
  while self.thread_running:
    moderation_buffer = self.buffer
    # 当缓冲区大小达到阈值或最终块时进行审核
    if chunk_length < buffer_size and not self.is_final_chunk:
      time.sleep(1)
      continue

    result = self.moderation(...)
    if result and result.flagged:
      # 触发替换事件
      self.queue_manager.publish(QueueMessageReplaceEvent(...))

输出审核的具体实现和输入审核是一样的,都是通过审核工厂 ModerationFactory 根据配置创建对应的审核实例,支持关键词、OpenAI Moderation 和 API 扩展三种策略。这其实都比较简单,输出审核真正的挑战 在于实时性要求,需要在内容流式传输过程中及时检测。Dify 的做法是将模型输出放到一个缓冲区中,然后启动一个后台审核线程,该线程持续监控缓冲区,当缓冲区大小达到 300 时触发审核,该值可以通过 MODERATION_BUFFER_SIZE 参数调整。一旦检测到违规内容,立即停止流式传输,并发送一个 QueueMessageReplaceEvent 替换事件。该事件会被发送到前端,前端直接将当前已显示内容替换为预设回复。

小结

今天我们深入分析了 Dify 应用运行器中的内容审核机制,从整体架构到具体实现,全面了解了 Dify 如何确保应用的内容安全。主要收获包括:

  1. 多层防护体系:Dify 设计了输入审核、托管审核和输出审核三道防线,确保全链路内容安全;
  2. 多样化的审核实现:支持多种审核方式的灵活配置,包括 OpenAI Moderation、关键词审核和 API 扩展三种方式,满足不同场景的需求;
  3. 灵活的处理策略:支持直接拒绝(DIRECT_OUTPUT)和内容覆盖(OVERRIDDEN)两种处理方式;
  4. 实时审核能力:针对流式输出场景,通过后台审核线程实现了分块实时审核机制。

细心的读者可能已经注意到了,关于内容审核还有一点没有讲到,那就是 API 扩展策略,我们将在下一篇文章中,学习外部数据集成的逻辑,到时候一起来看下 Dify 的扩展机制。


深入 Dify 的应用运行器之提示词组装

应用运行器(App Runner) 是 Dify 应用的核心执行器,负责处理具体的生成逻辑,今天,我们将继续深入应用运行器的内部实现。

应用运行器的概览

让我们以文本生成应用为例,深入 CompletionAppRunnerrun() 方法的实现:

def run(
  self,
  application_generate_entity: CompletionAppGenerateEntity,
  queue_manager: AppQueueManager,
  message: Message
) -> None:

  # 1. 提取配置和输入参数
  app_config = application_generate_entity.app_config
  inputs = application_generate_entity.inputs
  query = application_generate_entity.query
  files = application_generate_entity.files

  # 2. 组装提示词消息
  prompt_messages, stop = self.organize_prompt_messages(...)

  # 3. 内容审核
  try:
    _, inputs, query = self.moderation_for_inputs(...)
  except ModerationError as e:
    self.direct_output(queue_manager, ..., text=str(e))
    return

  # 4. 外部数据工具处理
  if app_config.external_data_variables:
    inputs = self.fill_in_inputs_from_external_data_tools(...)

  # 5. 知识库检索
  context = None
  if app_config.dataset and app_config.dataset.dataset_ids:
    dataset_retrieval = DatasetRetrieval(application_generate_entity)
    context = dataset_retrieval.retrieve(...)

  # 6. 重新组装包含上下文的提示词
  prompt_messages, stop = self.organize_prompt_messages(..., context=context)

  # 7. 托管审核检查
  if self.check_hosting_moderation(...):
    return

  # 8. 调整 Token 限制
  self.recalc_llm_max_tokens(...)

  # 9. 调用模型
  model_instance = ModelInstance(...)
  invoke_result = model_instance.invoke_llm(...)

  # 10. 处理调用结果
  self._handle_invoke_result(invoke_result, queue_manager, streaming)

其核心执行流程如下:

  • 配置和参数提取: 从应用生成实体中提取应用配置以及输入参数 (inputs)、查询 (query) 和文件 (files) 等;
  • 第一次提示词组装:将模板、输入、查询、文件组合成完整的提示消息,同时处理图片细节配置,生成初始的 prompt_messagesstop 序列;
  • 内容审核:对输入内容进行敏感词检测,如果检测到违规内容,直接返回错误消息;
  • 外部数据填充:从外部数据源获取变量值,动态补充应用输入参数;
  • 知识库检索:使用 DatasetRetrieval 进行向量检索,获取相关上下文;
  • 第二次提示词重组:整合所有信息:模板 + 输入 + 查询 + 文件 + 记忆 + 外部数据 + 知识库上下文,生成最终的提示消息;
  • 托管审核:检查提示消息是否符合托管方的内容政策,如果违规,直接返回标准回复;
  • 令牌重计算:计算提示令牌数量,如果 prompt_tokens + max_tokens > 模型上下文限制,则调整 max_tokens,确保请求不超过模型的令牌限制;
  • 模型调用:使用组装好的提示消息调用大模型,支持使用 stop 序列控制生成停止;
  • 结果处理:将模型调用结果通过队列管理器传递到主线程,并返回给最终用户,支持流式和非流式输出;

两次提示词组装

可以看出,运行器的整体流程还是蛮复杂的,其中经历了两次提示词组装:

  1. 第一次组装:使用基础的用户输入和查询,主要用于输入审核和知识库检索;
  2. 第二次组装:在获得完整上下文(包括外部数据和知识库检索结果)后重新组装;

提示词组装是应用运行器的核心功能之一,我们今天就详细地讲解下提示词组装的源码,同时对比其他几种应用运行器在提示词处理上的差异。让我们先看看两次提示词组装的具体实现:

# 第一次提示词组装 - 基础模板
prompt_messages, stop = self.organize_prompt_messages(
  app_record=app_record,
  model_config=application_generate_entity.model_conf,
  prompt_template_entity=app_config.prompt_template,
  inputs=inputs,
  files=files,
  query=query,
  image_detail_config=image_detail_config,
)

# 第二次提示词组装 - 包含完整上下文
prompt_messages, stop = self.organize_prompt_messages(
  app_record=app_record,
  model_config=application_generate_entity.model_conf,
  prompt_template_entity=app_config.prompt_template,
  inputs=inputs,   # 填充外部数据
  files=files,
  query=query,
  context=context, # 新增知识库上下文
  image_detail_config=image_detail_config,
)

这两段代码几乎一模一样,只是第二次多了个 context 字段,另外 inputs 字段相比于第一次填充了外部数据。提示词组装的核心逻辑在父类 AppRunnerorganize_prompt_messages() 方法中:

def organize_prompt_messages(...) -> tuple[list[PromptMessage], Optional[list[str]]]:

  # 根据提示词模板类型选择转换器
  if prompt_template_entity.prompt_type == PromptTemplateEntity.PromptType.SIMPLE:
    # 简单提示词模板:使用预设的模板格式
    prompt_transform = SimplePromptTransform()
    prompt_messages, stop = prompt_transform.get_prompt(
      prompt_template_entity=prompt_template_entity,
      ...
    )
  else:
    # 高级提示词模板:用户自定义的完整模板
    prompt_transform = AdvancedPromptTransform()
    prompt_messages = prompt_transform.get_prompt(
      prompt_template=prompt_template,
      ...
    )
    stop = model_config.stop

  return prompt_messages, stop

Dify 根据提示词模板类型选择不同的处理策略,它支持两种模式:

  • 简单模式(Simple Mode):适用于大多数基础场景,通过模板变量替换的方式生成提示词
  • 高级模式(Advanced Mode):提供了更精细的控制,支持多轮对话和角色定制

简单模式提示词组装

当我们在应用的编排页面配置提示词时,默认就是简单模式:

prompt-template-simple.png

简单模式由 SimplePromptTransform 实现,它负责处理大多数常见场景的提示词组装:

def get_prompt(...) -> tuple[list[PromptMessage], Optional[list[str]]]:

  # 根据模型类型选择处理方式
  model_mode = ModelMode(model_config.mode)
  if model_mode == ModelMode.CHAT:
    # Chat 模型:生成多条消息(系统提示 + 用户消息)
    prompt_messages, stops = self._get_chat_model_prompt_messages(...)
  else:
    # Completion 模型:生成单条完整提示词
    prompt_messages, stops = self._get_completion_model_prompt_messages(...)

  return prompt_messages, stops

它又将大模型分成两种类型:

  • 对于 对话模型(Chat),Dify 会构建结构化的消息列表,生成多条消息(系统提示 + 历史记录 + 用户消息);
  • 对于传统的 文本补全模型(Completion),Dify 会将所有内容拼在一起,生成单条完整的提示词;

目前市面上绝大多数的模型都是对话模型,通过结构化方式组装消息有几个优势:

  1. 清晰的角色分离:系统提示、历史记录、当前查询分别处理
  2. 更好的模型兼容性:充分利用 Chat 模型的对话能力
  3. 灵活的上下文管理:可以精确控制每部分内容的位置和格式

提示词模板

无论是对话模型,还是文本补全模型,Dify 都统一使用一套灵活的模板系统来构建提示词。提示词模板存放在 prompt_templates 目录下的 JSON 文件中:

  • common_completion.json: 文本生成应用的提示词模板规则
  • common_chat.json: 聊天应用或智能体应用的提示词模板规则

Dify 根据不同的应用切换不同的模板规则,以 common_chat.json 文件为例,它的内容如下:

{
  "human_prefix": "Human",
  "assistant_prefix": "Assistant",
  "context_prompt": "Use the following context as your learned knowledge...",
  "histories_prompt": "Here is the chat histories between human and assistant...",
  "system_prompt_orders": [
    "context_prompt",
    "pre_prompt",
    "histories_prompt"
  ],
  "query_prompt": "\n\nHuman: {{#query#}}\n\nAssistant: ",
  "stops": ["\nHuman:", "</histories>"]
}

提示规则包含以下核心字段:

  1. 提示词组装顺序

    • system_prompt_orders:定义系统提示的组装顺序,默认按照 context_prompt -> pre_prompt -> histories_prompt 这个顺序来组装,其中 pre_prompt 就是用户在编排页面自定义的提示词模板;
  2. 提示模板字段

    • context_prompt: 知识库上下文的提示模板
    • histories_prompt: 历史对话的提示模板
    • query_prompt: 用户查询的提示模板,拼接在系统提示之后
  3. 对话角色前缀 (仅聊天模型)

    • human_prefix: "Human" - 用户角色标识
    • assistant_prefix: "Assistant" - 助手角色标识
  4. 停止词

    • stops:定义模型生成时的停止标记,用于控制模型输出边界

注意,只有对话类应用的提示词模板设有停止词,这是因为对话应用需要明确的角色分工,使用停止词来防止模型继续生成不应该生成的内容,比如 "\nHuman:" 防止模型继续模拟用户发言,"</histories>" 防止模型破坏历史对话的 XML 标签结构。

提示词中预定义了几个占位符,用于变量替换:

  • {{#context#}}: 替换为实际上下文内容
  • {{#query#}}: 替换为用户查询
  • {{#histories#}}: 替换为对话历史

另外,它还针对百川大模型使用了定制的模板(其实就是翻译成中文):

  • baichuan_chat.json: 聊天应用的提示词模板规则(针对百川大模型定制)
  • baichuan_completion.json: 文本生成应用的提示词模板规则(针对百川大模型定制)

高级模式提示词组装

高级模式允许用户直接在页面上更精细的控制提示词顺序,而不是基于 JSON 配置文件。在老版本的编排页面,我们可以看到一个链接,点击后切换到 专家模式

prompt-template-adv-pre.png

不过这个模式在新版本中已经看不到了,不知道是废弃了?还是变成收费功能了?

尽管页面上看不到切换入口,但是相关的代码逻辑还没删,所以通过修改数据库中的应用配置,还可以切换到该模式。我们创建一个应用,然后在数据库中找到该应用的配置,将 prompt_type 改为 advanced,将 chat_prompt_config 改为 {"prompt":[]},刷新页面后就能看到专家模式了:

prompt-template-adv.png

高级模式由 AdvancedPromptTransform 实现,它的实现就比较简单,直接根据用户配置来组装提示词即可。

文件处理

在提示词的组装过程中,文件的处理至关重要,其实现位于 file_manager.to_prompt_message_content() 函数:

def to_prompt_message_content(f: File) -> PromptMessageContentUnionTypes:

  # 支持 4 种文件类型:图片、音频、视频、文件
  prompt_class_map: Mapping[FileType, type[PromptMessageContentUnionTypes]] = {
    FileType.IMAGE: ImagePromptMessageContent,
    FileType.AUDIO: AudioPromptMessageContent,
    FileType.VIDEO: VideoPromptMessageContent,
    FileType.DOCUMENT: DocumentPromptMessageContent,
  }

  # 对于不支持的文件类型,返回一句话描述
  if f.type not in prompt_class_map:
    return TextPromptMessageContent(data=f"[Unsupported file type: {f.filename} ({f.type.value})]")

  # 对于支持的文件类型,返回文件信息
  params = {
    "base64_data": _get_encoded_string(f) if dify_config.MULTIMODAL_SEND_FORMAT == "base64" else "",
    "url": _to_url(f) if dify_config.MULTIMODAL_SEND_FORMAT == "url" else "",
    "format": f.extension.removeprefix("."),
    "mime_type": f.mime_type,
    "filename": f.filename or "",
  }

  # 对于图片类型,增加 detail 参数
  if f.type == FileType.IMAGE:
    params["detail"] = image_detail_config or ImagePromptMessageContent.DETAIL.LOW

  return prompt_class_map[f.type].model_validate(params)

该函数将上传的文件转换为适合的 prompt 消息,它会根据配置将文件表示成 BASE64 编码或 URL 链接,默认使用 BASE64 方式:

MULTIMODAL_SEND_FORMAT=base64

值得注意的是,对于图片类型,还会增加一个 detail 参数,用于控制图片处理精度,支持 LOW 和 HIGH 两种:

  • 低精度:处理速度快,消耗资源少;适用于对图片细节要求不高的场景;比如图片分类、简单的图片理解任务;
  • 高精度:处理时间较长,消耗更多计算资源;适用于需要分析图片细节的场景;比如 OCR、详细图片分析、需要识别图片中细小文字或复杂内容;

模型供应商需要支持这个参数该特性才能生效,参考 OpenAI 的接口文档

openai-image-level.png

可以看到,模型供应商接收到 detail="low" 参数后,会将图片缩放到较低分辨率(如 512x512)进行处理,Dify 本身不会对图片做任何处理。

不同应用运行器的提示词组装对比

现在让我们对比分析不同类型应用运行器在提示词组装上的差异。首先,聊天应用和文本生成应用在提示词组装上的主要区别在于 记忆管理,在提示词组装时会传入记忆:

memory = None
if application_generate_entity.conversation_id:
  model_instance = ModelInstance(
    provider_model_bundle=application_generate_entity.model_conf.provider_model_bundle,
    model=application_generate_entity.model_conf.model,
  )
  # 创建基于会话的记忆缓冲区
  memory = TokenBufferMemory(conversation=conversation, model_instance=model_instance)

# 在提示词组装时传入记忆
prompt_messages, stop = self.organize_prompt_messages(
  # ... 其他参数
  memory=memory,  # ChatAppRunner 会传入记忆
)

智能体应用和聊天应用一样,在提示词组装时也会传入记忆,不过组装后的提示词仅仅用于内容审核。审核通过后,它会根据模型能力动态选择智能体策略,创建对应的 智能体运行器(Agent Runner),真正的提示词组装逻辑位于智能体运行器中:

def run(self, application_generate_entity: AgentChatAppGenerateEntity, ...):
  # 1. 基础提示词组装(与 ChatAppRunner 相同)
  prompt_messages, _ = self.organize_prompt_messages(
    app_record=app_record,
    model_config=application_generate_entity.model_conf,
    prompt_template_entity=app_config.prompt_template,
    inputs=dict(inputs),
    files=list(files),
    query=query,
    memory=memory,
  )

  # 2. 根据模型能力选择智能体策略
  model_instance = ModelInstance(...)
  llm_model = cast(LargeLanguageModel, model_instance.model_type_instance)
  model_schema = llm_model.get_model_schema(...)

  # 检查模型是否支持函数调用
  if {ModelFeature.MULTI_TOOL_CALL, ModelFeature.TOOL_CALL}.intersection(model_schema.features or []):
    agent_entity.strategy = AgentEntity.Strategy.FUNCTION_CALLING

  # 3. 选择对应的智能体运行器
  if agent_entity.strategy == AgentEntity.Strategy.FUNCTION_CALLING:
    runner_cls = FunctionCallAgentRunner
  elif agent_entity.strategy == AgentEntity.Strategy.CHAIN_OF_THOUGHT:
    # 根据 LLM 模式选择思维链实现
    if model_schema.model_properties.get(ModelPropertyKey.MODE) == LLMMode.CHAT.value:
      runner_cls = CotChatAgentRunner
    else:
      runner_cls = CotCompletionAgentRunner

  # 4. 创建专门的智能体运行器处理后续逻辑
  runner = runner_cls(...)
  invoke_result = runner.run(...)

因为需要支持工具调用,它有着更复杂的提示词组装逻辑,需要在提示词中集成工具描述和调用指令。

最后,对于工作流应用,运行器采用了完全不同的方式处理输入,它使用 变量池(Variable Pool) 而非传统提示词来管理整个工作流的数据流:

def run(self) -> None:
  inputs = self.application_generate_entity.inputs
  files = self.application_generate_entity.files

  # 创建系统变量
  system_inputs = SystemVariable(
    files=files,
    user_id=self._sys_user_id,
    app_id=app_config.app_id,
    workflow_id=app_config.workflow_id,
    workflow_execution_id=self.application_generate_entity.workflow_execution_id,
  )

  # 构建变量池
  variable_pool = VariablePool(
    system_variables=system_inputs,
    user_inputs=inputs,  # 用户输入变量
    environment_variables=self._workflow.environment_variables,  # 环境变量
    conversation_variables=[],  # 对话变量(对话流应用使用)
  )

  # 初始化工作流图
  graph = self._init_graph(graph_config=self._workflow.graph_dict)

  # 运行工作流
  workflow_entry = WorkflowEntry(
    tenant_id=self._workflow.tenant_id,
    # ... 其他参数
    variable_pool=variable_pool,
  )

  generator = workflow_entry.run(callbacks=workflow_callbacks)

工作流的每个节点首先从变量池中获取所需的输入数据,然后执行节点特定的逻辑(LLM 调用、工具执行、条件判断等),最后将处理结果写回变量池供后续节点使用。

关于智能体策略和工作流的执行细节,我们后面将专门学习,此处暂不展开。

小结

今天我们深入分析了 Dify 应用运行器的提示词组装机制,从 CompletionAppRunner 的双重组装策略开始,详细解析了简单提示词模板和高级提示词模板的处理流程,包括 Chat 和 Completion 模型的不同处理方式。我们也对比了不同应用运行器的提示词处理差异:

  1. CompletionAppRunner:专注单次文本生成
  2. ChatAppRunner:支持对话记忆,提供连续的多轮对话体验
  3. AgentChatAppRunner:结合工具调用能力,支持复杂的推理和执行流程
  4. WorkflowAppRunner:采用变量池机制,支持复杂的数据流处理

提示词组装是 Dify 的核心能力之一,它决定了用户输入如何转化为模型能够理解的格式。理解这一机制对于开发者深入使用 Dify 或定制应用逻辑具有重要意义。在后面的文章中,我们将继续探索应用运行器的其他核心功能,包括内容审核、外部数据扩展、知识库检索、模型调用,以及不同的智能体策略和工作流的执行细节等。


深入 Dify 的应用运行器

在前面的文章中,我们深入分析了 Dify 应用生成器的源码实现,从限流策略、流式响应、配置管理、文件上传处理,到追踪调试机制,逐步了解了 Dify 会话处理的完整流程。今天我们将继续深入 CompletionAppGeneratorgenerate() 方法,看看在创建好应用生成实体后,Dify 是如何通过 应用运行器(App Runner) 来执行具体的业务逻辑。

从生成器到运行器

让我们回顾一下 CompletionAppGeneratorgenerate() 方法,在完成配置管理、文件处理、追踪管理器初始化等前置工作后,接下来的步骤是创建 应用生成实体(App Generate Entity)

application_generate_entity = CompletionAppGenerateEntity(
  
  # 任务ID
  task_id=str(uuid.uuid4()),
  # 应用配置
  app_config=app_config,
  # 模型配置
  model_conf=ModelConfigConverter.convert(app_config),
  # 文件上传配置
  file_upload_config=file_extra_config,

  # 用户输入变量
  inputs=self._prepare_user_inputs(
    user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
  ),
  # 用户查询
  query=query,
  # 上传文件
  files=list(file_objs),
  # 用户ID
  user_id=user.id,
  
  # 是否流式输出
  stream=streaming,
  # 调用来源
  invoke_from=invoke_from,

  # 扩展参数
  extras={},
  # 追踪管理器
  trace_manager=trace_manager,
)

应用生成实体包含了执行一次应用调用所需的所有信息,包括:

  • 配置信息:应用配置、模型配置、文件上传配置
  • 用户数据:输入变量、查询内容、上传文件
  • 执行控制:流式开关、调用来源
  • 附加功能:追踪管理器、扩展参数

后续的运行流程将分成两条线路:一条是我们昨天学习的追踪线程,通过 Celery 任务队列,离线记录业务运行时产生的数据,并发送到外部 Ops 工具:另一条则为工作线程,根据应用生成实体执行具体的生成逻辑:

# 初始化数据库记录(会话和消息)
(conversation, message) = self._init_generate_records(application_generate_entity)

# 初始化队列管理器
queue_manager = MessageBasedAppQueueManager(
  task_id=application_generate_entity.task_id,
  user_id=application_generate_entity.user_id,
  invoke_from=application_generate_entity.invoke_from,
  conversation_id=conversation.id,
  app_mode=conversation.mode,
  message_id=message.id,
)

# 创建工作线程,并传递 Flask 请求上下文
@copy_current_request_context
def worker_with_context():
  return self._generate_worker(
    flask_app=current_app._get_current_object(),  # type: ignore
    application_generate_entity=application_generate_entity,
    queue_manager=queue_manager,
    message_id=message.id,
  )

# 启动工作线程
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start()

# 主线程处理响应或流生成器
response = self._handle_response(
  application_generate_entity=application_generate_entity,
  queue_manager=queue_manager,
  conversation=conversation,
  message=message,
  user=user,
  stream=streaming,
)
return CompletionAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)

这里首先根据应用生成实体创建两条数据库记录,一条是会话记录(conversations),一条是消息记录(messages),如果消息中带有文件,还会创建对应的消息文件记录(message_files);接着创建一个 队列管理器(App Queue Manager),它负责管理应用执行过程中的事件流,实现 生产者-消费者 模式的异步通信;最后启动工作线程,创建 应用运行器(App Runner),执行具体的生成逻辑,并通过队列管理器传递生成结果,同时主线程通过队列管理器监听执行结果,实现了请求处理和业务执行的解耦。整体流程如下:

app-generator-seq.png

至此,我们完成了整个生成器的处理流程的学习,开始进入运行器的学习:

app-generator-flow.png

队列管理器

队列管理器采用了 生产者-消费者 模式,通过 Python 的 queue.Queue 实现线程间的安全通信:

class AppQueueManager:
  def __init__(self, task_id: str, user_id: str, invoke_from: InvokeFrom):
    # 创建线程安全的队列
    self._q: queue.Queue = queue.Queue()

  def listen(self):
    # 监听队列事件,通过 yield 返回生成器
    listen_timeout = dify_config.APP_MAX_EXECUTION_TIME
    start_time = time.time()

    while True:
      try:
        # 从队列中获取消息,超时时间为 1 秒
        message = self._q.get(timeout=1)
        yield message
      except queue.Empty:
        # 检查是否超时或被停止
        elapsed_time = time.time() - start_time
        if elapsed_time >= listen_timeout or self._is_stopped():
          self.publish(QueueStopEvent(), PublishFrom.TASK_PIPELINE)

  def publish(self, event: AppQueueEvent, pub_from: PublishFrom):
    # 发布事件到队列
    self._q.put(event)

队列管理器支持多种类型的事件,包括:

  • QueuePingEvent:Ping 事件,心跳检测,保持连接活跃
  • QueueErrorEvent:错误事件,处理任务执行过程中的错误
  • QueueTextChunkEvent:文本块事件,处理流式文本输出
  • QueueLLMChunkEvent:LLM 流式响应块
  • QueueMessageEndEvent:消息结束事件
  • QueueStopEvent:停止事件

除此之外,还有很多关于工作流的事件,比如节点事件、并行分支事件、迭代事件、循环事件、控制事件等,参考 api/core/app/entities/queue_entities.py 文件。

在整个应用运行过程中,队列管理器扮演着重要的角色。它负责将运行过程中的各种事件发布到队列,实现服务端与客户端的实时通信,并统一处理和发布错误信息:

# 发布生成内容块事件
queue_manager.publish(
    QueueLLMChunkEvent(chunk=result), 
    PublishFrom.APPLICATION_MANAGER
)

# 发布消息结束事件
queue_manager.publish(
    QueueMessageEndEvent(llm_result=llm_result),
    PublishFrom.APPLICATION_MANAGER,
)

# 发布错误事件
queue_manager.publish_error(
    exception, 
    PublishFrom.APPLICATION_MANAGER
)

Flask 请求上下文传递

Flask 的请求上下文默认只在当前线程中有效,当你创建新线程时,新线程无法访问原始请求的信息。Dify 通过 @copy_current_request_context 装饰器解决这个问题:

from flask import copy_current_request_context

@copy_current_request_context
def worker_with_context():
  # 在这里可以访问 current_app、request 等 Flask 上下文对象
  return self._generate_worker(...)

这个装饰器会将当前请求的上下文(包括 current_apprequestsession 等)复制到新线程中,这样,工作线程就可以访问数据库连接、配置信息等依赖于 Flask 应用上下文的资源。比如在 _generate_worker() 函数中,使用 with flask_app.app_context() 手动创建并进入应用上下文:

def _generate_worker(...) -> None:
  with flask_app.app_context():
    # 在这里可以使用 Flask 应用相关的功能,如数据库操作
    message = self._get_message(message_id)
    # ...

注意,这个装饰器和之前学过的 stream_with_context 有所不同,后者用于确保在整个流式响应过程中都能访问请求上下文。

使用 contextvars 拷贝上下文

需要注意的是,前面的代码都是以文本生成应用为例的,和它类似的还有一个聊天应用,这两个应用都比较简单,因此直接使用 Flask 提供的 @copy_current_request_context 装饰器,复制请求上下文即可。

但是在智能体和工作流应用中,包含了更复杂的执行流程,可能涉及多个异步任务,除了 Flask 的请求上下文,还需要更全面的上下文保持。Dify 使用了 Python 3.7+ 的 contextvars 模块,复制所有的上下文变量。我们可以看下 AgentChatAppGenerator 的实现:

# new thread with request context and contextvars
context = contextvars.copy_context()
worker_thread = threading.Thread(
  target=self._generate_worker,
  kwargs={
    "flask_app": current_app._get_current_object(),  # type: ignore
    "context": context,
    "application_generate_entity": application_generate_entity,
    "queue_manager": queue_manager,
    "conversation_id": conversation.id,
    "message_id": message.id,
  },
)
worker_thread.start()

然后在 _generate_worker() 中使用:

with preserve_flask_contexts(flask_app, context_vars=context):
  runner = AgentChatAppRunner()
  runner.run(
    application_generate_entity=application_generate_entity,
    queue_manager=queue_manager,
    conversation=conversation,
    message=message,
  )

结合自定义的 preserve_flask_contexts() 函数,同时处理:

  • ContextVars 上下文 - Python 原生的上下文变量
  • Flask App 上下文 - Flask 应用上下文
  • 用户认证上下文 - Flask-Login 的用户对象

上下文变量

contextvars 是 Python 3.7 引入的标准库,用于管理 上下文变量(Context Variables),主要解决多线程或异步任务中变量传递的问题,主要应用场景有:

  1. 异步编程:在 asyncio 中,每个任务可以有独立的上下文变量
  2. Web开发:跟踪请求ID、用户身份等,无需在函数间显式传递
  3. 日志系统:自动在日志中包含上下文信息(如请求ID)

下面的代码演示了上下文变量的基本用法:

import contextvars

# 创建上下文变量
user_id = contextvars.ContextVar('user_id', default=None)

# 设置值(返回Token对象,用于后续重置)
token = user_id.set(123)

# 获取值
print(user_id.get())  # 输出: 123

# 重置值(使用之前保存的Token)
user_id.reset(token)
print(user_id.get())  # 输出: None(默认值)

# 重新设置值
token = user_id.set(456)

# 在函数中使用
def func():
    print(user_id.get())  # 输出: 456

func()

也可以模仿 Dify 的写法,在多线程中使用:

from contextlib import contextmanager

@contextmanager
def preserve_flask_contexts(context_vars: contextvars.Context):
  # Set context variables if provided
  if context_vars:
    for var, val in context_vars.items():
      var.set(val)
  yield

# 在新线程中使用
import threading

def func2(context: contextvars.Context):
  with preserve_flask_contexts(context_vars=context):
    print(user_id.get())  # 输出: 456

context = contextvars.copy_context()
worker_thread = threading.Thread(
  target=func2,
  kwargs={
    "context": context,
  },
)
worker_thread.start()

可以看出它和线程局部变量 threading.local 很像,两者区别如下:

特性contextvarsthreading.local
适用场景线程、异步任务仅线程
可复制性支持上下文复制不支持
异步友好

contextvars 特别适合需要在复杂调用链或异步任务中共享状态,但又不希望使用全局变量或显式参数传递的场景。但是上下文变量的查找速度略慢于普通变量,还可能使代码逻辑变得隐晦,在使用时需要特别注意,避免过度使用。

创建应用运行器

让我们继续看工作线程中创建应用运行器的部分:

def _generate_worker(
  self,
  flask_app: Flask,
  application_generate_entity: CompletionAppGenerateEntity,
  queue_manager: AppQueueManager,
  message_id: str,
) -> None:
  with flask_app.app_context():
    try:
      # 获取消息记录
      message = self._get_message(message_id)

      # 创建应用运行器并执行
      runner = CompletionAppRunner()
      runner.run(
        application_generate_entity=application_generate_entity,
        queue_manager=queue_manager,
        message=message,
      )
    except Exception as e:
      # 错误处理逻辑...

这里的代码是以文本生成应用为例的,其实,不同类型的应用(如聊天应用、智能体应用、工作流应用)都有对应的运行器实现,它们都遵循统一的接口规范:

app-runner-class.png

Dify 的应用运行器采用了清晰的继承结构,主要基于下面两个基类:

  • AppRunner(应用基础类):提供所有应用运行器的通用功能:提示消息组织、模型调用、内容审核、直接输出响应、外部数据集成等
  • WorkflowBasedAppRunner(工作流基础类):专门处理基于工作流的应用运行逻辑:图初始化、变量池管理、事件处理等

下面是具体的实现类:

  • ChatAppRunner(聊天应用):记忆管理、数据集检索、注释回复、外部数据工具、数据集检索等
  • CompletionAppRunner(文本生成应用):与聊天应用类似但没有对话记忆
  • AgentChatAppRunner(智能体):根据模型能力选择不同的智能体策略(函数调用或思维链)
  • WorkflowAppRunner(工作流):支持单次迭代运行和循环运行
  • AdvancedChatAppRunner(对话流):对话变量管理、输入审核、注释回复等

其中智能体的应用运行器比较特殊,它会根据模型能力选择不同的智能体策略,包括:

  • FunctionCallAgentRunner(函数调用):使用模型原生的函数调用能力,支持流式和非流式调用
  • CotAgentRunner(思维链):实现 ReAct (Reasoning + Acting) 模式的推理循环,它是一个抽象类,使用模板方法设计模式,定义了思维链的算法骨架,由子类实现具体步骤,包括:聊天模式的思维链(CotChatAgentRunner)和文本生成模式的思维链(CotCompletionAgentRunner

小结

今天我们深入学习了 Dify 的应用运行器机制。通过分析 CompletionAppGeneratorgenerate() 方法,我们了解了如何从生成器过渡到工作线程,并在其中创建应用运行器以执行具体生成任务。关键流程包括创建应用生成实体、初始化数据库记录、构建队列管理器以及启动工作线程等。总结如下:

  1. 了解队列管理器的异步管道机制,采用生产者-消费者模式,确保线程间安全通信,并实时处理和发布生成事件;
  2. 学习如何通过 @copy_current_request_contextcontextvars 实现跨线程的请求上下文传递,确保工作线程可以访问原始请求中的信息;
  3. 深入分析了不同类型的应用运行器及其继承结构,展示了如何根据具体应用需求选择不同的策略和实现类。

应用运行器是 Dify 的执行引擎,也是其核心所在。今天我们对这一机制进行了初步探索,明天我们将继续深入它的内部实现,揭示更多细节。


深入 Dify 应用的会话流程之追踪调试

在前面的几篇文章中,我们深入分析了 Dify 应用生成器的源码,包括限流策略、流式响应、配置管理以及上传文件的处理,今天我们将继续深入 CompletionAppGeneratorgenerate() 方法,学习另一个重要的组件 —— 追踪管理器(Trace Manager),通过它引出 LLMOps 的概念,并以 LangFuse 为例演示其使用方式,最后再详细讲解它的实现机制。

创建追踪管理器

让我们继续深入 generate() 方法的后续实现,在组装完成应用配置后,接下来就是创建追踪管理器:

trace_manager = TraceQueueManager(
  app_id=app_model.id,
  user_id=user.id if isinstance(user, Account) else user.session_id
)

这个 trace_manager 将贯穿整个会话流程,负责收集和记录各种运行时数据,为后续的分析和调试提供数据基础。接下来,将 trace_manager 注入到应用生成实体中:

application_generate_entity = CompletionAppGenerateEntity(
  task_id=str(uuid.uuid4()),
  app_config=app_config,
  model_conf=ModelConfigConverter.convert(app_config),
  file_upload_config=file_extra_config,
  inputs=self._prepare_user_inputs(
    user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
  ),
  query=query,
  files=list(file_objs),
  user_id=user.id,
  stream=streaming,
  invoke_from=invoke_from,
  extras={},
  trace_manager=trace_manager,  # 将 trace_manager 注入到实体中
)

应用生成实体 CompletionAppGenerateEntity 是一个携带所有生成所需信息的数据载体,其中就包含了 trace_manager,这样在整个应用执行过程中,各个模块都可以通过这个实体访问到追踪管理器,记录相应的运行数据。

后续的运行流程将分成两条线路:一条为工作线程,执行具体的业务逻辑;另一条为追踪线程,通过 Celery 任务队列,离线记录业务运行时产生的数据,并发送到外部 Ops 工具:

trace-flow.png

什么是 LLMOps

LLMOps(Large Language Model Operations) 即大语言模型运维,是 MLOps 在大语言模型领域的扩展,它涵盖了从 LLM 应用的开发、部署、监控到维护的完整生命周期管理。尽管 LLM 拥有出色的推理和文本生成能力,但其内部运作机制仍然难以完全理解,这给基于 LLM 的应用开发带来了挑战,比如:

  • 评估模型输出质量
  • 降低推理成本
  • 减少模型响应延迟
  • 链式调用、Agent 和工具引入的调试复杂性

目前市面上已经涌现出不少 LLMOps 工具,比如 LangSmithLangfuse 等,能够为 LLM 应用提供全面的追踪和深度评估能力。LLMOps 的核心概念包括:

  • 模型管理: 版本控制、A/B 测试、模型切换
  • 监控观测: 性能指标、成本跟踪、用户行为分析
  • 数据管理: 训练数据、推理日志、反馈收集
  • 部署运维: 自动化部署、扩缩容、故障恢复

不过我们一般更关注监控观测部分,通过在 LLM 应用中埋点,能够实现对模型性能的精细化监控,从而及时发现和解决潜在问题。

Dify 支持多种外部 Ops 工具的集成,包括:

下面我们以 Langfuse 为例,演示如何在 Dify 中配置和使用 LLMOps 工具。

Langfuse 集成演示

Langfuse 是一个开源的 LLM 工程平台,可以帮助团队协作调试、分析和迭代他们的应用程序。

langfuse.png

它提供了以下核心功能:

  • 追踪(Tracing):记录 LLM 应用的完整执行过程
  • 观测(Observability):提供实时的性能监控和可视化
  • 评估(Evaluation):支持多种评估指标和人工标注
  • 数据集管理:管理测试数据和历史记录
  • 成本追踪:监控 Token 使用和费用

获取 Langfuse 的 API Key

我们首先访问 Langfuse 官网,注册账号并登录,然后创建一个组织:

langfuse-new-org.png

然后在组织下创建一个项目:

langfuse-new-project.png

创建成功后,接着为项目创建 API Key:

langfuse-new-apikey.png

点击创建按钮,获取以下三个重要参数:

  • Public Key:公开密钥,用于客户端身份验证
  • Secret Key:私钥,用于服务端 API 调用
  • Host:Langfuse 服务器地址

langfuse-new-apikey-2.png

Langfuse 是开源项目,我们也可以本地部署它。

在 Dify 中配置 Langfuse

接下来,我们再在 Dify 中配置 Langfuse,打开需要监测的应用,点击左侧的 监测 菜单:

dify-monitor.png

这个页面显示了该应用的统计指标,包括会话数、活跃用户数、平均会话互动数、Token 输出速度、用户满意度、费用消耗、全部消息数等。然后再点击右上角的 追踪应用性能 按钮:

dify-monitor-2.png

该选项默认是禁用的,我们点击 Langfuse 右侧的配置按钮,将上面获取的 API 凭据粘贴到配置中并保存:

dify-monitor-3.png

配置成功后,可以在页面中看到状态显示为已启用,表示正在监测。

查看监测数据

配置完成后,当你在 Dify 中调试或使用应用时,所有的执行数据都会自动发送到 Langfuse 平台:

dify-test.png

在 Langfuse 中可以看到详细的追踪数据:

langfuse-trace.png

trace_manager 的实现

现在让我们来看下 Dify 中追踪管理器 trace_manager 的实现机制,了解它是如何收集、处理和发送追踪数据的。它的初始化代码如下:

trace_manager_timer: Optional[threading.Timer] = None
trace_manager_queue: queue.Queue = queue.Queue()

class TraceQueueManager:
  def __init__(self, app_id=None, user_id=None):
    self.app_id = app_id
    self.user_id = user_id
    # 获取该 app 的追踪实例(如 Langfuse, LangSmith 等)
    self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
    # Flask 实例
    self.flask_app = current_app._get_current_object()
    # 启动定时器
    if trace_manager_timer is None:
      self.start_timer()

这里主要有两个关键步骤:

  1. 获取追踪实例:根据应用 ID 查询该应用的追踪配置,初始化对应供应商的追踪实例
  2. 定时处理追踪任务:创建一个定时任务,每隔一段时间扫描一次追踪队列,将队列中的追踪任务发送 Celery 异步处理

获取追踪实例

首先根据应用 ID 查询该应用的追踪配置,也就是 apps 表的 tracing 字段:

{
  "enabled": true,
  "tracing_provider": "langfuse"
}

获取到对应的追踪供应商(如 Langfuse, LangSmith 等),然后再查询 trace_app_config 表获取详细的供应商配置,Dify 通过插件化的架构支持多种追踪供应商:

case TracingProviderEnum.LANGFUSE:
  return {
    "config_class": LangfuseConfig,
    "secret_keys": ["public_key", "secret_key"],
    "other_keys": ["host", "project_key"],
    "trace_instance": LangFuseDataTrace,
  }
case TracingProviderEnum.LANGSMITH:
  return {
    "config_class": LangSmithConfig,
    "secret_keys": ["api_key"],
    "other_keys": ["project", "endpoint"],
    "trace_instance": LangSmithDataTrace,
  }
case TracingProviderEnum.OPIK:
  # ...
case TracingProviderEnum.WEAVE:
  # ...
case TracingProviderEnum.ARIZE:
  # ...
case TracingProviderEnum.PHOENIX:
  # ...
case TracingProviderEnum.ALIYUN:
  # ...
case _:
  raise KeyError(f"Unsupported tracing provider: {provider}")

每个供应商的实现都由四个部分组成:

  • config_class - 该供应商的配置类
  • secret_keys - 该供应商的密钥信息,比如 Langfuse 的 public_keysecret_key,注意这些信息 Dify 都做了加密处理,防止密钥的泄漏
  • other_keys - 其他无需加密处理的信息
  • trace_instance - 该供应商的具体实现

定时处理追踪任务

然后通过 threading.Timer 创建一个定时任务,默认每隔 5s 执行一次:

trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5))

def start_timer(self):
  trace_manager_timer = threading.Timer(trace_manager_interval, self.run)
  trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
  trace_manager_timer.daemon = False
  trace_manager_timer.start()

该任务的实现 run() 方法如下:

trace_manager_queue: queue.Queue = queue.Queue()
trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))

def run(self):
  tasks = self.collect_tasks()
  self.send_to_celery(tasks)

# 从队列中收集任务
def collect_tasks(self):
  tasks: list[TraceTask] = []
  while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
    # 批量收集,最多 batch_size 个
    task = trace_manager_queue.get_nowait()
    tasks.append(task)
    trace_manager_queue.task_done()
  return tasks

# 异步处理任务
def send_to_celery(self, tasks: list[TraceTask]):
  with self.flask_app.app_context():
    for task in tasks:

      # 获取任务数据
      trace_info = task.execute()
      task_data = TaskData(
        app_id=task.app_id,
        trace_info_type=type(trace_info).__name__,
        trace_info=trace_info.model_dump() if trace_info else None,
      )

      # 将数据保存到文件中
      file_id = uuid4().hex
      file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json"
      storage.save(file_path, task_data.model_dump_json().encode("utf-8"))

      # 将任务发送到 Celery 队列进行异步处理
      file_info = {
        "file_id": file_id,
        "app_id": task.app_id,
      }
      process_trace_tasks.delay(file_info)

核心逻辑主要分为两个步骤:

  1. 收集任务:调用 collect_tasks() 从队列中收集一批待处理的追踪任务;该方法从全局队列 trace_manager_queue 中取出最多 trace_manager_batch_size 个任务;
  2. 发送任务:如果有任务,则调用 send_to_celery(tasks) 将任务发送到 Celery 异步任务队列;该方法首先将任务序列化后保存到存储系统,然后通过 process_trace_tasks.delay() 异步执行;

值得注意的是,这里的 process_trace_tasks 是一个 Celery 任务,因此可以调用 delay() 方法。它通过 @shared_task 装饰:

from celery import shared_task

@shared_task(queue="ops_trace")
def process_trace_tasks(file_info):
  #...

@shared_task 是 Celery 框架提供的装饰器,用于创建可以在不同 Celery 应用实例之间共享的异步任务。它最大的优势是应用无关性,不依赖特定的 Celery 应用实例,可以提高代码的可重用性。

添加追踪任务

上面提到,追踪管理器 trace_manager 被注入到应用生成实体中,因此在整个应用执行过程中,各个模块都可以通过追踪管理器记录相应的运行数据。当需要记录追踪数据时,通过 add_trace_task() 方法将任务添加到队列:

def add_trace_task(self, trace_task: TraceTask):
  global trace_manager_timer, trace_manager_queue
  try:
    if self.trace_instance:
      # 只有配置了追踪时才添加任务
      trace_task.app_id = self.app_id
      trace_manager_queue.put(trace_task)
  except Exception:
    logger.exception("Error adding trace task, trace_type %s", trace_task.trace_type)
  finally:
    # 确保定时器在运行
    self.start_timer()

Dify 支持多种类型的追踪任务,每种类型记录不同的运行时数据:

class TraceTaskName:
  # 会话追踪(未使用)
  CONVERSATION_TRACE = "conversation_trace"
  # 工作流追踪,记录工作流的执行详情
  WORKFLOW_TRACE = "workflow_trace"
  # 消息追踪,在消息处理过程中记录完整的对话数据
  MESSAGE_TRACE = "message_trace"
  # 审核追踪,记录内容审核的详细过程
  MODERATION_TRACE = "moderation_trace"
  # 建议问题追踪
  SUGGESTED_QUESTION_TRACE = "suggested_question_trace"
  # 数据集检索追踪,记录 RAG 检索的过程和结果
  DATASET_RETRIEVAL_TRACE = "dataset_retrieval_trace"
  # 工具调用追踪,记录智能体工具调用的详细信息
  TOOL_TRACE = "tool_trace"
  # 会话标题生成追踪
  GENERATE_NAME_TRACE = "generate_name_trace"

小结

今天我们深入学习了 Dify 的追踪调试机制,通过源码剖析,了解了 trace_manager 作为追踪系统的入口点,负责收集整个会话过程中的运行数据。此外,我们学习了 LLMOps 的概念以及常见的 LLMOps 工具,并通过 LangFuse 的实际使用,了解了如何在 Dify 中集成外部 Ops 工具,实现对应用运行状态的全面监控。

通过追踪管理器,Dify 实现了对应用执行过程的全面追踪,从消息对话、工具调用到知识库检索,不仅为开发者提供了强大的调试能力,也为生产环境下的性能优化和问题诊断奠定了坚实的基础。


深入 Dify 应用的会话流程之文件上传

我们之前曾学习过,当模型具备处理图片、文档、音视频的能力时(比如 Gemini 2.5 Pro),在应用的配置页面会多出三个配置开关:

file-settings.png

在 “视觉” 右侧有一个设置按钮,还可以进行更细致的配置:

vision.png

这个设置按钮虽然在 “视觉” 右侧,但是 “上传方式” 和 “上传数量限制” 对所有类型的文件都是生效的。

开启之后,我们就可以在对话时上传文件:

file-upload.png

今天我们将继续深入 Dify 应用生成器的内部实现,聚焦于文件上传部分,看看 Dify 是如何处理这几种不同类型的文件,以及 Dify 如何通过 SSRF 防护机制保障文件处理的安全性。

文件上传配置

我们昨天已经学习了 应用模型配置覆盖模型配置 的概念,它存储着应用的详细配置信息,其中就包括文件上传相关的配置:

{
  "image": {
    "detail": "high",
    "enabled": true,
    "number_limits": 3,
    "transfer_methods": [
      "remote_url",
      "local_file"
    ]
  },
  "enabled": true,
  "allowed_file_types": [
    "image"
  ],
  "allowed_file_extensions": [
    ".JPG", ".JPEG", ".PNG", ".GIF", ".WEBP", ".SVG", ".MP4", ".MOV", ".MPEG", ".WEBM"
  ],
  "allowed_file_upload_methods": [
    "remote_url",
    "local_file"
  ],
  "number_limits": 3,
  "fileUploadConfig": {
    "file_size_limit": 15,
    "batch_count_limit": 5,
    "image_file_size_limit": 10,
    "video_file_size_limit": 100,
    "audio_file_size_limit": 50,
    "workflow_file_upload_limit": 10
  }
}

这里的几个参数解释如下:

  • enabled 是否开启文件上传,当开启视觉、文档、音频三个开关中的任意一个时,该值即为 true
  • allowed_file_types 允许的文件类型,支持 image、document、audio 和 video 四种类型
  • allowed_file_extensions 允许的文件后缀,这个配置貌似没什么用,以 allowed_file_types 为准
  • allowed_file_upload_methods 允许的文件上传方式,支持 本地上传通过 URL 上传,默认两者都支持,对话框中上传文件的样式会根据这个参数而改变
  • number_limits 允许最多上传多少个文件
  • image.detail 图片分辨率设置,仅针对图片生效;低分辨率模式 将使模型接收图像的低分辨率版本,适用于对图片细节要求不高的场景,比如图片分类或简单的图片理解任务;高分辨率模式 处理速度慢,并消耗更多的处理资源,适用于需要分析图片细节的场景,比如 OCR、识别图片中的文字或复杂内容等

其中 fileUploadConfig 中的参数限制了前端上传各种类型文件的大小和数量,可以在 .env 文件中调整:

# Upload configuration
UPLOAD_FILE_SIZE_LIMIT=15
UPLOAD_FILE_BATCH_LIMIT=5
UPLOAD_IMAGE_FILE_SIZE_LIMIT=10
UPLOAD_VIDEO_FILE_SIZE_LIMIT=100
UPLOAD_AUDIO_FILE_SIZE_LIMIT=50

# Workflow file upload limit
WORKFLOW_FILE_UPLOAD_LIMIT=10

我们继续来看看 CompletionAppGeneratorgenerate() 方法,昨天从数据库中获取应用配置之后,接下来就是从应用配置中提取 文件上传配置

file_extra_config = FileUploadConfigManager.convert(
  override_model_config_dict or app_model_config.to_dict()
)

经过 convert() 方法后,得到 FileUploadConfig 对象:

class FileUploadConfig(BaseModel):
  image_config: Optional[ImageConfig] = None
  allowed_file_types: Sequence[FileType] = Field(default_factory=list)
  allowed_file_extensions: Sequence[str] = Field(default_factory=list)
  allowed_file_upload_methods: Sequence[FileTransferMethod] = Field(default_factory=list)
  number_limits: int = 0

文件工厂

当用户对话时传入了文件,在会话接口的入参中会多一个 files 参数,它是一个数组,格式如下:

[
  {
    "type": "image",
    "transfer_method": "local_file",
    "url": "",
    "upload_file_id": "d9341dfc-ceab-4041-9faf-a1a28579c589"
  },
  {
    "type": "image",
    "transfer_method": "remote_url",
    "url": "http://localhost:5001/files/90a2c3ad-d0c9-4d48-a7f9-b40e1dada22e/file-preview...",
    "upload_file_id": "90a2c3ad-d0c9-4d48-a7f9-b40e1dada22e"
  }
]

接下来的代码逻辑是,根据文件上传配置,将传入的文件转换为统一的 File 对象。这一步通过文件工厂的 build_from_mappings() 方法构建:

if file_extra_config:
  files = args["files"] if args.get("files") else []
  file_objs = file_factory.build_from_mappings(
    mappings=files,
    tenant_id=app_model.tenant_id,
    config=file_extra_config,
  )
else:
  file_objs = []

文件工厂通过一个简洁的分发机制来处理不同类型的文件:

def build_from_mapping(
  *,
  mapping: Mapping[str, Any],
  tenant_id: str,
  config: FileUploadConfig | None = None,
  strict_type_validation: bool = False,
) -> File:
  
  # 根据传输方式分发到不同的构建函数
  build_functions: dict[FileTransferMethod, Callable] = {
    FileTransferMethod.LOCAL_FILE: _build_from_local_file,
    FileTransferMethod.REMOTE_URL: _build_from_remote_url,
    FileTransferMethod.TOOL_FILE: _build_from_tool_file,
  }

  # 根据 transfer_method 找到对应的构建方法
  transfer_method = FileTransferMethod.value_of(mapping.get("transfer_method"))
  build_func = build_functions.get(transfer_method)

  # 构建文件对象
  file: File = build_func(
    mapping=mapping,
    tenant_id=tenant_id,
    transfer_method=transfer_method,
    strict_type_validation=strict_type_validation,
  )
  return file

可以看到不同的文件传输方式有不同的构建方法,Dify 支持三种文件传输方式:

  1. LOCAL_FILE - 本地文件
  2. REMOTE_URL - 远程文件
  3. TOOL_FILE - 工具文件

前两种我们在文章开篇已经见过了,在 Dify 的对话框中,支持两种文件上传方式:本地上传通过 URL 上传,他们都会获取并检查文件名,验证文件大小和类型,并生成一个唯一的文件键,保存到配置的存储后端(本地或云存储),同时还会创建一条数据库记录,保存到 upload_files 表中。

另外,Dify 中还支持通过工具生成文件,比如我们之前使用的文本转语音工具,它会生成一个音频文件,这个文件就是 工具文件,它同样保存在配置的存储后端,对应数据库中的 tool_files 表。

文件存储后端

Dify 支持本地存储、S3、阿里云 OSS、Azure Blob 等多种存储后端:

  • opendal (默认,推荐)
  • s3 (Amazon S3 或兼容 S3 的服务)
  • aliyun-oss (阿里云对象存储)
  • azure-blob (Azure Blob 存储)
  • google-storage (Google Cloud Storage)
  • tencent-cos (腾讯云对象存储)
  • huawei-obs (华为云对象存储)
  • baidu-obs (百度对象存储)
  • oci-storage (Oracle Cloud Infrastructure)
  • volcengine-tos (火山引擎对象存储)
  • supabase (Supabase 存储)
  • clickzetta-volume (ClickZetta 卷存储)
  • local (本地存储,已弃用)

默认使用的是 opendal 本地存储,存储路径位于 ./api/storage,文件上传后,完整的路径结构为:

./api/storage/upload_files/{tenant_id}/{uuid}.{extension}

工具生成的文件存储路径为:

./api/storage/tools/{tenant_id}/{uuid}.{extension}

Dify 的文件存储按租户隔离,确保租户的文件安全,并通过 SHA3-256 哈希值支持文件去重检测。

Apache OpenDAL(Open Data Access Layer) 是一个开源的数据访问层项目。它允许用户通过统一的 API 简单且高效地访问不同存储服务上的数据,其核心愿景是 One Layer, All Storage(一层接口,所有存储)

如果要切换到不同的存储后端,可以在 .env 文件中设置 STORAGE_TYPE 和相应的配置参数,比如切换到 Amazon S3:

# 基础配置
STORAGE_TYPE=s3

# S3 配置参数
S3_USE_AWS_MANAGED_IAM=false
S3_ENDPOINT=https://s3.amazonaws.com  # 或其他 S3 兼容服务
S3_BUCKET_NAME=your-bucket-name
S3_ACCESS_KEY=your-access-key
S3_SECRET_KEY=your-secret-key
S3_REGION=us-east-1
S3_ADDRESS_STYLE=auto  # auto, virtual, or path

构建本地文件

本地文件是用户通过 Dify 的文件上传界面上传到服务器存储的文件。这些文件会先存储在 Dify 的文件系统中(如本地磁盘、S3 等),并在数据库中记录相关元信息。接下来,我们看看本地文件的构建过程:

def _build_from_local_file(
  *,
  mapping: Mapping[str, Any],
  tenant_id: str,
  transfer_method: FileTransferMethod,
  strict_type_validation: bool = False,
) -> File:
  upload_file_id = mapping.get("upload_file_id")

  # 从数据库查询文件信息
  stmt = select(UploadFile).where(
    UploadFile.id == upload_file_id,
    UploadFile.tenant_id == tenant_id,
  )
  row = db.session.scalar(stmt)

  # 根据文件扩展和 MIME 类型获取文件类型
  # 文件类型可以是 IMAGE、DOCUMENT、AUDIO、VIDEO、CUSTOM
  file_type = _standardize_file_type(
    extension="." + row.extension,
    mime_type=row.mime_type
  )

  # 构建 File 对象
  return File(
    id=mapping.get("id"),
    filename=row.name,
    extension="." + row.extension,
    mime_type=row.mime_type,
    tenant_id=tenant_id,
    type=file_type,
    transfer_method=transfer_method,
    remote_url=row.source_url,
    related_id=mapping.get("upload_file_id"),
    size=row.size,
    storage_key=row.key,  # 存储系统中的键值
  )

本地文件的处理流程相对简单,主要包括:

  1. 查询文件记录:根据 upload_file_id 从数据库中查询文件的基本信息,包含文件名、文件大小、扩展名、MIME 类型以及存储系统中的键值等,这些信息是在文件上传时保存到数据库中的
  2. 类型检测:根据文件扩展名和 MIME 类型自动检测文件类型,文件类型是 IMAGE、DOCUMENT、AUDIO、VIDEO、CUSTOM 之一
  3. 构建 File 对象:将获取到的文件信息构建成统一的 File 对象,供后续使用

构建远程文件

远程文件是通过 URL 引用的外部文件,比如用户提供的图片链接、文档链接等。Dify 针对远程文件有两种处理方式,第一种是通过远程文件上传接口,根据 URL 从远程下载文件到存储系统,此时和本地文件的处理逻辑几乎一样;第二种是直接在会话接口中传入 URL 地址,这种方式不需要将文件上传到 Dify 服务器,而是在需要时动态获取。

def _build_from_remote_url(
  *,
  mapping: Mapping[str, Any],
  tenant_id: str,
  transfer_method: FileTransferMethod,
  strict_type_validation: bool = False,
) -> File:

  # 检查是否有关联的上传文件记录
  upload_file_id = mapping.get("upload_file_id")
  if upload_file_id:
    # 已缓存的远程文件,和本地文件一样处理...

  # 处理纯远程 URL
  url = mapping.get("url") or mapping.get("remote_url")

  # 获取远程文件信息
  mime_type, filename, file_size = _get_remote_file_info(url)
  extension = mimetypes.guess_extension(mime_type) or (
    "." + filename.split(".")[-1] if "." in filename else ".bin"
  )

  # 根据文件扩展和 MIME 类型获取文件类型
  file_type = _standardize_file_type(
    extension=extension, 
    mime_type=mime_type
  )

  # 构建 File 对象
  return File(
    id=mapping.get("id"),
    filename=filename,
    tenant_id=tenant_id,
    type=file_type,
    transfer_method=transfer_method,
    remote_url=url,
    mime_type=mime_type,
    extension=extension,
    size=file_size,
    storage_key="",  # 远程文件没有本地存储键
  )

远程文件的关键在于 _get_remote_file_info() 函数,它负责获取远程文件的元信息,包括 MIME 类型、文件名、文件大小:

def _get_remote_file_info(url: str):
  
  # 解析 URL 获取路径部分
  parsed_url = urllib.parse.urlparse(url)
  url_path = parsed_url.path

  # 从路径中提取文件名
  filename = os.path.basename(url_path)

  # 从文件名推测 MIME 类型
  mime_type, _ = mimetypes.guess_type(filename)

  # 初始化文件大小
  file_size = -1

  # 发送 HEAD 请求获取文件信息(注意这里使用了 ssrf_proxy)
  resp = ssrf_proxy.head(url, follow_redirects=True)
  if resp.status_code == httpx.codes.OK:
    # 从 Content-Disposition 头获取真实文件名
    if content_disposition := resp.headers.get("Content-Disposition"):
      filename = str(content_disposition.split("filename=")[-1].strip('"'))
      mime_type, _ = mimetypes.guess_type(filename)

    # 从 Content-Length 头获取文件大小
    file_size = int(resp.headers.get("Content-Length", file_size))

    # 从 Content-Type 头获取 MIME 类型
    if not mime_type:
      mime_type = resp.headers.get("Content-Type", "").split(";")[0].strip()

  return mime_type, filename, file_size

它的核心流程是,先通过 URL 解析,提取出初始文件名,基于文件名推测出 MIME 类型,接着再向远程 URL 发送 HEAD 请求,获取 HTTP 响应头,从响应头中获取准确的文件元信息:

  • Content-Disposition 头获取真实文件名
  • Content-Length 头获取文件大小
  • Content-Type 头获取 MIME 类型

这里 Dify 使用的两个技巧值得我们学习:

  1. Dify 使用 HEAD 请求而非 GET,只获取元数据不下载文件内容,可以提高效率
  2. Dify 使用 ssrf_proxy 而不是直接的 HTTP 请求,防止 SSRF 攻击,这是一个很重要的安全考虑,我们稍后会详细讨论这个安全机制

构建工具文件

工具文件是智能体或工具在执行过程中生成的临时文件,比如代码解释器生成的图表、文件处理工具创建的文档等。

def _build_from_tool_file(
  *,
  mapping: Mapping[str, Any],
  tenant_id: str,
  transfer_method: FileTransferMethod,
  strict_type_validation: bool = False,
) -> File:

  # 从数据库获取工具文件信息
  tool_file = db.session.scalar(
    select(ToolFile).where(
      ToolFile.id == mapping.get("tool_file_id"),
      ToolFile.tenant_id == tenant_id,
    )
  )

  # 根据文件扩展和 MIME 类型获取文件类型
  # 构建 File 对象

工具文件的构建过程和本地文件差不多,唯一的区别在于它存储在数据库的 tool_files 表中。

SSRF 防护机制

在远程文件处理中,我们注意到 Dify 使用了 ssrf_proxy.head() 而不是直接的 HTTP 请求。这涉及到一个重要的安全概念 —— SSRF(Server-Side Request Forgery,服务器端请求伪造)

什么是 SSRF

SSRF 是一种安全漏洞,攻击者可以诱使服务器代表他们向任意目标发起 HTTP 请求。这种攻击通常发生在 Web 应用需要获取用户提供的 URL 资源时,比如:

  • 图片预览功能
  • 网页截图服务
  • 文件下载功能
  • Webhook 回调

如果 Web 应用直接使用用户提供的 URL 而没有进行适当的验证和过滤,就可能遭受 SSRF 攻击。攻击者可以利用 SSRF 漏洞:

  1. 扫描内网:通过服务器访问内网地址(如 127.0.0.1192.168.x.x)来探测内网服务
  2. 绕过防火墙:利用服务器的网络位置访问被防火墙保护的资源
  3. 访问云元数据:在云环境中访问实例元数据服务(如 AWS EC2 的 169.254.169.254
  4. 端口扫描:探测服务器可访问的其他服务端口

例如,攻击者可能提供这样的恶意 URL:

  • http://127.0.0.1:6379/ - 访问本地 Redis
  • http://169.254.169.254/latest/meta-data/ - 访问 AWS 元数据服务
  • file:///etc/passwd - 读取本地文件

关于 SSRF 相关知识,推荐阅读这篇文章:

Dify 的 SSRF 防护策略

在之前的入门篇中,细心的读者可能已经注意到,在 docker-compose.yaml 文件中,一些服务配置了 SSRF_PROXYHTTP_PROXY 环境变量,全部指向一个 ssrf_proxy 容器:

ssrf_proxy:
  image: ubuntu/squid:latest
  restart: always
  volumes:
    - ./ssrf_proxy/squid.conf.template:/etc/squid/squid.conf.template
    - ./ssrf_proxy/docker-entrypoint.sh:/docker-entrypoint-mount.sh
  entrypoint:
    [ "sh", "-c", "docker-entrypoint.sh" ]
 environment:
   HTTP_PORT: ${SSRF_HTTP_PORT:-3128}
 networks:
   - ssrf_proxy_network
   - default

为避免不必要的风险,Dify 为所有可能引发 SSRF 攻击的服务配置了代理,并强制像 Sandbox 这样的沙盒服务只能通过代理访问外部网络,以确保数据和服务安全。在生产环境中,Dify 推荐使用 Squid 作为 SSRF 防护的代理服务器。

squid.png

默认情况下,该代理不会拦截任何本地请求,但我们可以通过修改其配置文件自定义代理行为。它的配置文件如下:

# 网络范围定义
# - 定义各种私有网络和本地网络范围,包括 RFC 1918 私有网络(10.x.x.x, 172.16-31.x.x, 192.168.x.x)
# - 包含 IPv6 本地网络范围和链路本地地址
acl localnet src 0.0.0.1-0.255.255.255    # RFC 1122 "this" network (LAN)
acl localnet src 10.0.0.0/8        # RFC 1918 local private network (LAN)
acl localnet src 100.64.0.0/10        # RFC 6598 shared address space (CGN)
acl localnet src 169.254.0.0/16     # RFC 3927 link-local (directly plugged) machines
acl localnet src 172.16.0.0/12        # RFC 1918 local private network (LAN)
acl localnet src 192.168.0.0/16        # RFC 1918 local private network (LAN)
acl localnet src fc00::/7           # RFC 4193 local private network range
acl localnet src fe80::/10          # RFC 4291 link-local (directly plugged) machines

# 端口访问控制
# - SSL_ports:允许的 SSL 端口(443)
# - Safe_ports:允许的安全端口(HTTP 80, HTTPS 443, FTP 21 等标准端口)
acl SSL_ports port 443
# acl SSL_ports port 1025-65535   # Enable the configuration to resolve this issue: https://github.com/langgenius/dify/issues/12792
acl Safe_ports port 80        # http
acl Safe_ports port 21        # ftp
acl Safe_ports port 443        # https
acl Safe_ports port 70        # gopher
acl Safe_ports port 210        # wais
acl Safe_ports port 1025-65535    # unregistered ports
acl Safe_ports port 280        # http-mgmt
acl Safe_ports port 488        # gss-http
acl Safe_ports port 591        # filemaker
acl Safe_ports port 777        # multiling http

# 方法和域名控制
# - CONNECT:CONNECT 方法控制
# - allowed_domains:只允许访问 .marketplace.dify.ai 域名
acl CONNECT method CONNECT
acl allowed_domains dstdomain .marketplace.dify.ai

# HTTP 访问规则,按优先级顺序:
# 1. 允许访问指定域名:允许访问 marketplace.dify.ai
# 2. 拒绝不安全端口:拒绝访问非安全端口
# 3. 限制 CONNECT 方法:只允许对 SSL 端口使用 CONNECT
# 4. 管理访问控制:只允许 localhost 进行管理
# 5. 拒绝所有其他访问:默认拒绝策略
http_access allow allowed_domains
http_access deny !Safe_ports
http_access deny CONNECT !SSL_ports
http_access allow localhost manager
http_access deny manager
http_access allow localhost
include /etc/squid/conf.d/*.conf
http_access deny all

# 为沙箱提供反向代理
http_port ${REVERSE_PROXY_PORT} accel vhost
cache_peer ${SANDBOX_HOST} parent ${SANDBOX_PORT} 0 no-query originserver
acl src_all src all
http_access allow src_all

通过 Squid 的访问控制列表(ACL),Dify 实现了有效的网络隔离和访问控制,大大降低了 SSRF 攻击的风险。这是一个值得学习和借鉴的安全最佳实践!

小结

我们今天学习了 Dify 对上传文件的处理过程,主要内容总结如下:

  • 支持 OpenDAL、S3、阿里云 OSS 等多样化的存储后端,默认按租户隔离文件,保障数据安全性;
  • 采用工厂模式统一文件处理逻辑,支持 LOCAL_FILEREMOTE_URLTOOL_FILE 三种不同的文件传输方式,统一转换为 File 对象;
  • 对于远程文件,Dify 引入 Squid 代理,通过 ACL 规则禁止访问内网地址与非安全端口,防范远程文件带来的 SSRF 风险;

在下一篇文章中,我们将继续深入会话流程的源码,探讨 Dify 是如何通过集成外部 Ops 工具,实现全面的追踪和深度评估能力。


深入 Dify 应用的会话流程之配置管理

在节前的文章中,我们深入分析了 Dify 会话处理流程的流式处理机制,学习了限流生成器、事件流转换和响应格式化的实现原理。通过分析 rate_limit.generate()convert_to_event_stream()compact_generate_response() 三个核心函数,我们理解了 Dify 是如何优雅地统一处理流式和非流式响应的。

今天我们将继续深入应用生成器的内部实现,也就是下面代码中的 “步骤 3”:

if app_model.mode == AppMode.COMPLETION.value:
  return rate_limit.generate(                        # 步骤1:限流生成器
    CompletionAppGenerator.convert_to_event_stream(  # 步骤2:事件流转换
      CompletionAppGenerator().generate(             # 步骤3:应用生成器
        app_model=app_model, user=user, args=args,
        invoke_from=invoke_from, streaming=streaming
      ),
    ),
    request_id=request_id,
  )

Python 中的方法重载机制

首先,在 CompletionAppGeneratorgenerate() 方法中,我们可以看到一个有意思的写法:

class CompletionAppGenerator(MessageBasedAppGenerator):
  
  @overload
  def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: Literal[True], # 明确指定为 True
  ) -> Generator[str | Mapping[str, Any], None, None]: ... # 返回生成器

  @overload
  def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: Literal[False], # 明确指定为 False
  ) -> Mapping[str, Any]: ... # 返回字典

  @overload
  def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: bool = False, # 通用情况
  ) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]: ...

  def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: bool = True,
  ) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
    # 实际的实现逻辑
    pass

可以看到这里定义了几种不同签名的 generate() 方法,并带有 @overload 装饰器,但是却没有真正的实现(实现部分只有 ...),这其实是 Python 中特殊的方法重载机制。

方法重载(Method Overloading) 在传统的强类型语言中是原生支持的特性,然而,Python 作为动态语言,并不支持方法重载。在 Python 中,如果在同一个类中定义多个同名方法,后定义的方法会覆盖先定义的方法:

class Example:
  def method(self, x: int):
    return f"Integer: {x}"

  def method(self, x: str):  # 覆盖了上面的方法
    return f"String: {x}"

example = Example()
print(example.method(42))  # 运行时错误!参数不匹配

为了解决这个问题,Python 3.5 引入了 typing.overload 装饰器,它不是真正的方法重载,而是为 静态类型检查器 提供类型提示的工具。@overload 为同一个函数提供多个类型签名,让类型检查器能够根据不同的参数类型组合推断出相应的返回类型。被 @overload 装饰的方法称为 重载签名,用于描述不同调用方式的类型信息,但这些方法本身不会被执行。最后一个不带 @overload 装饰器的同名方法才是真正的实现。

Python 的 @overload 并不是真正的方法重载,而是为静态类型检查器提供类型提示的工具。

通过 @overload 装饰器,可以提供几个好处:

  1. 类型安全性:在编译时就能检测出类型错误,避免运行时错误
  2. IDE 智能提示:IDE 可以根据参数类型提供更精确的代码提示

在上面的例子中:

  • streaming=True 时,返回类型是 Generator(流式响应)
  • streaming=False 时,返回类型是 Mapping(非流式响应)
  • streamingbool 类型时,返回联合类型

这样,当开发者调用 generate() 方法时,IDE 和类型检查器就能根据 streaming 参数的值自动推断出正确的返回类型,提升了代码的类型安全性和开发体验。

应用模型配置

我们继续看 generate() 的实现,首先是获取 应用模型配置(app model config),这里有三种情况。第一种是无会话状态的应用,比如文本生成,直接根据 应用模型(app model) 获取配置:

# get conversation
conversation = None

# get app model config
app_model_config = self._get_app_model_config(app_model=app_model, conversation=conversation)

第二种是有会话状态的应用,比如聊天助手或智能体,根据传入的会话 ID 获取配置:

# get conversation
conversation = None
conversation_id = args.get("conversation_id")
if conversation_id:
  conversation = ConversationService.get_conversation(
    app_model=app_model, conversation_id=conversation_id, user=user
  )

# get app model config
app_model_config = self._get_app_model_config(app_model=app_model, conversation=conversation)

第三种是工作流或对话流应用,它们没有应用模型配置,因此忽略这一步。

应用模型(app model)应用模型配置(app model config) 是两个不同的概念:应用模型对应数据库中的 apps 表,代表一个应用的基本信息和元数据,它的 ORM 模型如下:

class App(Base):
  __tablename__ = "apps"
  __table_args__ = (sa.PrimaryKeyConstraint("id", name="app_pkey"), sa.Index("app_tenant_id_idx", "tenant_id"))

  # 应用ID
  id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
  # 租户ID
  tenant_id: Mapped[str] = mapped_column(StringUUID)
  # 应用名称
  name: Mapped[str] = mapped_column(String(255))
  # 应用描述
  description: Mapped[str] = mapped_column(sa.Text, server_default=sa.text("''::character varying"))
  # 应用模式 (chat, completion, workflow, etc.)
  mode: Mapped[str] = mapped_column(String(255))
  # 图标类型 (image, emoji)
  icon_type: Mapped[Optional[str]] = mapped_column(String(255))
  # 图标
  icon = mapped_column(String(255))
  # 关联的配置ID
  app_model_config_id = mapped_column(StringUUID, nullable=True)
  # 关联的工作流ID (可选)
  workflow_id = mapped_column(StringUUID, nullable=True)
  # 应用状态
  status: Mapped[str] = mapped_column(String(255), server_default=sa.text("'normal'::character varying"))
  # 是否启用站点
  enable_site: Mapped[bool] = mapped_column(sa.Boolean)
  # 是否启用API
  enable_api: Mapped[bool] = mapped_column(sa.Boolean)
  # ... 其他基础字段  

而应用模型配置对应数据库中的 app_model_configs 表,存储应用的详细配置信息,对应的 ORM 模型如下:

class AppModelConfig(Base):
  __tablename__ = "app_model_configs"
  __table_args__ = (sa.PrimaryKeyConstraint("id", name="app_model_config_pkey"), sa.Index("app_app_id_idx", "app_id"))

  # 配置ID
  id = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
  # 所属应用ID
  app_id = mapped_column(StringUUID, nullable=False)
  # 模型提供商
  provider = mapped_column(String(255), nullable=True)
  # 模型ID
  model_id = mapped_column(String(255), nullable=True)
  # 通用配置 (JSON格式)
  configs = mapped_column(sa.JSON, nullable=True)
  # 开场白 (JSON字符串)
  opening_statement = mapped_column(sa.Text)
  # 建议问题 (JSON字符串)
  suggested_questions = mapped_column(sa.Text)
  # 语音转文字 (JSON字符串)
  speech_to_text = mapped_column(sa.Text)
  # 文字转语音 (JSON字符串)
  text_to_speech = mapped_column(sa.Text)
  # 更多类似的 (JSON字符串)
  more_like_this = mapped_column(sa.Text)
  # 模型配置 (JSON字符串)
  model = mapped_column(sa.Text)
  # 用户输入表单 (JSON字符串)
  user_input_form = mapped_column(sa.Text)
  # 提示词模板
  pre_prompt = mapped_column(sa.Text)
  # 智能体模式 (JSON字符串)
  agent_mode = mapped_column(sa.Text)
  # 敏感词审查 (JSON字符串)
  sensitive_word_avoidance = mapped_column(sa.Text)
  # 引用和归属 (JSON字符串)
  retriever_resource = mapped_column(sa.Text)
  # 知识库配置 (JSON字符串)
  dataset_configs = mapped_column(sa.Text)
  # 文件上传配置 (JSON字符串)
  file_upload = mapped_column(sa.Text)
  # ... 其他基础字段

可以看到,应用模型配置里的大多数字段都是 JSON 字符串,因此 AppModelConfig 还提供了一些便捷的属性访问器(通过 @property 装饰器实现),将 JSON 字符串转换为 Python 对象:

@property
def model_dict(self) -> dict:
  """模型配置字典"""
  return json.loads(self.model) if self.model else {}

@property
def suggested_questions_list(self) -> list:
  """建议问题列表"""
  return json.loads(self.suggested_questions) if self.suggested_questions else []

@property
def file_upload_dict(self) -> dict:
  """文件上传配置字典"""
  return json.loads(self.file_upload) if self.file_upload else {
    "image": {
      "enabled": False,
      "number_limits": DEFAULT_FILE_NUMBER_LIMITS,
      "detail": "high",
      "transfer_methods": ["remote_url", "local_file"],
    }
  }

@property
def dataset_configs_dict(self) -> dict:
  """知识库配置字典"""
  if self.dataset_configs:
    dataset_configs = json.loads(self.dataset_configs)
    if "retrieval_model" not in dataset_configs:
      return {"retrieval_model": "single"}
    return dataset_configs
  return {"retrieval_model": "multiple"}

应用配置可能会被反复修改,因此一个应用可能会有多个版本的配置,每当点击 “发布更新” 时就会生成一条配置记录:

apps-publish.png

要特别注意的是,当用户第一次创建会话时,会话和应用配置关联,该会话的后续聊天内容都将以该配置为准,如果此时修改应用配置,可能会不生效,必须创建一个新会话。

SQLAlchemy 介绍

我们再继续看 _get_app_model_config() 方法的实现:

def _get_app_model_config(self, app_model: App, conversation: Optional[Conversation] = None) -> AppModelConfig:
  # 获取应用模型配置
  # 支持两种获取方式:会话配置或应用默认配置
  if conversation:
    # 1. 从特定会话获取配置
    stmt = select(AppModelConfig).where(
      AppModelConfig.id == conversation.app_model_config_id,
      AppModelConfig.app_id == app_model.id
    )
    app_model_config = db.session.scalar(stmt)
  else:
    # 2. 从应用默认配置获取(属性访问器)
    app_model_config = app_model.app_model_config

  return app_model_config

这里使用了 SQLAlchemy 访问数据库。

SQLAlchemy 是 Python 中最流行的 ORM(对象关系映射)工具之一,它提供了一套高层 API 用于操作关系型数据库,同时也支持底层的 SQL 表达式操作。它的核心优势在于将 Python 对象与数据库表结构进行映射,让开发者可以用面向对象的方式操作数据库,而不必直接编写复杂的 SQL 语句。它的主要特点有:

  1. ORM 功能:允许开发者定义 Python 类作为数据库表的映射,通过操作类实例来实现对数据库的增删改查
  2. SQL 表达式语言:提供了一种灵活的方式构建 SQL 语句,既保留了 SQL 的表达能力,又具备 Python 代码的可读性和可维护性
  3. 支持多种数据库:兼容 PostgreSQL、MySQL、SQLite、Oracle、Microsoft SQL Server 等主流数据库,且操作接口统一
  4. 事务支持:内置事务管理机制,确保数据库操作的原子性、一致性、隔离性和持久性(ACID)
  5. 连接池管理:自动管理数据库连接池,优化数据库连接的创建和释放,提升性能

以下是一个使用 SQLAlchemy ORM 操作 SQLite 数据库的简单示例:

from sqlalchemy import create_engine, select, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 创建基类
Base = declarative_base()

# 定义数据模型(映射到数据库表)
class User(Base):
  __tablename__ = 'users'  # 表名
  
  id = Column(Integer, primary_key=True)
  name = Column(String(50), nullable=False)
  age = Column(Integer)

# 创建数据库引擎(SQLite 数据库)
engine = create_engine('sqlite:///example.db')

# 创建所有表(根据定义的模型)
Base.metadata.create_all(engine)

# 创建会话工厂
Session = sessionmaker(bind=engine)
session = Session()

# 添加数据
new_user = User(name='Alice', age=30)
session.add(new_user)
new_user = User(name='Bob', age=31)
session.add(new_user)
session.commit()

# 查询数据(1.x 语法)
users = session.query(User).all()
for user in users:
  print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")

# 查询数据(2.0 语法)
stmt = select(User).where(
  User.name == "Alice",
)
user = session.scalar(stmt)
print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")

# 关闭会话
session.close()

这段代码非常简单,主要是了解下 SQLAlchemy 的几个核心概念:

  • Engine:数据库连接的核心,负责管理数据库连接池和执行 SQL 语句
  • Session:用于操作数据库的会话对象,类似数据库事务的上下文
  • Declarative Base:所有 ORM 模型类的基类,通过继承它可以定义数据库表结构
  • MetaData:用于描述数据库 schema(表、列、约束等)的元数据容器

SQLAlchemy 平衡了高层 ORM 的便捷性和底层 SQL 的灵活性,是 Python 后端开发中处理数据库的重要工具,广泛应用于 Flask、Django 等 Web 框架中。

查询语法

SQLAlchemy 2.0 引入了全新的现代查询语法,更加直观且符合 Python 风格。这种新语法将核心查询操作统一到了 select() 等函数中,并提供了更流畅的链式调用体验。

在 Dify 中使用了 SQLAlchemy 2.0 查询语法:

stmt = select(AppModelConfig).where(
  AppModelConfig.id == config_id,
  AppModelConfig.app_id == app_id
)
app_model_config = db.session.scalar(stmt)

不过也有不少地方使用的是 1.x 传统语法,比如根据应用获取默认配置:

@property
def app_model_config(self):
  if self.app_model_config_id:
    return db.session.query(AppModelConfig).where(
      AppModelConfig.id == self.app_model_config_id
    ).first()
  return None

SQLAlchemy 2.0 的现代查询语法更加直观,将所有查询操作都统一到了函数式的 API 中,避免了旧版本中 Query 对象与核心表达式之间的不一致性。这种新语法也更好地支持类型提示,提高了代码的可维护性和开发效率。大家在阅读源码时注意区分。

ext_database 扩展模块

Dify 通过 ext_database 扩展模块(位于 extensions/ext_database.py 文件)管理数据库连接:

def init_app(app: DifyApp):
  db.init_app(app)                # 初始化 Flask-SQLAlchemy
  _setup_gevent_compatibility()   # 设置 Gevent 兼容性

其中 dbFlask-SQLAlchemy 实例:

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import MetaData

# PostgreSQL 索引命名约定
POSTGRES_INDEXES_NAMING_CONVENTION = {
  "ix": "%(column_0_label)s_idx",                    # 普通索引
  "uq": "%(table_name)s_%(column_0_name)s_key",      # 唯一约束
  "ck": "%(table_name)s_%(constraint_name)s_check",  # 检查约束
  "fk": "%(table_name)s_%(column_0_name)s_fkey",     # 外键约束
  "pk": "%(table_name)s_pkey",                       # 主键约束
}

metadata = MetaData(naming_convention=POSTGRES_INDEXES_NAMING_CONVENTION)
db = SQLAlchemy(metadata=metadata)

db.init_app() 则是用于初始化 Flask-SQLAlchemy 扩展,它有两个重要的配置参数:

@computed_field  # type: ignore[misc]
@property
def SQLALCHEMY_DATABASE_URI(self) -> str:
  db_extras = (
    f"{self.DB_EXTRAS}&client_encoding={self.DB_CHARSET}" if self.DB_CHARSET else self.DB_EXTRAS
  ).strip("&")
  db_extras = f"?{db_extras}" if db_extras else ""
  return (
    f"{self.SQLALCHEMY_DATABASE_URI_SCHEME}://"
    f"{quote_plus(self.DB_USERNAME)}:{quote_plus(self.DB_PASSWORD)}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_DATABASE}"
    f"{db_extras}"
  )

@computed_field  # type: ignore[misc]
@property
def SQLALCHEMY_ENGINE_OPTIONS(self) -> dict[str, Any]:
  # Parse DB_EXTRAS for 'options'
  db_extras_dict = dict(parse_qsl(self.DB_EXTRAS))
  options = db_extras_dict.get("options", "")
  # Always include timezone
  timezone_opt = "-c timezone=UTC"
  if options:
    # Merge user options and timezone
    merged_options = f"{options} {timezone_opt}"
  else:
    merged_options = timezone_opt

  connect_args = {"options": merged_options}

  return {
    "pool_size": self.SQLALCHEMY_POOL_SIZE,
    "max_overflow": self.SQLALCHEMY_MAX_OVERFLOW,
    "pool_recycle": self.SQLALCHEMY_POOL_RECYCLE,
    "pool_pre_ping": self.SQLALCHEMY_POOL_PRE_PING,
    "connect_args": connect_args,
    "pool_use_lifo": self.SQLALCHEMY_POOL_USE_LIFO,
    "pool_reset_on_return": None,
  }

分别是数据库连接和连接池配置,可以根据需要,在 .env 文件中通过环境变量来修改默认值。

覆盖模型配置

获取应用模型配置之后,紧接着处理调试模式下的配置覆盖:

# 覆盖模型配置(仅在调试模式下允许)
override_model_config_dict = None
if args.get("model_config"):
  if invoke_from != InvokeFrom.DEBUGGER:
    raise ValueError("Only in App debug mode can override model config")

  # 验证配置的有效性,设置默认值
  override_model_config_dict = CompletionAppConfigManager.config_validate(
    tenant_id=app_model.tenant_id, config=args.get("model_config", {})
  )

在正常情况下,会话接口的入参类似下面这样:

{
  "response_mode": "streaming",
  "conversation_id": "773bc365-5faa-4716-b11d-6e77fffd2639",
  "files": [],
  "query": "<用户问题>",
  "inputs": {},
  "parent_message_id": "9841bc16-0a8a-4fad-812d-ee5703cde868"
}

但是在调试模式,会多一个 model_config 参数:

{
  "response_mode": "streaming",
  "conversation_id": "773bc365-5faa-4716-b11d-6e77fffd2639",
  "files": [],
  "query": "<用户问题>",
  "inputs": {},
  "parent_message_id": "9841bc16-0a8a-4fad-812d-ee5703cde868",
  "model_config": {
    // 和 AppModelConfig 基本一致
  }
}

组装应用配置

最后,将应用模型、应用模型配置和覆盖模型配置转换为统一的 应用配置(app config) 对象:

app_config = CompletionAppConfigManager.get_app_config(
  app_model=app_model,
  app_model_config=app_model_config,
  override_config_dict=override_model_config_dict
)

各个应用类型都有自己的配置类。所有简单 UI 的应用配置都继承自 EasyUIBasedAppConfig

# 文本生成应用配置  
class CompletionAppConfig(EasyUIBasedAppConfig):
  pass

# 聊天助手配置
class ChatAppConfig(EasyUIBasedAppConfig):
  pass

# 智能体配置
class AgentChatAppConfig(EasyUIBasedAppConfig):
  agent: Optional[AgentEntity] = None

而工作流和对话流的应用配置则是继承自 WorkflowUIBasedAppConfig

# 工作流配置
class WorkflowAppConfig(WorkflowUIBasedAppConfig):
  pass

# 对话流配置
class AdvancedChatAppConfig(WorkflowUIBasedAppConfig):
  pass

应用配置(AppConfig) 是经过解析和验证后的配置实体对象,为应用运行时提供类型安全的配置访问。Dify 为每种应用定义了各自的 运行器(AppRunner),应用配置组装完成后,就传给运行器开始执行。大致流程如下:

app-config.png

小结

今天,我们详细分析了 Dify 应用生成器的配置管理机制,从应用模型(app model)到应用模型配置(app model config),到覆盖模型配置(override model config),到最后统一的应用配置对象组装(app config)。在阅读代码的同时,我们穿插学习了 Python 中的方法重载机制,SQLAlchemy 的基本使用和查询语法相关的知识,以及基于 Flask-SQLAlchemy 的 ext_database 扩展模块。通过本文的学习,相信大家对 Dify 中配置相关的概念和实现原理都有了基本的了解。