Fork me on GitHub

分类 Dify 下的文章

学习 Dify 的代码沙箱

在上一篇文章中,我们讲到了 Dify 的工具系统,其中有一个代码执行的内置工具非常重要,无论是在工作流的代码节点中执行用户代码,还是在智能体中作为 Code Interpreter 调用,都离不开这个工具。为了执行用户代码,Dify 需要一个安全的、隔离的、可控的代码执行环境,这也就是本文的主角 —— Dify 代码沙箱

如果没有完善的沙箱隔离机制,恶意用户可以利用代码执行漏洞访问系统文件、盗取数据、甚至获得整个服务器的控制权。因此,Dify 在代码沙箱的设计和实现上下了不少功夫,其中有不少值得学习的点。本文将详细介绍 Dify 代码沙箱的工作原理,以及它所采用的各种安全隔离技术。

代码沙箱概述

代码沙箱(Code Sandbox) 是一个隔离的代码执行环境,允许在受控的环境中安全地运行不信任的代码。它的核心目标是在提供代码执行功能的同时,防止恶意或有缺陷的代码对系统造成危害。

在 Dify 中,代码沙箱主要用于以下场景:

  1. 工作流代码节点:用户可以在工作流中添加代码节点,用 Python 或 JavaScript 处理工作流中的数据
  2. 代码执行工具:作为智能体的工具,大模型可以自主调用代码执行器完成计算任务
  3. 模板转换:在数据处理过程中,使用代码对数据进行转换和清洗

这些场景的共同点是,代码来自于用户或 AI 生成,具有不可预测性,因此必须在沙箱中执行。

沙箱的安全需求

设计一个安全的代码沙箱需要满足以下需求:

需求说明
进程隔离用户代码运行在独立的进程中,不影响主程序
文件系统隔离代码无法访问主机系统的文件和目录
网络隔离代码可以根据配置选择性地访问网络
系统调用限制代码不能调用危险的系统调用(如 fork、exec 等)
权限限制代码以低权限用户身份运行
资源限制限制代码的 CPU、内存和执行时间
代码隐私用户的代码在传输和执行过程中受到保护

Dify 通过一个独立的沙箱服务 dify-sandbox 来实现这些需求,它采用了 Linux 提供的多种安全机制,构建了一套分层的防御体系。

代码执行的实现

在深入 Dify 的代码沙箱之前,让我们来看下代码执行相关的逻辑,其实现位于 CodeExecutor 类:

class CodeExecutor:

  @classmethod
  def execute_code(cls, language: CodeLanguage, preload: str, code: str) -> str:
    """
    调用代码沙箱,执行代码
    """

    # 接口地址
    url = code_execution_endpoint_url / "v1" / "sandbox" / "run"

    # 简单鉴权
    headers = {"X-Api-Key": dify_config.CODE_EXECUTION_API_KEY}

    # 接口入参
    data = {
      "language": cls.code_language_to_running_language.get(language),
      "code": code,
      "preload": preload,
      "enable_network": True,
    }

    # 发送请求
    response = post(
      str(url),
      json=data,
      headers=headers,
      timeout=Timeout(
        connect=dify_config.CODE_EXECUTION_CONNECT_TIMEOUT,
        read=dify_config.CODE_EXECUTION_READ_TIMEOUT,
        write=dify_config.CODE_EXECUTION_WRITE_TIMEOUT,
        pool=None,
      ),
    )

    # 获取 stdout 输出
    response_data = response.json()
    response_code = CodeExecutionResponse(**response_data)
    return response_code.data.stdout or ""

  @classmethod
  def execute_workflow_code_template(cls, language: CodeLanguage, code: str, inputs: Mapping[str, Any]):
    """
    执行工作流代码节点
    """

    # 将用户代码和输入参数嵌入预置的代码模板
    template_transformer = cls.code_template_transformers.get(language)
    runner, preload = template_transformer.transform_caller(code, inputs)

    try:
      # 调用代码执行
      response = cls.execute_code(language, preload, runner)
    except CodeExecutionError as e:
      raise e

    # 将执行结果转换为工作流的节点出参
    return template_transformer.transform_response(response)

其中 execute_workflow_code_template() 函数负责工作流中的代码节点的执行,注意它并没有直接执行用户的代码,而是做了一层模板转换。因为代码执行服务是通过标准输出获取执行结果的,而工作流的代码节点中用户输入的代码必须包含一个 main() 入口:

def main(x: int, y: int) -> dict:
  return {
    'sum' : x + y
  }

如果直接执行不会有任何输出,所以 Dify 在调用代码执行服务之前,先使用一段预置的代码模板将用户代码和输入参数包起来。模板如下:

# 用户代码,申明 main 函数
{cls._code_placeholder}

import json
from base64 import b64decode

# 输入参数
inputs_obj = json.loads(b64decode('{cls._inputs_placeholder}').decode('utf-8'))

# 执行 main 函数
output_obj = main(**inputs_obj)

# 将输出转换为 JSON 格式并打印输出
output_json = json.dumps(output_obj, indent=4)
result = f'''<<RESULT>>{{output_json}}<<RESULT>>'''
print(result)

最终的输出结果会被转换为 JSON 格式并通过 print 打印出来,打印的时候加上 <<RESULT>> 这个特殊标签,防止 main 函数里其他的 print 对结果造成干扰,方便解析。

另外,CodeExecutor 中还有一个 execute_code() 函数,这才是真正的代码执行入口,支持 Python 或 JavaScript 两种编程语言,它负责将代码发送到代码沙箱服务并处理返回结果:

$ curl -X POST http://127.0.0.1:8194/v1/sandbox/run \
  -H "X-Api-Key: dify-sandbox" \
  -H "Content-Type: application/json" \
  -d '{
    "language": "python3",
    "code": "print('"'"'hello'"'"')",
    "preload": "",
    "enable_network": true
  }'

该接口通过 X-Api-Key 头支持简单的鉴权,该值可以在 .env 文件中修改:

CODE_EXECUTION_ENDPOINT=http://127.0.0.1:8194
CODE_EXECUTION_API_KEY=dify-sandbox

Dify 代码沙箱架构

Dify 的代码沙箱服务以 Docker 容器运行,采用了 防御纵深(Defense in Depth) 的设计思想,实现了多层安全防御,即使某一层防御被突破,其他层仍然能够保护系统。其执行流程如下:

sandbox-flow.png

整个过程涉及多个安全层,下面我们逐一探讨。

第一层:代码加密与编码

Dify 的代码沙箱在执行用户代码时,首先会将用户代码写到一个临时文件中,然后再启动 Python 或 Node.js 去运行该脚本文件,通过捕获 Python 或 Node.js 进程的 stdoutstderr 获取代码执行结果。

为了防止用户的代码以明文形式出现在磁盘上,Dify 在写入临时文件时对代码做了一次简单的加密:

// 生成一个 512 字节的随机密钥
key_len := 64
key := make([]byte, key_len)
_, err := rand.Read(key)

// 加密代码:采用简单的 XOR 加密,将代码与密钥进行 XOR 操作
encrypted_code := make([]byte, len(code))
for i := 0; i < len(code); i++ {
    encrypted_code[i] = code[i] ^ key[i%key_len]
}

// 对加密后的代码进行 Base64 编码
code = base64.StdEncoding.EncodeToString(encrypted_code)
// 对密钥进行 Base64 编码
encoded_key := base64.StdEncoding.EncodeToString(key)

用户的代码也不是直接执行的,而是通过另一个模板文件 prescript.py 动态生成的,可以在这个模板文件中看到用户代码的解密过程:

from base64 import b64decode

# 解码密钥
key = b64decode(key)

# 解码用户代码
code = b64decode("{{code}}")

# 定义解密函数
def decrypt(code, key):
  key_len = len(key)
  code_len = len(code)
  code = bytearray(code)
  for i in range(code_len):
    code[i] = code[i] ^ key[i % key_len]  # XOR 操作
  return bytes(code)

# 解密用户代码
code = decrypt(code, key)

# 执行用户代码
exec(code)

虽然 XOR 加密的安全性相对较弱,但由于这个密钥是一次性的,考虑到所有操作都在容器中,以及配合其他安全机制,这种方案在实践中是足够的。

第二层:进程隔离

每次代码执行都会创建一个独立的子进程,用户代码在这个进程中运行,与沙箱服务的主进程完全隔离。这样,即使代码因某种原因崩溃或消耗过多资源,也不会影响沙箱服务本身。另外,参考 docker/volumes/sandbox/conf/config.yaml 文件,沙箱还支持配置以下资源限制:

配置项说明默认值
max_workers最大并发执行进程数4
max_requests请求队列大小50
worker_timeout单个代码执行的超时时间5 秒

超过超时时间的进程会被强制杀死,这是一个重要的资源保护机制。

第三层:文件系统隔离

chroot 是 Linux 提供的一个系统调用,可以改变进程的根目录。当一个进程执行 chroot("/some/path") 后,对于这个进程来说,/some/path 就变成了新的根目录,进程无法访问此目录之外的任何文件。

在 Dify 沙箱中,Python 代码执行时会被 chroot 到一个特殊的沙箱目录,如 /var/sandbox/sandbox-python 目录。这个目录包含了 Python 运行时所需的最小文件集合。这样,即使用户代码尝试执行 open("/etc/passwd"),它实际上会尝试打开 /var/sandbox/sandbox-python/etc/passwd,而这个文件并不存在,因此访问会被拒绝。

这个最小化的文件系统被称为 chroot 监狱,即使用户代码知道绝对路径,也无法访问这个监狱之外的文件。

Dify 沙箱通过 Go 语言的 syscall.Chroot() 系统调用实现该功能:

import "syscall"

func InitSeccomp(uid int, gid int, enable_network bool) error {

  err := syscall.Chroot(".")

  // 其他安全措施...
}

需要注意的是,chroot 本身并不是一个强大的安全机制,如果进程有能力调用 chdir() 和其他系统调用,可能找到逃逸的方式,需要配合其他机制来形成完整的防护。因此,Dify 在应用 chroot 的同时,还使用了 seccomp 来限制进程可以调用的系统调用,后面我们会具体介绍。

第四层:用户权限隔离

沙箱中的代码不应该以 root 身份运行。Dify 在代码执行时,会使用 setuidsetgid 系统调用,将进程的用户身份和组身份切换到一个非特权用户:

import "syscall"

func InitSeccomp(uid int, gid int, enable_network bool) error {

  // 其他安全措施...

  // setuid
  err = syscall.Setuid(uid)

  // setgid
  err = syscall.Setgid(gid)
}

这个非特权用户名为 sandbox,ID 为 65537,是由沙箱服务在启动时自动创建的。

这样做的好处是:

  1. 限制文件和目录的访问权限(基于文件的 Unix 权限位)
  2. 防止进程获得 root 权限进行的操作
  3. 减小代码逃逸后的影响范围

除了权限降级外,Dify 还使用了 prctl(PR_SET_NO_NEW_PRIVS) 系统调用来禁止进程及其所有子进程获得新的权限:

import "github.com/langgenius/dify-sandbox/internal/core/lib"

func InitSeccomp(uid int, gid int, enable_network bool) error {

  // 其他安全措施...

  lib.SetNoNewPrivs()

  // 其他安全措施...
}

其中 SetNoNewPrivs() 函数通过调用 Go 语言的 syscall.Syscall6() 系统调用设置进程的 PR_SET_NO_NEW_PRIVS 标志:

func SetNoNewPrivs() error {
  // syscall.SYS_PRCTL 表示 Linux 的 prctl 系统调用,用于操作进程的各种属性
  // 它的第一个参数 0x26 是 PR_SET_NO_NEW_PRIVS 标志常数的十六进制值
  // 它的第二个参数 1 表示启用该标志
  _, _, e := syscall.Syscall6(syscall.SYS_PRCTL, 0x26, 1, 0, 0, 0, 0)
}

这个标志的作用是,即使进程调用了一个设置了 setuid 位的二进制文件(如 sudo),也无法获得额外的权限。这是一个额外的保护层,防止通过特殊的二进制文件实现权限提升。

第五层:系统调用隔离

Seccomp(Secure Computing Mode) 是 Linux 内核提供的一个强大的安全机制,它允许进程通过 BPF(Berkeley Packet Filter) 程序来过滤系统调用。当进程执行被禁用的系统调用时,内核会立即终止该进程或返回错误。

Seccomp 使用 BPF 字节码来实现系统调用过滤,当进程执行系统调用时,内核会执行这个 BPF 程序来决定是否允许该调用。BPF 程序的结果有几种可能:

返回值说明
SECCOMP_RET_ALLOW允许系统调用执行
SECCOMP_RET_ERRNO返回错误码,进程继续执行
SECCOMP_RET_KILL_PROCESS杀死整个进程(推荐)
SECCOMP_RET_KILL_THREAD杀死当前线程
SECCOMP_RET_TRAP发送信号给进程

Dify 采用 白名单模式,即只有显式允许的系统调用才能执行,其他所有系统调用都会被拒绝。Dify 针对不同的编程语言(Python 和 Node.js)以及不同的系统架构(ARM64 和 AMD64)提供了不同的白名单:

  • internal/static/python_syscall/ - Python 允许的系统调用
  • internal/static/nodejs_syscall/ - Node.js 允许的系统调用

比如针对 ARM64 架构下的 Python 语言,白名单如下:

allow-syscalls.png

这个白名单分三个部分:

  • ALLOW_SYSCALLS 允许的系统调用
  • ALLOW_ERROR_SYSCALLS 允许的系统调用,但是返回报错
  • ALLOW_NETWORK_SYSCALLS 允许的网络系统调用

可以看到,危险的系统调用如 execveforkptrace 等都被禁止了,这确保了用户代码无法执行任意的系统命令或创建新进程。

系统调用隔离的实现同样位于 InitSeccomp 函数中:

import "github.com/langgenius/dify-sandbox/internal/core/lib"

func InitSeccomp(uid int, gid int, enable_network bool) error {

  // 其他安全措施...

  allowed_syscalls = append(allowed_syscalls, python_syscall.ALLOW_SYSCALLS...)
  if enable_network {
    allowed_syscalls = append(allowed_syscalls, python_syscall.ALLOW_NETWORK_SYSCALLS...)
  }
  allowed_not_kill_syscalls = append(allowed_not_kill_syscalls, python_syscall.ALLOW_ERROR_SYSCALLS...)

  err = lib.Seccomp(allowed_syscalls, allowed_not_kill_syscalls)

  // 其他安全措施...
}

其中 lib.Seccomp() 函数的实现如下:

import (
    "syscall"
    sg "github.com/seccomp/libseccomp-golang"
)

