Fork me on GitHub

分类 Dify 下的文章

深入 Dify 应用的会话流程之文件上传

我们之前曾学习过,当模型具备处理图片、文档、音视频的能力时(比如 Gemini 2.5 Pro),在应用的配置页面会多出三个配置开关:

file-settings.png

在 “视觉” 右侧有一个设置按钮,还可以进行更细致的配置:

vision.png

这个设置按钮虽然在 “视觉” 右侧,但是 “上传方式” 和 “上传数量限制” 对所有类型的文件都是生效的。

开启之后,我们就可以在对话时上传文件:

file-upload.png

今天我们将继续深入 Dify 应用生成器的内部实现,聚焦于文件上传部分,看看 Dify 是如何处理这几种不同类型的文件,以及 Dify 如何通过 SSRF 防护机制保障文件处理的安全性。

文件上传配置

我们昨天已经学习了 应用模型配置覆盖模型配置 的概念,它存储着应用的详细配置信息,其中就包括文件上传相关的配置:

{
  "image": {
    "detail": "high",
    "enabled": true,
    "number_limits": 3,
    "transfer_methods": [
      "remote_url",
      "local_file"
    ]
  },
  "enabled": true,
  "allowed_file_types": [
    "image"
  ],
  "allowed_file_extensions": [
    ".JPG", ".JPEG", ".PNG", ".GIF", ".WEBP", ".SVG", ".MP4", ".MOV", ".MPEG", ".WEBM"
  ],
  "allowed_file_upload_methods": [
    "remote_url",
    "local_file"
  ],
  "number_limits": 3,
  "fileUploadConfig": {
    "file_size_limit": 15,
    "batch_count_limit": 5,
    "image_file_size_limit": 10,
    "video_file_size_limit": 100,
    "audio_file_size_limit": 50,
    "workflow_file_upload_limit": 10
  }
}

这里的几个参数解释如下:

  • enabled 是否开启文件上传,当开启视觉、文档、音频三个开关中的任意一个时,该值即为 true
  • allowed_file_types 允许的文件类型,支持 image、document、audio 和 video 四种类型
  • allowed_file_extensions 允许的文件后缀,这个配置貌似没什么用,以 allowed_file_types 为准
  • allowed_file_upload_methods 允许的文件上传方式,支持 本地上传通过 URL 上传,默认两者都支持,对话框中上传文件的样式会根据这个参数而改变
  • number_limits 允许最多上传多少个文件
  • image.detail 图片分辨率设置,仅针对图片生效;低分辨率模式 将使模型接收图像的低分辨率版本,适用于对图片细节要求不高的场景,比如图片分类或简单的图片理解任务;高分辨率模式 处理速度慢,并消耗更多的处理资源,适用于需要分析图片细节的场景,比如 OCR、识别图片中的文字或复杂内容等

其中 fileUploadConfig 中的参数限制了前端上传各种类型文件的大小和数量,可以在 .env 文件中调整:

# Upload configuration
UPLOAD_FILE_SIZE_LIMIT=15
UPLOAD_FILE_BATCH_LIMIT=5
UPLOAD_IMAGE_FILE_SIZE_LIMIT=10
UPLOAD_VIDEO_FILE_SIZE_LIMIT=100
UPLOAD_AUDIO_FILE_SIZE_LIMIT=50

# Workflow file upload limit
WORKFLOW_FILE_UPLOAD_LIMIT=10

我们继续来看看 CompletionAppGeneratorgenerate() 方法,昨天从数据库中获取应用配置之后,接下来就是从应用配置中提取 文件上传配置

file_extra_config = FileUploadConfigManager.convert(
  override_model_config_dict or app_model_config.to_dict()
)

经过 convert() 方法后,得到 FileUploadConfig 对象:

class FileUploadConfig(BaseModel):
  image_config: Optional[ImageConfig] = None
  allowed_file_types: Sequence[FileType] = Field(default_factory=list)
  allowed_file_extensions: Sequence[str] = Field(default_factory=list)
  allowed_file_upload_methods: Sequence[FileTransferMethod] = Field(default_factory=list)
  number_limits: int = 0

文件工厂

当用户对话时传入了文件,在会话接口的入参中会多一个 files 参数,它是一个数组,格式如下:

[
  {
    "type": "image",
    "transfer_method": "local_file",
    "url": "",
    "upload_file_id": "d9341dfc-ceab-4041-9faf-a1a28579c589"
  },
  {
    "type": "image",
    "transfer_method": "remote_url",
    "url": "http://localhost:5001/files/90a2c3ad-d0c9-4d48-a7f9-b40e1dada22e/file-preview...",
    "upload_file_id": "90a2c3ad-d0c9-4d48-a7f9-b40e1dada22e"
  }
]

接下来的代码逻辑是,根据文件上传配置,将传入的文件转换为统一的 File 对象。这一步通过文件工厂的 build_from_mappings() 方法构建:

if file_extra_config:
  files = args["files"] if args.get("files") else []
  file_objs = file_factory.build_from_mappings(
    mappings=files,
    tenant_id=app_model.tenant_id,
    config=file_extra_config,
  )
else:
  file_objs = []

文件工厂通过一个简洁的分发机制来处理不同类型的文件:

def build_from_mapping(
  *,
  mapping: Mapping[str, Any],
  tenant_id: str,
  config: FileUploadConfig | None = None,
  strict_type_validation: bool = False,
) -> File:
  
  # 根据传输方式分发到不同的构建函数
  build_functions: dict[FileTransferMethod, Callable] = {
    FileTransferMethod.LOCAL_FILE: _build_from_local_file,
    FileTransferMethod.REMOTE_URL: _build_from_remote_url,
    FileTransferMethod.TOOL_FILE: _build_from_tool_file,
  }

  # 根据 transfer_method 找到对应的构建方法
  transfer_method = FileTransferMethod.value_of(mapping.get("transfer_method"))
  build_func = build_functions.get(transfer_method)

  # 构建文件对象
  file: File = build_func(
    mapping=mapping,
    tenant_id=tenant_id,
    transfer_method=transfer_method,
    strict_type_validation=strict_type_validation,
  )
  return file

可以看到不同的文件传输方式有不同的构建方法,Dify 支持三种文件传输方式:

  1. LOCAL_FILE - 本地文件
  2. REMOTE_URL - 远程文件
  3. TOOL_FILE - 工具文件

前两种我们在文章开篇已经见过了,在 Dify 的对话框中,支持两种文件上传方式:本地上传通过 URL 上传,他们都会获取并检查文件名,验证文件大小和类型,并生成一个唯一的文件键,保存到配置的存储后端(本地或云存储),同时还会创建一条数据库记录,保存到 upload_files 表中。

另外,Dify 中还支持通过工具生成文件,比如我们之前使用的文本转语音工具,它会生成一个音频文件,这个文件就是 工具文件,它同样保存在配置的存储后端,对应数据库中的 tool_files 表。

文件存储后端

Dify 支持本地存储、S3、阿里云 OSS、Azure Blob 等多种存储后端:

  • opendal (默认,推荐)
  • s3 (Amazon S3 或兼容 S3 的服务)
  • aliyun-oss (阿里云对象存储)
  • azure-blob (Azure Blob 存储)
  • google-storage (Google Cloud Storage)
  • tencent-cos (腾讯云对象存储)
  • huawei-obs (华为云对象存储)
  • baidu-obs (百度对象存储)
  • oci-storage (Oracle Cloud Infrastructure)
  • volcengine-tos (火山引擎对象存储)
  • supabase (Supabase 存储)
  • clickzetta-volume (ClickZetta 卷存储)
  • local (本地存储,已弃用)

默认使用的是 opendal 本地存储,存储路径位于 ./api/storage,文件上传后,完整的路径结构为:

./api/storage/upload_files/{tenant_id}/{uuid}.{extension}

工具生成的文件存储路径为:

./api/storage/tools/{tenant_id}/{uuid}.{extension}

Dify 的文件存储按租户隔离,确保租户的文件安全,并通过 SHA3-256 哈希值支持文件去重检测。

Apache OpenDAL(Open Data Access Layer) 是一个开源的数据访问层项目。它允许用户通过统一的 API 简单且高效地访问不同存储服务上的数据,其核心愿景是 One Layer, All Storage(一层接口,所有存储)

如果要切换到不同的存储后端,可以在 .env 文件中设置 STORAGE_TYPE 和相应的配置参数,比如切换到 Amazon S3:

# 基础配置
STORAGE_TYPE=s3

# S3 配置参数
S3_USE_AWS_MANAGED_IAM=false
S3_ENDPOINT=https://s3.amazonaws.com  # 或其他 S3 兼容服务
S3_BUCKET_NAME=your-bucket-name
S3_ACCESS_KEY=your-access-key
S3_SECRET_KEY=your-secret-key
S3_REGION=us-east-1
S3_ADDRESS_STYLE=auto  # auto, virtual, or path

构建本地文件

本地文件是用户通过 Dify 的文件上传界面上传到服务器存储的文件。这些文件会先存储在 Dify 的文件系统中(如本地磁盘、S3 等),并在数据库中记录相关元信息。接下来,我们看看本地文件的构建过程:

def _build_from_local_file(
  *,
  mapping: Mapping[str, Any],
  tenant_id: str,
  transfer_method: FileTransferMethod,
  strict_type_validation: bool = False,
) -> File:
  upload_file_id = mapping.get("upload_file_id")

  # 从数据库查询文件信息
  stmt = select(UploadFile).where(
    UploadFile.id == upload_file_id,
    UploadFile.tenant_id == tenant_id,
  )
  row = db.session.scalar(stmt)

  # 根据文件扩展和 MIME 类型获取文件类型
  # 文件类型可以是 IMAGE、DOCUMENT、AUDIO、VIDEO、CUSTOM
  file_type = _standardize_file_type(
    extension="." + row.extension,
    mime_type=row.mime_type
  )

  # 构建 File 对象
  return File(
    id=mapping.get("id"),
    filename=row.name,
    extension="." + row.extension,
    mime_type=row.mime_type,
    tenant_id=tenant_id,
    type=file_type,
    transfer_method=transfer_method,
    remote_url=row.source_url,
    related_id=mapping.get("upload_file_id"),
    size=row.size,
    storage_key=row.key,  # 存储系统中的键值
  )

本地文件的处理流程相对简单,主要包括:

  1. 查询文件记录:根据 upload_file_id 从数据库中查询文件的基本信息,包含文件名、文件大小、扩展名、MIME 类型以及存储系统中的键值等,这些信息是在文件上传时保存到数据库中的
  2. 类型检测:根据文件扩展名和 MIME 类型自动检测文件类型,文件类型是 IMAGE、DOCUMENT、AUDIO、VIDEO、CUSTOM 之一
  3. 构建 File 对象:将获取到的文件信息构建成统一的 File 对象,供后续使用

构建远程文件

远程文件是通过 URL 引用的外部文件,比如用户提供的图片链接、文档链接等。Dify 针对远程文件有两种处理方式,第一种是通过远程文件上传接口,根据 URL 从远程下载文件到存储系统,此时和本地文件的处理逻辑几乎一样;第二种是直接在会话接口中传入 URL 地址,这种方式不需要将文件上传到 Dify 服务器,而是在需要时动态获取。

def _build_from_remote_url(
  *,
  mapping: Mapping[str, Any],
  tenant_id: str,
  transfer_method: FileTransferMethod,
  strict_type_validation: bool = False,
) -> File:

  # 检查是否有关联的上传文件记录
  upload_file_id = mapping.get("upload_file_id")
  if upload_file_id:
    # 已缓存的远程文件,和本地文件一样处理...

  # 处理纯远程 URL
  url = mapping.get("url") or mapping.get("remote_url")

  # 获取远程文件信息
  mime_type, filename, file_size = _get_remote_file_info(url)
  extension = mimetypes.guess_extension(mime_type) or (
    "." + filename.split(".")[-1] if "." in filename else ".bin"
  )

  # 根据文件扩展和 MIME 类型获取文件类型
  file_type = _standardize_file_type(
    extension=extension, 
    mime_type=mime_type
  )

  # 构建 File 对象
  return File(
    id=mapping.get("id"),
    filename=filename,
    tenant_id=tenant_id,
    type=file_type,
    transfer_method=transfer_method,
    remote_url=url,
    mime_type=mime_type,
    extension=extension,
    size=file_size,
    storage_key="",  # 远程文件没有本地存储键
  )

远程文件的关键在于 _get_remote_file_info() 函数,它负责获取远程文件的元信息,包括 MIME 类型、文件名、文件大小:

def _get_remote_file_info(url: str):
  
  # 解析 URL 获取路径部分
  parsed_url = urllib.parse.urlparse(url)
  url_path = parsed_url.path

  # 从路径中提取文件名
  filename = os.path.basename(url_path)

  # 从文件名推测 MIME 类型
  mime_type, _ = mimetypes.guess_type(filename)

  # 初始化文件大小
  file_size = -1

  # 发送 HEAD 请求获取文件信息(注意这里使用了 ssrf_proxy)
  resp = ssrf_proxy.head(url, follow_redirects=True)
  if resp.status_code == httpx.codes.OK:
    # 从 Content-Disposition 头获取真实文件名
    if content_disposition := resp.headers.get("Content-Disposition"):
      filename = str(content_disposition.split("filename=")[-1].strip('"'))
      mime_type, _ = mimetypes.guess_type(filename)

    # 从 Content-Length 头获取文件大小
    file_size = int(resp.headers.get("Content-Length", file_size))

    # 从 Content-Type 头获取 MIME 类型
    if not mime_type:
      mime_type = resp.headers.get("Content-Type", "").split(";")[0].strip()

  return mime_type, filename, file_size

它的核心流程是,先通过 URL 解析,提取出初始文件名,基于文件名推测出 MIME 类型,接着再向远程 URL 发送 HEAD 请求,获取 HTTP 响应头,从响应头中获取准确的文件元信息:

  • Content-Disposition 头获取真实文件名
  • Content-Length 头获取文件大小
  • Content-Type 头获取 MIME 类型

这里 Dify 使用的两个技巧值得我们学习:

  1. Dify 使用 HEAD 请求而非 GET,只获取元数据不下载文件内容,可以提高效率
  2. Dify 使用 ssrf_proxy 而不是直接的 HTTP 请求,防止 SSRF 攻击,这是一个很重要的安全考虑,我们稍后会详细讨论这个安全机制

构建工具文件

工具文件是智能体或工具在执行过程中生成的临时文件,比如代码解释器生成的图表、文件处理工具创建的文档等。

def _build_from_tool_file(
  *,
  mapping: Mapping[str, Any],
  tenant_id: str,
  transfer_method: FileTransferMethod,
  strict_type_validation: bool = False,
) -> File:

  # 从数据库获取工具文件信息
  tool_file = db.session.scalar(
    select(ToolFile).where(
      ToolFile.id == mapping.get("tool_file_id"),
      ToolFile.tenant_id == tenant_id,
    )
  )

  # 根据文件扩展和 MIME 类型获取文件类型
  # 构建 File 对象

工具文件的构建过程和本地文件差不多,唯一的区别在于它存储在数据库的 tool_files 表中。

SSRF 防护机制

在远程文件处理中,我们注意到 Dify 使用了 ssrf_proxy.head() 而不是直接的 HTTP 请求。这涉及到一个重要的安全概念 —— SSRF(Server-Side Request Forgery,服务器端请求伪造)

什么是 SSRF

SSRF 是一种安全漏洞,攻击者可以诱使服务器代表他们向任意目标发起 HTTP 请求。这种攻击通常发生在 Web 应用需要获取用户提供的 URL 资源时,比如:

  • 图片预览功能
  • 网页截图服务
  • 文件下载功能
  • Webhook 回调

如果 Web 应用直接使用用户提供的 URL 而没有进行适当的验证和过滤,就可能遭受 SSRF 攻击。攻击者可以利用 SSRF 漏洞:

  1. 扫描内网:通过服务器访问内网地址(如 127.0.0.1192.168.x.x)来探测内网服务
  2. 绕过防火墙:利用服务器的网络位置访问被防火墙保护的资源
  3. 访问云元数据:在云环境中访问实例元数据服务(如 AWS EC2 的 169.254.169.254
  4. 端口扫描:探测服务器可访问的其他服务端口

例如,攻击者可能提供这样的恶意 URL:

  • http://127.0.0.1:6379/ - 访问本地 Redis
  • http://169.254.169.254/latest/meta-data/ - 访问 AWS 元数据服务
  • file:///etc/passwd - 读取本地文件

关于 SSRF 相关知识,推荐阅读这篇文章:

Dify 的 SSRF 防护策略

在之前的入门篇中,细心的读者可能已经注意到,在 docker-compose.yaml 文件中,一些服务配置了 SSRF_PROXYHTTP_PROXY 环境变量,全部指向一个 ssrf_proxy 容器:

ssrf_proxy:
  image: ubuntu/squid:latest
  restart: always
  volumes:
    - ./ssrf_proxy/squid.conf.template:/etc/squid/squid.conf.template
    - ./ssrf_proxy/docker-entrypoint.sh:/docker-entrypoint-mount.sh
  entrypoint:
    [ "sh", "-c", "docker-entrypoint.sh" ]
 environment:
   HTTP_PORT: ${SSRF_HTTP_PORT:-3128}
 networks:
   - ssrf_proxy_network
   - default

为避免不必要的风险,Dify 为所有可能引发 SSRF 攻击的服务配置了代理,并强制像 Sandbox 这样的沙盒服务只能通过代理访问外部网络,以确保数据和服务安全。在生产环境中,Dify 推荐使用 Squid 作为 SSRF 防护的代理服务器。

squid.png

默认情况下,该代理不会拦截任何本地请求,但我们可以通过修改其配置文件自定义代理行为。它的配置文件如下:

# 网络范围定义
# - 定义各种私有网络和本地网络范围,包括 RFC 1918 私有网络(10.x.x.x, 172.16-31.x.x, 192.168.x.x)
# - 包含 IPv6 本地网络范围和链路本地地址
acl localnet src 0.0.0.1-0.255.255.255    # RFC 1122 "this" network (LAN)
acl localnet src 10.0.0.0/8        # RFC 1918 local private network (LAN)
acl localnet src 100.64.0.0/10        # RFC 6598 shared address space (CGN)
acl localnet src 169.254.0.0/16     # RFC 3927 link-local (directly plugged) machines
acl localnet src 172.16.0.0/12        # RFC 1918 local private network (LAN)
acl localnet src 192.168.0.0/16        # RFC 1918 local private network (LAN)
acl localnet src fc00::/7           # RFC 4193 local private network range
acl localnet src fe80::/10          # RFC 4291 link-local (directly plugged) machines

# 端口访问控制
# - SSL_ports:允许的 SSL 端口(443)
# - Safe_ports:允许的安全端口(HTTP 80, HTTPS 443, FTP 21 等标准端口)
acl SSL_ports port 443
# acl SSL_ports port 1025-65535   # Enable the configuration to resolve this issue: https://github.com/langgenius/dify/issues/12792
acl Safe_ports port 80        # http
acl Safe_ports port 21        # ftp
acl Safe_ports port 443        # https
acl Safe_ports port 70        # gopher
acl Safe_ports port 210        # wais
acl Safe_ports port 1025-65535    # unregistered ports
acl Safe_ports port 280        # http-mgmt
acl Safe_ports port 488        # gss-http
acl Safe_ports port 591        # filemaker
acl Safe_ports port 777        # multiling http

# 方法和域名控制
# - CONNECT:CONNECT 方法控制
# - allowed_domains:只允许访问 .marketplace.dify.ai 域名
acl CONNECT method CONNECT
acl allowed_domains dstdomain .marketplace.dify.ai

# HTTP 访问规则,按优先级顺序:
# 1. 允许访问指定域名:允许访问 marketplace.dify.ai
# 2. 拒绝不安全端口:拒绝访问非安全端口
# 3. 限制 CONNECT 方法:只允许对 SSL 端口使用 CONNECT
# 4. 管理访问控制:只允许 localhost 进行管理
# 5. 拒绝所有其他访问:默认拒绝策略
http_access allow allowed_domains
http_access deny !Safe_ports
http_access deny CONNECT !SSL_ports
http_access allow localhost manager
http_access deny manager
http_access allow localhost
include /etc/squid/conf.d/*.conf
http_access deny all

# 为沙箱提供反向代理
http_port ${REVERSE_PROXY_PORT} accel vhost
cache_peer ${SANDBOX_HOST} parent ${SANDBOX_PORT} 0 no-query originserver
acl src_all src all
http_access allow src_all

通过 Squid 的访问控制列表(ACL),Dify 实现了有效的网络隔离和访问控制,大大降低了 SSRF 攻击的风险。这是一个值得学习和借鉴的安全最佳实践!

小结

我们今天学习了 Dify 对上传文件的处理过程,主要内容总结如下:

  • 支持 OpenDAL、S3、阿里云 OSS 等多样化的存储后端,默认按租户隔离文件,保障数据安全性;
  • 采用工厂模式统一文件处理逻辑,支持 LOCAL_FILEREMOTE_URLTOOL_FILE 三种不同的文件传输方式,统一转换为 File 对象;
  • 对于远程文件,Dify 引入 Squid 代理,通过 ACL 规则禁止访问内网地址与非安全端口,防范远程文件带来的 SSRF 风险;

在下一篇文章中,我们将继续深入会话流程的源码,探讨 Dify 是如何通过集成外部 Ops 工具,实现全面的追踪和深度评估能力。


深入 Dify 应用的会话流程之配置管理

在节前的文章中,我们深入分析了 Dify 会话处理流程的流式处理机制,学习了限流生成器、事件流转换和响应格式化的实现原理。通过分析 rate_limit.generate()convert_to_event_stream()compact_generate_response() 三个核心函数,我们理解了 Dify 是如何优雅地统一处理流式和非流式响应的。

今天我们将继续深入应用生成器的内部实现,也就是下面代码中的 “步骤 3”:

if app_model.mode == AppMode.COMPLETION.value:
  return rate_limit.generate(                        # 步骤1:限流生成器
    CompletionAppGenerator.convert_to_event_stream(  # 步骤2:事件流转换
      CompletionAppGenerator().generate(             # 步骤3:应用生成器
        app_model=app_model, user=user, args=args,
        invoke_from=invoke_from, streaming=streaming
      ),
    ),
    request_id=request_id,
  )

Python 中的方法重载机制

首先,在 CompletionAppGeneratorgenerate() 方法中,我们可以看到一个有意思的写法:

class CompletionAppGenerator(MessageBasedAppGenerator):
  
  @overload
  def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: Literal[True], # 明确指定为 True
  ) -> Generator[str | Mapping[str, Any], None, None]: ... # 返回生成器

  @overload
  def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: Literal[False], # 明确指定为 False
  ) -> Mapping[str, Any]: ... # 返回字典

  @overload
  def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: bool = False, # 通用情况
  ) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]: ...

  def generate(
    self,
    app_model: App,
    user: Union[Account, EndUser],
    args: Mapping[str, Any],
    invoke_from: InvokeFrom,
    streaming: bool = True,
  ) -> Union[Mapping[str, Any], Generator[str | Mapping[str, Any], None, None]]:
    # 实际的实现逻辑
    pass

可以看到这里定义了几种不同签名的 generate() 方法,并带有 @overload 装饰器,但是却没有真正的实现(实现部分只有 ...),这其实是 Python 中特殊的方法重载机制。

方法重载(Method Overloading) 在传统的强类型语言中是原生支持的特性,然而,Python 作为动态语言,并不支持方法重载。在 Python 中,如果在同一个类中定义多个同名方法,后定义的方法会覆盖先定义的方法:

class Example:
  def method(self, x: int):
    return f"Integer: {x}"

  def method(self, x: str):  # 覆盖了上面的方法
    return f"String: {x}"

example = Example()
print(example.method(42))  # 运行时错误!参数不匹配

为了解决这个问题,Python 3.5 引入了 typing.overload 装饰器,它不是真正的方法重载,而是为 静态类型检查器 提供类型提示的工具。@overload 为同一个函数提供多个类型签名,让类型检查器能够根据不同的参数类型组合推断出相应的返回类型。被 @overload 装饰的方法称为 重载签名,用于描述不同调用方式的类型信息,但这些方法本身不会被执行。最后一个不带 @overload 装饰器的同名方法才是真正的实现。

Python 的 @overload 并不是真正的方法重载,而是为静态类型检查器提供类型提示的工具。

通过 @overload 装饰器,可以提供几个好处:

  1. 类型安全性:在编译时就能检测出类型错误,避免运行时错误
  2. IDE 智能提示:IDE 可以根据参数类型提供更精确的代码提示

在上面的例子中:

  • streaming=True 时,返回类型是 Generator(流式响应)
  • streaming=False 时,返回类型是 Mapping(非流式响应)
  • streamingbool 类型时,返回联合类型

这样,当开发者调用 generate() 方法时,IDE 和类型检查器就能根据 streaming 参数的值自动推断出正确的返回类型,提升了代码的类型安全性和开发体验。

应用模型配置

我们继续看 generate() 的实现,首先是获取 应用模型配置(app model config),这里有三种情况。第一种是无会话状态的应用,比如文本生成,直接根据 应用模型(app model) 获取配置:

# get conversation
conversation = None

# get app model config
app_model_config = self._get_app_model_config(app_model=app_model, conversation=conversation)

第二种是有会话状态的应用,比如聊天助手或智能体,根据传入的会话 ID 获取配置:

# get conversation
conversation = None
conversation_id = args.get("conversation_id")
if conversation_id:
  conversation = ConversationService.get_conversation(
    app_model=app_model, conversation_id=conversation_id, user=user
  )

# get app model config
app_model_config = self._get_app_model_config(app_model=app_model, conversation=conversation)

第三种是工作流或对话流应用,它们没有应用模型配置,因此忽略这一步。

应用模型(app model)应用模型配置(app model config) 是两个不同的概念:应用模型对应数据库中的 apps 表,代表一个应用的基本信息和元数据,它的 ORM 模型如下:

class App(Base):
  __tablename__ = "apps"
  __table_args__ = (sa.PrimaryKeyConstraint("id", name="app_pkey"), sa.Index("app_tenant_id_idx", "tenant_id"))

  # 应用ID
  id: Mapped[str] = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
  # 租户ID
  tenant_id: Mapped[str] = mapped_column(StringUUID)
  # 应用名称
  name: Mapped[str] = mapped_column(String(255))
  # 应用描述
  description: Mapped[str] = mapped_column(sa.Text, server_default=sa.text("''::character varying"))
  # 应用模式 (chat, completion, workflow, etc.)
  mode: Mapped[str] = mapped_column(String(255))
  # 图标类型 (image, emoji)
  icon_type: Mapped[Optional[str]] = mapped_column(String(255))
  # 图标
  icon = mapped_column(String(255))
  # 关联的配置ID
  app_model_config_id = mapped_column(StringUUID, nullable=True)
  # 关联的工作流ID (可选)
  workflow_id = mapped_column(StringUUID, nullable=True)
  # 应用状态
  status: Mapped[str] = mapped_column(String(255), server_default=sa.text("'normal'::character varying"))
  # 是否启用站点
  enable_site: Mapped[bool] = mapped_column(sa.Boolean)
  # 是否启用API
  enable_api: Mapped[bool] = mapped_column(sa.Boolean)
  # ... 其他基础字段  

而应用模型配置对应数据库中的 app_model_configs 表,存储应用的详细配置信息,对应的 ORM 模型如下:

class AppModelConfig(Base):
  __tablename__ = "app_model_configs"
  __table_args__ = (sa.PrimaryKeyConstraint("id", name="app_model_config_pkey"), sa.Index("app_app_id_idx", "app_id"))

  # 配置ID
  id = mapped_column(StringUUID, server_default=sa.text("uuid_generate_v4()"))
  # 所属应用ID
  app_id = mapped_column(StringUUID, nullable=False)
  # 模型提供商
  provider = mapped_column(String(255), nullable=True)
  # 模型ID
  model_id = mapped_column(String(255), nullable=True)
  # 通用配置 (JSON格式)
  configs = mapped_column(sa.JSON, nullable=True)
  # 开场白 (JSON字符串)
  opening_statement = mapped_column(sa.Text)
  # 建议问题 (JSON字符串)
  suggested_questions = mapped_column(sa.Text)
  # 语音转文字 (JSON字符串)
  speech_to_text = mapped_column(sa.Text)
  # 文字转语音 (JSON字符串)
  text_to_speech = mapped_column(sa.Text)
  # 更多类似的 (JSON字符串)
  more_like_this = mapped_column(sa.Text)
  # 模型配置 (JSON字符串)
  model = mapped_column(sa.Text)
  # 用户输入表单 (JSON字符串)
  user_input_form = mapped_column(sa.Text)
  # 提示词模板
  pre_prompt = mapped_column(sa.Text)
  # 智能体模式 (JSON字符串)
  agent_mode = mapped_column(sa.Text)
  # 敏感词审查 (JSON字符串)
  sensitive_word_avoidance = mapped_column(sa.Text)
  # 引用和归属 (JSON字符串)
  retriever_resource = mapped_column(sa.Text)
  # 知识库配置 (JSON字符串)
  dataset_configs = mapped_column(sa.Text)
  # 文件上传配置 (JSON字符串)
  file_upload = mapped_column(sa.Text)
  # ... 其他基础字段

可以看到,应用模型配置里的大多数字段都是 JSON 字符串,因此 AppModelConfig 还提供了一些便捷的属性访问器(通过 @property 装饰器实现),将 JSON 字符串转换为 Python 对象:

@property
def model_dict(self) -> dict:
  """模型配置字典"""
  return json.loads(self.model) if self.model else {}

@property
def suggested_questions_list(self) -> list:
  """建议问题列表"""
  return json.loads(self.suggested_questions) if self.suggested_questions else []

@property
def file_upload_dict(self) -> dict:
  """文件上传配置字典"""
  return json.loads(self.file_upload) if self.file_upload else {
    "image": {
      "enabled": False,
      "number_limits": DEFAULT_FILE_NUMBER_LIMITS,
      "detail": "high",
      "transfer_methods": ["remote_url", "local_file"],
    }
  }

@property
def dataset_configs_dict(self) -> dict:
  """知识库配置字典"""
  if self.dataset_configs:
    dataset_configs = json.loads(self.dataset_configs)
    if "retrieval_model" not in dataset_configs:
      return {"retrieval_model": "single"}
    return dataset_configs
  return {"retrieval_model": "multiple"}

应用配置可能会被反复修改,因此一个应用可能会有多个版本的配置,每当点击 “发布更新” 时就会生成一条配置记录:

apps-publish.png

要特别注意的是,当用户第一次创建会话时,会话和应用配置关联,该会话的后续聊天内容都将以该配置为准,如果此时修改应用配置,可能会不生效,必须创建一个新会话。

SQLAlchemy 介绍

我们再继续看 _get_app_model_config() 方法的实现:

def _get_app_model_config(self, app_model: App, conversation: Optional[Conversation] = None) -> AppModelConfig:
  # 获取应用模型配置
  # 支持两种获取方式:会话配置或应用默认配置
  if conversation:
    # 1. 从特定会话获取配置
    stmt = select(AppModelConfig).where(
      AppModelConfig.id == conversation.app_model_config_id,
      AppModelConfig.app_id == app_model.id
    )
    app_model_config = db.session.scalar(stmt)
  else:
    # 2. 从应用默认配置获取(属性访问器)
    app_model_config = app_model.app_model_config

  return app_model_config

这里使用了 SQLAlchemy 访问数据库。

SQLAlchemy 是 Python 中最流行的 ORM(对象关系映射)工具之一,它提供了一套高层 API 用于操作关系型数据库,同时也支持底层的 SQL 表达式操作。它的核心优势在于将 Python 对象与数据库表结构进行映射,让开发者可以用面向对象的方式操作数据库,而不必直接编写复杂的 SQL 语句。它的主要特点有:

  1. ORM 功能:允许开发者定义 Python 类作为数据库表的映射,通过操作类实例来实现对数据库的增删改查
  2. SQL 表达式语言:提供了一种灵活的方式构建 SQL 语句,既保留了 SQL 的表达能力,又具备 Python 代码的可读性和可维护性
  3. 支持多种数据库:兼容 PostgreSQL、MySQL、SQLite、Oracle、Microsoft SQL Server 等主流数据库,且操作接口统一
  4. 事务支持:内置事务管理机制,确保数据库操作的原子性、一致性、隔离性和持久性(ACID)
  5. 连接池管理:自动管理数据库连接池,优化数据库连接的创建和释放,提升性能

以下是一个使用 SQLAlchemy ORM 操作 SQLite 数据库的简单示例:

from sqlalchemy import create_engine, select, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 创建基类
Base = declarative_base()

# 定义数据模型(映射到数据库表)
class User(Base):
  __tablename__ = 'users'  # 表名
  
  id = Column(Integer, primary_key=True)
  name = Column(String(50), nullable=False)
  age = Column(Integer)

# 创建数据库引擎(SQLite 数据库)
engine = create_engine('sqlite:///example.db')

# 创建所有表(根据定义的模型)
Base.metadata.create_all(engine)

# 创建会话工厂
Session = sessionmaker(bind=engine)
session = Session()

# 添加数据
new_user = User(name='Alice', age=30)
session.add(new_user)
new_user = User(name='Bob', age=31)
session.add(new_user)
session.commit()

# 查询数据(1.x 语法)
users = session.query(User).all()
for user in users:
  print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")

# 查询数据(2.0 语法)
stmt = select(User).where(
  User.name == "Alice",
)
user = session.scalar(stmt)
print(f"ID: {user.id}, Name: {user.name}, Age: {user.age}")

# 关闭会话
session.close()

这段代码非常简单,主要是了解下 SQLAlchemy 的几个核心概念:

  • Engine:数据库连接的核心,负责管理数据库连接池和执行 SQL 语句
  • Session:用于操作数据库的会话对象,类似数据库事务的上下文
  • Declarative Base:所有 ORM 模型类的基类,通过继承它可以定义数据库表结构
  • MetaData:用于描述数据库 schema(表、列、约束等)的元数据容器

SQLAlchemy 平衡了高层 ORM 的便捷性和底层 SQL 的灵活性,是 Python 后端开发中处理数据库的重要工具,广泛应用于 Flask、Django 等 Web 框架中。

查询语法

SQLAlchemy 2.0 引入了全新的现代查询语法,更加直观且符合 Python 风格。这种新语法将核心查询操作统一到了 select() 等函数中,并提供了更流畅的链式调用体验。

在 Dify 中使用了 SQLAlchemy 2.0 查询语法:

stmt = select(AppModelConfig).where(
  AppModelConfig.id == config_id,
  AppModelConfig.app_id == app_id
)
app_model_config = db.session.scalar(stmt)

不过也有不少地方使用的是 1.x 传统语法,比如根据应用获取默认配置:

@property
def app_model_config(self):
  if self.app_model_config_id:
    return db.session.query(AppModelConfig).where(
      AppModelConfig.id == self.app_model_config_id
    ).first()
  return None

SQLAlchemy 2.0 的现代查询语法更加直观,将所有查询操作都统一到了函数式的 API 中,避免了旧版本中 Query 对象与核心表达式之间的不一致性。这种新语法也更好地支持类型提示,提高了代码的可维护性和开发效率。大家在阅读源码时注意区分。

ext_database 扩展模块

Dify 通过 ext_database 扩展模块(位于 extensions/ext_database.py 文件)管理数据库连接:

def init_app(app: DifyApp):
  db.init_app(app)                # 初始化 Flask-SQLAlchemy
  _setup_gevent_compatibility()   # 设置 Gevent 兼容性

其中 dbFlask-SQLAlchemy 实例:

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import MetaData

# PostgreSQL 索引命名约定
POSTGRES_INDEXES_NAMING_CONVENTION = {
  "ix": "%(column_0_label)s_idx",                    # 普通索引
  "uq": "%(table_name)s_%(column_0_name)s_key",      # 唯一约束
  "ck": "%(table_name)s_%(constraint_name)s_check",  # 检查约束
  "fk": "%(table_name)s_%(column_0_name)s_fkey",     # 外键约束
  "pk": "%(table_name)s_pkey",                       # 主键约束
}

metadata = MetaData(naming_convention=POSTGRES_INDEXES_NAMING_CONVENTION)
db = SQLAlchemy(metadata=metadata)

db.init_app() 则是用于初始化 Flask-SQLAlchemy 扩展,它有两个重要的配置参数:

@computed_field  # type: ignore[misc]
@property
def SQLALCHEMY_DATABASE_URI(self) -> str:
  db_extras = (
    f"{self.DB_EXTRAS}&client_encoding={self.DB_CHARSET}" if self.DB_CHARSET else self.DB_EXTRAS
  ).strip("&")
  db_extras = f"?{db_extras}" if db_extras else ""
  return (
    f"{self.SQLALCHEMY_DATABASE_URI_SCHEME}://"
    f"{quote_plus(self.DB_USERNAME)}:{quote_plus(self.DB_PASSWORD)}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_DATABASE}"
    f"{db_extras}"
  )

@computed_field  # type: ignore[misc]
@property
def SQLALCHEMY_ENGINE_OPTIONS(self) -> dict[str, Any]:
  # Parse DB_EXTRAS for 'options'
  db_extras_dict = dict(parse_qsl(self.DB_EXTRAS))
  options = db_extras_dict.get("options", "")
  # Always include timezone
  timezone_opt = "-c timezone=UTC"
  if options:
    # Merge user options and timezone
    merged_options = f"{options} {timezone_opt}"
  else:
    merged_options = timezone_opt

  connect_args = {"options": merged_options}

  return {
    "pool_size": self.SQLALCHEMY_POOL_SIZE,
    "max_overflow": self.SQLALCHEMY_MAX_OVERFLOW,
    "pool_recycle": self.SQLALCHEMY_POOL_RECYCLE,
    "pool_pre_ping": self.SQLALCHEMY_POOL_PRE_PING,
    "connect_args": connect_args,
    "pool_use_lifo": self.SQLALCHEMY_POOL_USE_LIFO,
    "pool_reset_on_return": None,
  }

分别是数据库连接和连接池配置,可以根据需要,在 .env 文件中通过环境变量来修改默认值。

覆盖模型配置

获取应用模型配置之后,紧接着处理调试模式下的配置覆盖:

# 覆盖模型配置(仅在调试模式下允许)
override_model_config_dict = None
if args.get("model_config"):
  if invoke_from != InvokeFrom.DEBUGGER:
    raise ValueError("Only in App debug mode can override model config")

  # 验证配置的有效性,设置默认值
  override_model_config_dict = CompletionAppConfigManager.config_validate(
    tenant_id=app_model.tenant_id, config=args.get("model_config", {})
  )

在正常情况下,会话接口的入参类似下面这样:

{
  "response_mode": "streaming",
  "conversation_id": "773bc365-5faa-4716-b11d-6e77fffd2639",
  "files": [],
  "query": "<用户问题>",
  "inputs": {},
  "parent_message_id": "9841bc16-0a8a-4fad-812d-ee5703cde868"
}

但是在调试模式,会多一个 model_config 参数:

{
  "response_mode": "streaming",
  "conversation_id": "773bc365-5faa-4716-b11d-6e77fffd2639",
  "files": [],
  "query": "<用户问题>",
  "inputs": {},
  "parent_message_id": "9841bc16-0a8a-4fad-812d-ee5703cde868",
  "model_config": {
    // 和 AppModelConfig 基本一致
  }
}

组装应用配置

最后,将应用模型、应用模型配置和覆盖模型配置转换为统一的 应用配置(app config) 对象:

app_config = CompletionAppConfigManager.get_app_config(
  app_model=app_model,
  app_model_config=app_model_config,
  override_config_dict=override_model_config_dict
)

各个应用类型都有自己的配置类。所有简单 UI 的应用配置都继承自 EasyUIBasedAppConfig

# 文本生成应用配置  
class CompletionAppConfig(EasyUIBasedAppConfig):
  pass

# 聊天助手配置
class ChatAppConfig(EasyUIBasedAppConfig):
  pass

# 智能体配置
class AgentChatAppConfig(EasyUIBasedAppConfig):
  agent: Optional[AgentEntity] = None

而工作流和对话流的应用配置则是继承自 WorkflowUIBasedAppConfig

# 工作流配置
class WorkflowAppConfig(WorkflowUIBasedAppConfig):
  pass

# 对话流配置
class AdvancedChatAppConfig(WorkflowUIBasedAppConfig):
  pass

应用配置(AppConfig) 是经过解析和验证后的配置实体对象,为应用运行时提供类型安全的配置访问。Dify 为每种应用定义了各自的 运行器(AppRunner),应用配置组装完成后,就传给运行器开始执行。大致流程如下:

app-config.png

小结

今天,我们详细分析了 Dify 应用生成器的配置管理机制,从应用模型(app model)到应用模型配置(app model config),到覆盖模型配置(override model config),到最后统一的应用配置对象组装(app config)。在阅读代码的同时,我们穿插学习了 Python 中的方法重载机制,SQLAlchemy 的基本使用和查询语法相关的知识,以及基于 Flask-SQLAlchemy 的 ext_database 扩展模块。通过本文的学习,相信大家对 Dify 中配置相关的概念和实现原理都有了基本的了解。


深入 Dify 应用的会话流程之流式处理

在昨天的文章中,我们学习了 Dify 会话处理流程的核心服务 AppGenerateService,并通过分析它的 generate() 方法了解了 Dify 如何通过系统级和应用级两层限流策略来保障服务的稳定性。不过这个方法昨天只分析了一半,今天我们继续来看看剩下的部分。

Dify 会根据不同的应用类型走不同的应用生成器逻辑,通过源码可以发现,无论是文本生成、聊天对话、智能体、工作流还是对话流,都遵循着固定的流式处理流程。比如文本生成:

if app_model.mode == AppMode.COMPLETION.value:
  return rate_limit.generate(                        # 步骤1:限流包装
    CompletionAppGenerator.convert_to_event_stream(  # 步骤2:事件流转换
      CompletionAppGenerator().generate(             # 步骤3:具体应用逻辑
        app_model=app_model, user=user, args=args,
        invoke_from=invoke_from, streaming=streaming
      ),
    ),
    request_id=request_id,
  )

我们先不急着看具体的应用生成器,而是先看看外面的两层方法,学习 Dify 是如何统一处理流式和非流式响应的。

限流生成器

generate() 方法最后的 finally 语句里,有一点特别值得注意,这里有一个判断很奇怪,只有当请求是非流式时才调用 exit() 释放限流:

  finally:
    if not streaming:
      rate_limit.exit(request_id)

在传统的非流式处理中,在 finally 块中释放限流确实是可以的;但在流式处理中,当 generate() 方法返回时,请求实际上还没有结束! 客户端会持续从生成器中读取数据,直到生成器耗尽或发生异常。因此,我们需要一个机制来确保只有当生成器正常结束或异常时才释放限流,并且不影响原有生成器的任何功能。

我们可以看看 Dify 是如何处理流式请求的限流的,核心就在 rate_limit.generate() 方法里:

def generate(self, generator: Union[Generator[str, None, None], Mapping[str, Any]], request_id: str):
  if isinstance(generator, Mapping):
    # 非流式响应直接返回
    return generator
  else:
    # 流式响应使用 RateLimitGenerator 包装
    return RateLimitGenerator(
      rate_limit=self,
      generator=generator,
      request_id=request_id,
    )

这里首先通过 generator 的类型自动区分流式响应(Generator)和非流式响应(Mapping),对于非流式响应直接返回,而对于流式响应,使用 RateLimitGenerator 进行包装而不影响原有接口。它的实现如下:

class RateLimitGenerator:
  def __init__(self, rate_limit: RateLimit, generator: Generator[str, None, None], request_id: str):
    self.rate_limit = rate_limit
    self.generator = generator
    self.request_id = request_id
    self.closed = False

  def __iter__(self):
    return self

  def __next__(self):
    if self.closed:
      raise StopIteration
    try:
      # 正常情况下透传数据
      return next(self.generator)
    except Exception:
      # 异常时自动释放资源
      self.close()
      raise

  def close(self):
    if not self.closed:
      self.closed = True
      # 从 Redis 中移除活跃请求记录
      self.rate_limit.exit(self.request_id)
      if self.generator is not None and hasattr(self.generator, "close"):
        self.generator.close()

可以看到 RateLimitGenerator 是一个实现了 迭代器协议 的包装类,它最精妙的地方在于可以监听到生成器的异常(迭代器结束也会抛出异常),这样就可以在流式响应结束时释放限流。

迭代器和生成器

在 Python 中,迭代器(Iterator)生成器(Generator) 都是用于处理可迭代对象的工具,但它们在实现方式、功能和使用场景上有明显区别。

迭代器 是实现了迭代器协议的对象,需要手动实现 __iter__()__next__() 这两个方法:

  • __iter__() 方法返回迭代器自身
  • __next__() 方法返回下一个元素,没有元素时抛出 StopIteration 异常

生成器 是一种特殊的迭代器,无需手动实现迭代器协议,通过 yield 关键字创建。只要函数中包含 yield 语句即为生成器函数,调用生成器函数时返回生成器对象:

def func():
    yield "a"
    yield "b"
    yield "c"

print(type(func()))
# 输出为 <class 'generator'>

迭代器通常需要在初始化时准备好所有元素或定义元素的生成逻辑,可能占用较多内存(如存储一个大列表);它需要自行维护迭代状态(如当前位置索引),实现较复杂。而生成器采用 惰性计算(Lazy Evaluation),只在需要时生成下一个元素,不提前存储所有元素,内存效率更高,特别适合处理大数据流或无限序列;并且它可以自动维护状态,每次执行到 yield 时暂停并保存当前状态,下次调用时从暂停处继续执行。

从 Dify 的代码可以看出,底层的应用生成器返回的是生成器对象,rate_limit.generate() 方法将其封装成迭代器,供上层使用。当上层消费结束后,释放流式响应的限流。

事件流转换

我们接着看 convert_to_event_stream() 方法,它位于 BaseAppGenerator 类中,所有的应用生成器都继承自它。该方法负责将生成器输出转换为标准的 SSE 格式:

@classmethod
def convert_to_event_stream(cls, generator: Union[Mapping, Generator[Mapping | str, None, None]]):
  if isinstance(generator, dict):
    # 非流式响应直接返回
    return generator
  else:
    # 将流式响应转换为 SSE 格式
    def gen():
      for message in generator:
        if isinstance(message, Mapping | dict):
          # 结构化数据使用 JSON 格式,通过 data 字段传输
          yield f"data: {orjson_dumps(message)}\n\n"
        else:
          # 简单字符串使用 event 字段传输
          yield f"event: {message}\n\n"

    return gen()

这个实现有几个关键之处:

  1. 自动识别流式响应和非流式响应,非流式响应直接返回,流式响应转换为 SSE 格式
  2. 使用高性能的 orjson 库对 JSON 进行序列化
  3. 内部的 gen() 函数是另一个生成器,保持流式特性

SSE 协议介绍

SSE(Server-Sent Events,服务器发送事件) 是一种 基于 HTTP 的轻量级实时通信协议,旨在解决 服务器向客户端单向、持续推送数据 的场景需求。与 WebSocket 的双向通信不同,它专注于服务器到客户端的单向通信,无需客户端频繁轮询,同时保持了 HTTP 协议的简洁性和兼容性,是实时消息通知、数据更新、AI 对话等场景的高效解决方案。它的核心特性如下:

  1. 基于 HTTP 协议:SSE 复用 HTTP 协议的传输层,无需额外创建新协议(如 WebSocket 的 ws:// 协议),可直接使用现有的 HTTP 基础设施(如 Nginx 反向代理、CDN、防火墙规则),部署成本低;
  2. 单向通信:通信方向固定为服务器 → 客户端,仅服务器能主动向客户端推送数据;
  3. 自动重连机制:若 SSE 连接因网络波动断开,浏览器会自动尝试重新连接,无需开发者手动实现重连逻辑;在重新连接时,客户端可以通过 HTTP 请求头 Last-Event-ID 发送最后接收到的事件 ID 给服务器,服务器可以根据这个 ID 来恢复中断处的数据流,避免重复发送;
  4. 支持事件分类:服务器可推送不同类型的事件,客户端可按事件类型分别监听,实现消息的分类处理;
  5. 轻量级数据格式:SSE 推送的数据以 文本流(UTF-8 编码) 传输,格式简单(由 field: value 键值对组成),无需复杂的序列化和反序列化,解析成本低;

SSE 对服务器的响应头有特殊要求,必须通过 Content-Type: text/event-stream 声明响应体是 SSE 格式的事件流,否则客户端无法识别。服务器推送的每一条消息由多个 field: value 键值对组成,field 为字段名,value 为文本格式的数据。SSE 支持的核心字段有:

  • data - 消息的核心数据,为文本内容,可多行,每行以 data: 开头,客户端接收时会自动合并多行 data 为一个字符串;
  • event - 自定义消息的事件类型,客户端可按类型监听,比如 log notification 等,若不指定 event,客户端默认触发 onmessage
  • id - 消息的唯一标识(可选),用于断线重连时恢复数据,客户端会记录最后一条消息的 id,重连时通过 Last-Event-ID 请求头告知服务器,服务器可据此补发未接收的消息;
  • retry - 客户端重连间隔(单位:毫秒),覆盖默认的 3 秒,仅当连接断开时生效,客户端会按此间隔重连;
  • : - 注释行,客户端会忽略,用于保持连接活跃,若服务器长时间无数据推送,可定期发送注释行避免连接被网关或防火墙断开;

下面是一些消息的示例:

: 这是一条测试消息

data: 这是一条消息

data: 这是另一条
data: 多行消息

event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

综上,SSE 是一种 轻量、简洁、低成本 的实时单向通信方案,适合无需客户端交互的场景,如果需要双向通信,可选择 WebSocket 协议;另外,SSE 仅能传输 UTF-8 编码的文本数据,若需推送图片、音频等二进制内容,需先将其编码为 Base64 文本(会增加 33% 的带宽开销),此时 WebSocket 更优;还有一点要注意的是,若使用 Nginx 等反向代理 SSE 连接,需配置禁用缓存,避免数据被缓存后延迟推送,并保持连接持久化,避免代理提前关闭 SSE 流。示例 Nginx 配置:

location /api/sse-stream {
  proxy_pass http://localhost:3000;
  proxy_http_version 1.1;

  # 禁用连接关闭
  proxy_set_header Connection '';

  # 禁用缓冲(避免数据被缓存后延迟推送)
  proxy_buffering off; 
  proxy_cache off;

  # 禁用分块编码
  chunked_transfer_encoding off;
}

高性能 JSON 库 orjson

正如上一节所学,SSE 的输出是文本内容,因此我们需要将结构化的对象序列化为 JSON 字符串返回。在流式处理中,JSON 序列化的性能至关重要,因为每个流式消息都需要序列化,高并发场景下的性能差异会被放大。因此 Dify 选择 orjson 而不是 Python 内置的 json 库来做 JSON 的序列化。

orjson 是一个用 Rust 编写的高性能 JSON 库,专为 Python 设计。它是标准库 json 模块的直接替代品,但性能更优秀。根据官方的测试数据,orjson 的序列化速度比 json 快了十几倍:

orjson-vs-json.png

反序列化也有 2 到 6 倍的提升:

orjson-vs-json-2.png

我也写了一个简单的例子来测试它们之间的性能差异:

import json
import orjson
import time

# 测试数据,构造大对象
data = {
  "users": [
    {"id": i, "name": f"user_{i}", "email": f"user_{i}@example.com"}
    for i in range(10000)
  ]
}

# 性能对比
def benchmark_json():
  start = time.time()
  for _ in range(10000):
    json.dumps(data)
  return time.time() - start

def benchmark_orjson():
  start = time.time()
  for _ in range(10000):
    orjson.dumps(data).decode('utf-8')
  return time.time() - start

json_time = benchmark_json()
orjson_time = benchmark_orjson()
print(f"json 耗时:{json_time}")
print(f"orjson 耗时:{orjson_time}")
print(f"orjson 比 json 快了 {json_time / orjson_time:.2f} 倍")

我构造了一个大对象用于测试,测试结果如下:

json 耗时:36.9989538192749
orjson 耗时:3.828395128250122
orjson 比 json 快了 9.66 倍

可以看出,虽然没有官网宣称的十几倍,但 orjson 确实比 json 快了一个数量级。

响应格式化

我们准备好 SSE 的消息格式之后,最后一个环节是响应格式化,位于控制器层的 helper.compact_generate_response() 函数,它负责将前面处理的结果转换为标准的 HTTP 响应,也就是 Flask 的 Response 对象:

def compact_generate_response(response: Union[Mapping, Generator, RateLimitGenerator]) -> Response:
  if isinstance(response, dict):
    # 模式1:非流式响应 - 返回标准 JSON
    return Response(
      response=json.dumps(jsonable_encoder(response)),
      status=200,
      mimetype="application/json"
    )
  else:
    # 模式2:流式响应 - 返回 Server-Sent Events
    def generate() -> Generator:
      yield from response  # 使用 yield from 委托

    return Response(
      stream_with_context(generate()), # 注入请求上下文
      status=200,
      mimetype="text/event-stream"
    )

这个函数虽然简洁,但有两点值得关注:

  • 通过 yield from 实现生成器委托
  • 通过 stream_with_context 保持流式处理时的请求上下文

yield from 生成器委托

在 Python 中,yield from 是 Python 3.3 引入的语法,用于简化生成器中的迭代操作,主要作用是将一个 可迭代对象(iterable) 的元素逐个返回,相当于在生成器内部对这个可迭代对象进行了 for 循环并逐个 yield 其元素。

因此下面两个生成器函数的功能完全相同:

# 使用 for 循环 + yield
def generator1(iterable):
  for item in iterable:
    yield item

# 使用 yield from
def generator2(iterable):
  yield from iterable

yield from 更强大的功能是实现 生成器委托(generator delegation),即允许一个生成器将部分操作委托给另一个生成器或可迭代对象。比如当需要迭代嵌套的可迭代对象时,yield from 可以简化代码:

def flatten(nested_iterable):
  for item in nested_iterable:
    if isinstance(item, list):
      # 委托给 flatten 处理嵌套列表
      yield from flatten(item)
    else:
      yield item

# 测试
nested = [1, [2, 3], [4, [5, 6]]]
for num in flatten(nested):
  print(num, end=' ')  # 输出:1 2 3 4 5 6 

通过前面的章节我们知道,限流返回的 RateLimitGenerator 是一个迭代器,Dify 在这里使用 yield from 主要目的是将迭代器转换为生成器,为后面的 stream_with_context 注入请求上下文。

流式响应的上下文管理

在 Flask 应用中,请求上下文的生命周期管理 非常重要,在流式响应或多线程处理时,我们往往面临着上下文丢失的挑战:

def problematic_stream():
  def generate():
    # 这里可能无法访问 request、session、g 等对象
    user_id = request.user.id  # 可能报错!
    yield f"data: {{\"user_id\": \"{user_id}\"}}\n\n"

  return Response(generate(), mimetype="text/event-stream")

Flask 的 stream_with_context 解决了 Web 框架中流式响应的上下文管理问题:

  • 上下文传递:确保请求上下文在整个流的生命周期中可用
  • 资源访问:数据库、认证、日志等资源持续可用
  • 错误隔离:异常处理机制正常工作
from flask import stream_with_context

def proper_stream():
  def generate():
    # 现在可以安全访问请求上下文
    user_id = request.user.id  # 正常工作
    yield f"data: {{\"user_id\": \"{user_id}\"}}\n\n"

  return Response(
    stream_with_context(generate()),  # 保持上下文
    mimetype="text/event-stream"
  )

小结

至此,我们已经将 Dify 应用的会话接口外围都扫清了,重点剖析了 rate_limit.generate()convert_to_event_stream()compact_generate_response() 三个函数的实现原理。

让我们用一个完整的序列图来总结下整个流式处理的流程:

chat-flow.png

接下来我们将继续深入具体的应用生成器源码,看看五种应用的具体实现。


深入 Dify 应用的会话流程之限流策略

在前面的文章中,我们梳理了 Dify 应用的各种会话接口,了解了不同蓝图下五种应用类型的接口实现,以及各自的认证方式和用户类型。如果我们仔细阅读源码就会发现,所有应用的会话接口背后处理逻辑几乎一样:

def post(self, app_model: App, end_user: EndUser):
  response = AppGenerateService.generate(
    app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=streaming
  )
  return helper.compact_generate_response(response)

都是通过统一调用核心服务 AppGenerateServicegenerate() 方法,然后返回响应。今天我们就看看这个核心服务的具体实现,深入探索 Dify 的会话流程,解析其中涉及的关键技术要点。

统一的会话入口

正如上文提到的,无论是 Console、Web 还是 Service API 蓝图下的会话接口,最终都会调用到 AppGenerateServicegenerate() 方法。这个方法是 Dify 会话处理的核心枢纽,负责协调不同类型应用的生成流程。

让我们来看看这个方法的实现逻辑:

@classmethod
def generate(
  cls, 
  app_model: App, 
  user: Union[Account, EndUser], 
  args: Mapping[str, Any], 
  invoke_from: InvokeFrom, 
  streaming: bool = True,
):
  # 如果是 sandbox 订阅计划
  # 开启系统级限流:总请求数限流
  if dify_config.BILLING_ENABLED:
    limit_info = BillingService.get_info(app_model.tenant_id)
    if limit_info["subscription"]["plan"] == "sandbox":
      if cls.system_rate_limiter.is_rate_limited(app_model.tenant_id):
        raise InvokeRateLimitError(
          "Rate limit exceeded, please upgrade your plan "
          f"or your RPD was {dify_config.APP_DAILY_RATE_LIMIT} requests/day"
        )
      cls.system_rate_limiter.increment_rate_limit(app_model.tenant_id)

  # 应用级限流:并发请求数限流
  max_active_request = cls._get_max_active_requests(app_model)
  rate_limit = RateLimit(app_model.id, max_active_request)
  request_id = RateLimit.gen_request_key()
  try:
    request_id = rate_limit.enter(request_id)

    # 调用不同类型的应用生成器
    if app_model.mode == AppMode.COMPLETION.value:
      # 文本生成
    elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
      # 智能体
    elif app_model.mode == AppMode.CHAT.value:
      # 聊天助手
    elif app_model.mode == AppMode.ADVANCED_CHAT.value:
      # 对话流
    elif app_model.mode == AppMode.WORKFLOW.value:
      # 工作流
    else:
      raise ValueError(f"Invalid app mode {app_model.mode}")

  # 退出应用级限流
  except Exception:
    rate_limit.exit(request_id)
    raise
  finally:
    if not streaming:
      rate_limit.exit(request_id)

从这个核心方法可以看出,Dify 的会话处理流程包含以下几个关键步骤:

  1. 系统级限流:基于应用的租户进行全局限流控制,开启账单功能后才有这个功能
  2. 应用级限流:基于应用的并发请求数量限制
  3. 应用类型路由:根据应用模式选择对应的生成器,统一的流式响应处理

系统级限流

Dify 支持开启账单功能,在 .env 文件中添加:

BILLING_ENABLED=true

然后重启 API 服务,用户设置页面就会多一个 “账单” 菜单:

billing.png

用户默认是 sandbox 订阅计划,对话时就会经过系统级限流,用于限制某个租户下的应用一天的总请求数量。

注意 Dify 的限流不是针对请求的用户,而是以应用或应用的租户作为维度。

系统级限流器的定义如下:

system_rate_limiter = RateLimiter(
  "app_daily_rate_limiter", 
  dify_config.APP_DAILY_RATE_LIMIT, 
  86400
)

Dify 的限流基于 Redis 实现,这里的 app_daily_rate_limiter 是 Redis 键的前缀,APP_DAILY_RATE_LIMIT 表示最大请求数,默认是 5000 次,86400 是时间窗口,单位秒,表示一天 24 小时。

系统级限流的核心算法是滑动窗口,通过 Redis 的 Sorted Set 实现。每次用户会话时增加请求计数:

def increment_rate_limit(self, email: str):

  # 生成 Redis 键: "app_daily_rate_limiter:tenant_id"
  key = self._get_key(email)

  # 在 Sorted Set 中添加当前时间戳
  current_time = int(time.time())
  redis_client.zadd(key, {current_time: current_time})

  # 设置 key 过期时间,为时间窗口的 2 倍,防止数据残留
  redis_client.expire(key, self.time_window * 2)

实际上就是使用 ZADD 命令将当前时间戳添加到 Sorted Set 中,key 为 app_daily_rate_limiter:tenant_id,可按租户隔离限流,同时为 key 设置一个过期时间,防止数据残留。限流的校验逻辑如下:

def is_rate_limited(self, email: str) -> bool:

  # 生成 Redis 键: "app_daily_rate_limiter:tenant_id"
  key = self._get_key(email)  
  
  # 清除时间窗口外的记录
  current_time = int(time.time())
  window_start_time = current_time - self.time_window
  redis_client.zremrangebyscore(key, "-inf", window_start_time)

  # 统计当前时间窗口内的请求数
  attempts = redis_client.zcard(key)

  # 判断是否超限
  if attempts and int(attempts) >= self.max_attempts:
    return True
  return False

这里比较巧妙的通过 Sorted Set 的 ZREMRANGEBYSCORE 命令,由于存储的分数都是时间戳,因此很容易根据时间清除时间窗口外的记录,然后再通过 ZCARD 命令获取集合大小,也就是当前时间窗口内的请求数,判断是否超出限制。

这是一个经典的分布式限流实现,使用 Redis 的 Sorted Set 实现了 24 小时的滑动窗口限流,每个请求都有精确的时间戳记录,既保证了精确性又具备良好的性能。

学习 Sorted Set 数据结构

Sorted Set(有序集合) 是 Redis 的核心数据结构之一,结合了 Set 的唯一性和排序功能,具有如下特点:

  • 有序性: 元素按分数(score)排序
  • 唯一性: 成员(member)不能重复
  • 双重索引: 支持按分数和成员查找
  • 高效操作: 大部分操作时间复杂度为 O(log N)

它的常用命令包括:

添加元素

我们创建一个有序集合 leaderboard 表示分数排行榜,并添加三个人的数据:

> ZADD leaderboard 100 "alice" 200 "bob" 150 "charlie"
(integer) 3

获取元素

按排名获取,根据分数从小到大排序:

> ZRANGE leaderboard 0 2 WITHSCORES
1) "alice"
2) "100"
3) "charlie"
4) "150"
5) "bob"
6) "200"

按排名获取,根据分数从大到小排序:

> ZREVRANGE leaderboard 0 2 WITHSCORES
1) "bob"
2) "200"
3) "charlie"
4) "150"
5) "alice"
6) "100"

按分数范围查询

按分数范围获取:

> ZRANGEBYSCORE leaderboard 100 180 WITHSCORES
1) "alice"
2) "100"
3) "charlie"
4) "150"

统计计数

统计总元素数:

> ZCARD leaderboard
(integer) 3

统计分数范围内元素数:

> ZCOUNT leaderboard 100 180
(integer) 2

分数和排名

获取成员分数:

> ZSCORE leaderboard "alice"
"100"

获取成员排名(从0开始,小到大):

> ZRANK leaderboard "alice"
(integer) 0

获取成员排名(从0开始,大到小):

> ZREVRANK leaderboard "alice"
(integer) 2

修改操作

增加分数,下面的命令将 alice 分数增加 50:

> ZINCRBY leaderboard 50 "alice"
"150"

删除元素

删除指定成员:

> ZREM leaderboard "alice"

按排名删除,比如下面的命令表示删除排名最低的:

> ZREMRANGEBYRANK leaderboard 0 0

按分数范围删除:

> ZREMRANGEBYSCORE leaderboard 0 100

应用级限流

通过系统级限流之后,会话流程还会经过一层应用级限流。可以在 .env 文件中通过 APP_MAX_ACTIVE_REQUESTS 变量设置限额,默认是 0 不限制:

APP_MAX_ACTIVE_REQUESTS=0

Dify 为每次会话生成一个请求 ID,然后调用 enter() 方法:

def enter(self, request_id: Optional[str] = None) -> str:

  # 检查当前活跃请求数
  active_requests_count = redis_client.hlen(self.active_requests_key)
  if active_requests_count >= self.max_active_requests:
    raise AppInvokeQuotaExceededError(...)

  # 记录新的活跃请求
  redis_client.hset(self.active_requests_key, request_id, str(time.time()))
  return request_id

这里使用了 Redis 的 Hash(哈希表) 数据结构来记录活跃请求,每次会话时,使用 HSET 命令将请求 ID 和当前时间存入 active_requests_key,这个键的格式是 dify:rate_limit:{}:active_requests,如果这个 Hash 的长度超出 max_active_requests 则抛出额度超限的异常。

当请求结束或异常,通过 exit() 方法将请求从 Redis 中移除:

def exit(self, request_id: str):

  # 从活跃请求集合中移除
  redis_client.hdel(self.active_requests_key, request_id)

可以看出,应用级限流采用并发请求数控制模式,与系统级的滑动窗口不同,它控制的是 同时进行的活跃请求数量,防止某个应用同一时间请求数过大。

Dify 的 ext_redis 扩展

这里还有一个知识点值得一提,在限流的代码里,Dify 通过 redis_client 执行 Redis 操作:

from extensions.ext_redis import redis_client

我们前面介绍过 Dify 的扩展系统,这个变量就是在 ext_redis 扩展中初始化的:

def init_app(app: DifyApp):
  
  global redis_client

  # 哨兵模式
  if dify_config.REDIS_USE_SENTINEL:
    redis_params = _get_base_redis_params()
    client = _create_sentinel_client(redis_params)
  # 集群模式
  elif dify_config.REDIS_USE_CLUSTERS:
    client = _create_cluster_client()
  # 单例模式
  else:
    redis_params = _get_base_redis_params()
    client = _create_standalone_client(redis_params)

  # 初始化 redis_client
  redis_client.initialize(client)
  app.extensions["redis"] = redis_client

从初始化代码可以看出,Dify 支持 哨兵集群单例 三种 Redis 模式,这种统一的处理方式很值得我们学习和借鉴。

客户端缓存

此外,在 Reids 的配置参数中,还有一点比较有意思:

def _get_cache_configuration() -> CacheConfig | None:

  # 是否开启客户端缓存
  if not dify_config.REDIS_ENABLE_CLIENT_SIDE_CACHE:
    return None
  # 必须是 RESP3 协议
  resp_protocol = dify_config.REDIS_SERIALIZATION_PROTOCOL
  if resp_protocol < 3:
    raise ValueError("Client side cache is only supported in RESP3")
  return CacheConfig()

def _get_base_redis_params() -> dict[str, Any]:
  return {
    "username": dify_config.REDIS_USERNAME,
    "password": dify_config.REDIS_PASSWORD or None,
    "db": dify_config.REDIS_DB,
    "encoding": "utf-8",
    "encoding_errors": "strict",
    "decode_responses": False,
    # 协议
    "protocol": dify_config.REDIS_SERIALIZATION_PROTOCOL,
    # 缓存配置
    "cache_config": _get_cache_configuration(),
  }

这里使用了 Redis 的 客户端缓存(Client-side caching) 特性,这是从 Redis 6.0 开始引入的一个功能特性,允许客户端在本地缓存从服务器读取的数据,具有如下特性:

  • 本地缓存: 数据存储在客户端内存中
  • 自动失效: 服务器数据变化时自动通知客户端
  • 透明操作: 对应用代码透明,自动命中缓存
  • 性能提升: 减少网络往返,提高读取性能

可以在 .env 文件中通过 REDIS_ENABLE_CLIENT_SIDE_CACHE 配置开启:

REDIS_ENABLE_CLIENT_SIDE_CACHE=true
REDIS_SERIALIZATION_PROTOCOL=3

注意开启这个功能需要 Redis 支持 RESP3 协议。RESP(REdis Serialization Protocol) 是 Redis 的通信协议,目前有 RESP2RESP3 两个版本:RESP2 协议比较老,后面大概率是要废弃的,它只支持简单的字符串、整数、大字符串、数组这些数据类型;而 RESP3 是新一代 Redis 通信协议,支持更丰富的数据类型,支持发布订阅模式,并支持服务器推送能力,这也是实现客户端缓存的关键。

开启客户端缓存后,工作流程如下:

  1. 客户端首次读取时,从服务器获取,并存储到本地缓存
  2. 后续读取时,直接从本地缓存返回,无网络开销,大大提升程序的读取性能
  3. 当服务器缓存发生变化时,会向客户端发送缓存失效通知,客户端清除本地缓存
  4. 下次读取时,客户端会重新从服务器获取,更新本地缓存

redis-client-side-cache.png

客户端缓存非常适合频繁读取且变化较少的场景,比如应用的配置、用户的权限、模型的参数等。通过该特性,可以大幅减少网络延迟,降低 Redis 服务器的压力,提高应用性能。但由于客户端需要额外的内存存储缓存,因此在使用时,需要特别关注客户端的内存使用情况,

小结

我们今天开始深入分析 Dify 的会话处理流程,了解了所有类型的应用都是通过 AppGenerateService.generate() 方法进行统一处理的,并经过了两层限流策略:

  • 系统级限流:基于 Redis Sorted Set 实现的滑动窗口算法,按租户维度限制每日总请求数
  • 应用级限流:基于 Redis Hash 实现的并发控制,限制应用同时活跃的请求数量

两者对比如下:

特性系统级限流应用级限流
算法滑动窗口并发控制
维度租户ID应用ID
限制24小时请求总数同时活跃请求数
数据结构Redis ZSETRedis Hash
清理机制时间窗口自动清理请求结束时清理

在学习过程中,我们看到 Dify 应用了大量的 Redis 技术:

  • Sorted Set:用于实现精确的滑动窗口限流,巧妙利用时间戳作为分数进行时间窗口管理
  • Hash:用于追踪活跃请求,实现并发数控制
  • 客户端初始化:支持多种 Redis 部署模式(单例、哨兵、集群),使用统一的客户端管理 Redis 连接
  • 客户端缓存:基于 RESP3 协议的服务器推送特性,适合读多写少的场景

这些内容都很值得我们在构建高并发系统时学习和借鉴。在下一篇文章中,我们将继续深入探讨 Dify 应用的会话流程,了解每种应用生成器的执行流程。


梳理 Dify 应用的会话接口

在前面的文章中,我们学习了 Dify 的代码结构和路由系统,了解了 Dify 是如何通过 Flask Blueprint 和 Flask-RESTX 来组织其复杂的 API 架构的。今天,我们将深入探索 Dify 应用的会话 API 接口,分析其具体的实现细节,并结合我们之前学习的内容,看看不同蓝图下的实现以及各自的区别。

五种应用 + 三大蓝图

我们知道,Dify 有五种不同的应用类型:

class AppMode(StrEnum):
  # 聊天助手
  CHAT = "chat"
  # 文本生成
  COMPLETION = "completion"
  # 智能体
  AGENT_CHAT = "agent-chat"
  # 工作流
  WORKFLOW = "workflow"
  # 对话流
  ADVANCED_CHAT = "advanced-chat"

不同的应用类型对应不同的会话接口,而根据应用的调用方式,这些会话接口又会出现在不同的蓝图下。因此 Dify 的会话接口有很多,如果第一次接触源码的话会很懵。会话接口主要分布在三个蓝图下:

  • Console 蓝图:从管理控制台调用,又分为从工作室的预览调试和探索的已发布应用两个地方调用,接口统一以 /console/api 为前缀
  • Web 蓝图:从前端的 Web 应用调用,Dify 会为每个应用生成一个公开可访问的页面,这个页面的调用统一以 /api 为前缀
  • Service API 蓝图:第三方通过 HTTP API 或 SDK 集成调用,接口以 /v1 为前缀

不同蓝图下虽然接口各异,但都实现了相同的核心功能,它们只是在认证方式和用户类型上有所区别,为不同的使用场景提供最合适的调用方式。下面就来对每个蓝图下的会话接口逐一分析。

Console 蓝图

当我们在 “工作室” 的预览页面对应用进行调试时:

apps.png

调用下面这些接口:

# 文本生成
api.add_resource(
  CompletionMessageApi, 
  "/apps/<uuid:app_id>/completion-messages"
)
# 聊天助手 或 Agent 应用
api.add_resource(
  ChatMessageApi, 
  "/apps/<uuid:app_id>/chat-messages"
)
# 对话流
api.add_resource(
  AdvancedChatDraftWorkflowRunApi, 
  "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run"
)
# 工作流
api.add_resource(
  DraftWorkflowRunApi, 
  "/apps/<uuid:app_id>/workflows/draft/run"
)

我们还可以在 “探索” 中打开应用:

explore.png

此时调用接口如下:

# 文本生成
api.add_resource(
  CompletionApi, 
  "/installed-apps/<uuid:installed_app_id>/completion-messages", 
  endpoint="installed_app_completion"
)
# 聊天助手、Agent 应用 或 对话流
api.add_resource(
  ChatApi, 
  "/installed-apps/<uuid:installed_app_id>/chat-messages", 
  endpoint="installed_app_chat_completion"
)
# 工作流
api.add_resource(
  InstalledAppWorkflowRunApi, 
  "/installed-apps/<uuid:installed_app_id>/workflows/run"
)

Console 蓝图下的对话接口最大特点是 URL 上带有 app_id 参数,接口直接对该应用发起会话即可。另外,这个蓝图下的接口主要面向平台用户,需要对用户进行登录认证,所以接口的实现上都带有 @login_required 装饰器:

class ChatMessageApi(Resource):
  @setup_required
  @login_required
  @account_initialization_required
  @get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT])
  def post(self, app_model):
    # ...

Web 蓝图

在应用的配置页,我们可以将应用发布成 Web 应用:

web.png

该功能默认开启,会自动生成一个能够在互联网上公开访问的网址。Web 应用的页面适配了不同尺寸的设备,包括 PC,平板和手机,使用者无需登陆,即可使用。当用户通过 Web 应用页面和助手对话时:

web-2.png

调用的接口如下:

# 文本生成
api.add_resource(CompletionApi, "/completion-messages")
# 聊天助手、Agent 应用 或 对话流
api.add_resource(ChatApi, "/chat-messages")
# 工作流
api.add_resource(WorkflowRunApi, "/workflows/run")

Dify 还支持将你的 Web 应用嵌入到业务网站中,你可以在几分钟内制作具有业务数据的官网 AI 客服、业务知识问答等应用:

web-iframe.png

Dify 提供了 3 种不同的嵌入方式,分别为:

  • <iframe> 标签方式:将 <iframe> 代码复制到你网站用于显示 AI 应用的标签中,如 <div><section> 等标签
  • <script> 标签方式:将 <script> 代码复制到你网站 <head><body> 标签中,这会在你的网站上显示一个 Dify 聊天机器人气泡按钮
  • 安装 Dify Chrome 浏览器扩展方式:前往 Chrome 应用商店,安装 Dify Chatbot 扩展即可

Web 应用的嵌入有不少高级技巧,比如自定义聊天机器人的气泡按钮,向 Web 应用传值,甚至可以基于前端模板对 Web 应用进行二次开发。

当用户通过这些方式访问我们的应用时,都统一使用 Web 蓝图下的接口。这个蓝图下的会话接口有着完善的接口文档:

class ChatApi(WebApiResource):
  @api.doc("Create Chat Message")
  @api.doc(description="Create a chat message for conversational applications.")
  @api.doc(
    params={
      "inputs": {"description": "Input variables for the chat", "type": "object", "required": True},
      "query": {"description": "User query/message", "type": "string", "required": True},
      "files": {"description": "Files to be processed", "type": "array", "required": False},
      "response_mode": {
        "description": "Response mode: blocking or streaming",
        "type": "string",
        "enum": ["blocking", "streaming"],
        "required": False,
      },
      "conversation_id": {"description": "Conversation UUID", "type": "string", "required": False},
      "parent_message_id": {"description": "Parent message UUID", "type": "string", "required": False},
      "retriever_from": {"description": "Source of retriever", "type": "string", "required": False},
    }
  )
  @api.doc(
    responses={
      200: "Success",
      400: "Bad Request",
      401: "Unauthorized",
      403: "Forbidden",
      404: "App Not Found",
      500: "Internal Server Error",
    }
  )
  def post(self, app_model, end_user):
    # ...

值得注意的是,这个接口没有 app_id 参数,那么 Dify 是怎么区分用户是在和哪个应用进行对话呢?答案在于 WebApiResource 这个父类:

class WebApiResource(Resource):
  method_decorators = [validate_jwt_token]

WebApiResource 继承自 Flask-RESTX 的 Resource 类,method_decorators 是该类的一个属性,用于为资源类中的所有 HTTP 方法自动应用装饰器。Dify 在这里定义了一个 validate_jwt_token 装饰器,对 Web 蓝图下的接口验证 JWT 令牌,从而得到应用信息。

Web 应用的访问地址一般是 http://ip:port/chat/{app_code},当用户第一次访问 Web 应用时,会调用 api/passport 接口,通过 HTTP 请求头 X-App-Code 创建一个 JWT 令牌,后续对话时都会带上这个令牌。

Service API 蓝图

Dify 基于 后端即服务 理念为所有应用提供了 API 接口,为应用开发者带来了诸多便利。可以在应用配置页开启:

api.png

Dify 为每个应用提供了详尽的接口文档:

api-2.png

同时 Dify 还提供了多语言的 SDK,包括 PythonPHPNode.jsJavaGoRuby,方便开发者在不同技术栈中集成 Dify 的能力。

通过 API 访问的接口如下:

# 文本生成
@service_api_ns.route("/completion-messages")
class CompletionApi(Resource):

# 聊天助手、Agent 应用 或 对话流
@service_api_ns.route("/chat-messages")
class ChatApi(Resource):

# 工作流
@service_api_ns.route("/workflows/run")
class WorkflowRunApi(Resource):

不过要访问这些 API 接口,我们得先创建 API 密钥,可以点击文档页面右上角的 “API 密钥” 按钮创建。然后按照接口文档,发送请求如下:

curl -X POST 'http://localhost:5001/v1/chat-messages' \
  --header 'Authorization: Bearer app-cSZlATg5dglJSxVLLaJoXkgX' \
  --header 'Content-Type: application/json' \
  --data-raw '{
    "inputs": {},
    "query": "本是同根生,相煎何太急",
    "response_mode": "blocking",
    "user": "abc-123"
  }'

得到应用的响应如下:

{
  "event": "message",
  "task_id": "e705b358-5a75-4fc4-9dfe-c8bbc8095be7",
  "id": "005f66cf-08eb-4aac-9f19-3a6255ecdb66",
  "message_id": "005f66cf-08eb-4aac-9f19-3a6255ecdb66",
  "conversation_id": "5cd66e81-05fd-4a11-8596-586258653213",
  "mode": "chat",
  "answer": "Being born from the same root, why rush to fry each other to death?",
  "metadata": {
    "annotation_reply": null,
    "retriever_resources": [],
    "usage": {
      "prompt_tokens": 63,
      "prompt_unit_price": "5",
      "prompt_price_unit": "0.000001",
      "prompt_price": "0.000315",
      "completion_tokens": 17,
      "completion_unit_price": "15",
      "completion_price_unit": "0.000001",
      "completion_price": "0.000255",
      "total_tokens": 80,
      "total_price": "0.00057",
      "currency": "USD",
      "latency": 2.1960244579822756
    }
  }
}

很明显,Service API 蓝图下的接口要实现 API 密钥的认证:

class ChatApi(Resource):
  @service_api_ns.expect(chat_parser)
  @service_api_ns.doc("create_chat_message")
  @service_api_ns.doc(description="Send a message in a chat conversation")
  @service_api_ns.doc(
    responses={
      200: "Message sent successfully",
      400: "Bad request - invalid parameters or workflow issues",
      401: "Unauthorized - invalid API token",
      404: "Conversation or workflow not found",
      429: "Rate limit exceeded",
      500: "Internal server error",
    }
  )
  @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
  def post(self, app_model: App, end_user: EndUser):
    # ...

当我们创建 API 密钥时,Dify 会将 API 密钥和 app_id 保存到数据库中,然后用户调用 API 接口时,必须传入 API 密钥,接口通过 @validate_app_token 从数据库中获取 API 密钥对应的应用信息。

小结

今天我们主要对 Dify 的会话接口做了一个简单的梳理,学习了 Dify 应用的不同访问方式以及各自蓝图下的接口定义,它们之间的差异对比如下:

蓝图认证方式装饰器用户类型
ConsoleSession@login_requiredAccount(开发者)
WebJWT TokenWebApiResource 基类处理EndUser(终端用户),来自浏览器
Service APIAPI Key@validate_app_tokenEndUser(终端用户),来自 API

在下一篇文章中,我们将继续研究会话接口的源码,来具体看下不同应用的会话是如何实现的。


学习 Dify 的路由系统

我们昨天学习了 Dify 的代码架构和三种启动模式,不过在应用启动过程中,我们并没有看到路由注册的相关代码,这里的关键就在于它模块化的扩展系统。Dify 通过 ext_blueprints 模块注册路由,使用 Flask 的 Blueprint 和 Flask-RESTX 的 Namespace 实现模块化的 API 路由管理。我们今天就来学习这部分内容。

Flask 框架

在深入分析 Dify 的路由系统之前,我们先简单了解一下 Flask 框架。Flask 是一个基于 Python 的轻量级 Web 应用框架,它设计简洁、易于扩展,特别适合构建 API 服务。

flask.png

Flask 的主要优势包括:

  • 简单易用:代码简洁,学习曲线平缓
  • 灵活扩展:可以根据需要选择和集成各种扩展
  • 成熟稳定:经过多年发展,社区生态丰富
  • 适合 API:非常适合构建 RESTful API 服务

Flask 的设计哲学是 微框架,它只提供 Web 开发的核心功能,其他功能通过扩展来实现。在 Dify 项目中采用了大量的 Flask 扩展,比如:

  • Flask-RESTX:用于构建 RESTful API 和自动生成文档
  • Flask-SQLAlchemy:ORM 数据库操作
  • Flask-Migrate:数据库迁移管理
  • Flask-Login:用户认证和会话管理
  • Flask-CORS:跨域资源共享支持
  • Flask-Compress:自动压缩 HTTP 响应内容
  • Flask-orjson:使用 orjson(一个高性能的 JSON 库)替换 Flask 默认的 JSON 编码器和解码器

Flask 基本使用

下面是 Flask 框架的基本用法:

# 创建应用
from flask import Flask
app = Flask(__name__)

# 注册路由
@app.route('/user/profile')
def user_profile():
  return 'user profile'

@app.route('/user/settings')
def user_settings():
  return 'user settings'

if __name__ == "__main__":
  app.run(host="0.0.0.0", port=6001)

寥寥几行代码就可以创建一个 RESTful API 服务。可以看到,这里使用了 Flask 最原始的装饰器方式 @app.route 来注册路由,Flask 也支持手动注册路由,这种方式更灵活:

# 手动注册路由
app.add_url_rule('/user/prifile', 'user_profile', user_profile)
app.add_url_rule('/user/settings', 'user_settings', user_settings)

使用 Blueprint 注册路由

不过上述两种方式都存在着明显的缺点,所有路由都注册到同一个 app 对象上,难以按功能模块分离,导致代码组织混乱;如果是大型项目,路由会分散在各处,而且无法统一为一组相关路由设置 URL 前缀,维护起来很困难。

于是 Flask 引入了 Blueprint 功能,也被称为 蓝图,允许你将相关的路由、视图函数、模板和静态文件组织在一起,形成一个可重用的组件。通过模块化和代码隔离,很好地解决了这些问题。

下面演示下蓝图的基本用法,首先我们创建一个独立的 user.py 文件,内容如下:

# 创建蓝图
from flask import Blueprint
user_bp = Blueprint('user', __name__, url_prefix='/user')

# 定义路由
@user_bp.route('/profile')
def profile():
  return 'user profile'

@user_bp.route('/settings')
def settings():
  return 'user settings'

在这个文件中,我们创建了一个名为 user 的蓝图,并约定了统一的 URL 前缀,然后通过装饰器 @user_bp.route 定义路由。接着我们创建主程序:

# 创建应用
from flask import Flask
app = Flask(__name__)

# 注册蓝图
from user import user_bp
app.register_blueprint(user_bp)

if __name__ == "__main__":
  app.run(host="0.0.0.0", port=6001)

在主程序中,我们导入刚刚的蓝图,并通过 app.register_blueprint() 注册蓝图,这样蓝图中定义的路由就注册好了。

Flask 的 Blueprint 功能允许我们将大型应用拆分成多个模块,每个模块负责处理特定的功能,优势如下:

  • 模块化:将大型应用拆分成多个模块
  • URL 前缀管理:为一组路由添加统一的 URL 前缀
  • 代码复用:相同功能可以在不同应用中重复使用
  • 团队协作:不同开发者可以独立开发不同的 Blueprint

Dify 充分利用了这一特性来组织其复杂的 API 结构。

Dify 的 ext_blueprints 扩展

在 Dify 中,所有的蓝图都在 api/extensions/ext_blueprints.py 文件中统一注册:

def init_app(app: DifyApp):
    
  # 导入所有蓝图
  from controllers.console import bp as console_app_bp     # 管理控制台
  from controllers.web import bp as web_bp                 # Web 应用
  from controllers.service_api import bp as service_api_bp # 服务 API
  from controllers.files import bp as files_bp             # 文件操作
  from controllers.inner_api import bp as inner_api_bp     # 内部 API
  from controllers.mcp import bp as mcp_bp                 # MCP 协议

  # 为不同蓝图配置 CORS 策略
  from flask_cors import CORS
  CORS(service_api_bp,
     allow_headers=["Content-Type", "Authorization", "X-App-Code"])
  CORS(web_bp,
     resources={r"/*": {"origins": dify_config.WEB_API_CORS_ALLOW_ORIGINS}},
     supports_credentials=True)
  CORS(console_app_bp,
     resources={r"/*": {"origins": dify_config.CONSOLE_CORS_ALLOW_ORIGINS}},
     supports_credentials=True)
  
  # 注册所有蓝图到 Flask 应用
  app.register_blueprint(service_api_bp) # /v1/*
  app.register_blueprint(web_bp)         # /api/*
  app.register_blueprint(console_app_bp) # /console/api/*
  app.register_blueprint(files_bp)       # /files/*
  app.register_blueprint(inner_api_bp)   # /inner/api/*
  app.register_blueprint(mcp_bp)         # /mcp/*

可以看出,Dify 将 API 按照使用场景分成了几个主要的蓝图:

  • console:管理控制台的 API,用于应用管理、配置等
  • web:Web 端使用的 API,用于应用展示和交互
  • service_api:对外提供的服务 API,供第三方集成使用
  • files:文件上传下载相关的 API
  • inner_api:内部服务间通信的 API
  • mcp:MCP(Model Context Protocol)相关的 API

每个蓝图都对应自己的 URL 前缀:

dify-blueprints.png

而且 Dify 为不同蓝图配置了不同的 CORS 策略,比如 console 蓝图用于管理控制台 API,只允许管理控制台域名访问;web 蓝图用于前端应用 API,只允许配置的前端域名访问;service_api 蓝图用于第三方开发者 API,无 origins 限制,允许任何域名访问,并支持 X-App-Code 头,用于应用身份验证。这种分层的接口安全设计,同样得益于 Flask Blueprint 的模块化特性。

此外,不同的蓝图还使用了不同的认证策略:

Blueprint认证方式用途示例端点
consoleJWT管理后台操作/console/api/apps
webToken/Session前端应用调用/api/chat-messages
service_apiAPI Key第三方服务调用/v1/chat-messages
files多重认证文件上传下载/files/upload
inner_api内网限制服务间通信/inner/api/health

使用 Flask-RESTX 增强 API 开发体验

虽然 Flask 本身已经很适合构建 API,但 Dify 选择了 Flask-RESTX 这个扩展来进一步增强 API 开发体验。Flask-RESTX 是 Flask-RESTPlus 的社区维护版本,它在 Flask 的基础上提供了更多 RESTful API 开发的便利功能。

Flask-RESTX 的主要特性包括:

  • 自动 API 文档生成:通过装饰器自动生成 Swagger/OpenAPI 文档
  • 请求参数验证:内置参数验证和序列化功能
  • 命名空间管理:支持 API 版本控制和模块化组织
  • 响应模型定义:可以定义标准化的响应格式

使用 Api 注册路由

Flask-RESTX 在 Flask 的基础上,引入了 ResourceApi 的概念:Resource 用于定义 RESTful 资源,可以更方便地创建 RESTful 接口;而 Api 则是对 Flask 的 Blueprint 的扩展,专门用于添加 RESTful 资源类,它能够自动处理 HTTP 方法映射(GET、POST、PUT、DELETE 等),支持自动生成 API 文档和 Swagger UI,支持参数的解析和验证。下面是使用 Api 注册路由的示例代码:

from flask import Blueprint
from flask_restx import Api, Resource

# 创建 Blueprint 和 API
bp = Blueprint('api', __name__, url_prefix='/api')
api = Api(bp, doc='/docs')  # 启用 Swagger UI

# 定义 API 资源
class UserResource(Resource):
  
  def get(self):
    # 获取用户列表
    return []

  def post(self):
    # 创建新用户
    return {"message": "用户创建成功"}, 201

# 注册资源
api.add_resource(UserResource, '/users')

和直接在蓝图中添加路由不同,这里需要先定义 API 资源,它会自动生成资源对应的增删改查 RESTful 接口,然后通过 api.add_resource() 添加资源。而本质上 Api 还是对蓝图进行操作的,因此我们只要在主程序中通过 app.register_blueprint() 注册蓝图即可,和普通的蓝图用法一样。

使用 Namespace 管理路由

Flask-RESTX 的另一大特点是增加了 Namespace 的功能,可以对蓝图下的 API 端点进一步组织和分组。下面的示例演示了 Namespace 的基本用法:

from flask import Blueprint
from flask_restx import Api, Namespace, Resource, fields

# 创建 Blueprint 和 API
bp = Blueprint('api', __name__, url_prefix='/api')
api = Api(bp, doc='/docs')  # 启用 Swagger UI

# 创建命名空间
user_ns = Namespace('users', description='用户管理相关操作')

# 定义数据模型
user_model = user_ns.model('User', {
    'id': fields.Integer(description='用户ID'),
    'name': fields.String(required=True, description='用户名')
})

# 定义 API 资源
@user_ns.route('/')
class UserList(Resource):
    @user_ns.doc('get_users')
    @user_ns.marshal_list_with(user_model)
    def get(self):
        """获取用户列表"""
        return []

    @user_ns.doc('create_user')
    @user_ns.expect(user_model)
    def post(self):
        """创建新用户"""
        return {"message": "用户创建成功"}, 201

# 注册命名空间
api.add_namespace(user_ns, path='/users')

和上面 api.add_resource() 添加资源不同的是,这里是通过 api.add_namespace() 注册命名空间,Api 会将命名空间下的所有路由注册到蓝图中。主程序还是一样,通过 app.register_blueprint() 注册蓝图即可。

此外,我们还可以通过 @user_ns 对接口进行增强,比如定义数据模型,增加参数验证。访问 /api/docs 地址,可以查看 Swagger 页面:

api-docs.png

Dify 如何组织路由

我们上面已经了解到,Dify 将 API 分成了 consolewebservice_apifilesinner_apimcp 六个蓝图,所有的蓝图注册逻辑都位于 api/controllers 目录。让我们以 console 蓝图为例,看看它是如何组织的:

from flask import Blueprint
from libs.external_api import ExternalApi

# 创建 console Blueprint
bp = Blueprint("console", __name__, url_prefix="/console/api")
api = ExternalApi(bp)

# 文件相关 API
api.add_resource(FileApi, "/files/upload")
api.add_resource(FilePreviewApi, "/files/<uuid:file_id>/preview")

# 应用导入相关 API
api.add_resource(AppImportApi, "/apps/imports")
api.add_resource(AppImportConfirmApi, "/apps/imports/<string:import_id>/confirm")

# 导入各个子模块的控制器
from . import admin, apikey, extension, feature, ping, setup, version
# 导入应用相关的控制器
from .app import app, workflow, completion, conversation, message, model_config, statistic
# 导入认证相关的控制器
from .auth import login, oauth, forgot_password
# 导入数据集相关的控制器
from .datasets import datasets, datasets_document, hit_testing

从代码可以看出,console 蓝图都是通过 api.add_resource() 的方式注册路由的。我们再看看 service_api 蓝图:

from flask import Blueprint
from flask_restx import Namespace
from libs.external_api import ExternalApi

# 创建 service_api Blueprint
bp = Blueprint("service_api", __name__, url_prefix="/v1")

api = ExternalApi(
  bp,
  version="1.0",
  title="Service API",
  description="API for application services",
  doc="/docs",  # Enable Swagger UI at /v1/docs
)

# 创建 service_api Namespace
service_api_ns = Namespace("service_api", description="Service operations", path="/")

from . import index
# 导入应用相关的控制器
from .app import annotation, app, audio, completion, conversation, file, file_preview, message, site, workflow
# 导入数据集相关的控制器
from .dataset import dataset, document, hit_testing, metadata, segment, upload_file
# 导入模型相关的控制器
from .workspace import models

# 添加 Namespace
api.add_namespace(service_api_ns)

这里有点奇怪的是,service_apiconsole 不一样,它是通过 api.add_namespace() 方式注册路由的,而且如果你仔细阅读其他几个蓝图的代码会发现,所有的蓝图都使用了 Namespace 特性,唯独 console 是个例外,不清楚是历史遗留问题,还是设计如此,如果是设计如此,又是基于什么考虑的呢?有清楚的朋友,欢迎评论区交流~

小结

今天我们深入学习了 Dify 的路由系统。Dify 基于 Flask 的 Blueprint 功能,将复杂的 API 按照 consolewebservice_api 等不同场景进行模块化拆分。这种设计不仅使代码结构更加清晰,还允许为不同模块配置独立的 CORS 和认证策略,增强了系统的安全性。此外,它还借助 Flask-RESTX 扩展,通过 ApiResourceNamespace 等概念进一步优化了 API 的组织方式,并实现了 API 文档的自动生成和参数验证等功能。

明天我们将接着研究 Dify 的源码,来看看具体的接口实现,先从用户和助手对话时的后端逻辑开始。


学习 Dify 的代码结构

在前面的系列文章中,我们从实用的角度学习了 Dify 的部署方式、应用创建和各种应用类型的使用方法。今天,我们将深入 Dify 的源码,从技术架构的角度来理解这个 LLM 应用开发平台是如何构建的。

目录结构

首先,让我们从 Dify 的源码目录结构开始,了解整个项目的组织方式:

dify/
├── api/                    # 后端 API 服务 (Python Flask)
├── web/                    # 前端 Web 应用 (Next.js 15 + React 19)
├── sdks/                   # 多语言 SDK
│   ├── python-client/      # Python SDK
│   ├── nodejs-client/      # Node.js SDK
│   └── php-client/         # PHP SDK
├── dev/                    # 开发工具脚本
└── docker/                 # Docker 部署配置

从这个目录结构可以看出,Dify 采用了经典的前后端分离架构。后端 API 使用 Python Flask 框架提供 RESTful API 服务,前端 Web 使用 Next.js 构建现代化的用户界面,两者通过 HTTP API 进行通信。此外,Dify 还提供了多语言的 SDK,方便开发者在不同技术栈中集成 Dify 的能力。

后端架构

让我们继续深入后端 api/ 目录,这里是 Dify 的核心逻辑所在:

api/
├── app.py                 # 应用入口文件
├── app_factory.py         # Flask 应用工厂
├── dify_app.py            # 自定义 Flask 应用类
├── controllers/           # 控制器层 (路由处理)
├── services/              # 服务层 (业务逻辑)
├── models/                # 模型层 (数据模型)
├── repositories/          # 仓储层 (数据访问)
├── core/                  # 核心功能模块
├── extensions/            # Flask 扩展
├── libs/                  # 工具库
├── configs/               # 配置管理
├── tasks/                 # 异步任务
├── migrations/            # 数据库迁移
└── tests/                 # 测试代码

很显然 Dify 采用了分层的设计思想,将代码按照功能职责进行了清晰的分层:

  • 控制器层(Controllers):处理 HTTP 请求和响应,负责参数验证和路由分发;
  • 服务层(Services):包含核心业务逻辑,协调各个组件完成复杂的业务操作;
  • 仓储层(Repositories):封装数据访问逻辑,提供统一的数据操作接口;
  • 模型层(Models):定义数据模型和实体关系;

应用启动入口

其中 app.py 文件是 Dify 的启动入口,核心代码如下:

if is_db_command():
  from app_factory import create_migrations_app
  app = create_migrations_app()
else:
  from app_factory import create_app
  app = create_app()
  celery = app.extensions["celery"]

if __name__ == "__main__":
  app.run(host="0.0.0.0", port=5001)

Dify 应用有三种启动模式:

数据库迁移模式

在入门篇里我们学习过,首次运行 Dify 之前,需要运行下面的命令初始化数据库:

$ flask db upgrade

Dify 通过判断命令行参数是否包含 flask db 子命令,如果包含则调用 create_migrations_app() 执行数据库迁移逻辑。

这里使用了 Flask-SQLAlchemyFlask-Migrate 库,其核心是通过 SQLAlchemy 的迁移工具 Alembic 管理数据库结构的版本化变更。在软件开发过程中,我们经常会对数据模型进行修改,这时如果手动写 SQL 同步表结构,极有可能会导致不一致或误操作。而 Flask-Migrate 会自动跟踪数据库模型的变更、生成版本化迁移脚本、执行脚本更新数据库表结构,让数据迁移变得非常容易。

除了 flask db upgrade 命令,Flask-Migrate 还支持很多其他的 flask db 子命令,比如 initmigratedowngrade 等。

API 服务模式

正常情况下我们通过 flask run 运行程序时,Dify 会调用 create_app() 函数来创建 DifyApp 实例(其实就是 Flask 应用实例):

def create_app() -> DifyApp:

  # 1. 创建 Flask 应用并加载配置
  dify_app = DifyApp(__name__)
  dify_app.config.from_mapping(dify_config.model_dump())

  # 2. 初始化扩展系统
  initialize_extensions(app)

  return app

应用创建流程分为两个关键步骤:

  1. 创建 Flask 应用:初始化 DifyApp 实例,并加载配置
  2. 初始化扩展系统:按顺序加载所有功能扩展,这里是程序启动的关键

要注意的是 flask run 一般用于开发调试,在生产环境部署 Dify 时,建议使用 Gunicorn

$ gunicorn \
    --bind "0.0.0.0:5001" \
    --workers 1 \
    --worker-class gevent \
    --worker-connections 10 \
    --timeout 200 \
    app:app

flask run 是为开发阶段设计的轻量级工具,而 gunicorn 是为生产环境优化的专业 WSGI 应用服务器,专门解决生产场景下的高并发、稳定性和资源管理等问题。

gunicorn.png

Celery 任务模式

此外,Dify 支持通过 Celery 处理异步任务和定时任务,它利用 Redis 作为消息中间件,把一些耗时的操作(比如发送邮件、处理图片、数据分析等)放到后台去执行,而不会阻塞主程序的运行。

celery.png

我们可以执行下面的命令启动 Celery 的任务处理服务:

$ celery -A app.celery worker <其他参数>

或执行下面的命令启动 Celery 的定时任务调度器:

$ celery -A app.celery beat <其他参数>

这里的 -A 参数使用 Python 的模块导入机制,app.celery 指向 app.py 模块的 celery 变量,它其实是一个 Celery 应用,通过 ext_celery 扩展系统初始化的。

模块化的扩展系统

Dify 的扩展系统是其架构的一大亮点,通过模块化的方式组织各种功能:

def initialize_extensions(app: DifyApp):

  # 按顺序初始化所有扩展
  extensions = [
    ext_timezone,        # 时区设置
    ext_logging,         # 日志系统
    ext_warnings,        # 警告处理
    ext_import_modules,  # 模块导入
    ext_orjson,         # JSON 序列化
    ext_set_secretkey,   # 密钥设置
    ext_compress,        # 响应压缩
    ext_code_based_extension,  # 代码扩展
    ext_database,        # 数据库连接
    ext_app_metrics,     # 应用指标
    ext_migrate,         # 数据库迁移
    ext_redis,          # Redis 缓存
    ext_storage,        # 文件存储
    ext_celery,         # 异步任务队列
    ext_login,          # 登录认证
    ext_mail,           # 邮件服务
    ext_hosting_provider, # 托管提供商
    ext_sentry,         # 错误监控
    ext_proxy_fix,      # 代理修复
    ext_blueprints,     # 路由蓝图注册
    ext_commands,       # CLI 命令
    ext_otel,           # OpenTelemetry
    ext_request_logging, # 请求日志
  ]

  for ext in extensions:
    is_enabled = ext.is_enabled() if hasattr(ext, "is_enabled") else True
    if not is_enabled:
      continue  # 跳过禁用的扩展
    ext.init_app(app)  # 初始化扩展

扩展系统按照扩展之间的依赖关系顺序执行,确保正确初始化,并支持动态地启用或禁用扩展。

小结

通过对 Dify 代码结构的分析,我们可以看到:

  1. 前后端分离架构:后端使用 Python Flask 提供 API 服务,前端使用 Next.js 构建用户界面
  2. 分层设计思想:控制器、服务、仓储、模型层职责清晰,代码组织良好
  3. 灵活的启动模式:支持数据库迁移、API 服务和 Celery 任务三种运行模式
  4. 模块化扩展系统:通过扩展机制组织各种功能,支持按需加载和禁用

不过细心的读者可能会发现一个问题:在应用启动过程中,我们并没有看到路由注册的相关代码。那么 Dify 是如何处理 HTTP 请求路由的呢?

答案就在扩展系统中的 ext_blueprints 模块。Dify 通过 Flask 的 Blueprint 机制和 Flask-RESTX 的 Namespace 来组织和注册所有的 API 路由,我们明天就来看看这部分内容。


学习 Dify 的工作流和对话流应用

在前面的文章中,我们学习了 Dify 的聊天助手、文本生成和 Agent 应用,这些应用虽然功能强大,但在处理一些复杂业务逻辑时仍有局限性。今天,我们将学习 Dify 的 工作流(Workflow)对话流(Chatflow) 应用,了解如何通过可视化编排构建更加复杂和灵活的 AI 应用。

工作流基本概念

工作流是 Dify 提供的一种可视化应用构建方式,它通过将复杂的任务分解成较小的步骤(节点)来降低系统复杂度,减少了对提示词技术和模型推理能力的依赖,提高了 LLM 应用面向复杂任务的性能,同时提升了系统的可解释性、稳定性和容错性。

Dify 将工作流分为两种类型,每种都有不同的适用场景:

  • Workflow(工作流):面向自动化和批处理情景,比如高质量翻译、数据分析、内容生成、电子邮件自动化等,它的交互特点是单次输入输出,无法进行多轮对话;
  • Chatflow(对话流):面向对话类情景,比如客户服务、语义搜索、需要多步逻辑的对话式应用,它的交互特点是支持多轮对话交互,可以调整生成结果;

Workflow 这个单词翻译成中文是工作流,但是在 Dify 的概念中,工作流指的是 Workflow 和 Chatflow 两种。不过我感觉这种分类容易让人混淆,所以我倾向于把 Workflow 就叫做工作流,Chatflow 叫做对话流。

创建工作流

我们进入 “工作室” 页面,点击 “创建空白应用”,应用类型选择 “工作流”:

workflow.png

我们拿之前学习 Coze Studio 时用过的天气查询的例子,来体验下 Dify 的工作流。填写助手名称,以及可选的图标和描述,进入应用的配置页面:

workflow-nodes.png

可以看出 Dify 和 Coze Studio 工作流差不多,也提供了大量的节点可供使用,包括:

基础节点

  • LLM:调用大语言模型的能力,根据给定的提示词处理广泛的任务类型,比如意图识别、文本生成、内容分类、文本转换、代码生成、RAG、图片理解等;
  • 知识检索:从知识库中检索与用户问题相关的文本内容,可作为下游 LLM 节点的上下文来使用,实现 RAG 问答;
  • 结束:每一个工作流在完整执行后都需要至少一个结束节点,用于输出完整执行的最终结果;若流程中出现条件分叉,则需要定义多个结束节点;
  • Agent:通过集成不同的 Agent 推理策略,使大语言模型能够在运行时动态选择并执行工具,从而实现多步推理;

问题理解

  • 问题分类器:通过大模型对用户输入进行分类,类似于条件分支,只不过是用自然语言来定义分支;

逻辑

  • 条件分支:根据 if 条件表达式将工作流程拆分成多个分支;
  • 迭代:对数组中的元素依次执行相同的操作步骤,直至输出所有结果,可以理解为任务批处理器,支持并行模式;
  • 循环:用于执行依赖前一轮结果的重复任务,直到满足退出条件或达到最大循环次数;

转换

  • 代码执行:支持执行 Python 或 JavaScript 代码,对输入变量进行处理;
  • 模版转换:借助 Jinja2 模板引擎灵活地进行数据转换、文本处理等;
  • 变量聚合器:将多路分支的变量聚合为一个变量,确保无论哪个分支被执行,其结果都能通过一个统一的变量来引用和访问;
  • 文档提取器:解析并读取文件,返回文件内容,支持 TXT、Markdown、PDF、HTML、DOCX 等格式;
  • 变量赋值:将工作流内的变量赋值到会话变量中用于临时存储,并可以在后续对话中持续引用;
  • 参数提取器:利用 LLM 从自然语言推理并提取结构化参数,用于后置的工具调用或 HTTP 请求;

工具

  • HTTP 请求:向指定的网络地址发送定制化的 HTTP 请求,实现与各种外部服务的互联互通;
  • 列表操作:对列表进行过滤、排序、取第 N 项或取前 N 项等操作;
  • Dify 工具:支持调用平台上所有的工具,包括插件工具、自定义工具、工作流工具、MCP 工具等;

相比于 Coze Studio 的工作流,Dify 的工作流少了文本处理(字符串拼接和分割)、知识库写入以及一些数据库相关的组件。

编排工作流

接下来我们开始创建 “天气小助手” 工作流,我们首先在 “开始节点” 上定义输入字段 city 表示要查询的城市名称:

workflow-node-start.png

在 “开始节点” 中,除了我们定义的输入字段外,还有很多内置的系统变量,都以 sys 开头:

  • sys.files:用户上传的文件
  • sys.user_id:用户唯一标识
  • sys.app_id:应用唯一标识
  • sys.workflow_id:工作流标识
  • sys.workflow_run_id:工作流运行标识

这些参数面向具备开发能力的用户,以区分调用工作流的用户和应用或查询工作流或工作流的运行情况。

假设我们要对接 高德的天气查询接口,这个接口通过城市编码查询该城市的天气详情。因此第二步是想办法将用户输入的城市名称转换为城市编码,这可以通过 “代码执行” 节点实现:

workflow-node-code.png

这个节点的输入参数为 city_name 引用自 “开始节点”,输出参数为 city_id,转换的代码如下:

# 城市编码表,可以从高德官网下载
city_list = [
  "北京市,110000",
  "天津市,120000",
  "石家庄市,130100",
  # ...
]
def main(city_name: str) -> dict:
  city_id = '110000'
  for city_info in city_list:
    city_info = city_info.split(',')
    if city_name in city_info[0]:
      city_id = city_info[1]
      break
  return {
    'city_id': city_id
  }

代码写好后,可以点击上面的小三角测试该节点,验证没问题后,接着再在代码节点后面加一个 “HTTP 请求” 节点,调用高德的天气查询接口:

workflow-node-http.png

该节点配置如下:

  • 接口方式 - GET
  • API 地址 - https://restapi.amap.com/v3/weather/weatherInfo
  • 请求参数

    • city - 引用自代码节点的 city_id 参数;
    • extensions - 填写 all,表示返回的天气类型,base 返回实况天气,all 返回预报天气;
    • output - 填写 json 表示希望接口返回 JSON 格式;
    • key - 填写高德 API KEY,可以从高德开放平台免费申请;

这里的 key 我并没有直接填写 API KEY,而是使用了环境变量,可以点击上面的 “ENV” 按钮创建,类型选择 “Secret”:

workflow-env.png

使用环境变量的好处是,可以保护工作流内所涉及的敏感信息,例如运行工作流时所涉及的 API 密钥、数据库密码等。导出工作流时会提示用户,可以剔除掉 “Secret” 类型的变量值。

同样点击小三角验证,测试通过后,在 “HTTP 请求” 节点后再接一个 “LLM” 节点:

workflow-node-llm.png

使用大模型将 HTTP 返回的 JSON 结果转换为自然语言,方便用户查看。最后将 “LLM” 节点和 “结束” 节点连接起来,在结束节点上添加 text 参数,并引用 “LLM” 节点的出参:

workflow-node-end.png

至此,整个工作流搭建完成:

workflow-export.png

可以点击上方的 “运行” 按钮,对整个工作流进行测试:

workflow-output.png

在输出面板上,我们还可以点击 “详情” 查看工作流完整的入参和出参:

workflow-output-detail.png

以及工作流中每个节点的执行情况:

workflow-output-trace.png

调试完成后,点击右上角的发布按钮:

workflow-publish.png

可以将工作流发布为不同形式:

  • 直接运行:通过 Web 界面直接使用
  • 批量运行:支持 CSV 文件批量处理
  • 在 “探索” 中打开:在 “探索” 页面使用
  • API 访问:通过 REST API 调用
  • 发布为工具:这是工作流和其他应用的一大区别,可以将其发布成工具,可以在其他应用中复用

创建对话流

接下来我们再看看对话流。我们进入 “工作室” 页面,点击 “创建空白应用”,应用类型选择 “Chatflow”:

chatflow.png

还是以天气助手为例,填写助手名称,以及可选的图标和描述,进入应用的配置页面:

chatflow-nodes.png

可以看出,对话流和工作流的节点几乎一样,只不过工作流的 “结束” 节点在这里变成了 “直接回复” 节点,它和 “结束” 节点的最大区别在于,“直接回复” 节点可以不作为最终的输出节点,作为流程过程节点时,可以在中间步骤流式输出结果。

编排对话流

由于对话流直接是以对话框的形式交互,因此我们无需在 “开始节点” 定义输入字段:

chatflow-start.png

其实,对话流的 “开始节点” 也可以添加输入字段,如果添加的话,就会在对话框的上方多出一个表单,供用户填写。

和工作流一样,对话流的 “开始节点” 也有很多以 sys 开头的系统变量,相比于工作流,它还多了几个和会话相关的变量:

  • sys.query:用户输入内容
  • sys.dialogue_count:对话的轮次,每轮对话后自动加 1
  • sys.conversation_id:会话唯一标识

除此之外,对话流还多了一个 “会话变量” 的功能,用于临时存储一些信息,比如上下文、用户偏好等,确保在多轮对话中都能够引用该信息。会话变量为可读写变量,我们可以通过 “变量赋值” 节点修改会话变量的内容。

然后再 “开始节点” 后加入一个 “问题分类” 节点:

chatflow-classifier.png

我们添加两个分类,分类 1 为天气查询,分类 2 为其他。天气查询分类后接一个参数提取器:

chatflow-extract.png

该节点的作用是从用户问题中提取出城市名称,这样后面的流程就和之前的工作流一样了,通过代码节点转换城市编码,再通过高德接口查询天气,最后通过大模型输出润色结果,这里不再赘述。

其他分类后接 “LLM” 节点:

chatflow-llm.png

对话流的 “LLM” 节点和工作流也有一点区别,多了一个 “记忆” 选项,可以开启,这样在多轮对话时,大模型就能记住之前的对话内容。

至此,对话版的天气小助手就开发好了:

chatflow-export.png

点击上面的 “预览” 按钮,和小助手进行对话:

chatflow-output.png

调试完成后,点击右上角的发布按钮,可以将对话流发布为不同形式:

  • 直接运行:通过 Web 界面直接使用
  • 嵌入网站:通过 iframe 嵌入到其他网站
  • 在 “探索” 中打开:在 “探索” 页面使用
  • API 访问:通过 REST API 调用

小结

我们今天学习了 Dify 的工作流和对话流应用,通过可视化拖拽的方式连接各个节点,实现复杂的业务逻辑:

  • Workflow(工作流):单次输入输出,适用于自动化和批处理场景;
  • Chatflow(对话流):支持会话变量、对话记忆等功能,更适合多轮交互场景;

至此,我们已经完整体验了 Dify 的 5 种应用类型(聊天助手、文本生成、Agent、工作流、对话流),每种类型都有其独特的优势和适用场景。接下来,我们将进入 Dify 的源码,看看这些应用的实现原理,同时我们还会学习 Dify 的其他高级特性,比如知识库、工具、插件系统等,这些模块为应用提供了更强大的功能,为我们构建企业级 AI 应用提供技术基础。


学习 Dify 的文本生成和 Agent 应用

在前两篇文章中,我们介绍了 Dify 的基本概念和部署方式,并通过创建一个简单的翻译助手初步体验了聊天助手的构建流程。今天,我们将继续学习 Dify 的另外两种应用类型:文本生成应用Agent 应用,了解它们的用法、特点以及应用场景。

文本生成应用

文本生成应用是 Dify 提供的一种专门用于内容创作的应用类型,它专注于根据用户输入生成特定格式的文本内容。与聊天助手的多轮对话不同,文本生成应用采用 单次输入、单次输出 的交互模式,非常适合批量处理和标准化内容生成场景。比如:

  • 内容创作:文章摘要、产品描述、营销文案
  • 文本处理:翻译、分类、情感分析
  • 模板化生成:邮件回复、报告生成、技术文档
  • 批量处理:大规模内容生成和数据处理等

我们进入 “工作室” 页面,点击 “创建空白应用”,应用类型选择 “文本生成应用”:

text-generator-ui.png

我们继续以之前翻译助手为例,填写助手名称,以及可选的图标和描述,进入应用的配置页面:

text-generator-config.png

文本生成应用和聊天助手的配置页面几乎完全一样,整个配置页面也是分为左侧的 编排 和 右侧的 调试与预览 两大块。在编排区域里,同样可以配置 提示词变量知识库视觉 四个选项。

我们在这里添加两个变量:

  • lang - 目标语言,字段类型为下拉选项,选项配置有中文、英文、日文等
  • query - 翻译文本,字段类型为段落

然后在提示词中以 {{lang}}{{query}} 占位符插入变量,完整的提示词如下:

你是一个翻译助手,你的任务是将用户输入翻译成 {{lang}}
用户输入:{{query}}

这里定义的变量将以表单的形式显现在右侧的调试区域,和聊天助手不同的是,文本生成应用的对话页面就只有一个表单,用户填写的表单内容将自动替换提示词中的变量,然后调用大模型输出结果。

使用变量

Dify 变量支持的字段类型包括:文本(string)、段落(string)、下拉选项(string)、数字(number)和复选框(boolean)。

这里不支持文件上传类型,功能其实是很受限的,如果有上传文件的需求,可以考虑使用工作流。

此外,Dify 还支持以 API 扩展的方式创建 基于 API 的变量

dify-variable-api.png

关于 API 变量,我们后面在学习 API 扩展时再作讨论。

批量生成

文本生成应用的一个强大特性是支持 CSV 批量处理,非常适合批量数据处理或内容生产场景。点击右上角的 “发布” 按钮,可以看到 “批量运行” 的选项:

text-generator-publish.png

点击后进入翻译应用,页面有 “Run Once” 和 “Run Batch” 两个标签页:

text-generator-batch-1.png

要使用批量运行功能,需要准备一个 CSV 格式的文件,包含多条要处理的数据,文件内容要满足一定的格式要求,可以下载模版文件填写:

翻译文本,目标语言
"Hello","中文"
"What's your name?","中文"

批量运行结果如下:

text-generator-batch-2.png

运行的结果可以点击下载按钮导出。

和聊天助手的区别

文本生成应用和聊天助手在架构和使用方式上存在显著差异:

特性文本生成应用聊天应用
WebApp 界面表单 + 结果展示聊天对话框
API 端点completion-messageschat-messages
交互模式单次问答多轮对话
上下文保持仅当前会话跨会话持久化
开场白支持不支持支持
批量处理支持 CSV 批量处理不支持

Agent 应用

Agent(智能体)是 Dify 中最强大的应用类型之一,它利用大语言模型的推理能力,能够 自主规划目标、分解任务、调用工具并迭代执行,无需人工干预即可完成复杂任务。

我们进入 “工作室” 页面,点击 “创建空白应用”,应用类型选择 “Agent”:

agent-ui.png

我们创建一个 “生活小能手”,可以调用各种小工具解答用户的各种生活问题,填写应用名称,以及可选的图标和描述,进入应用的配置页面:

agent-config.png

Agent 应用和聊天助手以及文本生成应用的配置页面也没有太多的区别,总的来说有两点不同:

  • 工具配置:可以使用工具扩展 Agent 应用的能力,比如联网搜索或科学计算等;
  • Agent 设置:可以配置 Agent 的工作模式,默认支持 Function Calling 策略;

Dify 很贴心地内置了几个小工具:

  • 语音转写:支持文本转语音(Text to Speech)和语音转文本(Speech to Text);
  • 代码执行:运行一段代码并返回结果;
  • 时间计算:各种时间小工具,比如获取当前时间、计算星期几、时区转换等;
  • 网页抓取:根据网页链接获取网页内容;

我们不妨全都加上:

agent-tools.png

这样一个 “生活小能手” 智能体就做好了:

agent-chat.png

工具配置

值得注意的是,这里语音转写的两个工具需要配置后才能使用,可以点击工具右侧的 “信息和设置” 图标:

agent-tools-tts-1.png

在工具配置页面选择 TTS 或 ASR 模型即可:

agent-tools-tts-2.png

需要提前在设置中的模型供应商里配置对应的模型。

不过经过我的测试,这两个工具配在智能体里其实有点多余。首先对话窗口不支持上传音频文件,因此调不了语音转文本的工具;其次生成的音频文件直接保存在本地文件,也无法直接播放,也无法下载:

agent-chat-2.png

Agent 设置

在 Agent 设置中,Dify 会根据模型特性自动设置 Agent 的推理模式。Dify 在源码中定义了 9 种不同的模型特性:

class ModelFeature(Enum):
  # 单工具调用,模型能够理解工具描述并生成符合格式的工具调用请求
  TOOL_CALL = "tool-call"
  # 多工具调用,模型能够在一次响应中同时调用多个工具
  MULTI_TOOL_CALL = "multi-tool-call"
  # 流式工具调用,在流式响应中实时输出工具调用信息
  STREAM_TOOL_CALL = "stream-tool-call"
  # 思考能力,模型具备思维链推理能力
  AGENT_THOUGHT = "agent-thought"
  # 视觉理解能力,模型支持图像输入和理解
  VISION = "vision"
  # 文档理解能力,模型支持处理和理解文档格式内容
  DOCUMENT = "document"
  # 视频理解能力,模型支持视频内容的处理和分析
  VIDEO = "video"
  # 音频处理能力,模型支持音频内容的理解和处理
  AUDIO = "audio"
  # 结构化输出,模型能够按照指定的 JSON Schema 输出结果
  STRUCTURED_OUTPUT = "structured-output"

当模型支持工具调用能力时,默认使用 Function Calling 模式,拥有效果更佳、更稳定的表现:

agent-mode-function-call.png

对于不支持工具调用的模型系列,Dify 通过 ReAct 推理框架实现类似的效果:

agent-mode-react.png

我们可以在这里对 ReAct 提示词进行适当调整。此外,我们还可以修改最大迭代次数防止死循环,默认 10 次。

文件上传

当模型具备处理图片、文档、视频或音频时,智能体的对话入口会多一个文件上传按钮,比如 Gemini 2.5 Pro 同时支持视觉、文档和音频:

agent-files.png

开启之后,我们就可以对话时处理上传的文件:

agent-files-chat.png

小结

今天我们探索了 Dify 的文本生成和 Agent 应用:

  • 文本生成应用:专注于单次输入输出的内容生成场景,支持批量处理,它不维护对话状态,特别适合标准化内容生产;
  • Agent 应用:通过 Function Calling 或 ReAct 策略实现,支持工具调用和多步骤执行,利用大模型推理能力自主完成复杂任务;

文本生成、Agent 与之前学习的聊天助手这三种应用很相似,都是围绕大模型的调用来实现的,根据不同的业务场景选择合适的应用类型:

  • 简单对话:选择聊天助手
  • 内容生成:选择文本生成应用
  • 复杂任务:选择 Agent 应用

下一篇文章中,我们继续学习 Dify 的工作流应用和对话流应用,了解如何通过可视化编排构建更加复杂的业务逻辑。


使用 Dify 创建你的第一个 AI 应用

上一篇文章中,我们介绍了 Dify 这个开源 LLM 应用开发平台,并演示了两种主要的部署方式。今天,我们将进入实操阶段,学习如何使用 Dify 来构建我们的 AI 应用,Dify 提供了丰富的应用类型和强大的功能,让我们一步步来探索它的核心使用方法。

模型配置

成功部署 Dify 后,首次访问需要创建管理员账户。登录后,你会看到 Dify 的主界面,这里包含了 探索工作室知识库工具 四个核心功能模块。默认进入的是工作室模块:

apps.png

这里是应用开发的核心区域,用于创建和管理 AI 应用,包括工作流、对话流、聊天助手、Agent 和文本生成应用。而探索模块可以体验自己创建的应用,或浏览社区其他用户创建的应用,并支持复制到自己的工作区:

explore.png

知识库和工具是辅助模块,为你的应用添砖加瓦,在知识库模块你可以导入自己的文本数据,为 RAG 应用提供知识支撑,还支持通过 WebHook 实时写入或连接外部知识库:

datasets.png

在工具模块你可以自定义工具、工作流或 MCP 工具,还支持从 Dify 市场下载并安装第三方工具:

tools.png

但在使用这些功能之前,我们需要先在 Dify 中配置模型。点击右上角头像,选择 “设置” 进入设置页面,在左侧菜单中选择 “模型供应商”:

dify-model-providers.png

这里可以看到所有支持的模型供应商,包括国外的 OpenAI、Anthropic、Google Gemini,国内的智谱、DeepSeek、通义千问、月之暗面等 60 多家供应商。Dify 将模型供应商实现成一种插件,因此需要安装。以 OpenAI 为例,在列表中找到 “OpenAI” 并点击 “安装”,安装完成后,点击 “配置”:

dify-model-openai.png

输入你的 API Key 和其他必要信息并点击 “保存” 即可,我们可以为凭据取一个名字,方便后续使用。配置完成后,你就可以在创建应用时选择相应的模型了。

值得注意的是,Dify 使用 PKCS1_OAEP 加密算法安全存储 API 密钥,每个租户都有独立的密钥对,确保数据安全。

根据使用场景,Dify 将模型分为六大类:

  1. 系统推理模型:应用的核心推理引擎,用于对话生成、文本处理等
  2. Embedding 模型:将文本转换为向量表示,用于知识库检索
  3. Rerank 模型:优化检索结果排序,提升 RAG 应用效果
  4. 语音转文字模型:支持语音输入功能
  5. 文字转语音模型:支持将文本输出转换为语音
  6. Moderation 模型:支持内容审查功能

我们可以在 “系统模型设置” 中为每一类选择一个默认模型:

dify-model-system.png

应用的三种创建方式

配置好模型后,我们来创建第一个 AI 应用。Dify 提供了三种创建方式:

1. 从模板创建

初次使用 Dify 时,你可能对于应用创建比较陌生。为了帮助新手用户快速了解在 Dify 上能够构建哪些类型的应用,Dify 提供了丰富的应用模板,涵盖了智能客服、文案写作、数据分析、代码助手等不同的场景,推荐新手从这里快速上手:

apps-template.png

任意选择某个模板,并将其添加到工作区即可。

2. 创建空白应用

如果需要从零开始创建应用,可以选择这项,适用于对 Dify 有一定了解的用户:

apps-empty.png

Dify 提供了 5 种主要的应用类型,每种都适用于不同的场景:

  • 聊天助手(Chatbot):最常见的 AI 应用类型,适合构建对话式的智能助手。它支持多轮对话、上下文记忆,可以用来创建客服机器人、个人助理等;
  • 文本生成(Text Generator):这种应用专注于根据用户输入生成特定格式的文本内容,比如文章摘要、产品描述、邮件回复等。它通常是单次交互,输入提示词后直接输出结果;
  • 智能体(Agent):Dify 的高级功能之一,它可以使用工具,比如调用外部 API、执行代码、搜索网络信息、处理文件等,这使得它能够处理需要多步骤操作的复杂任务;
  • 工作流(Workflow):提供了可视化的节点编排界面,让你可以构建复杂的业务逻辑;
  • 对话流(Chatflow):结合了聊天助手和工作流的优点,在对话式交互的基础上增加了复杂的流程控制能力。

创建应用时,你需要给应用起一个名字,选择合适的图标,或者上传喜爱的图片用作图标,并使用一段清晰的文字描述此应用的用途,以便后续使用。

3. 通过 DSL 文件创建

这种方式不太常用,一般用于导入别人分享的应用,或者将应用从一个环境导入到另一个环境。DSL(Domain Specific Language) 是 Dify 定义的应用配置标准,采用 YAML 格式,文件内容包括应用的基本描述、模型参数、编排配置等信息。

已经创建好的应用可以导出成 DSL 文件,这样可以在另一个环境导入,支持本地文件导入和 URL 导入:

apps-dsl.png

导入 DSL 文件时将校对文件版本号,如果 DSL 版本号差异较大,有可能会出现兼容性问题。

创建你的第一个应用

让我们以创建一个 “翻译小能手” 为例,简单了解下应用构建的基本流程。点击 “创建空白应用”,应用类型选择 “聊天助手”,并填写基本的名称、图标和描述信息,然后进入应用的配置页面:

apps-config.png

整个配置页面可分为两大块:左侧为 编排 区域,右侧为 调试与预览 区域。在编排区域里,可以对聊天助手进行以下配置:

  • 提示词:用于对聊天助手的回复做出一系列指令和约束,提示词中可插入表单变量;
  • 变量:将以表单形式让用户在对话前填写,用户填写的表单内容将自动替换提示词中的变量;
  • 知识库:为聊天助手提供特定领域的知识背景,让其可以回答领域内的问题;
  • 视觉:开启视觉功能将允许模型输入图片,并根据图像内容的理解回答用户问题;

下面的配置项我们暂时不管,对于 “翻译小能手”,我们只需要配置提示词即可:

你是一个翻译助手,你的任务是将用户输入翻译成其他的语言,
如果用户输入是中文,翻译成英文,如果用户输入是英文,则翻译成中文。

Dify 提供了一个 “提示词生成器” 功能,可以对你的提示词进行优化,生成高质量、结构化的提示词:

prompt-generate.png

然后选择合适的大语言模型,就可以在右侧和其进行聊天了:

apps-chat-debug.png

测试通过后,点击 “发布” 按钮对应用进行发布:

apps-publish.png

至此,我们的第一个 AI 应用就开发好了,可以通过下面几种方式来访问该应用:

  • 通过助手的独立页面访问,可以将链接分享给任何人直接使用;
  • 将助手嵌入到你的网站中,通过 iframe 将其放在你的网站中的任意位置;
  • 在 “探索” 页面中访问;
  • 通过 API 调用,将助手的对话能力接入你的服务中;

高级功能配置

我们在调试聊天助手时,可以切换不同的模型,对比模型之间的效果,Dify 为此提供了一个 “多模型调试” 功能:

apps-chat-multi.png

我们可以同时和最多 4 个模型进行会话,非常方便:

apps-chat-multi-4.png

此外,Dify 还提供了一些高级功能来增强应用体验:

  • 对话开场白:设置应用的欢迎词,在对话型应用中,让 AI 主动说第一段话可以拉近与用户间的距离;同时可以预设最多 10 个常见问题供用户选择;
  • 下一步问题建议:设置下一步问题建议可以在每次回复后,根据对话内容推荐 3 条相关问题;
  • 文字转语音:开启后,回复的内容后面会多一个播放按钮,支持自动语音播放;
  • 语音转文字:开启后,对话框后面会多一个录音按钮,支持语音输入;
  • 引用和归属:显示源文档和生成内容的归属部分;
  • 内容审查:可以调用审查 API 或者维护敏感词库来使模型更安全地输出;
  • 标注回复:启用后,将标注用户的回复,以便在用户重复提问时快速响应;

apps-config-adv.png

小结

今天的内容比较简单,主要是熟悉下 Dify 平台的基本使用流程,涵盖以下关键环节:

  • 模型配置:Dify 支持 60+ 主流模型供应商,采用插件化安装,通过 PKCS1_OAEP 加密确保 API 密钥安全。系统将模型分为六大类型,满足从推理到语音转换的全链路需求。
  • 应用创建:提供模板创建、空白创建和 DSL 导入三种方式,覆盖聊天助手、文本生成、智能体、工作流和对话流五种应用类型,适配不同复杂度的业务场景。
  • 功能特性:内置多模型调试、语音转换、内容审查等高级功能,提供从原型验证到生产部署的完整工具链。

篇幅有限,关于 Dify 的应用创建,还有很多细节没有展开,比如,模型插件是如何加载的,聊天助手的各个功能特性是如何实现的,除聊天助手之外还有另四种应用又是如何使用的,我们下期再见。