func Seccomp(allowed_syscalls []int, allowed_not_kill_syscalls []int) error {
    
  // 初始化 Seccomp 过滤器
  ctx, err := sg.NewFilter(sg.ActKillProcess)

  // 添加规则:允许的系统调用
  for _, syscall := range allowed_syscalls {
    ctx.AddRule(sg.ScmpSyscall(syscall), sg.ActAllow)
  }

  // 添加规则:报错的系统调用
  for _, syscall := range allowed_not_kill_syscalls {
    ctx.AddRule(sg.ScmpSyscall(syscall), sg.ActErrno)
  }

  // 将过滤器规则导出成字节码
  file := os.NewFile(uintptr(writer.Fd()), "pipe")
  ctx.ExportBPF(file)

  // 应用 Seccomp 规则
  _, _, err2 := syscall.Syscall(
    SYS_SECCOMP,
    uintptr(SeccompSetModeFilter),
    uintptr(SeccompFilterFlagTSYNC),
    uintptr(unsafe.Pointer(&bpf)),
  )

它通过 libseccomp-golang 库,创建 Seccomp 过滤器并添加对应的白名单规则。

第六层:网络隔离

除了上述安全机制外,Dify 还允许根据配置选择性地启用或禁用网络访问:

enable_network: True

这是通过上面的 Seccomp 过滤器来实现的,当禁用网络后,所有和网络相关的系统调用(比如 socketconnect 等)都将被拒绝。

另外,细心的朋友可能会注意到,在 Docker Compose 部署中,沙箱服务是运行在一个隔离的网络中,使用 SSRF 代理容器来控制出站连接:

sandbox:
  image: langgenius/dify-sandbox:0.2.12
  environment:
    ENABLE_NETWORK: ${SANDBOX_ENABLE_NETWORK:-true}
    HTTP_PROXY: ${SANDBOX_HTTP_PROXY:-http://ssrf_proxy:3128}
    HTTPS_PROXY: ${SANDBOX_HTTPS_PROXY:-http://ssrf_proxy:3128}
  networks:
    - ssrf_proxy_network

因此即使代码绕过了 Seccomp 限制(理论上不可能),Docker 网络层也会提供额外的防护。

共享库的奥秘

看到这里,大家可能会疑惑,上述大多数的安全措施都位于 InitSeccomp() 函数中,这是一个 Go 函数,它是如何作用在用户编写的 Python 或 Node.js 脚本上的呢?

答案在于 python.so 这个 C 共享库。

Dify 通过 CGO(Go 的 C 互操作机制)将 InitSeccomp() 中的防护逻辑编译成一个 so 库文件,编译命令如下:

CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build \
  -o internal/core/runner/python/python.so \
  -buildmode=c-shared \
  -ldflags="-s -w" \
  cmd/lib/python/main.go

其中 CGO_ENABLED=1 表示启用 CGO,允许 Go 调用 C 代码;GOOS=linux 表示目标操作系统为 Linux,Dify 的沙箱服务只能跑在 Linux 环境下;-buildmode=c-shared 表示编译为 C 共享库格式;最后的 -ldflags="-s -w" 用于移除符号表和调试信息,减小文件大小。

编译后的 python.so 文件主要导出函数是 DifySeccomp,它在 cmd/lib/python/main.go 中定义:

package main

import (
  "github.com/langgenius/dify-sandbox/internal/core/lib/python"
)
import "C"

// 导出的函数
func DifySeccomp(uid int, gid int, enable_network bool) {
  python.InitSeccomp(uid, gid, enable_network)
}

func main() {}

这个函数被 Python 脚本通过 ctypes 调用,在用户代码执行前完成所有的安全隔离设置:

import ctypes

# 动态加载 python.so,这是 Dify 沙箱的 C 共享库
lib = ctypes.CDLL("./python.so")
lib.DifySeccomp.argtypes = [ctypes.c_uint32, ctypes.c_uint32, ctypes.c_bool]
lib.DifySeccomp.restype = None

# 应用安全隔离:chroot + seccomp + setuid + SetNoNewPrivs
lib.DifySeccomp({{uid}}, {{gid}}, {{enable_network}})

# 执行用户代码 ...
code = decrypt(code, key)
exec(code)

实际应用示例

让我们通过一个例子来理解 Seccomp 的实际效果,假设用户在工作流的代码节点中尝试执行以下恶意操作:

def main() -> dict:    
  return {
    "result": "hello",
  }

# 尝试删除整个系统
import os
os.system("rm -rf /")

当 Python 执行 os.system() 时,实际上会调用 execve 系统调用来启动新的进程。但由于 execve 被 Seccomp 过滤器拒绝了,内核会立即终止这个进程并返回错误:

code-execute-error.png

这种保护是在内核级别进行的,任何绕过尝试都会失败。

小结

代码沙箱作为 AI 应用安全的基石,其重要性不言而喻。我们今天详细学习了 Dify 代码沙箱的工作原理,它巧妙地结合了 Linux 提供的多种安全机制:

  • Chroot 提供文件系统隔离
  • Setuid/Setgid 实现权限降级
  • SetNoNewPrivs 防止权限提升
  • Seccomp 在系统调用级别进行细粒度的控制

通过这些机制的组合,Dify 实现了进程隔离、文件系统隔离、用户权限隔离、系统调用隔离及网络隔离等多层防御,打造了一个既安全又实用的代码执行环境,允许 AI 应用能够动态执行代码,同时避免恶意代码对系统造成伤害。


学习 Dify 的工具系统

在这一个月的时间里,我们通过源码深入学习了 Dify 的整个会话流程,了解了从应用生成器到运行器的核心机制,从限流控制、文件处理、跟踪调试,到提示词组装、内容审核、外部数据扩展,再到最后的知识库检索和模型调用。通过这个完整的会话过程,我们几乎把 Dify 的各个方面都摸了个遍。

不过,关于 Dify 还有很多未展开的话题值得深入研究,比如智能体、工作流、对话流这些应用的实现,不过这里面的细节也很多,为了防止这个系列过于冗长,我决定不再继续讲解应用的源码,而是挑一些比较重要且有意思的点来写,比如今天将要学习的工具系统。

工具使得 AI 应用能够跳出纯文本对话的局限,连接外部世界的各种服务和功能。无论是在智能体应用中自主决策调用工具,还是在工作流中精确编排工具的执行,工具系统都是 Dify 应用能力的重要体现。本文将详细介绍 Dify 工具的使用和开发,包括内置工具、自定义工具、MCP 工具、工作流工具、工具插件等,带你全面认识 Dify 的工具系统。

内置工具

我们之前在实战智能体的时候,曾经使用过 Dify 内置的几个小工具:

  • 语音转写:支持 TTS 文本转语音和 ASR 语音转文本;
  • 代码执行:运行 Python 代码并返回执行结果;
  • 时间计算:各种时间小工具,比如获取当前时间、时区转换、时间戳转换、计算星期几等;
  • 网页抓取:获取指定 URL 的网页内容;

我们打开 “工具” 页面,可以看到所有内置的工具以及来自 Dify 市场的工具,用户可以择需安装:

builtin-tools.png

所有内置工具的实现位于 api/core/tools/builtin_tool/providers 目录,以约定的目录结构进行组织:

.
├── audio              # 按供应商分组
│   ├── _assets        # 图标资源
│   │   └── icon.svg
│   ├── audio.py       # 供应商代码,主要是认证鉴权
│   ├── audio.yaml     # 供应商配置文件,名称、描述、作者、图标、标签等
│   └── tools
│       ├── asr.py     # 工具的具体实现代码
│       ├── asr.yaml   # 工具配置文件,名称、描述以及参数的详细信息
│       ├── tts.py
│       └── tts.yaml
├── code
├── time
└── webscraper

可以看到,内置工具按供应商进行分组,每个供应商可以包含多个工具。供应商和工具都有对应的 YAML 配置文件,用于定义其基本信息和参数,以及对应的 Python 实现代码。

在这里 “供应商” 这个词可能不太合适,使用 “工具箱” 可能更容易理解一点。

Dify 在启动时会自动扫描并加载这个目录下的所有工具,因此对于在本地部署 Dify 的开发人员来说,完全可以在这个目录下添加并实现自己的内置工具。

自定义工具

自定义工具允许用户通过导入 OpenAPI 规范 的 API 文档,快速集成任何 RESTful API 作为工具:

custom-tools.png

其实,考虑到兼容性,Dify 支持好几种不同的 API 规范:

  • OpenAPI 3.0/3.1:业界标准的 API 文档格式
  • Swagger 2.0:OpenAPI 的前身规范
  • OpenAI Plugin:OpenAI 插件规范

对 Schema 的解析可以参考 ApiBasedToolSchemaParser 的源码。下面是一个天气查询接口的 Schema 示例:

{
  "openapi": "3.1.0",
  "info": {
    "title": "天气查询",
    "description": "天气查询",
    "version": "v1.0.0"
  },
  "servers": [
    {
      "url": "https://query.asilu.com"
    }
  ],
  "paths": {
    "/weather/baidu": {
      "get": {
        "description": "查询某个城市的天气信息",
        "operationId": "QueryWeather",
        "parameters": [
          {
            "name": "city",
            "in": "query",
            "description": "城市名称",
            "required": true,
            "schema": {
              "type": "string"
            }
          }
        ],
        "deprecated": false
      }
    }
  },
  "components": {
    "schemas": {}
  }
}

其中 operationId 会作为工具的名称,description 是工具的描述,点击 “测试” 按钮可以对接口进行调试:

custom-tools-debug.png

自定义工具的实现位于 api/core/tools/custom_tool/,系统会自动解析 API 规范并调用相应的工具接口。值得注意的是,对外部接口的调用,统一走 SSRF 代理,防止 SSRF 攻击。

MCP 工具

MCP 工具是一种更灵活的工具集成方式,通过 Model Context Protocol 标准,允许连接任何实现了 MCP 规范的外部服务器。它目前已经是大模型调用工具的事实性规范了,几乎所有的接口提供方都在争相推出他们的 MCP Server。

我们以 高德地图 MCP Server 为例,演示下如何添加 MCP 工具。首先,需要在高德官网上申请 API KEY,然后点击 “添加 MCP 服务”:

mcp-tools.png

依次填写名称、图标、服务端点、服务器标识符等信息,再点击 “添加并授权”,就可以看到 MCP Server 下的 15 个工具都已添加成功:

mcp-tools-list.png

MCP 的完整实现位于 api/core/mcp/,包括客户端、会话管理、认证等完整的生态支持。

工作流工具

当我们开发完一个工作流并点击发布时,可以选择将其发布为一个工具:

workflow-publish.png

我们需要对工具的名称、描述和参数进行配置:

workflow-tool.png

配置完成之后,我们就可以在 “工具管理” 页面看到这个工具,并且可以在智能体和工作流等应用中使用它了。

工具插件

我们昨天曾学习过 Dify 的插件机制,并演示了如何从零开始开发一个模型插件。除了模型插件,Dify 也支持工具插件,我们完全可以按照昨天的步骤再开发一个工具插件,注意在选择插件模板时选择 “tool” 模板:

dify-plugin-init.png

生成的模板目录结构如下:

calculator
├── GUIDE.md
├── PRIVACY.md
├── README.md
├── _assets
│   ├── icon-dark.svg
│   └── icon.svg
├── main.py
├── manifest.yaml
├── provider
│   ├── calculator.py
│   └── calculator.yaml
├── requirements.txt
└── tools
    ├── add.py
    └── add.yaml

同样是先将插件清单、供应商配置、工具配置这几个 YAML 文件调整下,最后编辑 add.py 文件,完成工具的实现:

class AddTool(Tool):
  
  def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]:
    """
    加法运算
    """
    x = tool_parameters.get("x", 0)
    y = tool_parameters.get("y", 0)
    result = str(x + y)
    yield self.create_text_message(result)

这里的 create_text_message() 表示工具返回的是文本消息,Dify 定义了多种工具返回消息类型,支持文本、链接、图片、文件等丰富的内容:

# 创建文本消息
self.create_text_message(text="Hello, World!")

# 创建 JSON 消息
self.create_json_message(data={"key": "value"})

# 创建链接消息
self.create_link_message(link="https://example.com")

# 创建图片消息
self.create_image_message(image="https://example.com/image.jpg")

# 创建文件 BLOB 消息
self.create_blob_message(blob=file_bytes, meta={"mime_type": "application/pdf"})

然后将插件打包上传,就可以在智能体或工作流中使用了:

plugin-tool.png

验证工具能否正常工作:

plugin-tool-use.png

插件签名

有一点我们昨天没有提,当使用 dify plugin package 打包插件并上传到 Dify 平台时,可能会遇到下面这个签名错误:

plugin-upload-fail.png

可以在 docker/middleware.env 环境变量文件的末尾添加如下配置参数:

# 关闭插件签名
FORCE_VERIFYING_SIGNATURE=false

如果你不是通过源码部署的,可以修改 /docker/.env 文件。

然后重启 Dify 的插件服务即可:

$ docker compose -f docker-compose.middleware.yaml up -d \
  --force-recreate \
  --no-deps \
  plugin_daemon

不过这种方法将允许安装所有未审核的插件,可能存在安全隐患,最好的做法是对插件进行签名。插件的开发者首先需要创建密钥对:

$ dify signature generate -f demo

这个命令将生成 demo.private.pemdemo.public.pem 两个文件,一个私钥,一个公钥。然后开发者使用私钥对插件进行签名:

$ dify signature sign calculator.difypkg -p demo.private.pem

这个命令将生成一个带签名的插件文件 calculator.signed.difypkg,不过这个时候插件还上传不了。开发者需要将公钥提交给 Dify 平台的管理员,管理员审核通过后,将该开发者的公钥放在 public_keys 目录下,并修改平台配置:

# 开启插件签名
FORCE_VERIFYING_SIGNATURE=true
THIRD_PARTY_SIGNATURE_VERIFICATION_ENABLED=true
THIRD_PARTY_SIGNATURE_VERIFICATION_PUBLIC_KEYS=/app/storage/public_keys/demo.public.pem

重启插件服务后,开发者就可以上传带签名的插件了。

小结

Dify 的工具系统是一个分层次、多维度的生态,从内置工具的开箱即用,到自定义 API 工具的快速集成,再到 MCP 工具的标准化接入,以及最后的插件工具开发,提供了从简单到复杂、从低门槛到高灵活性的完整解决方案。

无论你是想快速为应用增强功能,还是想开发一个功能丰富的工具插件供社区使用,Dify 的工具系统都能满足你的需求。

关于本文中所提到的内置工具的开发,以及工具插件的示例,我已经将完整源码发布到 Github 上了,感兴趣的同学可以参考:


深入 Dify 的应用运行器之模型调用

在前面的几篇文章中,我们学习了 Dify 应用运行器中的外部数据扩展和知识库检索相关的内容,至此,万事俱备,只欠东风,我们已经到达应用运行器的收尾阶段。接下来就是重新组装提示词,调用大模型,完成对用户问题的回答。

我们今天就来深入学习模型调用相关的逻辑,以及 Dify 是如何通过插件化的架构来管理和调用各种大语言模型的,顺便看下 Dify 插件的实现原理,并通过一个自定义模型插件演示如何从零开发 Dify 插件。

模型实例

回顾 CompletionAppRunnerrun() 方法,在处理完外部数据和知识库检索后,核心的模型调用逻辑如下:

# 重新组装提示词,包含外部数据和知识库上下文
prompt_messages, stop = self.organize_prompt_messages(...)

# 创建模型实例
model_instance = ModelInstance(
  provider_model_bundle=application_generate_entity.model_conf.provider_model_bundle,
  model=application_generate_entity.model_conf.model,
)

# 调用大语言模型
invoke_result = model_instance.invoke_llm(
  prompt_messages=prompt_messages, # 处理后的提示消息
  model_parameters=application_generate_entity.model_conf.parameters,  # 模型参数(温度、max_tokens等)
  stop=stop, # 停止词
  stream=application_generate_entity.stream, # 是否流式输出
  user=application_generate_entity.user_id, # 用户ID(用于追踪和限流)
)

在 Dify 中,所有的模型调用都通过 ModelInstance 类来完成,这个类位于 api/core/model_manager.py,它是模型调用的统一入口。ModelInstance 不仅支持大语言模型,还支持多种其他类型的模型:

class ModelInstance:
  
  # 大语言模型调用
  def invoke_llm(self, prompt_messages, model_parameters=None, tools=None, stop=None,
        stream=True, user=None, callbacks=None) -> Union[LLMResult, Generator]:
    # 调用大语言模型,支持文本生成、对话、工具调用等
    pass

  # 文本嵌入模型调用
  def invoke_text_embedding(self, texts: list[str], user: Optional[str] = None) -> TextEmbeddingResult:
    # 将文本转换为向量表示,用于语义搜索和相似度计算
    pass

  # 重排序模型调用
  def invoke_rerank(self, query: str, docs: list[str], score_threshold: Optional[float] = None,
          top_n: Optional[int] = None, user: Optional[str] = None) -> RerankResult:
    # 对文档进行重新排序,提高检索质量
    pass

  # 内容审核模型调用
  def invoke_moderation(self, text: str, user: Optional[str] = None) -> ModerationResult:
    # 检测文本中的不当内容,如暴力、色情、仇恨言论等
    pass

  # 语音转文本模型调用
  def invoke_speech2text(self, file: IO[bytes], user: Optional[str] = None) -> str:
    # 将音频文件转换为文本
    pass

  # 文本转语音模型调用
  def invoke_tts(self, content_text: str, voice: str, user: Optional[str] = None) -> bytes:
    # 将文本转换为音频,支持多种语音选择
    pass

这种统一的接口设计让开发者可以用一致的方式调用不同类型的 AI 模型。此外,ModelInstance 还实现了模型调用的负载均衡和容错机制,当用户配置了多个 API Key 时,系统会自动进行负载均衡;如果某个 API Key 遇到速率限制或认证错误时,系统会自动将其放入冷却期,并切换到其他可用的 API Key,确保服务的连续性。

不同类型的模型实现

所有的模型实现都继承自 AIModel 基类:

class AIModel(BaseModel):
  tenant_id: str          # 租户 ID
  model_type: ModelType   # 模型类型(LLM、Embedding 等)
  plugin_id: str          # 插件 ID
  provider_name: str      # 供应商名称
  plugin_model_provider: PluginModelProviderEntity  # 插件模型供应商实体

具体的模型类型实现包括:

  • LargeLanguageModel - 大语言模型
  • TextEmbeddingModel - 文本嵌入模型
  • RerankModel - 重排序模型
  • ModerationModel - 内容审核模型
  • Speech2TextModel - 语音转文本模型
  • TTSModel - 文本转语音模型

每种模型类型都实现了对应的 invoke() 方法,并且它们的共同点是都会通过 PluginModelClient 与插件守护进程通信。以大语言模型为例,LargeLanguageModel 的实现如下:

class LargeLanguageModel(AIModel):

  def invoke(...) -> Union[LLMResult, Generator[LLMResultChunk, None, None]]:
    # 调用大语言模型
    plugin_model_manager = PluginModelClient()
    result = plugin_model_manager.invoke_llm(
      tenant_id=self.tenant_id,
      user_id=user or "unknown",
      plugin_id=self.plugin_id,
      provider=self.provider_name,
      model=model,
      credentials=credentials,
      model_parameters=model_parameters,
      prompt_messages=prompt_messages,
      tools=tools,
      stop=list(stop) if stop else None,
      stream=stream,
    )
    # 返回流式或非流式结果 ...

其中,PluginModelClient 是 Dify 应用与插件守护进程的通信桥梁,它负责将模型调用请求转发给插件守护进程:

class PluginModelClient(BasePluginClient):
  def invoke_llm(...) -> Generator[LLMResultChunk, None, None]:
    # 调用插件中的大语言模型
    response = self._request_with_plugin_daemon_response_stream(
      method="POST",
      path=f"plugin/{tenant_id}/dispatch/llm/invoke",
      type=LLMResultChunk,
      data=jsonable_encoder(
        # 构造请求数据
        {
          "user_id": user_id,
          "data": {
            "provider": provider,
            "model_type": "llm",
            "model": model,
            "credentials": credentials,
            "prompt_messages": prompt_messages,
            "model_parameters": model_parameters,
            "tools": tools,
            "stop": stop,
            "stream": stream,
          },
        }
      ),
      headers={
        "X-Plugin-ID": plugin_id,
        "Content-Type": "application/json",
      },
    )

    # 流式返回结果
    yield from response

Dify 插件化架构

可以看出,真正的模型调用是由插件守护进程执行的,这也是 Dify 最具创新性的设计之一,将所有模型供应商都以插件的形式封装在独立的 Dify Plugin Daemon 服务中。通过这种插件化的架构设计,Dify 构建了一个开放、可扩展的模型生态系统。

当然,除了模型供应商,Dify 支持多种类型的插件:

  1. 模型插件(Models):集成各种 AI 模型,如 LLM、Embedding、TTS 等
  2. 工具插件(Tools):为 Agent 和工作流提供专业能力,如数据分析、内容处理等
  3. Agent 策略插件(Agent Strategies):创建自定义推理策略(ReAct, CoT, ToT),增强 Agent 能力
  4. 扩展插件(Extensions):通过 HTTP Webhook 集成外部服务
  5. 包插件(Bundles):将多个插件组合打包

Dify 的插件系统的由两部分组成:

  1. Dify Plugin Daemon - 独立的插件守护进程,是一个 Go 开发的 Web 服务,提供了插件管理、插件调用等接口
  2. api/core/plugin/** - Dify 主服务中的客户端集成代码,包括上面我们介绍的 PluginModelClient,通过 HTTP 请求调用插件服务的各项功能

尽管插件守护进程是用 Go 开发的,但是每个插件还是用 Python 开发的。插件服务为每个插件创建一个独立的 Python 虚拟环境,并拥有独立的工作目录,每个插件运行在独立的进程中。我们进到 dify-plugin-daemon 容器中,使用 ps 命令可以看到运行的每个插件进程:

dify-plugin-daemon-ps.png

从零开发一个模型插件

让我们通过一个具体的例子来了解如何开发一个模型插件。假设我们要接入一个新的大语言模型 "MockGPT"。

第一步:准备开发环境

首先安装 Dify 插件开发工具:

$ brew tap langgenius/dify
$ brew install dify

然后运行 dify version 验证安装

$ dify version
v0.3.1

如果能正确输出版本号,则说明安装成功。

第二步:初始化插件项目

运行 dify plugin init 创建新插件项目:

dify-plugin-init.png

根据提示依次填写插件名称、作者、描述等信息,按回车进入下一步:

dify-plugin-init-2.png

看提示 Dify 是计划支持 Python 和 Go 两种编程语言来开发插件的,只不过目前仅支持 Python,继续回车进入下一步:

dify-plugin-init-3.png

这里对 Dify 的插件类型做了一个简单的介绍,并为我们准备了几个常见的插件模版,选择 llm 模版继续:

dify-plugin-init-4.png

这里是对插件权限的配置,比如允许插件反向调用 Dify 中的工具或其他模型等,我们暂时不用管,保持默认即可。再次回车项目初始化完成:

[INFO]plugin demo created successfully, you can refer to `demo/GUIDE.md` for more information about how to develop it

生成的插件目录结构如下:

demo
├── GUIDE.md
├── PRIVACY.md
├── README.md
├── _assets
│   ├── icon-dark.svg
│   └── icon.svg
├── main.py
├── manifest.yaml
├── models
│   └── llm
│       ├── llm.py
│       └── llm.yaml
├── provider
│   ├── demo.py
│   └── demo.yaml
└── requirements.txt

第三步:插件清单

编辑 manifest.yaml 文件,完善插件信息(有一些基础信息上面已经填过),包括标签、描述、图标、资源等:

version: 0.0.1
type: plugin
author: aneasystone
name: demo
label:
  en_US: Demo AI Provider                       # 修改
  zh_Hans: 演示 AI 供应商                         # 修改
description:
  en_US: A demo AI model provider for learning   # 修改
  zh_Hans: 用于学习的演示 AI 模型供应商             # 修改
icon: icon.svg
icon_dark: icon-dark.svg
resource:
  memory: 268435456
  permission: {}
plugins:
  models:
    - provider/demo.yaml
meta:
  version: 0.0.1
  arch:
    - amd64
    - arm64
  runner:
    language: python
    version: "3.12"
    entrypoint: main
  minimum_dify_version: null
created_at: 2025-10-22T07:45:17.322263+08:00
privacy: PRIVACY.md
verified: false

第四步:模型供应商配置

编辑 provider/demo.yaml 文件,修改其中的 iconprovider_source 两个地方(因为自动生成文件名不存在),其他参数保持不变:

provider: demo
label:
  en_US: "Demo"
description:
  en_US: "Models provided by demo."
  zh_Hans: "Demo 提供的模型。"
icon_small:
  en_US: "icon.svg"  # 保证 _assets 目录下存在
icon_large:
  en_US: "icon.svg"  # 保证 _assets 目录下存在
background: "#E5E7EB"
help:
  title:
    en_US: "Get your API Key from demo"
    zh_Hans: "从 Demo 获取 API Key"
  url:
    en_US: "https://__put_your_url_here__/account/api-keys"
supported_model_types:
  - llm
configurate_methods:
  - predefined-model
  - customizable-model
model_credential_schema:
  model:
    label:
      en_US: Model Name
      zh_Hans: 模型名称
    placeholder:
      en_US: Enter your model name
      zh_Hans: 输入模型名称
  credential_form_schemas:
    - variable: openai_api_key
      label:
        en_US: API Key
      type: secret-input
      required: true
      placeholder:
        zh_Hans: 在此输入您的 API Key
        en_US: Enter your API Key
provider_credential_schema:
  credential_form_schemas:
    - variable: openai_api_key
      label:
        en_US: API Key
      type: secret-input
      required: true
      placeholder:
        zh_Hans: 在此输入您的 API Key
        en_US: Enter your API Key
models:
  llm:
    predefined:
      - "models/llm/*.yaml"
extra:
  python:
    provider_source: provider/demo.py # 修改
    model_sources:
      - "models/llm/llm.py"

第五步:模型配置

编辑 models/llm/llm.yaml 文件,修改模型名称,并根据实际情况配置你的模型特性、参数和价格:

model: mock-gpt-v1     # 修改
label:
  zh_Hans: 模拟大模型 v1 # 修改
  en_US: Mock GPT v1   # 修改
model_type: llm
features:
  - multi-tool-call
  - agent-thought
  - stream-tool-call
model_properties:
  mode: chat
  context_size: 16385
parameter_rules:
  - name: temperature
    use_template: temperature
  - name: top_p
    use_template: top_p
  - name: presence_penalty
    use_template: presence_penalty
  - name: frequency_penalty
    use_template: frequency_penalty
  - name: max_tokens
    use_template: max_tokens
    default: 512
    min: 1
    max: 16385
  - name: response_format
    use_template: response_format
pricing:
  input: '0.003'
  output: '0.004'
  unit: '0.001'
  currency: USD

第六步:实现 MockGPT

编辑 models/llm/llm.py 文件,完成 MockGPT 模型的实现,代码逻辑很简单,随机选择一段话,并模拟流式输出:

class MockGptLargeLanguageModel(LargeLanguageModel):
  """
  MockGPT 实现
  """

  def _invoke(...) -> Union[LLMResult, Generator]:
    """
    调用大语言模型
    """

    # 模拟响应内容
    demo_responses = [
      "这是一个演示模型的回复。我可以帮助您了解 Dify 插件的工作原理。",
      "作为演示模型,我会生成模拟的响应内容来展示插件功能。",
      "您好!这是 Demo AI 模型的模拟输出,用于演示插件开发流程。"
    ]

    response_text = random.choice(demo_responses)

    if stream:
      return self._handle_stream_response(model, prompt_messages, response_text)
    else:
      return self._handle_sync_response(model, prompt_messages, response_text)
   
  def _handle_stream_response(self, model: str, prompt_messages: List[PromptMessage], response_text: str) -> Generator:
    """
    处理流式响应
    """

    # 模拟流式输出
    words = response_text.split()
    for i, word in enumerate(words):
      chunk_text = word + (" " if i < len(words) - 1 else "")

      delta = LLMResultChunkDelta(
        index=0,
        message=AssistantPromptMessage(content=chunk_text),
        finish_reason=None if i < len(words) - 1 else "stop",
        usage=self._calc_usage(response_text) if i == len(words) - 1 else None
      )

      yield LLMResultChunk(
        model=model,
        prompt_messages=prompt_messages,
        system_fingerprint=None,
        delta=delta
      )

      # 模拟网络延迟
      time.sleep(0.1)

  def _handle_sync_response(self, model: str, prompt_messages: List[PromptMessage], response_text: str) -> LLMResult:
    """
    处理同步响应
    """
    return LLMResult(
      model=model,
      prompt_messages=prompt_messages,
      message=AssistantPromptMessage(content=response_text),
      usage=self._calc_usage(response_text),
      system_fingerprint=None
    )

  def _calc_usage(self, text: str) -> LLMUsage:
    """
    计算使用量(模拟)
    """
    prompt_tokens = 50  # 模拟
    completion_tokens = len(text.split())

    return LLMUsage(
      prompt_tokens=prompt_tokens,
      prompt_unit_price=0.001,
      prompt_price_unit=1000,
      prompt_price=0.00005,
      completion_tokens=completion_tokens,
      completion_unit_price=0.002,
      completion_price_unit=1000,
      completion_price=completion_tokens * 0.000002,
      total_tokens=prompt_tokens + completion_tokens,
      total_price=0.00005 + completion_tokens * 0.000002,
      currency="USD",
      latency=1.5
    )
   
  def get_num_tokens(...) -> int:
    """
    计算 token 数量(模拟)
    """
    total_text = ""
    for message in prompt_messages:
      if isinstance(message.content, str):
        total_text += message.content

    # 简单估算:中文字符算1个token,英文单词算1个token
    return len(total_text.split()) + len([c for c in total_text if '\u4e00' <= c <= '\u9fff'])

  def validate_credentials(self, model: str, credentials: dict) -> None:
    """
    验证模型凭据
    """
    try:
      pass
    except Exception as ex:
      raise CredentialsValidateFailedError(str(ex))

  def get_customizable_model_schema(self, model: str, credentials: dict) -> AIModelEntity:
    """
    返回模型 Schema
    """
    entity = AIModelEntity(
      model=model,
      label=I18nObject(zh_Hans=model, en_US=model),
      model_type=ModelType.LLM,
      features=[],
      fetch_from=FetchFrom.CUSTOMIZABLE_MODEL,
      model_properties={},
      parameter_rules=[],
    )

    return entity

  @property
  def _invoke_error_mapping(self) -> dict:
    """
    错误映射
    """
    return {
      InvokeError: [Exception]
    }

插件调试

至此,一个简单的模型插件就开发好了,接下来需要测试插件是否可以正常运行。Dify 提供便捷地远程调试方式,帮助你快速在测试环境中验证插件功能。

首先点击右上角的 “插件” 进入 “插件管理” 页面,在这里可以获取插件服务的地址和调试 Key:

plugins-debug.png

回到插件项目代码,拷贝 .env.example 文件并重命名为 .env,将获取的插件服务地址和调试 Key 等信息填入其中:

INSTALL_METHOD=remote
REMOTE_INSTALL_URL=localhost:5003
REMOTE_INSTALL_KEY=0cd54aa2-7731-4368-bac4-6d2ed1299087

然后运行 python -m main 命令启动插件:

$ python -m main
{"event": "log", "data": {"level": "INFO", "message": "Installed model: demo", "timestamp": 1761101795.533712}}
INFO:dify_plugin.plugin:Installed model: demo

刷新插件列表,此时就可以看到该插件了:

plugins-debug-2.png

插件守护进程支持三种运行时:本地运行时(Local Runtime),通过运行 Python 子进程,监听 STDIN/STDOUT 管道进行通信,从而实现插件的调用;调试运行时(Debug Runtime),通过 TCP 服务器模式,监听插件连接,支持全双工通信,从而实现开发时的实时调试;无服务器运行时(Serverless Runtime),支持 AWS Lambda 等云函数平台,通过 HTTP 调用模式,可以实现插件服务的自动部署和扩缩容。

插件验证

我们进入 “模型供应商” 页面,找到 Demo 后,配置 API 密钥:

plugins-test.png

由于我们并没有做这块的校验,因此可以随便填。配置之后,进入应用页面,在模型列表中选择 “模拟大模型 v1”:

plugins-test-2.png

然后发起对话,验证模型是否可以正常输出:

plugins-test-3.png

可以看到,该模型会随机输出我们预先内置的话术,符合预期。

插件打包和发布

如果一切验证 OK,我们就可以将插件打包并发布出去。通过 dify plugin package 可以将插件打包成 difypkg 文件:

$ dify plugin package demo
[INFO]plugin packaged successfully, output path: demo.difypkg

然后在 Dify 的 “插件管理” 页面上传并安装这个插件,安装完成后,使用就和上面调试验证步骤基本一样了。

Dify 支持三种插件安装方式:

  • Marketplace:Dify 官方提供的插件市场,用户可以在此浏览、搜索并一键安装各类插件;
  • GitHub 仓库:将插件开源或托管在 GitHub 上,方便他人查看、下载和安装;
  • 本地安装:将插件打包成本地文件(正如上面的 difypkg 文件),通过文件分享的方式供他人安装;

下面是 Dify 市场的官方地址:

另外,Dify 的官方插件也都开源了,可以作为开发参考:

本文的 MockGPT 模型插件我也发布到 Github 上了,感兴趣的同学也可以参考:

小结

今天我们深入探索了 Dify 应用运行器中模型调用的核心机制。从 CompletionAppRunnerrun() 方法开始,我们了解了 ModelInstance 作为统一的模型调用入口的设计,以及不同类型的模型实现,然后学习了 Dify 的插件化架构,Dify 的插件化架构构建了一个开放的生态系统,各模型供应商可以作为独立的插件运行,这种设计提供了出色的稳定性、可扩展性和维护性。

最后,我们通过一个实际例子学习了如何从零开发一个模型插件,并学习了插件的调试、验证、打包和发布等流程,通过模型插件的实战,相信大家对 Dify 的模型调用流程有了更深入的认识。


深入 Dify 的应用运行器之知识库检索(续)

在上一篇文章中,我们从界面操作的角度了解了 Dify 知识库的功能特性,包括创建知识库、配置分段设置、选择索引方式和检索方法,以及如何在应用中集成知识库。通过这些配置,我们可以让 AI 应用获得外部知识的支持,实现更准确、更专业的回答。

今天我们将继续深入 CompletionAppRunnerrun() 方法源码,详细分析知识库检索的具体实现原理,了解 Dify 如何将用户的查询转化为向量检索、如何处理多知识库场景,以及背后的技术机制。

应用运行器回顾

让我们先回顾一下 CompletionAppRunner 中知识库检索的流程:

def run(...) -> None:

  # 1. 第一次提示词组装
  self.organize_prompt_messages(...)

  # 2. 输入内容审核
  self.moderation_for_inputs(...)

  # 3. 外部数据工具处理
  inputs = self.fill_in_inputs_from_external_data_tools(...)

  # 4. 知识库检索
  context = None
  if app_config.dataset and app_config.dataset.dataset_ids:

    # 创建回调处理器,用于记录检索命中信息
    hit_callback = DatasetIndexToolCallbackHandler(...)

    # 创建知识库检索器并执行检索
    dataset_retrieval = DatasetRetrieval(application_generate_entity)
    context = dataset_retrieval.retrieve(...)

  # 5. 第二次提示词组装,包含知识库上下文
  prompt_messages, stop = self.organize_prompt_messages(
    ..., context=context, ...
  )

  # 6. 后续处理:托管审核、令牌重计算、模型调用等
  ...

从代码中可以看出,知识库检索发生在外部数据工具处理之后,第二次提示词组装之前,确保将检索到的上下文信息整合到最终的提示词中。

知识库检索的核心逻辑

检索的核心逻辑封装在 DatasetRetrieval 类的 retrieve() 方法中:

def retrieve(...) -> Optional[str]:
  
  # 从知识库列表中筛选出所有可用的知识库
  available_datasets = []
  for dataset_id in dataset_ids:
    # 从数据库获取知识库信息
    dataset_stmt = select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id)
    dataset = db.session.scalar(dataset_stmt)
    # 跳过无效知识库
    if not dataset:
      continue
    # 跳过没有文档的内部知识库(外部知识库除外)
    if dataset and dataset.available_document_count == 0 and dataset.provider != "external":
      continue
    available_datasets.append(dataset)

  # 元数据过滤
  available_datasets_ids = [dataset.id for dataset in available_datasets]
  metadata_filter_document_ids, metadata_condition = self.get_metadata_filter_condition(
    available_datasets_ids,
    query,
    ...
  )

  # 单库检索策略 vs. 多库检索策略
  all_documents = []
  if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE:
    # 判断模型是否支持工具调用,使用不同的选择策略
    planning_strategy = PlanningStrategy.REACT_ROUTER
    if ModelFeature.TOOL_CALL in features or ModelFeature.MULTI_TOOL_CALL in features:
        planning_strategy = PlanningStrategy.ROUTER
    # 单库检索策略:智能选择一个最相关的知识库
    all_documents = self.single_retrieve(...)
  elif retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE:
    # 多库检索策略:从所有知识库中检索并合并结果
    all_documents = self.multiple_retrieve(...)

  dify_documents = [item for item in all_documents if item.provider == "dify"]
  external_documents = [item for item in all_documents if item.provider == "external"]

  # 分离内部和外部文档
  retrieval_resource_list: list[RetrievalSourceMetadata] = ...
  document_context_list: list[DocumentContext] = ...
  
  # 引用归属
  if hit_callback and retrieval_resource_list:
    retrieval_resource_list = sorted(retrieval_resource_list, key=lambda x: x.score or 0.0, reverse=True)
    for position, item in enumerate(retrieval_resource_list, start=1):
      item.position = position
    hit_callback.return_retriever_resource_info(retrieval_resource_list)

  # 按分数排序并合并为最终上下文
  if document_context_list:
    document_context_list = sorted(document_context_list, key=lambda x: x.score or 0.0, reverse=True)
    return str("\n".join([document_context.content for document_context in document_context_list]))
  return ""

其主要逻辑包括:

  • 知识库有效性验证:首先,对知识库列表进行筛选,过滤掉无效知识库,以及没有文档的内部知识库;需要注意的是,外部知识库(provider == "external")即使文档数量为 0 也会被保留,因为它们的文档数量统计可能不准确;
  • 元数据过滤处理:根据文档的属性(如标签、类别、时间等)对文档进行精确过滤,支持用户自定义过滤条件和大模型智能生成过滤条件;
  • 执行检索策略:在前文中我们提到,一个应用可以关联多个知识库,当面对多个知识库时,Dify 提供了两种检索策略:

    • 单库检索策略(SINGLE):采用智能路由的方式,从多个候选知识库中自动选择最适合的那一个进行检索;这种策略的优势是减少了检索时间和计算开销,特别适用于知识库间差异较大、各自专注于不同领域的场景;
    • 多库检索策略(MULTIPLE):并行检索所有配置的知识库,然后将结果进行合并、重排序和过滤;这种策略能够获得更全面的检索结果,适用于知识库间存在互补关系的场景;
  • 结果格式化:将检索到的文档转换为大模型可以理解的上下文格式;

元数据过滤功能

在实际检索之前,Dify 会先进行基于元数据的文档过滤,这是一个强大的功能,允许用户根据文档的属性(如标签、类别、时间等)进行过滤,这对于大型知识库的精准检索非常重要。系统支持三种元数据过滤模式:

  1. 禁用模式(disabled):不使用元数据过滤,检索所有文档
  2. 自动模式(automatic):系统根据用户问题自动生成过滤条件
  3. 手动模式(manual):用户手动配置过滤条件
def get_metadata_filter_condition(...) -> tuple[Optional[dict[str, list[str]]], Optional[MetadataCondition]]:
  
  if metadata_filtering_mode == "disabled":
    # 禁用模式
    return None, None
  elif metadata_filtering_mode == "automatic":
    # 自动模式,根据用户问题自动生成过滤条件
    automatic_metadata_filters = self._automatic_metadata_filter_func(
      dataset_ids, query, tenant_id, user_id, metadata_model_config
    )
    for sequence, filter in enumerate(automatic_metadata_filters):
      filters = self._process_metadata_filter_func(...)
  elif metadata_filtering_mode == "manual":
    # 手动模式,用户手动配置过滤条件
    if metadata_filtering_conditions:
      for sequence, condition in enumerate(metadata_filtering_conditions.conditions):
        # 支持变量替换
        expected_value = condition.value
        if expected_value is not None and condition.comparison_operator not in ("empty", "not empty"):
          if isinstance(expected_value, str):
            expected_value = self._replace_metadata_filter_value(expected_value, inputs)
        filters = self._process_metadata_filter_func(...)
  else:
    raise ValueError("Invalid metadata filtering mode")

  # 根据过滤条件筛选文档
  if filters:
    if metadata_filtering_conditions and metadata_filtering_conditions.logical_operator == "and":
      document_query = document_query.where(and_(*filters))
    else:
      document_query = document_query.where(or_(*filters))
  documents = document_query.all()
  # 根据知识库 ID 分组
  metadata_filter_document_ids = defaultdict(list) if documents else None
  for document in documents:
    metadata_filter_document_ids[document.dataset_id].append(document.id)
  return metadata_filter_document_ids, metadata_condition

其中手动模式没什么好讲的,直接根据用户配置组装过滤条件,唯一值得注意的一点是,在手动设置条件时可以使用变量,因此这里先对配置的值进行变量替换;而自动模式则是通过大模型实现,使用了专门的提示词模板,要求大模型分析用户查询并输出 JSON 格式的元数据过滤条件:

### 职位描述
您是一个文本元数据提取引擎,需根据用户输入提取文本的元数据,并设定元数据值。

### 任务
您的任务仅从提供的元数据列表中,提取输入文本中实际存在的元数据;并使用以下运算符来表达逻辑关系:

["contains", "not contains", "start with", "end with", "is", "is not", "empty", "not empty", "=", "≠", ">", "<", "≥", "≤", "before", "after"]

然后以 JSON 格式返回结果,其中键包括:

- metadata_fields(元数据字段)
- metadata_field_value(元数据字段值)
- comparison_operator(比较运算符)

### 格式
输入文本存储在变量 input_text 中,元数据以列表形式在变量 metadata_fields 中指定。

### 约束
您的响应中不得包含 JSON 数组以外的任何内容。

大模型从用户输入中提取出元数据并设置过滤条件,支持各种类型的比较操作,包括:

  1. 文本比较(contains, start with, end with, is, is not)
  2. 数值比较(=, ≠, >, <, ≥, ≤)
  3. 时间比较(before, after)
  4. 存在性检查(empty, not empty)

比如,用户输入这样的查询:

总结2023年的财务报告

大模型输出类似这样的结果:

{
  "metadata_map": [
    {
      "metadata_field_name": "year",
      "metadata_field_value": "2023",
      "comparison_operator": "="
    },
    {
      "metadata_field_name": "category",
      "metadata_field_value": "财务",
      "comparison_operator": "contains"
    }
  ]
}

单库检索策略

单库检索策略适用于知识库主题相对独立的场景,系统会先从多个知识库中智能地选择一个最相关的知识库,然后再在该知识库中进行检索。选择策略根据模型的能力分为两种:

  • ROUTER 策略:适用于支持工具调用的模型(如 GPT-4、Claude-3.5 等),使用函数调用方式让模型自主选择合适的知识库;这种方式下,每个知识库都被包装成一个工具,工具名称是知识库ID,工具描述是知识库的描述信息;模型会根据用户查询的内容,自主选择最合适的工具(即知识库)进行调用;
  • REACT_ROUTER 策略:适用于不支持工具调用的模型,使用 ReAct(Reasoning + Acting)方式,通过推理步骤指导模型选择知识库;这种方式,系统会构造一个包含所有可用知识库工具的提示词,要求模型以特定的 JSON 格式输出决策;
def single_retrieve(...) -> list[Document]:

  # 1. 构建知识库工具描述
  tools = []
  for dataset in available_datasets:
    description = dataset.description or f"useful for when you want to answer queries about the {dataset.name}"
    message_tool = PromptMessageTool(
      name=dataset.id,
      description=description,
      parameters={"type": "object", "properties": {}, "required": []},
    )
    tools.append(message_tool)

  # 2. 根据策略选择知识库
  dataset_id = None
  if planning_strategy == PlanningStrategy.REACT_ROUTER:
    # ReAct 方式
    react_multi_dataset_router = ReactMultiDatasetRouter()
    dataset_id = react_multi_dataset_router.invoke(query, tools, model_config, model_instance, user_id, tenant_id)
  elif planning_strategy == PlanningStrategy.ROUTER:
    # Function Call 方式
    function_call_router = FunctionCallMultiDatasetRouter()
    dataset_id = function_call_router.invoke(query, tools, model_config, model_instance)

  # ...

当选定知识库后,接下来就针对该库执行具体的检索操作:

  # 3. 执行检索
  if dataset_id:
    dataset = db.session.scalar(select(Dataset).where(Dataset.id == dataset_id))
    if dataset:
      if dataset.provider == "external":
        # 外部知识库检索
        external_documents = ExternalDatasetService.fetch_external_knowledge_retrieval(...)
        results = [Document(page_content=doc.get("content"), metadata=doc.get("metadata"), provider="external") for doc in external_documents]
      else:
        # 内部知识库检索,根据索引技术选择检索方法
        retrieval_model_config = dataset.retrieval_model or default_retrieval_model
        if dataset.indexing_technique == "economy":
          retrieval_method = "keyword_search"
        else:
          retrieval_method = retrieval_model_config["search_method"]

        # 调用 RetrievalService 执行检索
        results = RetrievalService.retrieve(
          retrieval_method=retrieval_method,
          dataset_id=dataset.id,
          query=query,
          ...
        )
      return results
  return []

Dify 的知识库分为外部知识库和内部知识库:针对外部知识库,使用统一的 外部知识库 API 进行检索;针对内部知识库,根据知识库的索引配置选择合适的检索方法,最终调用 RetrievalService 进行实际的向量检索或关键词检索。

多库检索策略

多库检索策略适用于知识库内容互补的场景,系统会同时从多个知识库中检索,并通过重排序获得最佳结果。实际上,我们在 Dify 页面上创建的应用,默认都是使用多库检索策略,好像没有看到哪里可以设置单库检索策略。

多库检索策略的核心逻辑位于 multiple_retrieve() 方法:

def multiple_retrieve(...):

  # 1. 一致性检查
  index_type_check = all(
    item.indexing_technique == available_datasets[0].indexing_technique
    for item in available_datasets
  )
  if not index_type_check and (not reranking_enable or reranking_mode != RerankMode.RERANKING_MODEL):
    raise ValueError("不同索引技术的知识库需要设置重排序模型")

  # 2. 为每个知识库创建检索线程
  threads = []
  all_documents: list[Document] = []
  for dataset in available_datasets:
    retrieval_thread = threading.Thread(
      target=self._retriever,
      kwargs={
        "flask_app": current_app._get_current_object(),
        "dataset_id": dataset.id,
        "query": query,
        "top_k": top_k,
        "all_documents": all_documents,  # 共享结果列表
        "document_ids_filter": document_ids_filter,
        "metadata_condition": metadata_condition,
      },
    )
    threads.append(retrieval_thread)
    retrieval_thread.start()

  # 3. 等待所有检索线程完成
  for thread in threads:
    thread.join()

  # 4. 结果融合和重排序
  if reranking_enable:
    # 开启重排序
    data_post_processor = DataPostProcessor(tenant_id, reranking_mode, reranking_model, weights, False)
    all_documents = data_post_processor.invoke(
      query=query, documents=all_documents,
      score_threshold=score_threshold, top_n=top_k
    )
  else:
    # 根据索引类型进行简单排序
    if index_type == "economy":
      all_documents = self.calculate_keyword_score(query, all_documents, top_k)
    elif index_type == "high_quality":
      all_documents = self.calculate_vector_score(all_documents, top_k, score_threshold)
    else:
      all_documents = all_documents[:top_k] if top_k else all_documents
  return all_documents

多库检索策略的流程主要分为三步:

  1. 一致性检查:确保所有知识库使用相同的索引技术,或者配置了重排序模型;
  2. 并发检索:开启多线程并发检索,每个线程检索一个知识库;根据知识库的类型调用不同的检索方法,使用 ExternalDatasetService 检索外部知识库,使用 RetrievalService 检索内部知识库,具体的逻辑和单库检索策略类似;
  3. 结果融合和排序:如果开启了重排序,通过 DataPostProcessor 对来自不同知识库的结果进行融合和排序;否则根据索引类型进行简单排序;

检索服务

RetrievalService 是 Dify 检索功能的核心服务类,支持多种检索方法的并发执行:

class RetrievalService:
  
  # 核心检索方法
  @classmethod
  def retrieve(
    cls,
    retrieval_method: str,
    dataset_id: str,
    query: str,
    top_k: int,
    score_threshold: Optional[float] = 0.0,
    reranking_model: Optional[dict] = None,
    reranking_mode: str = "reranking_model",
    weights: Optional[dict] = None,
    document_ids_filter: Optional[list[str]] = None,
  ):

    # 使用线程池并发执行不同的检索方法
    with ThreadPoolExecutor(max_workers=dify_config.RETRIEVAL_SERVICE_EXECUTORS) as executor:  # type: ignore
      futures = []
      if retrieval_method == "keyword_search":
        # 关键词检索
        futures.append(
          executor.submit(cls.keyword_search, ...)
        )
      if RetrievalMethod.is_support_semantic_search(retrieval_method):
        # 语义检索(向量检索)
        futures.append(
          executor.submit(cls.embedding_search, ...)
        )
      if RetrievalMethod.is_support_fulltext_search(retrieval_method):
        # 全文检索
        futures.append(
          executor.submit(cls.full_text_index_search, ...)
        )
      # 等待所有检索任务完成
      concurrent.futures.wait(futures, timeout=30, return_when=concurrent.futures.ALL_COMPLETED)

    # 混合检索需要后处理
    if retrieval_method == RetrievalMethod.HYBRID_SEARCH.value:
      data_post_processor = DataPostProcessor(
        str(dataset.tenant_id), reranking_mode, reranking_model, weights, False
      )
      # 重排序
      all_documents = data_post_processor.invoke(
        query=query,
        documents=all_documents,
        score_threshold=score_threshold,
        top_n=top_k,
      )

    return all_documents

Dify 支持四种不同的检索方法:关键词检索(Keyword Search)向量检索(Embedding Search)全文检索(Full-Text Search)混合检索(Hybrid Search)。其中混合检索就是同时执行向量检索和全文检索,因此这里使用线程池来并发执行,如果不是混合检索,这里实际上就是单线程。最后,针对混合检索的结果,还需要经过后处理,包括格式化、去重、重排序和阈值过滤等操作。

关键词检索

该检索方法适用于经济型索引,经济型索引使用关键词提取技术,为每个文档片段提取 10 个关键词,然后通过倒排索引进行检索。虽然准确度相对较低,但成本更加经济。其核心代码位于 Keyword.search() 方法:

class Keyword:
  def __init__(self, dataset: Dataset):
    self._dataset = dataset
    self._keyword_processor = self._init_keyword()

  def _init_keyword(self) -> BaseKeyword:
    # 根据配置初始化关键词实现,目前仅支持基于 Jieba 分词的实现
    keyword_type = dify_config.KEYWORD_STORE
    keyword_factory = self.get_keyword_factory(keyword_type)
    return keyword_factory(self._dataset)

  @staticmethod
  def get_keyword_factory(keyword_type: str) -> type[BaseKeyword]:
    match keyword_type:
      case KeyWordType.JIEBA:
        # 使用 Jieba 库提取关键词
        from core.rag.datasource.keyword.jieba.jieba import Jieba
        return Jieba
      case _:
        raise ValueError(f"Keyword store {keyword_type} is not supported.")
  
  # 执行关键词检索
  def search(self, query: str, **kwargs: Any) -> list[Document]:
    return self._keyword_processor.search(query, **kwargs)

它通过 Jieba 库的 jieba.analyse.extract_tags() 方法提取用户问题的关键词,并和文档片段中的关键词进行匹配,根据匹配度计算得分,然后按得分排序得到检索结果。

class JiebaKeywordTableHandler:
  
  def __init__(self):
    import jieba.analyse  # type: ignore
    from core.rag.datasource.keyword.jieba.stopwords import STOPWORDS
    jieba.analyse.default_tfidf.stop_words = STOPWORDS  # type: ignore

  def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> set[str]:
    # 使用 Jieba 库提取关键词
    import jieba.analyse  # type: ignore
    keywords = jieba.analyse.extract_tags(
      sentence=text,
      topK=max_keywords_per_chunk,
    )

    return set(self._expand_tokens_with_subtokens(set(keywords)))

向量检索

这是一种基于向量相似度的检索方法,适用于高质量索引。其核心代码位于 embedding_search() 方法:

@classmethod
def embedding_search(...):
  with flask_app.app_context():
    dataset = cls._get_dataset(dataset_id)
    vector = Vector(dataset=dataset)

    # 执行向量相似度搜索
    documents = vector.search_by_vector(
      query,
      search_type="similarity_score_threshold",
      top_k=top_k,
      score_threshold=score_threshold,
      filter={"group_id": [dataset.id]},
      document_ids_filter=document_ids_filter,
    )

    # 应用重排序(如果配置)
    if reranking_model and retrieval_method == RetrievalMethod.SEMANTIC_SEARCH.value:
      data_post_processor = DataPostProcessor(...)
      all_documents.extend(data_post_processor.invoke(...))
    else:
      all_documents.extend(documents)

Dify 支持多种向量数据库,通过工厂模式动态选择合适的向量存储实现,包括:

  • 开源方案:Chroma、Qdrant、Milvus、Weaviate、PGVector
  • 云服务:Pinecone、Elasticsearch、OpenSearch
  • 企业级:AnalyticDB、OceanBase、Oracle、TiDB

每种向量数据库都有对应的实现类,位于 api/core/rag/datasource/vdb/ 目录下。它们都实现了统一的向量搜索接口:

class VectorBase:

  def search_by_vector(
    self,
    query: str,
    search_type: str,
    top_k: int,
    score_threshold: Optional[float] = None,
    filter: Optional[dict] = None,
    document_ids_filter: Optional[list[str]] = None,
  ) -> list[Document]:
    # 向量相似度检索实现
    pass

全文检索

基于倒排索引的全文检索:

@classmethod
def full_text_index_search(...):
  with flask_app.app_context():
    dataset = cls._get_dataset(dataset_id)
    vector_processor = Vector(dataset=dataset)

    # 执行全文搜索
    documents = vector_processor.search_by_full_text(
      cls.escape_query_for_search(query),
      top_k=top_k,
      document_ids_filter=document_ids_filter
    )

    # 同样支持重排序
    if reranking_model and retrieval_method == RetrievalMethod.FULL_TEXT_SEARCH.value:
      data_post_processor = DataPostProcessor(...)
      all_documents.extend(data_post_processor.invoke(...))
    else:
      all_documents.extend(documents)

Dify 的全文检索也是基于向量数据库实现的,它们同样也实现了统一的全文搜索接口:

class VectorBase:

  def search_by_full_text(
    self,
    query: str,
    top_k: int,
    document_ids_filter: Optional[list[str]] = None,
  ) -> list[Document]:
    # 全文检索实现
    pass

混合检索

混合检索同时使用向量检索和全文检索,能够结合语义理解和精确匹配的优势,在大多数场景下提供更好的检索效果。

hybrid-search.png

混合检索完成后,系统需要对结果进行后处理,包括格式化、去重、重排序和阈值过滤等操作。这个过程由 DataPostProcessor 负责执行:

class DataPostProcessor:

  def invoke(...) -> list[Document]:
    if self.rerank_runner:
      documents = self.rerank_runner.run(query, documents, score_threshold, top_n, user)
    if self.reorder_runner:
      documents = self.reorder_runner.run(documents)
    return documents

Dify 将重排序分为两种:

  • 重排序(Reranker) - 这是比较常见的重排序策略,根据用户配置的重排序模式进行处理,又分为两种情况,一种是使用外部重排序模型(如 CohereJina AI 等),一种是使用权重分数融合,最后排序后的结果通过分数阈值进行过滤,并截取 Top-N 结果;
  • 重新排列(Reorder) - 这是一种特殊的文档重排序策略,首先将文档按奇偶索引分离,然后将偶数位置的文档列表进行反转,最后将奇数位置文档和反转后的偶数位置文档合并;譬如原始文档顺序是 [1,2,3,4,5,6],重排序后的结果是 [1,3,5,6,4,2];这种排序策略可以缓解大模型的位置偏见,避免总是优先考虑列表前面的文档,不过这个策略默认并没有启用,可能只是一种实验性功能。

小结

经过上面的步骤,我们最终检索出和用户问题最相关的文档,然后我们还需要将检索到的文档转换为大模型可用的上下文格式。根据不同的分段模式,处理过程略有不同:

  1. 普通分段:直接根据 index_node_id 查找对应的文档分段
  2. 父子分段:先查找子分段,再根据 segment_id 查找父分段

上下文转换完成之后,经过第二次提示词组装,最终调用大模型,完成对用户问题的回答。

下面对今天的学习内容做一个总结:

  • 元数据过滤:支持自动模式和手动模式;自动模式 通过大模型分析查询自动生成过滤条件,智能且动态;手动模式 支持复杂的过滤条件配置和变量替换,灵活性强;
  • 多层次检索策略:支持单库检索策略和多库检索策略;单库检索策略 智能选择最相关的知识库,精准度高;多库检索策略 并发检索多个知识库,覆盖面广;
  • 智能路由机制:针对单库检索策略,系统根据模型特性自动选择函数调用或 ReAct 方式;函数调用方式 利用支持工具调用的模型原生能力,准确性更高;ReAct 方式 通过结构化提示词实现推理,兼容性更好;
  • 多元化检索方法:通过检索服务,Dify 集成了多种检索技术;关键词检索 基于精确匹配,适用于经济型索引;向量检索 基于向量相似度,理解能力强;全文检索 基于倒排索引,检索效率高;混合检索 融合了向量检索和全文检索两种方法的优势,并通过重排序得到更好的检索效果;
  • 重排序优化:支持外部重排序模型和权重分数融合两种重排序策略;
  • 上下文构建:将检索结果转换为大模型友好的格式,注意父子分段的处理过程;

至此,我们已经完成了 Dify 应用会话流程中几乎所有的源码解剖,目前还差最后一步,模型调用,我们明天继续。


深入 Dify 的应用运行器之知识库检索

在构建 AI 应用时,如何让大模型能够准确回答特定领域的问题,一直是开发者面临的挑战。虽然大模型具有丰富的通用知识,但在处理企业内部文档、产品规范或专业领域的问题时,往往存在信息过时、不够准确或无法涵盖最新动态的问题。Dify 的知识库功能就是为了解决这个痛点,它利用 RAG(检索增强生成) 技术,将静态的文档内容转化为可被动态检索和使用的知识源,从而提供给大模型用于回复用户的问题。

Dify 的所有应用类型都支持关联知识库。对于聊天应用和文本生成应用,我们之前在应用运行器的 run() 方法中已经看到,第二次提示词组装时,会将知识库检索内容作为上下文喂给大模型;对于智能体应用,知识库的使用稍有不同,它是作为工具提供给大模型动态调用的;而对于工作流应用,知识库则是一个独立的检索组件,可以灵活地被编排在应用流程中。

创建知识库

无论是哪种情况,我们首先得创建一个知识库。在 Dify 平台顶部导航中点击 “知识库”,然后点击 “创建知识库” 即可开始。Dify 支持三种方式来创建知识库:

  1. 导入已有文本:支持批量上传多种格式的文档,包括 TXT、Markdown、DOCX、HTML、JSON、PDF、CSV、Excel 等;
  2. 同步自 Notion 内容:支持将 Notion 内容导入到知识库,同步 Notion 内容前,须先绑定 Notion 空间;
  3. 同步自 Web 站点:支持 Jina ReaderFirecrawlWaterCrawl 等网页内容提取工具,抓取 Web 站点的内容到知识库;

dataset-create.png

另外,Dify 还支持通过 API 和知识库 ID 连接到外部知识库,比如 AWS Bedrock 知识库LlamaCloud 知识库 等,感兴趣的同学可以参考官方文档自行尝试。

dataset-connect.png

我们这里就以 Dify 文档中的 术语表 页面为例,演示下 Dify 知识库的基本用法。

分段设置

由于大语言模型的上下文窗口有限,无法一次性处理整个知识库的内容,因此需要对文档中的长文本进行分段处理。即使部分大模型已支持上传完整的文档文件,实验表明检索效率依然弱于检索单个内容分段。

合理的分段大小非常关键,它能够帮助模型准确地找到与问题最相关的内容,减少噪音信息。过大或过小的分段都可能影响召回的效果。

Dify 提供了两种分段模式,以适应不同类型的文档结构和应用场景:

通用模式

系统按照用户自定义的规则将内容拆分为独立的分段。当用户输入问题后,系统自动分析关键词,并计算与知识库中各内容分段的相关度,选取最相关的内容分段发送给大模型。

chunk-setting-general.png

在通用模式下,你可以配置以下设置:

  • 分段标识符:用于文本分割的字符,默认值为 \n\n,即按照文章段落进行分段;
  • 分段最大长度:指定分段内的文本字符数最大上限,超出该长度时将强制分段,默认值为 1024 Tokens,最大上限为 4000 Tokens;
  • 分段重叠长度:设置分段之间的重叠长度可以保留分段之间的语义关系,建议设置为最大分段长度的 10%-25%,有助于提高召回效果;

为了保证知识库质量,Dify 还提供了两种文本预处理规则,用于过滤知识库中部分无意义的内容:

  • 替换连续的空格、换行符和制表符
  • 删除所有 URL 和电子邮件地址

父子模式

父子模式采用双层分段结构,在精确度和上下文信息之间取得平衡。它包含两个层次:

  • 父区块:保持较大的文本单位(如段落、章节甚至整个文档),提供丰富的上下文信息
  • 子区块:较小的文本单位(如句子),用于精确检索,能更加精准地匹配用户所输入的问题

父子模式的配置包括父块的配置和子块的配置,如下所示:

chunk-setting-parent-child.png

其工作原理是:系统首先通过子区块进行精确检索以确保相关性,然后获取对应的父区块来补充上下文信息,从而在生成响应时既保证准确性又能提供完整的背景信息。

父子模式的优势在于:

  • 子分段能精准匹配用户问题
  • 父分段提供完整的背景信息
  • 检索效果优于传统的单层检索方式

Q&A 分段

在通用分段模式下,还有一个 Q&A 模式,开启该模式后,系统首先会对已上传的文本进行分段,然后自动为每个分段生成 Q&A 对。与常见的 Q2P(用户问题匹配文本段落) 策略不同,Q&A 模式采用 Q2Q(问题匹配问题) 策略。

q2p-vs-q2q.jpg

当用户提问时,系统会找出与之最相似的问题,然后返回对应的分段作为答案。这种方式更加精确,因为它直接针对用户问题进行匹配,可以更准确地帮助用户检索真正需要的信息。

注意,Q&A 模式要选择语言,生成对应语言的问题,启用该模式后会消耗更多的 Tokens,并且无法使用经济型索引方法。

索引方式

选定内容的分段模式后,接下来需要设置知识库的索引方法与检索设置。Dify 提供了两种索引方法:

高质量索引

使用 Embedding 嵌入模型将分段的文本块转换为数字向量,帮助更有效地压缩与存储大量文本信息,使得用户问题与文本之间的匹配更加精准。

index-setting-high.png

高质量索引支持 向量检索全文检索混合检索 三种检索设置。

经济索引

在经济模式下,每个区块使用 10 个关键词进行检索,降低了准确度但无需产生费用。仅提供 倒排索引 方式选择最相关的区块。

检索设置

不用的索引方式支持的检索设置也不同,高质量索引支持 向量检索全文检索混合检索 三种,经济索引仅支持 倒排索引 一种。

向量检索

将用户问题向量化,查询知识库中向量距离与之最接近的文本分段,也就是最相似的内容。

search-setting-vector.png

其配置参数有:

  • Rerank 模型:开启后将使用第三方 Rerank 模型再一次重排序由向量检索召回的内容分段,以优化排序结果;
  • TopK:用于筛选与用户问题相似度最高的文本片段,系统同时会根据模型的上下文窗口大小动态调整片段数量;
  • Score 阈值:用于设置文本片段筛选的相似度阈值,只召回超过设置分数的文本片段;

全文检索

也被称为 关键词检索,即索引文档中的所有词汇,用户输入问题后,通过关键词匹配知识库内对应的文本片段,返回符合关键词的文本片段。

search-setting-fulltext.png

混合检索

同时执行全文检索和向量检索,从查询结果中选择匹配用户问题的最佳结果。

search-setting-hybrid.png

Dify 支持两种混合模式:

  • 权重设置:允许用户设置语义优先(向量检索)和关键词优先(关键词检索)的权重,可以不断调试二者的权重,找到符合业务场景的最佳比例;
  • Rerank 模型:开启后将使用第三方 Rerank 模型再一次重排序由混合检索召回的内容分段,以优化排序结果;

倒排索引

倒排索引是一种用于快速检索文档中关键词的索引结构,常用于在线搜索引擎。倒排索引仅支持 TopK 设置项。

search-setting-inverted-index.png

在应用中集成知识库

至此,我们的 “术语表” 知识库已经创建完毕,接下来,我们要将该知识库集成到我们的 AI 应用中。我们进入工作台,创建一个聊天助手应用,取名为 “翻译专家”:

apps-chat.png

虽然大模型已经具备不错的翻译能力,但在遇到专业术语或领域知识时,往往词不达意,比如我们问 ToT 是什么意思?

apps-chat-debug-prev.png

很显然它不知道我们要问的是其实是思维树的缩写,为解决这个问题,我们可以在知识库设置部分点击 “添加” 按钮,引用刚刚创建的 “术语表” 知识库,一个应用可以引用多个知识库;再点击 “召回设置” 配置检索方式:

apps-chat-kb-setting.png

这个设置页面和上面的混合检索很像,只不过这里使用多路召回,系统会从多个知识库中检索知识,然后通过 Rerank 策略找到最适合的内容。

我们保持默认参数即可,然后在右侧的调试面板进行验证:

apps-chat-debug.png

这次大模型就能正常回答我们的问题了。

引用和归属

在应用的功能选项中有一个 “引用和归属” 开关,默认是开启的:

apps-chat-ref.png

当助手回答用户问题后,若涉及已关联的知识库文档,将将回复内容下方标注引用来源,用户可查看到具体的引用段落信息,包括原始分段文本、分段序号、匹配度等:

apps-chat-debug-ref.png

点击引用分段上方的 “跳转至知识库”,可以快捷访问该分段所在的知识库分段列表,方便开发者进行调试编辑。

元数据过滤

Dify 的知识库支持元数据功能,我们可以在知识库的管理界面,创建、修改和删除元数据字段(比如标签、类别、作者、时间等),并设置元数据的值:

dify-kb-metadata.png

然后我们就可以在应用中使用这些元数据对文档进行精确过滤。一共有三种模式:

  • 禁用模式:不使用元数据过滤
  • 自动模式:系统根据用户问题自动生成过滤条件
  • 手动模式:用户手动配置过滤条件

apps-chat-kb-metadata.png

通过元数据,我们可以:

  • 提升搜索效率:用户可以根据元数据标签快速筛选和查找相关信息,节省时间并提高工作效率
  • 增强数据安全性:通过元数据设置访问权限,确保只有授权用户能访问敏感信息,保障数据的安全性
  • 优化数据管理能力:元数据帮助企业或组织有效分类和存储数据,提高数据的管理和检索能力,增强数据的可用性和一致性
  • 支持自动化流程:元数据在文档管理、数据分析等场景中可以自动触发任务或操作,简化流程并提高整体效率

未完待续

Dify 的知识库功能为开发者提供了一套完整的 RAG 解决方案,从文档上传、分段处理到检索配置,通过可视化的界面,开发者可以轻松创建和管理知识库,从而让 AI 应用能够访问实时、准确的外部知识,提升回答的质量和可信度。

今天主要是对 Dify 的知识库功能走马观花地过了一遍,对其有一个整体感性的认识,明天我们将继续深入源码,重点看下知识库检索这块的实现。


深入 Dify 的应用运行器之外部数据扩展

在创建 Dify 应用时,我们可以在提示词中嵌入用户自定义变量,提高应用的灵活性和功能性。比如在之前的文本生成应用中,我们使用 {{lang}}{{query}} 两个变量,实现了一个简单的中英互译小助手:

text-generator-config.png

Dify 支持文本、段落、下拉选项、数字和复选框等字段类型:

apps-gen-variables.png

除此之外,它还支持以 API 扩展的方式创建 基于 API 的变量

今天我们就来学习下 Dify 中与扩展相关的内容。

使用 API 扩展

首先我们进入 “设置” 页面,打开 “API 扩展” 菜单:

setting-api-extension.png

点击 “新增” 按钮,添加已开发好的 API 扩展:

setting-api-extension-add.png

然后创建一个 “天气查询” 应用,类型为文本生成:

apps-gen.png

我们为该应用添加两个变量:

  • city - 城市名称,字段类型为文本
  • weather_result - 天气查询结果,字段类型为基于 API 的变量,选择上一步添加的 API 扩展

apps-gen-variable-api.png

接着编写如下提示词:

根据下面的查询结果:
{{weather_result}} 

回答问题:
今天 {{city}} 的天气怎么样?

一个简单的天气查询助手就开发好了:

apps-gen-test.png

可以看出,Dify 会自动调用 API 扩展的接口,并将获取到的外部数据组装至提示词中作为大模型的额外信息。

开发 API 扩展

接下来,让我们来看下这个 API 扩展是如何实现的?

首先我们要明确下扩展的概念,不同于 Dify 的插件机制,Dify 的 API 扩展主要针对这两个模块:

  • external_data_tool 外部数据工具
  • moderation 敏感内容审计

Dify 的 API 扩展遵循一定的规范,它会按照下面的格式调用你填写的接口地址:

POST {Your-API-Endpoint}

Authorization: Bearer {api_key}
Content-Type: application/json

{
  "point":  string, //  扩展点,不同模块可能包含多个扩展点
  "params": {
    ...  // 各模块扩展点传入参数
  }
}

其中 point 为扩展点,params 为不同扩展点对应的传入参数。Dify 支持下面几类不同的扩展点:

  • ping:测试接口,在添加 API 扩展时通过该扩展点验证接口的可用性;当 API 接收到 point=ping 时,接口应返回 result=pong 固定值;
  • app.external_data_tool.query:应用外部数据工具查询扩展点,将用户传入的变量和对话内容作为参数,传给 API;开发者需要实现对应工具的查询逻辑,并返回字符串类型的查询结果;
  • app.moderation.input:输入内容审查扩展点,用于审查用户传入的变量以及对话内容;
  • app.moderation.output:输出内容审查扩展点,用于审查大模型输出的内容,当输出为流式时,输出的内容将以 100 个字符为一个批次进行请求 API;

要实现基于 API 的变量,其实就是实现 app.external_data_tool.query 扩展点。该扩展点的输入参数如下:

{
  "point": "app.external_data_tool.query", // 扩展点类型,此处固定为 app.external_data_tool.query
  "params": {
    "app_id": string,  // 应用 ID
    "tool_variable": string,  // 外部数据工具变量名称,表示对应变量工具调用来源
    "inputs": {  // 用户传入的变量,key 为变量名,value 为变量值
      "var_1": "value_1",
      "var_2": "value_2",
      ...
    },
    "query": string | null  // 用户当前的对话内容
  }
}

该扩展点直接输出一个 result 字符串即可:

{
  "result": string
}

天气查询示例

这里是一个简单的外部数据工具示例,场景为根据城市获取天气信息作为上下文。创建 main.py 文件:

import json
from fastapi import FastAPI, HTTPException, Header

app = FastAPI()

@app.post("/api/weather")
async def query_weather(data: dict, authorization: str = Header(None)):
    
  # 简单鉴权
  expected_api_key = "123456"
  auth_scheme, _, api_key = authorization.partition(' ')
  if auth_scheme.lower() != "bearer" or api_key != expected_api_key:
    raise HTTPException(status_code=401, detail="Unauthorized")

  print("接受到请求:" + json.dumps(data))
  
  # 扩展点
  point = data["point"]
  
  if point == "ping":
    # 测试
    return {
      "result": "pong"
    }

  if point == "app.external_data_tool.query":
    # 外部数据扩展
    return handle_app_external_data_tool_query(params=data["params"])
  
  raise HTTPException(status_code=400, detail="Not implemented")

def handle_app_external_data_tool_query(params: dict):
  # 模拟天气查询接口
  inputs = params.get("inputs")
  if inputs.get("city") == "合肥":
    return {
      "result": "天气晴,西北风,温度10-24摄氏度"
    }
  else:
    return {
      "result": "未知城市"
    }

代码比较简单,按照接口规范编写即可,它实现了 pingapp.external_data_tool.query 两个扩展点,并配置了 API Key 为 123456。这段代码基于 Python 的 FastAPI 框架,因此需要安装对应的依赖:

$ pip install fastapi[all] uvicorn

然后通过 uvicorn 启动 API 服务:

$ uvicorn main:app --reload --host 0.0.0.0

默认端口为 8000,通过 curl 验证之:

$ curl 'http://127.0.0.1:8000/api/weather' \
  -H 'Authorization: Bearer 123456' \
  -H 'Content-Type: application/json' \
  -d '{
    "point": "ping"
  }'

验证 OK 后,就可以在 Dify 配置页面添加该 API 扩展,然后在应用中选择它。调试时,通过日志可以看到,Dify 发送的请求内容如下:

{
  "point": "app.external_data_tool.query",
  "params": {
    "app_id": "6187c87a-9495-4412-8d22-0a11ec409376",
    "tool_variable": "weather_result",
    "inputs": {
      "city": "合肥"
    },
    "query": ""
  }
}

内容审核扩展

我们在上一篇文章中学习过,Dify 支持多种内容审核方式,除了 OpenAI Moderation 和关键词审核,它还支持 API 扩展方式来实现更加定制化的内容审核策略:

moderation-api-extension.png

当开启输入内容审查时,Dify 会给相应的 API 扩展发送 HTTP POST 请求,我们必须实现 app.moderation.input 扩展点:

{
  "point": "app.moderation.input", // 扩展点类型,此处固定为 app.moderation.input
  "params": {
    "app_id": string,  // 应用 ID
    "inputs": {  // 用户传入的变量,key 为变量名,value 为变量值
      "var_1": "value_1",
      "var_2": "value_2",
      ...
    },
    "query": string | null  // 用户当前的对话内容
  }
}

接口返回需满足如下规范:

{
  "flagged": bool,  // 是否违反校验规则
  "action": string, // 动作,direct_output 直接输出预设回答; overridden 覆写传入变量值
  "preset_response": string,  // 预设回答(仅当 action=direct_output 返回)
  "inputs": {  // 覆写用户传入的变量,key 为变量名,value 为变量值(仅当 action=overridden 返回)
    "var_1": "value_1",
    "var_2": "value_2",
    ...
  },
  "query": string | null  // 覆写用户的对话内容。(仅当 action=overridden 返回)
}

当开启输出内容审查时,我们必须实现 app.moderation.output 扩展点:

{
  "point": "app.moderation.output", // 扩展点类型,此处固定为 app.moderation.output
  "params": {
    "app_id": string,  // 应用 ID
    "text": string  // 大模型的回答内容。当输出为流式时,此处为 100 字为一个分段的内容。
  }
}

接口返回需满足如下规范:

{
  "flagged": bool,  // 是否违反校验规则
  "action": string, // 动作,direct_output 直接输出预设回答; overridden 覆写传入变量值
  "preset_response": string,  // 预设回答(仅当 action=direct_output 返回)
  "text": string  // 覆写大模型的回答内容。(仅当 action=overridden 返回)
}

代码扩展

对于在本地部署 Dify 的开发人员来说,使用 API 扩展还是过于麻烦了,针对每个 API 扩展都要独立部署对应的 HTTP 服务。因此 Dify 还提供了另一种扩展方式 —— 代码扩展,通过代码扩展,你可以在不破坏 Dify 原始代码逻辑的情况下,以代码形式扩展或增强程序的功能。

代码扩展也支持外部数据工具和敏感内容审核两个模块,我们只需要在对应模块下添加代码,遵循一定的接口规范,从而实现平台的横向扩展。

还是以天气查询扩展为例,我们可以在 api/core/external_data_tool 目录下新建相关的目录和文件:

api/core/external_data_tool/weather_search/
├── __init__.py
├── weather_search.py
└── schema.json

其中 schema.json 文件用于定义前端组件的样式:

{
  "label": {
    "en-US": "Weather Search",
    "zh-Hans": "天气查询"
  },
  "form_schema": [
    {
      "type": "select",
      "label": {
        "en-US": "Temperature Unit",
        "zh-Hans": "温度单位"
      },
      "variable": "temperature_unit",
      "required": true,
      "options": [
        {
          "label": {"en-US": "Celsius", "zh-Hans": "摄氏度"},
          "value": "celsius"
        },
        {
          "label": {"en-US": "Fahrenheit", "zh-Hans": "华氏度"},
          "value": "fahrenheit"
        }
      ],
      "default": "celsius"
    }
  ]
}

weather_search.py 文件为具体的扩展实现,外部工具必须继承 ExternalDataTool 类:

class WeatherSearch(ExternalDataTool):
  # 天气查询外部数据工具
  # 注意:name 必须与目录名和文件名保持一致
  name: str = "weather_search"

  @classmethod
  def validate_config(cls, tenant_id: str, config: dict) -> None:
    # 验证配置的有效性
    if not config.get('temperature_unit'):
      raise ValueError('temperature unit is required')

  def query(self, inputs: dict, query: Optional[str] = None) -> str:
    # 执行天气查询
    city = inputs.get('city')
    temperature_unit = self.config.get('temperature_unit')

    # 模拟天气 API 调用
    if temperature_unit == 'fahrenheit':
      return f'Weather in {city} is 32°F'
    else:
      return f'Weather in {city} is 0°C'

重启 Dify 应用,在添加基于 API 的变量时,就可以看到我们自定义的 “天气查询” 变量类型:

apps-gen-variable-api-2.png

内容审核扩展和外部数据工具的实现基本类似,我们可以在 api/core/moderation 目录下新建相关的目录和文件,定义前端样式,实现审核接口,具体的内容此处不再展开,感兴趣的朋友可参考官方的文档:

当内容审核扩展开发就绪后,在 “内容审查” 的设置页面会多一个选项:

moderation-code-extension.png

小结

我们今天主要学习了 Dify 的扩展机制在 外部数据工具敏感内容审计 两个模块的应用,通过 API 扩展和代码扩展,开发者可以实现自定义的应用逻辑,创造高定制化的应用解决方案。

回顾 CompletionAppRunnerrun() 方法,在第一次提示词组装和输入内容审核之后,接着就是外部数据的填充,通过扩展机制从外部数据源获取额外信息,动态地补充到应用的输入参数中,从而重新组装出最终的提示词。在第二次提示词的组装中,除了外部数据,还有另一个重要部分,那就是知识库的检索结果,我们明天将继续学习这一部分。


深入 Dify 的应用运行器之内容审核

在上一篇文章中,我们学习了 Dify 应用运行器的提示词组装机制,了解了从用户输入到模型调用的完整转换流程。今天我们将继续深入 CompletionAppRunnerrun() 方法源码,详细讲解其中的内容审核相关逻辑,包括输入审核、托管审核和输出审核,以及审核的三种实现方式。

应用运行器流程回顾

让我们先回顾一下 CompletionAppRunnerrun() 方法的核心流程,在这个流程中,内容审核扮演着重要的安全守门员角色:

moderation-flow.png

可以看到,Dify 在输入处理时设置了两道内容安全防线:

  1. 输入审核:在第一次提示词组装后,根据已配置的审核策略,检查用户输入是否包含违规内容;
  2. 托管审核:在第二次提示词组装后,模型调用前,对完整的提示词进行合规性检查;托管审核是 Dify 提供的一个系统级别的额外安全层,它是在用户配置的常规审核机制之外,由 Dify 平台自动提供的内容安全保障服务;

此外,Dify 还有一套输出审核机制,在生成最终的输出内容时触发。通过这三重保障,确保了 AI 应用的内容安全。

输入审核

输入审核紧跟在第一次提示词组装之后,代码如下:

# 第一次提示词组装
prompt_messages, stop = self.organize_prompt_messages(...)

try:
  # 输入内容审核
  _, inputs, query = self.moderation_for_inputs(
    app_id=app_record.id,
    tenant_id=app_config.tenant_id,
    app_generate_entity=application_generate_entity,
    inputs=inputs,
    query=query or "",
    message_id=message.id,
  )
except ModerationError as e:
  # 审核失败,直接返回预设回复
  self.direct_output(
    queue_manager=queue_manager,
    app_generate_entity=application_generate_entity,
    prompt_messages=prompt_messages,
    text=str(e),  # 预设的错误回复
    stream=application_generate_entity.stream,
  )
  return

当审核检测到违规内容时,会抛出 ModerationError 异常,应用运行器捕获异常后调用 direct_output() 方法,直接向用户返回预设的安全回复,跳过后续的模型调用流程。这里的关键是 moderation_for_inputs() 方法,它位于基类 AppRunner 中:

def moderation_for_inputs(...) -> tuple[bool, Mapping[str, Any], str]:
  # 输入审核检查
  moderation_feature = InputModeration()
  return moderation_feature.check(...)

真正的审核逻辑位于 InputModeration 类的 check() 方法中:

def check(
    self,
    app_id: str,
    tenant_id: str,
    app_config: AppConfig,
    inputs: Mapping[str, Any],
    query: str,
    message_id: str,
    trace_manager: Optional[TraceQueueManager] = None,
) -> tuple[bool, Mapping[str, Any], str]:

  # 检查应用是否启用了内容审查功能
  if not app_config.sensitive_word_avoidance:
    return False, inputs, query

  # 获取内容审查配置
  sensitive_word_avoidance_config = app_config.sensitive_word_avoidance
  moderation_type = sensitive_word_avoidance_config.type

  # 创建审核工厂实例
  moderation_factory = ModerationFactory(
    name=moderation_type,
    app_id=app_id,
    tenant_id=tenant_id,
    config=sensitive_word_avoidance_config.config
  )

  # 执行审核,并记录耗时
  with measure_time() as timer:
    moderation_result = moderation_factory.moderation_for_inputs(inputs, query)

  # 添加追踪记录(用于调试和监控)
  if trace_manager:
    trace_manager.add_trace_task(
      TraceTask(
        TraceTaskName.MODERATION_TRACE,
        message_id=message_id,
        moderation_result=moderation_result,
        inputs=inputs,
        timer=timer,
      )
    )

  # 处理审核结果
  if not moderation_result.flagged:
    return False, inputs, query  # 审核通过

  # 根据审核动作进行相应处理
  if moderation_result.action == ModerationAction.DIRECT_OUTPUT:
    # 直接输出预设回复
    raise ModerationError(moderation_result.preset_response)
  elif moderation_result.action == ModerationAction.OVERRIDDEN:
    # 覆盖用户输入
    inputs = moderation_result.inputs
    query = moderation_result.query

  return True, inputs, query

这个方法的执行流程如下:

  1. 检查审核配置:从应用配置中获取内容审查设置,如果没有配置,则跳过输入审核;
  2. 创建审核实例:根据配置的审核策略,通过审核工厂 ModerationFactory 创建对应的审核实例;
  3. 执行审核检查:调用具体的审核实现进行内容检查,并通过追踪管理器将审核的输入、输出、耗时等添加到追踪记录;
  4. 处理审核结果:根据审核结果采取相应的行动,如果审核通过,继续后续流程;如果审核失败,支持 直接输出预设回复覆盖用户输入 两种处理手段;

审核策略详解

应用开发者可以在功能设置里开启 “内容审查” 开关:

features-moderation.png

Dify 通过工厂模式支持多种审核策略:

  • 关键词:这是最简单的一种审核策略,开发者可以定义需要审查的敏感词,当用户输入中包含这些关键词时触发,返回预设的回复内容;
  • OpenAI Moderation:调用 OpenAI 的 Moderation API 实现内容审查;
  • API 扩展:不同的企业内部往往有着不同的内容审查机制,Dify 支持通过 API 扩展的方式实现高度自定义的审核策略;

关键词策略

在应用编排页面,打开 “内容审查设置” 对话框,类别选择 “关键词”,在输入框中填上需要审查的敏感词,1 行 1 个,最多 100 行:

features-moderation-keywords.png

然后选择 “审查输入内容”,并填上预设回复,点击 “确定” 后,在调试面板进行验证:

features-moderation-keywords-test.png

关键词策略的实现比较简单,直接基于字符串匹配即可,不区分大小写:

class KeywordsModeration(Moderation):
  name: str = "keywords"

  def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
    flagged = False
    preset_response = ""

    if self.config["inputs_config"]["enabled"]:
      preset_response = self.config["inputs_config"]["preset_response"]

      # 将查询内容也加入检查范围
      if query:
        inputs["query__"] = query

      # 过滤掉空关键词
      keywords_list = [
        keyword for keyword in self.config["keywords"].split("\n")
        if keyword
      ]

      # 执行关键词违规检查
      flagged = self._is_violated(inputs, keywords_list)

    return ModerationInputsResult(
      flagged=flagged,
      action=ModerationAction.DIRECT_OUTPUT,
      preset_response=preset_response
    )

  def _is_violated(self, inputs: dict, keywords_list: list) -> bool:
    # 检查输入是否包含违规关键词
    return any(
      self._check_keywords_in_value(keywords_list, value)
      for value in inputs.values()
    )

  def _check_keywords_in_value(self, keywords_list: Sequence[str], value: Any) -> bool:
    # 在单个值中检查关键词(不区分大小写)
    return any(
      keyword.lower() in str(value).lower()
      for keyword in keywords_list
    )

关键词策略的特点是简单高效,是最基础也是最常用的审核策略。

审核结果处理

内容审核的结果通过 ModerationInputsResult 对象返回,它定义了审核系统的核心数据结构:

class ModerationInputsResult(BaseModel):
  flagged: bool = False              # 是否违反审核规则
  action: ModerationAction           # 执行动作
  preset_response: str = ""          # 预设回复
  inputs: dict = Field(default_factory=dict)  # 处理后的输入
  query: str = ""                    # 处理后的查询

调用方通过 flagged 字段判断输入是否违反审核规则,如果违反,则执行 action 对应的动作。Dify 支持两种审核动作:

  1. 直接输出(DIRECT_OUTPUT):这是最常见的处理方式,当检测到违规内容时,直接返回预设的回复消息;用户会看到类似 "您的输入包含不当内容,请重新输入" 这样的提示,而不会看到模型的任何响应;
  2. 内容覆盖(OVERRIDDEN):这种方式更加智能,它不是简单地拒绝用户输入,而是对内容进行修正或替换;这种方式允许审核器删除或替换敏感词,然后继续正常的处理流程。例如,可以将 "这个人是个XX" 修正为 "这个人是个不好的人";

我们可以开启应用追踪,在 LLMOps 平台查看审核的追踪记录:

moderation-langfuse.png

OpenAI Moderation 策略

在 “内容审查设置” 对话框中选择 “OpenAI Moderation” 类别:

features-moderation-openai.png

同样勾选 “审查输入内容”,并填上预设回复,点击 “确定” 后,在调试面板进行验证:

features-moderation-openai-test.png

OpenAI Moderation 策略的实现如下:

class OpenAIModeration(Moderation):
  name: str = "openai_moderation"

  def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
    flagged = False
    preset_response = ""

    if self.config["inputs_config"]["enabled"]:
      preset_response = self.config["inputs_config"]["preset_response"]

      # 将查询内容加入检查
      if query:
        inputs["query__"] = query

      # 使用 OpenAI Moderation API 进行内容检查
      flagged = self._is_violated(inputs)

    return ModerationInputsResult(
      flagged=flagged,
      action=ModerationAction.DIRECT_OUTPUT,
      preset_response=preset_response
    )

  def _is_violated(self, inputs: dict):
    # 将所有输入值合并为一个文本
    text = "\n".join(str(inputs.values()))

    # 获取 OpenAI Moderation 模型实例
    model_manager = ModelManager()
    model_instance = model_manager.get_model_instance(
      tenant_id=self.tenant_id,
      provider="openai",
      model_type=ModelType.MODERATION,
      model="text-moderation-stable"
    )

    # 调用 OpenAI Moderation API
    openai_moderation = model_instance.invoke_moderation(text=text)
    return openai_moderation

整体逻辑也很简单,直接调用 OpenAI 的 text-moderation-stable 模型即可。不过要注意的是,使用此策略需要提前在设置中添加 OpenAI 模型供应商,并启用对应的模型。

OpenAI Moderation 策略的优势是基于 OpenAI 训练的专业模型,能够更准确地识别各种类型的有害内容,包括暴力、性内容、自残、仇恨等多种分类。

学习 OpenAI Moderation API

这一节我们对 OpenAI 的 Moderation API 稍加介绍。其实,该 API 支持两种模型:

  1. omni-moderation-latest(推荐):最新的多模态模型,支持更多分类选项和文本+图像输入
  2. text-moderation-latest(遗留):仅支持文本输入的旧版模型

可以看到 Dify 使用的还是老版本的模型。

Moderation API 的基本用法如下:

from openai import OpenAI

client = OpenAI()

response = client.moderations.create(
  model="omni-moderation-latest",
  input="kill them all"
)

print(response)

对于图像和文本的混合内容审核:

response = client.moderations.create(
  model="omni-moderation-latest",
  input=[
    {"type": "text", "text": "需要检查的文本内容"},
    {
      "type": "image_url",
      "image_url": {
        "url": "https://example.com/image.png"
        # 也支持 base64 编码的图片: "data:image/jpeg;base64,abcdefg..."
      }
    }
  ]
)

API 响应结构如下:

{
  "id": "modr-6796",
  "model": "omni-moderation-latest",
  "results": [
    {
      "flagged": true, // 是否被标记为有害内容
      "categories": {
        "harassment": false,
        "harassment/threatening": false,
        "hate": false,
        "hate/threatening": false,
        "illicit": false,
        "illicit/violent": true,
        "self-harm": false,
        "self-harm/instructions": false,
        "self-harm/intent": false,
        "sexual": false,
        "sexual/minors": false,
        "violence": true,  // 检测到暴力内容
        "violence/graphic": false
      },
      "category_scores": {
        // 每个类别的置信度分数 (0-1)
        "harassment": 0.1996759395892913,
        "violence": 0.9430467818012114,
        // ... 其他分数
      },
      "category_applied_input_types": {
        // 指明哪些输入类型触发了特定类别
        "harassment": [ "text" ],
        "violence": [ "text" ],
        // ... 其他类别
      }
    }
  ],
  "usage": { // 消耗的 token 数
    "prompt_tokens": 6,
    "completion_tokens": 0,
    "total_tokens": 6
  }
}

Moderation API 支持以下内容分类:

分类描述支持的输入类型
harassment表达、煽动或促进对任何目标的骚扰语言仅文本
harassment/threatening包含暴力或严重伤害的骚扰内容仅文本
hate基于种族、性别、民族等的仇恨言论仅文本
hate/threatening包含暴力威胁的仇恨内容仅文本
illicit提供非法行为建议或指导的内容仅文本(仅 omni 模型)
illicit/violent涉及暴力或获取武器的非法内容仅文本(仅 omni 模型)
self-harm促进、鼓励或描述自残行为的内容文本和图像
self-harm/intent表达自残意图的内容文本和图像
self-harm/instructions教授自残方法的内容文本和图像
sexual旨在引起性兴奋的内容文本和图像
sexual/minors涉及18岁以下个体的性内容仅文本
violence描绘死亡、暴力或身体伤害的内容文本和图像
violence/graphic以图形方式描绘暴力的内容文本和图像

托管审核

除了输入内容审核,Dify 还在模型调用前还会进行一次托管审核检查,这是第二道安全防线:

hosting_moderation_result = self.check_hosting_moderation(
  application_generate_entity=application_generate_entity,
  queue_manager=queue_manager,
  prompt_messages=prompt_messages,
)

if hosting_moderation_result:
  return  # 审核失败,直接返回

托管审核 (Hosting Moderation) 是 Dify 提供的一个系统级别的额外安全层,由平台统一配置和管理,用户无法直接控制,属于平台安全策略的一部分。这个特性只在云托管环境下才生效,因此我们需要修改 EDITION 配置:

# 默认是 SELF_HOSTED 自托管
EDITION=CLOUD

并配置系统供应商:

HOSTED_OPENAI_TRIAL_ENABLED=true
HOSTED_OPENAI_QUOTA_LIMIT=999999
HOSTED_OPENAI_TRIAL_MODELS=
HOSTED_OPENAI_API_KEY=sk-...
HOSTED_OPENAI_API_BASE=...

然后启用托管审核:

HOSTED_MODERATION_ENABLED=true
HOSTED_MODERATION_PROVIDERS=openai

托管审核的核心逻辑位于 core/helper/moderation.py 文件:

def check_moderation(tenant_id: str, model_config: ModelConfigWithCredentialsEntity, text: str) -> bool:

  # 1. 检查托管审核配置是否启用
  moderation_config = hosting_configuration.moderation_config

  # 2. 验证 OpenAI 提供商是否可用
  openai_provider_name = f"{DEFAULT_PLUGIN_ID}/openai/openai"

  # 3. 检查当前模型提供商是否在审核范围内
  if using_provider_type == ProviderType.SYSTEM and provider_name in moderation_config.providers:

    # 4. 文本分块处理(每块2000字符)
    length = 2000
    text_chunks = [text[i:i + length] for i in range(0, len(text), length)]

    # 5. 随机选择一个文本块进行审核
    text_chunk = secrets.choice(text_chunks)

    # 6. 调用 OpenAI 的 omni-moderation-latest 模型
    moderation_result = model_type_instance.invoke(
      model="omni-moderation-latest",
      credentials=hosting_openai_config.credentials,
      text=text_chunk
    )

这里主要关注几点:

  1. 必须在 .env 文件中配置 OpenAI 系统供应商并启用托管审核,该功能才会生效;
  2. 由于托管审核针对的是第二次组装后的提示词,包含外部数据和知识库,完整的上下文可能非常长,因此这里对文本进行分块处理,并随机选择一个文本块进行审核;通过这种随机采样的策略,不仅提高程序性能,也降低 API 调用成本;
  3. 托管审核使用的是 OpenAI 最新的审核模型 omni-moderation-latest 模型,而不是输入审核的 text-moderation-stable 模型;
  4. 托管审核失败后直接返回固定的礼貌拒绝响应,而非用户自定义的消息;
  5. 托管审核是 Dify 平台自动提供的内容安全保障服务,不同于应用开发者只需关注用户的输入即可,平台侧需要关注模型的完整输入,包括开发者提供的知识库内容和外部接口数据,这也正是应用运行器中要进行两次提示词组装的根本原因。

用户审核和托管审核对比

下表对比了用户审核和托管审核的几点区别:

特征用户审核 (User Moderation)托管审核 (Hosting Moderation)
配置主体应用开发者Dify 平台
检查内容用户输入完整提示词
检查时机第一次提示词组装后模型调用前
配置方式应用级配置系统级配置
配置灵活性高度可定制固定策略
审核类型关键词、OpenAI、API扩展等仅 OpenAI omni-moderation
触发条件用户启用系统自动判断
失败响应用户自定义固定响应
性能策略全文审核随机采样

托管审核作为平台的最后一道防线,即使用户没有配置任何审核策略,或者用户配置的审核策略存在漏洞,平台仍能提供基础的内容安全保障,确保 Dify 平台的整体合规性和安全性。

输出审核

除了输入审核和托管审核,Dify 还支持对模型生成的输出内容进行审核。输出审核的实现位于 OutputModeration 类,当模型输出新的 token 时:

def append_new_token(self, token: str) -> None:
  self.buffer += token
  if not self.thread:
    self.thread = self.start_thread()

启动一个后台审核线程:

def worker(self, flask_app: Flask, buffer_size: int):
  while self.thread_running:
    moderation_buffer = self.buffer
    # 当缓冲区大小达到阈值或最终块时进行审核
    if chunk_length < buffer_size and not self.is_final_chunk:
      time.sleep(1)
      continue

    result = self.moderation(...)
    if result and result.flagged:
      # 触发替换事件
      self.queue_manager.publish(QueueMessageReplaceEvent(...))

输出审核的具体实现和输入审核是一样的,都是通过审核工厂 ModerationFactory 根据配置创建对应的审核实例,支持关键词、OpenAI Moderation 和 API 扩展三种策略。这其实都比较简单,输出审核真正的挑战 在于实时性要求,需要在内容流式传输过程中及时检测。Dify 的做法是将模型输出放到一个缓冲区中,然后启动一个后台审核线程,该线程持续监控缓冲区,当缓冲区大小达到 300 时触发审核,该值可以通过 MODERATION_BUFFER_SIZE 参数调整。一旦检测到违规内容,立即停止流式传输,并发送一个 QueueMessageReplaceEvent 替换事件。该事件会被发送到前端,前端直接将当前已显示内容替换为预设回复。

小结

今天我们深入分析了 Dify 应用运行器中的内容审核机制,从整体架构到具体实现,全面了解了 Dify 如何确保应用的内容安全。主要收获包括:

  1. 多层防护体系:Dify 设计了输入审核、托管审核和输出审核三道防线,确保全链路内容安全;
  2. 多样化的审核实现:支持多种审核方式的灵活配置,包括 OpenAI Moderation、关键词审核和 API 扩展三种方式,满足不同场景的需求;
  3. 灵活的处理策略:支持直接拒绝(DIRECT_OUTPUT)和内容覆盖(OVERRIDDEN)两种处理方式;
  4. 实时审核能力:针对流式输出场景,通过后台审核线程实现了分块实时审核机制。

细心的读者可能已经注意到了,关于内容审核还有一点没有讲到,那就是 API 扩展策略,我们将在下一篇文章中,学习外部数据集成的逻辑,到时候一起来看下 Dify 的扩展机制。


深入 Dify 的应用运行器之提示词组装

应用运行器(App Runner) 是 Dify 应用的核心执行器,负责处理具体的生成逻辑,今天,我们将继续深入应用运行器的内部实现。

应用运行器的概览

让我们以文本生成应用为例,深入 CompletionAppRunnerrun() 方法的实现:

def run(
  self,
  application_generate_entity: CompletionAppGenerateEntity,
  queue_manager: AppQueueManager,
  message: Message
) -> None:

  # 1. 提取配置和输入参数
  app_config = application_generate_entity.app_config
  inputs = application_generate_entity.inputs
  query = application_generate_entity.query
  files = application_generate_entity.files

  # 2. 组装提示词消息
  prompt_messages, stop = self.organize_prompt_messages(...)

  # 3. 内容审核
  try:
    _, inputs, query = self.moderation_for_inputs(...)
  except ModerationError as e:
    self.direct_output(queue_manager, ..., text=str(e))
    return

  # 4. 外部数据工具处理
  if app_config.external_data_variables:
    inputs = self.fill_in_inputs_from_external_data_tools(...)

  # 5. 知识库检索
  context = None
  if app_config.dataset and app_config.dataset.dataset_ids:
    dataset_retrieval = DatasetRetrieval(application_generate_entity)
    context = dataset_retrieval.retrieve(...)

  # 6. 重新组装包含上下文的提示词
  prompt_messages, stop = self.organize_prompt_messages(..., context=context)

  # 7. 托管审核检查
  if self.check_hosting_moderation(...):
    return

  # 8. 调整 Token 限制
  self.recalc_llm_max_tokens(...)

  # 9. 调用模型
  model_instance = ModelInstance(...)
  invoke_result = model_instance.invoke_llm(...)

  # 10. 处理调用结果
  self._handle_invoke_result(invoke_result, queue_manager, streaming)

其核心执行流程如下:

  • 配置和参数提取: 从应用生成实体中提取应用配置以及输入参数 (inputs)、查询 (query) 和文件 (files) 等;
  • 第一次提示词组装:将模板、输入、查询、文件组合成完整的提示消息,同时处理图片细节配置,生成初始的 prompt_messagesstop 序列;
  • 内容审核:对输入内容进行敏感词检测,如果检测到违规内容,直接返回错误消息;
  • 外部数据填充:从外部数据源获取变量值,动态补充应用输入参数;
  • 知识库检索:使用 DatasetRetrieval 进行向量检索,获取相关上下文;
  • 第二次提示词重组:整合所有信息:模板 + 输入 + 查询 + 文件 + 记忆 + 外部数据 + 知识库上下文,生成最终的提示消息;
  • 托管审核:检查提示消息是否符合托管方的内容政策,如果违规,直接返回标准回复;
  • 令牌重计算:计算提示令牌数量,如果 prompt_tokens + max_tokens > 模型上下文限制,则调整 max_tokens,确保请求不超过模型的令牌限制;
  • 模型调用:使用组装好的提示消息调用大模型,支持使用 stop 序列控制生成停止;
  • 结果处理:将模型调用结果通过队列管理器传递到主线程,并返回给最终用户,支持流式和非流式输出;

两次提示词组装

可以看出,运行器的整体流程还是蛮复杂的,其中经历了两次提示词组装:

  1. 第一次组装:使用基础的用户输入和查询,主要用于输入审核和知识库检索;
  2. 第二次组装:在获得完整上下文(包括外部数据和知识库检索结果)后重新组装;

提示词组装是应用运行器的核心功能之一,我们今天就详细地讲解下提示词组装的源码,同时对比其他几种应用运行器在提示词处理上的差异。让我们先看看两次提示词组装的具体实现:

# 第一次提示词组装 - 基础模板
prompt_messages, stop = self.organize_prompt_messages(
  app_record=app_record,
  model_config=application_generate_entity.model_conf,
  prompt_template_entity=app_config.prompt_template,
  inputs=inputs,
  files=files,
  query=query,
  image_detail_config=image_detail_config,
)

# 第二次提示词组装 - 包含完整上下文
prompt_messages, stop = self.organize_prompt_messages(
  app_record=app_record,
  model_config=application_generate_entity.model_conf,
  prompt_template_entity=app_config.prompt_template,
  inputs=inputs,   # 填充外部数据
  files=files,
  query=query,
  context=context, # 新增知识库上下文
  image_detail_config=image_detail_config,
)

这两段代码几乎一模一样,只是第二次多了个 context 字段,另外 inputs 字段相比于第一次填充了外部数据。提示词组装的核心逻辑在父类 AppRunnerorganize_prompt_messages() 方法中:

def organize_prompt_messages(...) -> tuple[list[PromptMessage], Optional[list[str]]]:

  # 根据提示词模板类型选择转换器
  if prompt_template_entity.prompt_type == PromptTemplateEntity.PromptType.SIMPLE:
    # 简单提示词模板:使用预设的模板格式
    prompt_transform = SimplePromptTransform()
    prompt_messages, stop = prompt_transform.get_prompt(
      prompt_template_entity=prompt_template_entity,
      ...
    )
  else:
    # 高级提示词模板:用户自定义的完整模板
    prompt_transform = AdvancedPromptTransform()
    prompt_messages = prompt_transform.get_prompt(
      prompt_template=prompt_template,
      ...
    )
    stop = model_config.stop

  return prompt_messages, stop

Dify 根据提示词模板类型选择不同的处理策略,它支持两种模式:

  • 简单模式(Simple Mode):适用于大多数基础场景,通过模板变量替换的方式生成提示词
  • 高级模式(Advanced Mode):提供了更精细的控制,支持多轮对话和角色定制

简单模式提示词组装

当我们在应用的编排页面配置提示词时,默认就是简单模式:

prompt-template-simple.png

简单模式由 SimplePromptTransform 实现,它负责处理大多数常见场景的提示词组装:

def get_prompt(...) -> tuple[list[PromptMessage], Optional[list[str]]]:

  # 根据模型类型选择处理方式
  model_mode = ModelMode(model_config.mode)
  if model_mode == ModelMode.CHAT:
    # Chat 模型:生成多条消息(系统提示 + 用户消息)
    prompt_messages, stops = self._get_chat_model_prompt_messages(...)
  else:
    # Completion 模型:生成单条完整提示词
    prompt_messages, stops = self._get_completion_model_prompt_messages(...)

  return prompt_messages, stops

它又将大模型分成两种类型:

  • 对于 对话模型(Chat),Dify 会构建结构化的消息列表,生成多条消息(系统提示 + 历史记录 + 用户消息);
  • 对于传统的 文本补全模型(Completion),Dify 会将所有内容拼在一起,生成单条完整的提示词;

目前市面上绝大多数的模型都是对话模型,通过结构化方式组装消息有几个优势:

  1. 清晰的角色分离:系统提示、历史记录、当前查询分别处理
  2. 更好的模型兼容性:充分利用 Chat 模型的对话能力
  3. 灵活的上下文管理:可以精确控制每部分内容的位置和格式

提示词模板

无论是对话模型,还是文本补全模型,Dify 都统一使用一套灵活的模板系统来构建提示词。提示词模板存放在 prompt_templates 目录下的 JSON 文件中:

  • common_completion.json: 文本生成应用的提示词模板规则
  • common_chat.json: 聊天应用或智能体应用的提示词模板规则

Dify 根据不同的应用切换不同的模板规则,以 common_chat.json 文件为例,它的内容如下:

{
  "human_prefix": "Human",
  "assistant_prefix": "Assistant",
  "context_prompt": "Use the following context as your learned knowledge...",
  "histories_prompt": "Here is the chat histories between human and assistant...",
  "system_prompt_orders": [
    "context_prompt",
    "pre_prompt",
    "histories_prompt"
  ],
  "query_prompt": "\n\nHuman: {{#query#}}\n\nAssistant: ",
  "stops": ["\nHuman:", "</histories>"]
}

提示规则包含以下核心字段:

  1. 提示词组装顺序

    • system_prompt_orders:定义系统提示的组装顺序,默认按照 context_prompt -> pre_prompt -> histories_prompt 这个顺序来组装,其中 pre_prompt 就是用户在编排页面自定义的提示词模板;
  2. 提示模板字段

    • context_prompt: 知识库上下文的提示模板
    • histories_prompt: 历史对话的提示模板
    • query_prompt: 用户查询的提示模板,拼接在系统提示之后
  3. 对话角色前缀 (仅聊天模型)

    • human_prefix: "Human" - 用户角色标识
    • assistant_prefix: "Assistant" - 助手角色标识
  4. 停止词

    • stops:定义模型生成时的停止标记,用于控制模型输出边界

注意,只有对话类应用的提示词模板设有停止词,这是因为对话应用需要明确的角色分工,使用停止词来防止模型继续生成不应该生成的内容,比如 "\nHuman:" 防止模型继续模拟用户发言,"</histories>" 防止模型破坏历史对话的 XML 标签结构。

提示词中预定义了几个占位符,用于变量替换:

  • {{#context#}}: 替换为实际上下文内容
  • {{#query#}}: 替换为用户查询
  • {{#histories#}}: 替换为对话历史

另外,它还针对百川大模型使用了定制的模板(其实就是翻译成中文):

  • baichuan_chat.json: 聊天应用的提示词模板规则(针对百川大模型定制)
  • baichuan_completion.json: 文本生成应用的提示词模板规则(针对百川大模型定制)

高级模式提示词组装

高级模式允许用户直接在页面上更精细的控制提示词顺序,而不是基于 JSON 配置文件。在老版本的编排页面,我们可以看到一个链接,点击后切换到 专家模式

prompt-template-adv-pre.png

不过这个模式在新版本中已经看不到了,不知道是废弃了?还是变成收费功能了?

尽管页面上看不到切换入口,但是相关的代码逻辑还没删,所以通过修改数据库中的应用配置,还可以切换到该模式。我们创建一个应用,然后在数据库中找到该应用的配置,将 prompt_type 改为 advanced,将 chat_prompt_config 改为 {"prompt":[]},刷新页面后就能看到专家模式了:

prompt-template-adv.png

高级模式由 AdvancedPromptTransform 实现,它的实现就比较简单,直接根据用户配置来组装提示词即可。

文件处理

在提示词的组装过程中,文件的处理至关重要,其实现位于 file_manager.to_prompt_message_content() 函数:

def to_prompt_message_content(f: File) -> PromptMessageContentUnionTypes:

  # 支持 4 种文件类型:图片、音频、视频、文件
  prompt_class_map: Mapping[FileType, type[PromptMessageContentUnionTypes]] = {
    FileType.IMAGE: ImagePromptMessageContent,
    FileType.AUDIO: AudioPromptMessageContent,
    FileType.VIDEO: VideoPromptMessageContent,
    FileType.DOCUMENT: DocumentPromptMessageContent,
  }

  # 对于不支持的文件类型,返回一句话描述
  if f.type not in prompt_class_map:
    return TextPromptMessageContent(data=f"[Unsupported file type: {f.filename} ({f.type.value})]")

  # 对于支持的文件类型,返回文件信息
  params = {
    "base64_data": _get_encoded_string(f) if dify_config.MULTIMODAL_SEND_FORMAT == "base64" else "",
    "url": _to_url(f) if dify_config.MULTIMODAL_SEND_FORMAT == "url" else "",
    "format": f.extension.removeprefix("."),
    "mime_type": f.mime_type,
    "filename": f.filename or "",
  }

  # 对于图片类型,增加 detail 参数
  if f.type == FileType.IMAGE:
    params["detail"] = image_detail_config or ImagePromptMessageContent.DETAIL.LOW

  return prompt_class_map[f.type].model_validate(params)

该函数将上传的文件转换为适合的 prompt 消息,它会根据配置将文件表示成 BASE64 编码或 URL 链接,默认使用 BASE64 方式:

MULTIMODAL_SEND_FORMAT=base64

值得注意的是,对于图片类型,还会增加一个 detail 参数,用于控制图片处理精度,支持 LOW 和 HIGH 两种:

  • 低精度:处理速度快,消耗资源少;适用于对图片细节要求不高的场景;比如图片分类、简单的图片理解任务;
  • 高精度:处理时间较长,消耗更多计算资源;适用于需要分析图片细节的场景;比如 OCR、详细图片分析、需要识别图片中细小文字或复杂内容;

模型供应商需要支持这个参数该特性才能生效,参考 OpenAI 的接口文档

openai-image-level.png

可以看到,模型供应商接收到 detail="low" 参数后,会将图片缩放到较低分辨率(如 512x512)进行处理,Dify 本身不会对图片做任何处理。

不同应用运行器的提示词组装对比

现在让我们对比分析不同类型应用运行器在提示词组装上的差异。首先,聊天应用和文本生成应用在提示词组装上的主要区别在于 记忆管理,在提示词组装时会传入记忆:

memory = None
if application_generate_entity.conversation_id:
  model_instance = ModelInstance(
    provider_model_bundle=application_generate_entity.model_conf.provider_model_bundle,
    model=application_generate_entity.model_conf.model,
  )
  # 创建基于会话的记忆缓冲区
  memory = TokenBufferMemory(conversation=conversation, model_instance=model_instance)

# 在提示词组装时传入记忆
prompt_messages, stop = self.organize_prompt_messages(
  # ... 其他参数
  memory=memory,  # ChatAppRunner 会传入记忆
)

智能体应用和聊天应用一样,在提示词组装时也会传入记忆,不过组装后的提示词仅仅用于内容审核。审核通过后,它会根据模型能力动态选择智能体策略,创建对应的 智能体运行器(Agent Runner),真正的提示词组装逻辑位于智能体运行器中:

def run(self, application_generate_entity: AgentChatAppGenerateEntity, ...):
  # 1. 基础提示词组装(与 ChatAppRunner 相同)
  prompt_messages, _ = self.organize_prompt_messages(
    app_record=app_record,
    model_config=application_generate_entity.model_conf,
    prompt_template_entity=app_config.prompt_template,
    inputs=dict(inputs),
    files=list(files),
    query=query,
    memory=memory,
  )

  # 2. 根据模型能力选择智能体策略
  model_instance = ModelInstance(...)
  llm_model = cast(LargeLanguageModel, model_instance.model_type_instance)
  model_schema = llm_model.get_model_schema(...)

  # 检查模型是否支持函数调用
  if {ModelFeature.MULTI_TOOL_CALL, ModelFeature.TOOL_CALL}.intersection(model_schema.features or []):
    agent_entity.strategy = AgentEntity.Strategy.FUNCTION_CALLING

  # 3. 选择对应的智能体运行器
  if agent_entity.strategy == AgentEntity.Strategy.FUNCTION_CALLING:
    runner_cls = FunctionCallAgentRunner
  elif agent_entity.strategy == AgentEntity.Strategy.CHAIN_OF_THOUGHT:
    # 根据 LLM 模式选择思维链实现
    if model_schema.model_properties.get(ModelPropertyKey.MODE) == LLMMode.CHAT.value:
      runner_cls = CotChatAgentRunner
    else:
      runner_cls = CotCompletionAgentRunner

  # 4. 创建专门的智能体运行器处理后续逻辑
  runner = runner_cls(...)
  invoke_result = runner.run(...)

因为需要支持工具调用,它有着更复杂的提示词组装逻辑,需要在提示词中集成工具描述和调用指令。

最后,对于工作流应用,运行器采用了完全不同的方式处理输入,它使用 变量池(Variable Pool) 而非传统提示词来管理整个工作流的数据流:

def run(self) -> None:
  inputs = self.application_generate_entity.inputs
  files = self.application_generate_entity.files

  # 创建系统变量
  system_inputs = SystemVariable(
    files=files,
    user_id=self._sys_user_id,
    app_id=app_config.app_id,
    workflow_id=app_config.workflow_id,
    workflow_execution_id=self.application_generate_entity.workflow_execution_id,
  )

  # 构建变量池
  variable_pool = VariablePool(
    system_variables=system_inputs,
    user_inputs=inputs,  # 用户输入变量
    environment_variables=self._workflow.environment_variables,  # 环境变量
    conversation_variables=[],  # 对话变量(对话流应用使用)
  )

  # 初始化工作流图
  graph = self._init_graph(graph_config=self._workflow.graph_dict)

  # 运行工作流
  workflow_entry = WorkflowEntry(
    tenant_id=self._workflow.tenant_id,
    # ... 其他参数
    variable_pool=variable_pool,
  )

  generator = workflow_entry.run(callbacks=workflow_callbacks)

工作流的每个节点首先从变量池中获取所需的输入数据,然后执行节点特定的逻辑(LLM 调用、工具执行、条件判断等),最后将处理结果写回变量池供后续节点使用。

关于智能体策略和工作流的执行细节,我们后面将专门学习,此处暂不展开。

小结

今天我们深入分析了 Dify 应用运行器的提示词组装机制,从 CompletionAppRunner 的双重组装策略开始,详细解析了简单提示词模板和高级提示词模板的处理流程,包括 Chat 和 Completion 模型的不同处理方式。我们也对比了不同应用运行器的提示词处理差异:

  1. CompletionAppRunner:专注单次文本生成
  2. ChatAppRunner:支持对话记忆,提供连续的多轮对话体验
  3. AgentChatAppRunner:结合工具调用能力,支持复杂的推理和执行流程
  4. WorkflowAppRunner:采用变量池机制,支持复杂的数据流处理

提示词组装是 Dify 的核心能力之一,它决定了用户输入如何转化为模型能够理解的格式。理解这一机制对于开发者深入使用 Dify 或定制应用逻辑具有重要意义。在后面的文章中,我们将继续探索应用运行器的其他核心功能,包括内容审核、外部数据扩展、知识库检索、模型调用,以及不同的智能体策略和工作流的执行细节等。


深入 Dify 的应用运行器

在前面的文章中,我们深入分析了 Dify 应用生成器的源码实现,从限流策略、流式响应、配置管理、文件上传处理,到追踪调试机制,逐步了解了 Dify 会话处理的完整流程。今天我们将继续深入 CompletionAppGeneratorgenerate() 方法,看看在创建好应用生成实体后,Dify 是如何通过 应用运行器(App Runner) 来执行具体的业务逻辑。

从生成器到运行器

让我们回顾一下 CompletionAppGeneratorgenerate() 方法,在完成配置管理、文件处理、追踪管理器初始化等前置工作后,接下来的步骤是创建 应用生成实体(App Generate Entity)

application_generate_entity = CompletionAppGenerateEntity(
  
  # 任务ID
  task_id=str(uuid.uuid4()),
  # 应用配置
  app_config=app_config,
  # 模型配置
  model_conf=ModelConfigConverter.convert(app_config),
  # 文件上传配置
  file_upload_config=file_extra_config,

  # 用户输入变量
  inputs=self._prepare_user_inputs(
    user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
  ),
  # 用户查询
  query=query,
  # 上传文件
  files=list(file_objs),
  # 用户ID
  user_id=user.id,
  
  # 是否流式输出
  stream=streaming,
  # 调用来源
  invoke_from=invoke_from,

  # 扩展参数
  extras={},
  # 追踪管理器
  trace_manager=trace_manager,
)

应用生成实体包含了执行一次应用调用所需的所有信息,包括:

  • 配置信息:应用配置、模型配置、文件上传配置
  • 用户数据:输入变量、查询内容、上传文件
  • 执行控制:流式开关、调用来源
  • 附加功能:追踪管理器、扩展参数

后续的运行流程将分成两条线路:一条是我们昨天学习的追踪线程,通过 Celery 任务队列,离线记录业务运行时产生的数据,并发送到外部 Ops 工具:另一条则为工作线程,根据应用生成实体执行具体的生成逻辑:

# 初始化数据库记录(会话和消息)
(conversation, message) = self._init_generate_records(application_generate_entity)

# 初始化队列管理器
queue_manager = MessageBasedAppQueueManager(
  task_id=application_generate_entity.task_id,
  user_id=application_generate_entity.user_id,
  invoke_from=application_generate_entity.invoke_from,
  conversation_id=conversation.id,
  app_mode=conversation.mode,
  message_id=message.id,
)

# 创建工作线程,并传递 Flask 请求上下文
@copy_current_request_context
def worker_with_context():
  return self._generate_worker(
    flask_app=current_app._get_current_object(),  # type: ignore
    application_generate_entity=application_generate_entity,
    queue_manager=queue_manager,
    message_id=message.id,
  )

# 启动工作线程
worker_thread = threading.Thread(target=worker_with_context)
worker_thread.start()

# 主线程处理响应或流生成器
response = self._handle_response(
  application_generate_entity=application_generate_entity,
  queue_manager=queue_manager,
  conversation=conversation,
  message=message,
  user=user,
  stream=streaming,
)
return CompletionAppGenerateResponseConverter.convert(response=response, invoke_from=invoke_from)

这里首先根据应用生成实体创建两条数据库记录,一条是会话记录(conversations),一条是消息记录(messages),如果消息中带有文件,还会创建对应的消息文件记录(message_files);接着创建一个 队列管理器(App Queue Manager),它负责管理应用执行过程中的事件流,实现 生产者-消费者 模式的异步通信;最后启动工作线程,创建 应用运行器(App Runner),执行具体的生成逻辑,并通过队列管理器传递生成结果,同时主线程通过队列管理器监听执行结果,实现了请求处理和业务执行的解耦。整体流程如下:

app-generator-seq.png

至此,我们完成了整个生成器的处理流程的学习,开始进入运行器的学习:

app-generator-flow.png

队列管理器

队列管理器采用了 生产者-消费者 模式,通过 Python 的 queue.Queue 实现线程间的安全通信:

class AppQueueManager:
  def __init__(self, task_id: str, user_id: str, invoke_from: InvokeFrom):
    # 创建线程安全的队列
    self._q: queue.Queue = queue.Queue()

  def listen(self):
    # 监听队列事件,通过 yield 返回生成器
    listen_timeout = dify_config.APP_MAX_EXECUTION_TIME
    start_time = time.time()

    while True:
      try:
        # 从队列中获取消息,超时时间为 1 秒
        message = self._q.get(timeout=1)
        yield message
      except queue.Empty:
        # 检查是否超时或被停止
        elapsed_time = time.time() - start_time
        if elapsed_time >= listen_timeout or self._is_stopped():
          self.publish(QueueStopEvent(), PublishFrom.TASK_PIPELINE)

  def publish(self, event: AppQueueEvent, pub_from: PublishFrom):
    # 发布事件到队列
    self._q.put(event)

队列管理器支持多种类型的事件,包括:

  • QueuePingEvent:Ping 事件,心跳检测,保持连接活跃
  • QueueErrorEvent:错误事件,处理任务执行过程中的错误
  • QueueTextChunkEvent:文本块事件,处理流式文本输出
  • QueueLLMChunkEvent:LLM 流式响应块
  • QueueMessageEndEvent:消息结束事件
  • QueueStopEvent:停止事件

除此之外,还有很多关于工作流的事件,比如节点事件、并行分支事件、迭代事件、循环事件、控制事件等,参考 api/core/app/entities/queue_entities.py 文件。

在整个应用运行过程中,队列管理器扮演着重要的角色。它负责将运行过程中的各种事件发布到队列,实现服务端与客户端的实时通信,并统一处理和发布错误信息:

# 发布生成内容块事件
queue_manager.publish(
    QueueLLMChunkEvent(chunk=result), 
    PublishFrom.APPLICATION_MANAGER
)

# 发布消息结束事件
queue_manager.publish(
    QueueMessageEndEvent(llm_result=llm_result),
    PublishFrom.APPLICATION_MANAGER,
)

# 发布错误事件
queue_manager.publish_error(
    exception, 
    PublishFrom.APPLICATION_MANAGER
)

Flask 请求上下文传递

Flask 的请求上下文默认只在当前线程中有效,当你创建新线程时,新线程无法访问原始请求的信息。Dify 通过 @copy_current_request_context 装饰器解决这个问题:

from flask import copy_current_request_context

@copy_current_request_context
def worker_with_context():
  # 在这里可以访问 current_app、request 等 Flask 上下文对象
  return self._generate_worker(...)

这个装饰器会将当前请求的上下文(包括 current_apprequestsession 等)复制到新线程中,这样,工作线程就可以访问数据库连接、配置信息等依赖于 Flask 应用上下文的资源。比如在 _generate_worker() 函数中,使用 with flask_app.app_context() 手动创建并进入应用上下文:

def _generate_worker(...) -> None:
  with flask_app.app_context():
    # 在这里可以使用 Flask 应用相关的功能,如数据库操作
    message = self._get_message(message_id)
    # ...

注意,这个装饰器和之前学过的 stream_with_context 有所不同,后者用于确保在整个流式响应过程中都能访问请求上下文。

使用 contextvars 拷贝上下文

需要注意的是,前面的代码都是以文本生成应用为例的,和它类似的还有一个聊天应用,这两个应用都比较简单,因此直接使用 Flask 提供的 @copy_current_request_context 装饰器,复制请求上下文即可。

但是在智能体和工作流应用中,包含了更复杂的执行流程,可能涉及多个异步任务,除了 Flask 的请求上下文,还需要更全面的上下文保持。Dify 使用了 Python 3.7+ 的 contextvars 模块,复制所有的上下文变量。我们可以看下 AgentChatAppGenerator 的实现:

# new thread with request context and contextvars
context = contextvars.copy_context()
worker_thread = threading.Thread(
  target=self._generate_worker,
  kwargs={
    "flask_app": current_app._get_current_object(),  # type: ignore
    "context": context,
    "application_generate_entity": application_generate_entity,
    "queue_manager": queue_manager,
    "conversation_id": conversation.id,
    "message_id": message.id,
  },
)
worker_thread.start()

然后在 _generate_worker() 中使用:

with preserve_flask_contexts(flask_app, context_vars=context):
  runner = AgentChatAppRunner()
  runner.run(
    application_generate_entity=application_generate_entity,
    queue_manager=queue_manager,
    conversation=conversation,
    message=message,
  )

结合自定义的 preserve_flask_contexts() 函数,同时处理:

  • ContextVars 上下文 - Python 原生的上下文变量
  • Flask App 上下文 - Flask 应用上下文
  • 用户认证上下文 - Flask-Login 的用户对象

上下文变量

contextvars 是 Python 3.7 引入的标准库,用于管理 上下文变量(Context Variables),主要解决多线程或异步任务中变量传递的问题,主要应用场景有:

  1. 异步编程:在 asyncio 中,每个任务可以有独立的上下文变量
  2. Web开发:跟踪请求ID、用户身份等,无需在函数间显式传递
  3. 日志系统:自动在日志中包含上下文信息(如请求ID)

下面的代码演示了上下文变量的基本用法:

import contextvars

# 创建上下文变量
user_id = contextvars.ContextVar('user_id', default=None)

# 设置值(返回Token对象,用于后续重置)
token = user_id.set(123)

# 获取值
print(user_id.get())  # 输出: 123

# 重置值(使用之前保存的Token)
user_id.reset(token)
print(user_id.get())  # 输出: None(默认值)

# 重新设置值
token = user_id.set(456)

# 在函数中使用
def func():
    print(user_id.get())  # 输出: 456

func()

也可以模仿 Dify 的写法,在多线程中使用:

from contextlib import contextmanager

@contextmanager
def preserve_flask_contexts(context_vars: contextvars.Context):
  # Set context variables if provided
  if context_vars:
    for var, val in context_vars.items():
      var.set(val)
  yield

# 在新线程中使用
import threading

def func2(context: contextvars.Context):
  with preserve_flask_contexts(context_vars=context):
    print(user_id.get())  # 输出: 456

context = contextvars.copy_context()
worker_thread = threading.Thread(
  target=func2,
  kwargs={
    "context": context,
  },
)
worker_thread.start()

可以看出它和线程局部变量 threading.local 很像,两者区别如下:

特性contextvarsthreading.local
适用场景线程、异步任务仅线程
可复制性支持上下文复制不支持
异步友好

contextvars 特别适合需要在复杂调用链或异步任务中共享状态,但又不希望使用全局变量或显式参数传递的场景。但是上下文变量的查找速度略慢于普通变量,还可能使代码逻辑变得隐晦,在使用时需要特别注意,避免过度使用。

创建应用运行器

让我们继续看工作线程中创建应用运行器的部分:

def _generate_worker(
  self,
  flask_app: Flask,
  application_generate_entity: CompletionAppGenerateEntity,
  queue_manager: AppQueueManager,
  message_id: str,
) -> None:
  with flask_app.app_context():
    try:
      # 获取消息记录
      message = self._get_message(message_id)

      # 创建应用运行器并执行
      runner = CompletionAppRunner()
      runner.run(
        application_generate_entity=application_generate_entity,
        queue_manager=queue_manager,
        message=message,
      )
    except Exception as e:
      # 错误处理逻辑...

这里的代码是以文本生成应用为例的,其实,不同类型的应用(如聊天应用、智能体应用、工作流应用)都有对应的运行器实现,它们都遵循统一的接口规范:

app-runner-class.png

Dify 的应用运行器采用了清晰的继承结构,主要基于下面两个基类:

  • AppRunner(应用基础类):提供所有应用运行器的通用功能:提示消息组织、模型调用、内容审核、直接输出响应、外部数据集成等
  • WorkflowBasedAppRunner(工作流基础类):专门处理基于工作流的应用运行逻辑:图初始化、变量池管理、事件处理等

下面是具体的实现类:

  • ChatAppRunner(聊天应用):记忆管理、数据集检索、注释回复、外部数据工具、数据集检索等
  • CompletionAppRunner(文本生成应用):与聊天应用类似但没有对话记忆
  • AgentChatAppRunner(智能体):根据模型能力选择不同的智能体策略(函数调用或思维链)
  • WorkflowAppRunner(工作流):支持单次迭代运行和循环运行
  • AdvancedChatAppRunner(对话流):对话变量管理、输入审核、注释回复等

其中智能体的应用运行器比较特殊,它会根据模型能力选择不同的智能体策略,包括:

  • FunctionCallAgentRunner(函数调用):使用模型原生的函数调用能力,支持流式和非流式调用
  • CotAgentRunner(思维链):实现 ReAct (Reasoning + Acting) 模式的推理循环,它是一个抽象类,使用模板方法设计模式,定义了思维链的算法骨架,由子类实现具体步骤,包括:聊天模式的思维链(CotChatAgentRunner)和文本生成模式的思维链(CotCompletionAgentRunner

小结

今天我们深入学习了 Dify 的应用运行器机制。通过分析 CompletionAppGeneratorgenerate() 方法,我们了解了如何从生成器过渡到工作线程,并在其中创建应用运行器以执行具体生成任务。关键流程包括创建应用生成实体、初始化数据库记录、构建队列管理器以及启动工作线程等。总结如下:

  1. 了解队列管理器的异步管道机制,采用生产者-消费者模式,确保线程间安全通信,并实时处理和发布生成事件;
  2. 学习如何通过 @copy_current_request_contextcontextvars 实现跨线程的请求上下文传递,确保工作线程可以访问原始请求中的信息;
  3. 深入分析了不同类型的应用运行器及其继承结构,展示了如何根据具体应用需求选择不同的策略和实现类。

应用运行器是 Dify 的执行引擎,也是其核心所在。今天我们对这一机制进行了初步探索,明天我们将继续深入它的内部实现,揭示更多细节。


深入 Dify 应用的会话流程之追踪调试

在前面的几篇文章中,我们深入分析了 Dify 应用生成器的源码,包括限流策略、流式响应、配置管理以及上传文件的处理,今天我们将继续深入 CompletionAppGeneratorgenerate() 方法,学习另一个重要的组件 —— 追踪管理器(Trace Manager),通过它引出 LLMOps 的概念,并以 LangFuse 为例演示其使用方式,最后再详细讲解它的实现机制。

创建追踪管理器

让我们继续深入 generate() 方法的后续实现,在组装完成应用配置后,接下来就是创建追踪管理器:

trace_manager = TraceQueueManager(
  app_id=app_model.id,
  user_id=user.id if isinstance(user, Account) else user.session_id
)

这个 trace_manager 将贯穿整个会话流程,负责收集和记录各种运行时数据,为后续的分析和调试提供数据基础。接下来,将 trace_manager 注入到应用生成实体中:

application_generate_entity = CompletionAppGenerateEntity(
  task_id=str(uuid.uuid4()),
  app_config=app_config,
  model_conf=ModelConfigConverter.convert(app_config),
  file_upload_config=file_extra_config,
  inputs=self._prepare_user_inputs(
    user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
  ),
  query=query,
  files=list(file_objs),
  user_id=user.id,
  stream=streaming,
  invoke_from=invoke_from,
  extras={},
  trace_manager=trace_manager,  # 将 trace_manager 注入到实体中
)

应用生成实体 CompletionAppGenerateEntity 是一个携带所有生成所需信息的数据载体,其中就包含了 trace_manager,这样在整个应用执行过程中,各个模块都可以通过这个实体访问到追踪管理器,记录相应的运行数据。

后续的运行流程将分成两条线路:一条为工作线程,执行具体的业务逻辑;另一条为追踪线程,通过 Celery 任务队列,离线记录业务运行时产生的数据,并发送到外部 Ops 工具:

trace-flow.png

什么是 LLMOps

LLMOps(Large Language Model Operations) 即大语言模型运维,是 MLOps 在大语言模型领域的扩展,它涵盖了从 LLM 应用的开发、部署、监控到维护的完整生命周期管理。尽管 LLM 拥有出色的推理和文本生成能力,但其内部运作机制仍然难以完全理解,这给基于 LLM 的应用开发带来了挑战,比如:

  • 评估模型输出质量
  • 降低推理成本
  • 减少模型响应延迟
  • 链式调用、Agent 和工具引入的调试复杂性

目前市面上已经涌现出不少 LLMOps 工具,比如 LangSmithLangfuse 等,能够为 LLM 应用提供全面的追踪和深度评估能力。LLMOps 的核心概念包括:

  • 模型管理: 版本控制、A/B 测试、模型切换
  • 监控观测: 性能指标、成本跟踪、用户行为分析
  • 数据管理: 训练数据、推理日志、反馈收集
  • 部署运维: 自动化部署、扩缩容、故障恢复

不过我们一般更关注监控观测部分,通过在 LLM 应用中埋点,能够实现对模型性能的精细化监控,从而及时发现和解决潜在问题。

Dify 支持多种外部 Ops 工具的集成,包括:

下面我们以 Langfuse 为例,演示如何在 Dify 中配置和使用 LLMOps 工具。

Langfuse 集成演示

Langfuse 是一个开源的 LLM 工程平台,可以帮助团队协作调试、分析和迭代他们的应用程序。

langfuse.png

它提供了以下核心功能:

  • 追踪(Tracing):记录 LLM 应用的完整执行过程
  • 观测(Observability):提供实时的性能监控和可视化
  • 评估(Evaluation):支持多种评估指标和人工标注
  • 数据集管理:管理测试数据和历史记录
  • 成本追踪:监控 Token 使用和费用

获取 Langfuse 的 API Key

我们首先访问 Langfuse 官网,注册账号并登录,然后创建一个组织:

langfuse-new-org.png

然后在组织下创建一个项目:

langfuse-new-project.png

创建成功后,接着为项目创建 API Key:

langfuse-new-apikey.png

点击创建按钮,获取以下三个重要参数:

  • Public Key:公开密钥,用于客户端身份验证
  • Secret Key:私钥,用于服务端 API 调用
  • Host:Langfuse 服务器地址

langfuse-new-apikey-2.png

Langfuse 是开源项目,我们也可以本地部署它。

在 Dify 中配置 Langfuse

接下来,我们再在 Dify 中配置 Langfuse,打开需要监测的应用,点击左侧的 监测 菜单:

dify-monitor.png

这个页面显示了该应用的统计指标,包括会话数、活跃用户数、平均会话互动数、Token 输出速度、用户满意度、费用消耗、全部消息数等。然后再点击右上角的 追踪应用性能 按钮:

dify-monitor-2.png

该选项默认是禁用的,我们点击 Langfuse 右侧的配置按钮,将上面获取的 API 凭据粘贴到配置中并保存:

dify-monitor-3.png

配置成功后,可以在页面中看到状态显示为已启用,表示正在监测。

查看监测数据

配置完成后,当你在 Dify 中调试或使用应用时,所有的执行数据都会自动发送到 Langfuse 平台:

dify-test.png

在 Langfuse 中可以看到详细的追踪数据:

langfuse-trace.png

trace_manager 的实现

现在让我们来看下 Dify 中追踪管理器 trace_manager 的实现机制,了解它是如何收集、处理和发送追踪数据的。它的初始化代码如下:

trace_manager_timer: Optional[threading.Timer] = None
trace_manager_queue: queue.Queue = queue.Queue()

class TraceQueueManager:
  def __init__(self, app_id=None, user_id=None):
    self.app_id = app_id
    self.user_id = user_id
    # 获取该 app 的追踪实例(如 Langfuse, LangSmith 等)
    self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
    # Flask 实例
    self.flask_app = current_app._get_current_object()
    # 启动定时器
    if trace_manager_timer is None:
      self.start_timer()

这里主要有两个关键步骤:

  1. 获取追踪实例:根据应用 ID 查询该应用的追踪配置,初始化对应供应商的追踪实例
  2. 定时处理追踪任务:创建一个定时任务,每隔一段时间扫描一次追踪队列,将队列中的追踪任务发送 Celery 异步处理

获取追踪实例

首先根据应用 ID 查询该应用的追踪配置,也就是 apps 表的 tracing 字段:

{
  "enabled": true,
  "tracing_provider": "langfuse"
}

获取到对应的追踪供应商(如 Langfuse, LangSmith 等),然后再查询 trace_app_config 表获取详细的供应商配置,Dify 通过插件化的架构支持多种追踪供应商:

case TracingProviderEnum.LANGFUSE:
  return {
    "config_class": LangfuseConfig,
    "secret_keys": ["public_key", "secret_key"],
    "other_keys": ["host", "project_key"],
    "trace_instance": LangFuseDataTrace,
  }
case TracingProviderEnum.LANGSMITH:
  return {
    "config_class": LangSmithConfig,
    "secret_keys": ["api_key"],
    "other_keys": ["project", "endpoint"],
    "trace_instance": LangSmithDataTrace,
  }
case TracingProviderEnum.OPIK:
  # ...
case TracingProviderEnum.WEAVE:
  # ...
case TracingProviderEnum.ARIZE:
  # ...
case TracingProviderEnum.PHOENIX:
  # ...
case TracingProviderEnum.ALIYUN:
  # ...
case _:
  raise KeyError(f"Unsupported tracing provider: {provider}")

每个供应商的实现都由四个部分组成:

  • config_class - 该供应商的配置类
  • secret_keys - 该供应商的密钥信息,比如 Langfuse 的 public_keysecret_key,注意这些信息 Dify 都做了加密处理,防止密钥的泄漏
  • other_keys - 其他无需加密处理的信息
  • trace_instance - 该供应商的具体实现

定时处理追踪任务

然后通过 threading.Timer 创建一个定时任务,默认每隔 5s 执行一次:

trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5))

def start_timer(self):
  trace_manager_timer = threading.Timer(trace_manager_interval, self.run)
  trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}"
  trace_manager_timer.daemon = False
  trace_manager_timer.start()

该任务的实现 run() 方法如下:

trace_manager_queue: queue.Queue = queue.Queue()
trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100))

def run(self):
  tasks = self.collect_tasks()
  self.send_to_celery(tasks)

# 从队列中收集任务
def collect_tasks(self):
  tasks: list[TraceTask] = []
  while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty():
    # 批量收集,最多 batch_size 个
    task = trace_manager_queue.get_nowait()
    tasks.append(task)
    trace_manager_queue.task_done()
  return tasks

# 异步处理任务
def send_to_celery(self, tasks: list[TraceTask]):
  with self.flask_app.app_context():
    for task in tasks:

      # 获取任务数据
      trace_info = task.execute()
      task_data = TaskData(
        app_id=task.app_id,
        trace_info_type=type(trace_info).__name__,
        trace_info=trace_info.model_dump() if trace_info else None,
      )

      # 将数据保存到文件中
      file_id = uuid4().hex
      file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json"
      storage.save(file_path, task_data.model_dump_json().encode("utf-8"))

      # 将任务发送到 Celery 队列进行异步处理
      file_info = {
        "file_id": file_id,
        "app_id": task.app_id,
      }
      process_trace_tasks.delay(file_info)

核心逻辑主要分为两个步骤:

  1. 收集任务:调用 collect_tasks() 从队列中收集一批待处理的追踪任务;该方法从全局队列 trace_manager_queue 中取出最多 trace_manager_batch_size 个任务;
  2. 发送任务:如果有任务,则调用 send_to_celery(tasks) 将任务发送到 Celery 异步任务队列;该方法首先将任务序列化后保存到存储系统,然后通过 process_trace_tasks.delay() 异步执行;

值得注意的是,这里的 process_trace_tasks 是一个 Celery 任务,因此可以调用 delay() 方法。它通过 @shared_task 装饰:

from celery import shared_task

@shared_task(queue="ops_trace")
def process_trace_tasks(file_info):
  #...

@shared_task 是 Celery 框架提供的装饰器,用于创建可以在不同 Celery 应用实例之间共享的异步任务。它最大的优势是应用无关性,不依赖特定的 Celery 应用实例,可以提高代码的可重用性。

添加追踪任务

上面提到,追踪管理器 trace_manager 被注入到应用生成实体中,因此在整个应用执行过程中,各个模块都可以通过追踪管理器记录相应的运行数据。当需要记录追踪数据时,通过 add_trace_task() 方法将任务添加到队列:

def add_trace_task(self, trace_task: TraceTask):
  global trace_manager_timer, trace_manager_queue
  try:
    if self.trace_instance:
      # 只有配置了追踪时才添加任务
      trace_task.app_id = self.app_id
      trace_manager_queue.put(trace_task)
  except Exception:
    logger.exception("Error adding trace task, trace_type %s", trace_task.trace_type)
  finally:
    # 确保定时器在运行
    self.start_timer()

Dify 支持多种类型的追踪任务,每种类型记录不同的运行时数据:

class TraceTaskName:
  # 会话追踪(未使用)
  CONVERSATION_TRACE = "conversation_trace"
  # 工作流追踪,记录工作流的执行详情
  WORKFLOW_TRACE = "workflow_trace"
  # 消息追踪,在消息处理过程中记录完整的对话数据
  MESSAGE_TRACE = "message_trace"
  # 审核追踪,记录内容审核的详细过程
  MODERATION_TRACE = "moderation_trace"
  # 建议问题追踪
  SUGGESTED_QUESTION_TRACE = "suggested_question_trace"
  # 数据集检索追踪,记录 RAG 检索的过程和结果
  DATASET_RETRIEVAL_TRACE = "dataset_retrieval_trace"
  # 工具调用追踪,记录智能体工具调用的详细信息
  TOOL_TRACE = "tool_trace"
  # 会话标题生成追踪
  GENERATE_NAME_TRACE = "generate_name_trace"

小结

今天我们深入学习了 Dify 的追踪调试机制,通过源码剖析,了解了 trace_manager 作为追踪系统的入口点,负责收集整个会话过程中的运行数据。此外,我们学习了 LLMOps 的概念以及常见的 LLMOps 工具,并通过 LangFuse 的实际使用,了解了如何在 Dify 中集成外部 Ops 工具,实现对应用运行状态的全面监控。

通过追踪管理器,Dify 实现了对应用执行过程的全面追踪,从消息对话、工具调用到知识库检索,不仅为开发者提供了强大的调试能力,也为生产环境下的性能优化和问题诊断奠定了坚实的基础。