Fork me on GitHub

2025年9月

深入 Dify 应用的会话流程之流式处理

在昨天的文章中,我们学习了 Dify 会话处理流程的核心服务 AppGenerateService,并通过分析它的 generate() 方法了解了 Dify 如何通过系统级和应用级两层限流策略来保障服务的稳定性。不过这个方法昨天只分析了一半,今天我们继续来看看剩下的部分。

Dify 会根据不同的应用类型走不同的应用生成器逻辑,通过源码可以发现,无论是文本生成、聊天对话、智能体、工作流还是对话流,都遵循着固定的流式处理流程。比如文本生成:

if app_model.mode == AppMode.COMPLETION.value:
  return rate_limit.generate(                        # 步骤1:限流包装
    CompletionAppGenerator.convert_to_event_stream(  # 步骤2:事件流转换
      CompletionAppGenerator().generate(             # 步骤3:具体应用逻辑
        app_model=app_model, user=user, args=args,
        invoke_from=invoke_from, streaming=streaming
      ),
    ),
    request_id=request_id,
  )

我们先不急着看具体的应用生成器,而是先看看外面的两层方法,学习 Dify 是如何统一处理流式和非流式响应的。

限流生成器

generate() 方法最后的 finally 语句里,有一点特别值得注意,这里有一个判断很奇怪,只有当请求是非流式时才调用 exit() 释放限流:

  finally:
    if not streaming:
      rate_limit.exit(request_id)

在传统的非流式处理中,在 finally 块中释放限流确实是可以的;但在流式处理中,当 generate() 方法返回时,请求实际上还没有结束! 客户端会持续从生成器中读取数据,直到生成器耗尽或发生异常。因此,我们需要一个机制来确保只有当生成器正常结束或异常时才释放限流,并且不影响原有生成器的任何功能。

我们可以看看 Dify 是如何处理流式请求的限流的,核心就在 rate_limit.generate() 方法里:

def generate(self, generator: Union[Generator[str, None, None], Mapping[str, Any]], request_id: str):
  if isinstance(generator, Mapping):
    # 非流式响应直接返回
    return generator
  else:
    # 流式响应使用 RateLimitGenerator 包装
    return RateLimitGenerator(
      rate_limit=self,
      generator=generator,
      request_id=request_id,
    )

这里首先通过 generator 的类型自动区分流式响应(Generator)和非流式响应(Mapping),对于非流式响应直接返回,而对于流式响应,使用 RateLimitGenerator 进行包装而不影响原有接口。它的实现如下:

class RateLimitGenerator:
  def __init__(self, rate_limit: RateLimit, generator: Generator[str, None, None], request_id: str):
    self.rate_limit = rate_limit
    self.generator = generator
    self.request_id = request_id
    self.closed = False

  def __iter__(self):
    return self

  def __next__(self):
    if self.closed:
      raise StopIteration
    try:
      # 正常情况下透传数据
      return next(self.generator)
    except Exception:
      # 异常时自动释放资源
      self.close()
      raise

  def close(self):
    if not self.closed:
      self.closed = True
      # 从 Redis 中移除活跃请求记录
      self.rate_limit.exit(self.request_id)
      if self.generator is not None and hasattr(self.generator, "close"):
        self.generator.close()

可以看到 RateLimitGenerator 是一个实现了 迭代器协议 的包装类,它最精妙的地方在于可以监听到生成器的异常(迭代器结束也会抛出异常),这样就可以在流式响应结束时释放限流。

迭代器和生成器

在 Python 中,迭代器(Iterator)生成器(Generator) 都是用于处理可迭代对象的工具,但它们在实现方式、功能和使用场景上有明显区别。

迭代器 是实现了迭代器协议的对象,需要手动实现 __iter__()__next__() 这两个方法:

  • __iter__() 方法返回迭代器自身
  • __next__() 方法返回下一个元素,没有元素时抛出 StopIteration 异常

生成器 是一种特殊的迭代器,无需手动实现迭代器协议,通过 yield 关键字创建。只要函数中包含 yield 语句即为生成器函数,调用生成器函数时返回生成器对象:

def func():
    yield "a"
    yield "b"
    yield "c"

print(type(func()))
# 输出为 <class 'generator'>

迭代器通常需要在初始化时准备好所有元素或定义元素的生成逻辑,可能占用较多内存(如存储一个大列表);它需要自行维护迭代状态(如当前位置索引),实现较复杂。而生成器采用 惰性计算(Lazy Evaluation),只在需要时生成下一个元素,不提前存储所有元素,内存效率更高,特别适合处理大数据流或无限序列;并且它可以自动维护状态,每次执行到 yield 时暂停并保存当前状态,下次调用时从暂停处继续执行。

从 Dify 的代码可以看出,底层的应用生成器返回的是生成器对象,rate_limit.generate() 方法将其封装成迭代器,供上层使用。当上层消费结束后,释放流式响应的限流。

事件流转换

我们接着看 convert_to_event_stream() 方法,它位于 BaseAppGenerator 类中,所有的应用生成器都继承自它。该方法负责将生成器输出转换为标准的 SSE 格式:

@classmethod
def convert_to_event_stream(cls, generator: Union[Mapping, Generator[Mapping | str, None, None]]):
  if isinstance(generator, dict):
    # 非流式响应直接返回
    return generator
  else:
    # 将流式响应转换为 SSE 格式
    def gen():
      for message in generator:
        if isinstance(message, Mapping | dict):
          # 结构化数据使用 JSON 格式,通过 data 字段传输
          yield f"data: {orjson_dumps(message)}\n\n"
        else:
          # 简单字符串使用 event 字段传输
          yield f"event: {message}\n\n"

    return gen()

这个实现有几个关键之处:

  1. 自动识别流式响应和非流式响应,非流式响应直接返回,流式响应转换为 SSE 格式
  2. 使用高性能的 orjson 库对 JSON 进行序列化
  3. 内部的 gen() 函数是另一个生成器,保持流式特性

SSE 协议介绍

SSE(Server-Sent Events,服务器发送事件) 是一种 基于 HTTP 的轻量级实时通信协议,旨在解决 服务器向客户端单向、持续推送数据 的场景需求。与 WebSocket 的双向通信不同,它专注于服务器到客户端的单向通信,无需客户端频繁轮询,同时保持了 HTTP 协议的简洁性和兼容性,是实时消息通知、数据更新、AI 对话等场景的高效解决方案。它的核心特性如下:

  1. 基于 HTTP 协议:SSE 复用 HTTP 协议的传输层,无需额外创建新协议(如 WebSocket 的 ws:// 协议),可直接使用现有的 HTTP 基础设施(如 Nginx 反向代理、CDN、防火墙规则),部署成本低;
  2. 单向通信:通信方向固定为服务器 → 客户端,仅服务器能主动向客户端推送数据;
  3. 自动重连机制:若 SSE 连接因网络波动断开,浏览器会自动尝试重新连接,无需开发者手动实现重连逻辑;在重新连接时,客户端可以通过 HTTP 请求头 Last-Event-ID 发送最后接收到的事件 ID 给服务器,服务器可以根据这个 ID 来恢复中断处的数据流,避免重复发送;
  4. 支持事件分类:服务器可推送不同类型的事件,客户端可按事件类型分别监听,实现消息的分类处理;
  5. 轻量级数据格式:SSE 推送的数据以 文本流(UTF-8 编码) 传输,格式简单(由 field: value 键值对组成),无需复杂的序列化和反序列化,解析成本低;

SSE 对服务器的响应头有特殊要求,必须通过 Content-Type: text/event-stream 声明响应体是 SSE 格式的事件流,否则客户端无法识别。服务器推送的每一条消息由多个 field: value 键值对组成,field 为字段名,value 为文本格式的数据。SSE 支持的核心字段有:

  • data - 消息的核心数据,为文本内容,可多行,每行以 data: 开头,客户端接收时会自动合并多行 data 为一个字符串;
  • event - 自定义消息的事件类型,客户端可按类型监听,比如 log notification 等,若不指定 event,客户端默认触发 onmessage
  • id - 消息的唯一标识(可选),用于断线重连时恢复数据,客户端会记录最后一条消息的 id,重连时通过 Last-Event-ID 请求头告知服务器,服务器可据此补发未接收的消息;
  • retry - 客户端重连间隔(单位:毫秒),覆盖默认的 3 秒,仅当连接断开时生效,客户端会按此间隔重连;
  • : - 注释行,客户端会忽略,用于保持连接活跃,若服务器长时间无数据推送,可定期发送注释行避免连接被网关或防火墙断开;

下面是一些消息的示例:

: 这是一条测试消息

data: 这是一条消息

data: 这是另一条
data: 多行消息

event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

综上,SSE 是一种 轻量、简洁、低成本 的实时单向通信方案,适合无需客户端交互的场景,如果需要双向通信,可选择 WebSocket 协议;另外,SSE 仅能传输 UTF-8 编码的文本数据,若需推送图片、音频等二进制内容,需先将其编码为 Base64 文本(会增加 33% 的带宽开销),此时 WebSocket 更优;还有一点要注意的是,若使用 Nginx 等反向代理 SSE 连接,需配置禁用缓存,避免数据被缓存后延迟推送,并保持连接持久化,避免代理提前关闭 SSE 流。示例 Nginx 配置:

location /api/sse-stream {
  proxy_pass http://localhost:3000;
  proxy_http_version 1.1;

  # 禁用连接关闭
  proxy_set_header Connection '';

  # 禁用缓冲(避免数据被缓存后延迟推送)
  proxy_buffering off; 
  proxy_cache off;

  # 禁用分块编码
  chunked_transfer_encoding off;
}

高性能 JSON 库 orjson

正如上一节所学,SSE 的输出是文本内容,因此我们需要将结构化的对象序列化为 JSON 字符串返回。在流式处理中,JSON 序列化的性能至关重要,因为每个流式消息都需要序列化,高并发场景下的性能差异会被放大。因此 Dify 选择 orjson 而不是 Python 内置的 json 库来做 JSON 的序列化。

orjson 是一个用 Rust 编写的高性能 JSON 库,专为 Python 设计。它是标准库 json 模块的直接替代品,但性能更优秀。根据官方的测试数据,orjson 的序列化速度比 json 快了十几倍:

orjson-vs-json.png

反序列化也有 2 到 6 倍的提升:

orjson-vs-json-2.png

我也写了一个简单的例子来测试它们之间的性能差异:

import json
import orjson
import time

# 测试数据,构造大对象
data = {
  "users": [
    {"id": i, "name": f"user_{i}", "email": f"user_{i}@example.com"}
    for i in range(10000)
  ]
}

# 性能对比
def benchmark_json():
  start = time.time()
  for _ in range(10000):
    json.dumps(data)
  return time.time() - start

def benchmark_orjson():
  start = time.time()
  for _ in range(10000):
    orjson.dumps(data).decode('utf-8')
  return time.time() - start

json_time = benchmark_json()
orjson_time = benchmark_orjson()
print(f"json 耗时:{json_time}")
print(f"orjson 耗时:{orjson_time}")
print(f"orjson 比 json 快了 {json_time / orjson_time:.2f} 倍")

我构造了一个大对象用于测试,测试结果如下:

json 耗时:36.9989538192749
orjson 耗时:3.828395128250122
orjson 比 json 快了 9.66 倍

可以看出,虽然没有官网宣称的十几倍,但 orjson 确实比 json 快了一个数量级。

响应格式化

我们准备好 SSE 的消息格式之后,最后一个环节是响应格式化,位于控制器层的 helper.compact_generate_response() 函数,它负责将前面处理的结果转换为标准的 HTTP 响应,也就是 Flask 的 Response 对象:

def compact_generate_response(response: Union[Mapping, Generator, RateLimitGenerator]) -> Response:
  if isinstance(response, dict):
    # 模式1:非流式响应 - 返回标准 JSON
    return Response(
      response=json.dumps(jsonable_encoder(response)),
      status=200,
      mimetype="application/json"
    )
  else:
    # 模式2:流式响应 - 返回 Server-Sent Events
    def generate() -> Generator:
      yield from response  # 使用 yield from 委托

    return Response(
      stream_with_context(generate()), # 注入请求上下文
      status=200,
      mimetype="text/event-stream"
    )

这个函数虽然简洁,但有两点值得关注:

  • 通过 yield from 实现生成器委托
  • 通过 stream_with_context 保持流式处理时的请求上下文

yield from 生成器委托

在 Python 中,yield from 是 Python 3.3 引入的语法,用于简化生成器中的迭代操作,主要作用是将一个 可迭代对象(iterable) 的元素逐个返回,相当于在生成器内部对这个可迭代对象进行了 for 循环并逐个 yield 其元素。

因此下面两个生成器函数的功能完全相同:

# 使用 for 循环 + yield
def generator1(iterable):
  for item in iterable:
    yield item

# 使用 yield from
def generator2(iterable):
  yield from iterable

yield from 更强大的功能是实现 生成器委托(generator delegation),即允许一个生成器将部分操作委托给另一个生成器或可迭代对象。比如当需要迭代嵌套的可迭代对象时,yield from 可以简化代码:

def flatten(nested_iterable):
  for item in nested_iterable:
    if isinstance(item, list):
      # 委托给 flatten 处理嵌套列表
      yield from flatten(item)
    else:
      yield item

# 测试
nested = [1, [2, 3], [4, [5, 6]]]
for num in flatten(nested):
  print(num, end=' ')  # 输出:1 2 3 4 5 6 

通过前面的章节我们知道,限流返回的 RateLimitGenerator 是一个迭代器,Dify 在这里使用 yield from 主要目的是将迭代器转换为生成器,为后面的 stream_with_context 注入请求上下文。

流式响应的上下文管理

在 Flask 应用中,请求上下文的生命周期管理 非常重要,在流式响应或多线程处理时,我们往往面临着上下文丢失的挑战:

def problematic_stream():
  def generate():
    # 这里可能无法访问 request、session、g 等对象
    user_id = request.user.id  # 可能报错!
    yield f"data: {{\"user_id\": \"{user_id}\"}}\n\n"

  return Response(generate(), mimetype="text/event-stream")

Flask 的 stream_with_context 解决了 Web 框架中流式响应的上下文管理问题:

  • 上下文传递:确保请求上下文在整个流的生命周期中可用
  • 资源访问:数据库、认证、日志等资源持续可用
  • 错误隔离:异常处理机制正常工作
from flask import stream_with_context

def proper_stream():
  def generate():
    # 现在可以安全访问请求上下文
    user_id = request.user.id  # 正常工作
    yield f"data: {{\"user_id\": \"{user_id}\"}}\n\n"

  return Response(
    stream_with_context(generate()),  # 保持上下文
    mimetype="text/event-stream"
  )

小结

至此,我们已经将 Dify 应用的会话接口外围都扫清了,重点剖析了 rate_limit.generate()convert_to_event_stream()compact_generate_response() 三个函数的实现原理。

让我们用一个完整的序列图来总结下整个流式处理的流程:

chat-flow.png

接下来我们将继续深入具体的应用生成器源码,看看五种应用的具体实现。


深入 Dify 应用的会话流程之限流策略

在前面的文章中,我们梳理了 Dify 应用的各种会话接口,了解了不同蓝图下五种应用类型的接口实现,以及各自的认证方式和用户类型。如果我们仔细阅读源码就会发现,所有应用的会话接口背后处理逻辑几乎一样:

def post(self, app_model: App, end_user: EndUser):
  response = AppGenerateService.generate(
    app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=streaming
  )
  return helper.compact_generate_response(response)

都是通过统一调用核心服务 AppGenerateServicegenerate() 方法,然后返回响应。今天我们就看看这个核心服务的具体实现,深入探索 Dify 的会话流程,解析其中涉及的关键技术要点。

统一的会话入口

正如上文提到的,无论是 Console、Web 还是 Service API 蓝图下的会话接口,最终都会调用到 AppGenerateServicegenerate() 方法。这个方法是 Dify 会话处理的核心枢纽,负责协调不同类型应用的生成流程。

让我们来看看这个方法的实现逻辑:

@classmethod
def generate(
  cls, 
  app_model: App, 
  user: Union[Account, EndUser], 
  args: Mapping[str, Any], 
  invoke_from: InvokeFrom, 
  streaming: bool = True,
):
  # 如果是 sandbox 订阅计划
  # 开启系统级限流:总请求数限流
  if dify_config.BILLING_ENABLED:
    limit_info = BillingService.get_info(app_model.tenant_id)
    if limit_info["subscription"]["plan"] == "sandbox":
      if cls.system_rate_limiter.is_rate_limited(app_model.tenant_id):
        raise InvokeRateLimitError(
          "Rate limit exceeded, please upgrade your plan "
          f"or your RPD was {dify_config.APP_DAILY_RATE_LIMIT} requests/day"
        )
      cls.system_rate_limiter.increment_rate_limit(app_model.tenant_id)

  # 应用级限流:并发请求数限流
  max_active_request = cls._get_max_active_requests(app_model)
  rate_limit = RateLimit(app_model.id, max_active_request)
  request_id = RateLimit.gen_request_key()
  try:
    request_id = rate_limit.enter(request_id)

    # 调用不同类型的应用生成器
    if app_model.mode == AppMode.COMPLETION.value:
      # 文本生成
    elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
      # 智能体
    elif app_model.mode == AppMode.CHAT.value:
      # 聊天助手
    elif app_model.mode == AppMode.ADVANCED_CHAT.value:
      # 对话流
    elif app_model.mode == AppMode.WORKFLOW.value:
      # 工作流
    else:
      raise ValueError(f"Invalid app mode {app_model.mode}")

  # 退出应用级限流
  except Exception:
    rate_limit.exit(request_id)
    raise
  finally:
    if not streaming:
      rate_limit.exit(request_id)

从这个核心方法可以看出,Dify 的会话处理流程包含以下几个关键步骤:

  1. 系统级限流:基于应用的租户进行全局限流控制,开启账单功能后才有这个功能
  2. 应用级限流:基于应用的并发请求数量限制
  3. 应用类型路由:根据应用模式选择对应的生成器,统一的流式响应处理

系统级限流

Dify 支持开启账单功能,在 .env 文件中添加:

BILLING_ENABLED=true

然后重启 API 服务,用户设置页面就会多一个 “账单” 菜单:

billing.png

用户默认是 sandbox 订阅计划,对话时就会经过系统级限流,用于限制某个租户下的应用一天的总请求数量。

注意 Dify 的限流不是针对请求的用户,而是以应用或应用的租户作为维度。

系统级限流器的定义如下:

system_rate_limiter = RateLimiter(
  "app_daily_rate_limiter", 
  dify_config.APP_DAILY_RATE_LIMIT, 
  86400
)

Dify 的限流基于 Redis 实现,这里的 app_daily_rate_limiter 是 Redis 键的前缀,APP_DAILY_RATE_LIMIT 表示最大请求数,默认是 5000 次,86400 是时间窗口,单位秒,表示一天 24 小时。

系统级限流的核心算法是滑动窗口,通过 Redis 的 Sorted Set 实现。每次用户会话时增加请求计数:

def increment_rate_limit(self, email: str):

  # 生成 Redis 键: "app_daily_rate_limiter:tenant_id"
  key = self._get_key(email)

  # 在 Sorted Set 中添加当前时间戳
  current_time = int(time.time())
  redis_client.zadd(key, {current_time: current_time})

  # 设置 key 过期时间,为时间窗口的 2 倍,防止数据残留
  redis_client.expire(key, self.time_window * 2)

实际上就是使用 ZADD 命令将当前时间戳添加到 Sorted Set 中,key 为 app_daily_rate_limiter:tenant_id,可按租户隔离限流,同时为 key 设置一个过期时间,防止数据残留。限流的校验逻辑如下:

def is_rate_limited(self, email: str) -> bool:

  # 生成 Redis 键: "app_daily_rate_limiter:tenant_id"
  key = self._get_key(email)  
  
  # 清除时间窗口外的记录
  current_time = int(time.time())
  window_start_time = current_time - self.time_window
  redis_client.zremrangebyscore(key, "-inf", window_start_time)

  # 统计当前时间窗口内的请求数
  attempts = redis_client.zcard(key)

  # 判断是否超限
  if attempts and int(attempts) >= self.max_attempts:
    return True
  return False

这里比较巧妙的通过 Sorted Set 的 ZREMRANGEBYSCORE 命令,由于存储的分数都是时间戳,因此很容易根据时间清除时间窗口外的记录,然后再通过 ZCARD 命令获取集合大小,也就是当前时间窗口内的请求数,判断是否超出限制。

这是一个经典的分布式限流实现,使用 Redis 的 Sorted Set 实现了 24 小时的滑动窗口限流,每个请求都有精确的时间戳记录,既保证了精确性又具备良好的性能。

学习 Sorted Set 数据结构

Sorted Set(有序集合) 是 Redis 的核心数据结构之一,结合了 Set 的唯一性和排序功能,具有如下特点:

  • 有序性: 元素按分数(score)排序
  • 唯一性: 成员(member)不能重复
  • 双重索引: 支持按分数和成员查找
  • 高效操作: 大部分操作时间复杂度为 O(log N)

它的常用命令包括:

添加元素

我们创建一个有序集合 leaderboard 表示分数排行榜,并添加三个人的数据:

> ZADD leaderboard 100 "alice" 200 "bob" 150 "charlie"
(integer) 3

获取元素

按排名获取,根据分数从小到大排序:

> ZRANGE leaderboard 0 2 WITHSCORES
1) "alice"
2) "100"
3) "charlie"
4) "150"
5) "bob"
6) "200"

按排名获取,根据分数从大到小排序:

> ZREVRANGE leaderboard 0 2 WITHSCORES
1) "bob"
2) "200"
3) "charlie"
4) "150"
5) "alice"
6) "100"

按分数范围查询

按分数范围获取:

> ZRANGEBYSCORE leaderboard 100 180 WITHSCORES
1) "alice"
2) "100"
3) "charlie"
4) "150"

统计计数

统计总元素数:

> ZCARD leaderboard
(integer) 3

统计分数范围内元素数:

> ZCOUNT leaderboard 100 180
(integer) 2

分数和排名

获取成员分数:

> ZSCORE leaderboard "alice"
"100"

获取成员排名(从0开始,小到大):

> ZRANK leaderboard "alice"
(integer) 0

获取成员排名(从0开始,大到小):

> ZREVRANK leaderboard "alice"
(integer) 2

修改操作

增加分数,下面的命令将 alice 分数增加 50:

> ZINCRBY leaderboard 50 "alice"
"150"

删除元素

删除指定成员:

> ZREM leaderboard "alice"

按排名删除,比如下面的命令表示删除排名最低的:

> ZREMRANGEBYRANK leaderboard 0 0

按分数范围删除:

> ZREMRANGEBYSCORE leaderboard 0 100

应用级限流

通过系统级限流之后,会话流程还会经过一层应用级限流。可以在 .env 文件中通过 APP_MAX_ACTIVE_REQUESTS 变量设置限额,默认是 0 不限制:

APP_MAX_ACTIVE_REQUESTS=0

Dify 为每次会话生成一个请求 ID,然后调用 enter() 方法:

def enter(self, request_id: Optional[str] = None) -> str:

  # 检查当前活跃请求数
  active_requests_count = redis_client.hlen(self.active_requests_key)
  if active_requests_count >= self.max_active_requests:
    raise AppInvokeQuotaExceededError(...)

  # 记录新的活跃请求
  redis_client.hset(self.active_requests_key, request_id, str(time.time()))
  return request_id

这里使用了 Redis 的 Hash(哈希表) 数据结构来记录活跃请求,每次会话时,使用 HSET 命令将请求 ID 和当前时间存入 active_requests_key,这个键的格式是 dify:rate_limit:{}:active_requests,如果这个 Hash 的长度超出 max_active_requests 则抛出额度超限的异常。

当请求结束或异常,通过 exit() 方法将请求从 Redis 中移除:

def exit(self, request_id: str):

  # 从活跃请求集合中移除
  redis_client.hdel(self.active_requests_key, request_id)

可以看出,应用级限流采用并发请求数控制模式,与系统级的滑动窗口不同,它控制的是 同时进行的活跃请求数量,防止某个应用同一时间请求数过大。

Dify 的 ext_redis 扩展

这里还有一个知识点值得一提,在限流的代码里,Dify 通过 redis_client 执行 Redis 操作:

from extensions.ext_redis import redis_client

我们前面介绍过 Dify 的扩展系统,这个变量就是在 ext_redis 扩展中初始化的:

def init_app(app: DifyApp):
  
  global redis_client

  # 哨兵模式
  if dify_config.REDIS_USE_SENTINEL:
    redis_params = _get_base_redis_params()
    client = _create_sentinel_client(redis_params)
  # 集群模式
  elif dify_config.REDIS_USE_CLUSTERS:
    client = _create_cluster_client()
  # 单例模式
  else:
    redis_params = _get_base_redis_params()
    client = _create_standalone_client(redis_params)

  # 初始化 redis_client
  redis_client.initialize(client)
  app.extensions["redis"] = redis_client

从初始化代码可以看出,Dify 支持 哨兵集群单例 三种 Redis 模式,这种统一的处理方式很值得我们学习和借鉴。

客户端缓存

此外,在 Reids 的配置参数中,还有一点比较有意思:

def _get_cache_configuration() -> CacheConfig | None:

  # 是否开启客户端缓存
  if not dify_config.REDIS_ENABLE_CLIENT_SIDE_CACHE:
    return None
  # 必须是 RESP3 协议
  resp_protocol = dify_config.REDIS_SERIALIZATION_PROTOCOL
  if resp_protocol < 3:
    raise ValueError("Client side cache is only supported in RESP3")
  return CacheConfig()

def _get_base_redis_params() -> dict[str, Any]:
  return {
    "username": dify_config.REDIS_USERNAME,
    "password": dify_config.REDIS_PASSWORD or None,
    "db": dify_config.REDIS_DB,
    "encoding": "utf-8",
    "encoding_errors": "strict",
    "decode_responses": False,
    # 协议
    "protocol": dify_config.REDIS_SERIALIZATION_PROTOCOL,
    # 缓存配置
    "cache_config": _get_cache_configuration(),
  }

这里使用了 Redis 的 客户端缓存(Client-side caching) 特性,这是从 Redis 6.0 开始引入的一个功能特性,允许客户端在本地缓存从服务器读取的数据,具有如下特性:

  • 本地缓存: 数据存储在客户端内存中
  • 自动失效: 服务器数据变化时自动通知客户端
  • 透明操作: 对应用代码透明,自动命中缓存
  • 性能提升: 减少网络往返,提高读取性能

可以在 .env 文件中通过 REDIS_ENABLE_CLIENT_SIDE_CACHE 配置开启:

REDIS_ENABLE_CLIENT_SIDE_CACHE=true
REDIS_SERIALIZATION_PROTOCOL=3

注意开启这个功能需要 Redis 支持 RESP3 协议。RESP(REdis Serialization Protocol) 是 Redis 的通信协议,目前有 RESP2RESP3 两个版本:RESP2 协议比较老,后面大概率是要废弃的,它只支持简单的字符串、整数、大字符串、数组这些数据类型;而 RESP3 是新一代 Redis 通信协议,支持更丰富的数据类型,支持发布订阅模式,并支持服务器推送能力,这也是实现客户端缓存的关键。

开启客户端缓存后,工作流程如下:

  1. 客户端首次读取时,从服务器获取,并存储到本地缓存
  2. 后续读取时,直接从本地缓存返回,无网络开销,大大提升程序的读取性能
  3. 当服务器缓存发生变化时,会向客户端发送缓存失效通知,客户端清除本地缓存
  4. 下次读取时,客户端会重新从服务器获取,更新本地缓存

redis-client-side-cache.png

客户端缓存非常适合频繁读取且变化较少的场景,比如应用的配置、用户的权限、模型的参数等。通过该特性,可以大幅减少网络延迟,降低 Redis 服务器的压力,提高应用性能。但由于客户端需要额外的内存存储缓存,因此在使用时,需要特别关注客户端的内存使用情况,

小结

我们今天开始深入分析 Dify 的会话处理流程,了解了所有类型的应用都是通过 AppGenerateService.generate() 方法进行统一处理的,并经过了两层限流策略:

  • 系统级限流:基于 Redis Sorted Set 实现的滑动窗口算法,按租户维度限制每日总请求数
  • 应用级限流:基于 Redis Hash 实现的并发控制,限制应用同时活跃的请求数量

两者对比如下:

特性系统级限流应用级限流
算法滑动窗口并发控制
维度租户ID应用ID
限制24小时请求总数同时活跃请求数
数据结构Redis ZSETRedis Hash
清理机制时间窗口自动清理请求结束时清理

在学习过程中,我们看到 Dify 应用了大量的 Redis 技术:

  • Sorted Set:用于实现精确的滑动窗口限流,巧妙利用时间戳作为分数进行时间窗口管理
  • Hash:用于追踪活跃请求,实现并发数控制
  • 客户端初始化:支持多种 Redis 部署模式(单例、哨兵、集群),使用统一的客户端管理 Redis 连接
  • 客户端缓存:基于 RESP3 协议的服务器推送特性,适合读多写少的场景

这些内容都很值得我们在构建高并发系统时学习和借鉴。在下一篇文章中,我们将继续深入探讨 Dify 应用的会话流程,了解每种应用生成器的执行流程。


梳理 Dify 应用的会话接口

在前面的文章中,我们学习了 Dify 的代码结构和路由系统,了解了 Dify 是如何通过 Flask Blueprint 和 Flask-RESTX 来组织其复杂的 API 架构的。今天,我们将深入探索 Dify 应用的会话 API 接口,分析其具体的实现细节,并结合我们之前学习的内容,看看不同蓝图下的实现以及各自的区别。

五种应用 + 三大蓝图

我们知道,Dify 有五种不同的应用类型:

class AppMode(StrEnum):
  # 聊天助手
  CHAT = "chat"
  # 文本生成
  COMPLETION = "completion"
  # 智能体
  AGENT_CHAT = "agent-chat"
  # 工作流
  WORKFLOW = "workflow"
  # 对话流
  ADVANCED_CHAT = "advanced-chat"

不同的应用类型对应不同的会话接口,而根据应用的调用方式,这些会话接口又会出现在不同的蓝图下。因此 Dify 的会话接口有很多,如果第一次接触源码的话会很懵。会话接口主要分布在三个蓝图下:

  • Console 蓝图:从管理控制台调用,又分为从工作室的预览调试和探索的已发布应用两个地方调用,接口统一以 /console/api 为前缀
  • Web 蓝图:从前端的 Web 应用调用,Dify 会为每个应用生成一个公开可访问的页面,这个页面的调用统一以 /api 为前缀
  • Service API 蓝图:第三方通过 HTTP API 或 SDK 集成调用,接口以 /v1 为前缀

不同蓝图下虽然接口各异,但都实现了相同的核心功能,它们只是在认证方式和用户类型上有所区别,为不同的使用场景提供最合适的调用方式。下面就来对每个蓝图下的会话接口逐一分析。

Console 蓝图

当我们在 “工作室” 的预览页面对应用进行调试时:

apps.png

调用下面这些接口:

# 文本生成
api.add_resource(
  CompletionMessageApi, 
  "/apps/<uuid:app_id>/completion-messages"
)
# 聊天助手 或 Agent 应用
api.add_resource(
  ChatMessageApi, 
  "/apps/<uuid:app_id>/chat-messages"
)
# 对话流
api.add_resource(
  AdvancedChatDraftWorkflowRunApi, 
  "/apps/<uuid:app_id>/advanced-chat/workflows/draft/run"
)
# 工作流
api.add_resource(
  DraftWorkflowRunApi, 
  "/apps/<uuid:app_id>/workflows/draft/run"
)

我们还可以在 “探索” 中打开应用:

explore.png

此时调用接口如下:

# 文本生成
api.add_resource(
  CompletionApi, 
  "/installed-apps/<uuid:installed_app_id>/completion-messages", 
  endpoint="installed_app_completion"
)
# 聊天助手、Agent 应用 或 对话流
api.add_resource(
  ChatApi, 
  "/installed-apps/<uuid:installed_app_id>/chat-messages", 
  endpoint="installed_app_chat_completion"
)
# 工作流
api.add_resource(
  InstalledAppWorkflowRunApi, 
  "/installed-apps/<uuid:installed_app_id>/workflows/run"
)

Console 蓝图下的对话接口最大特点是 URL 上带有 app_id 参数,接口直接对该应用发起会话即可。另外,这个蓝图下的接口主要面向平台用户,需要对用户进行登录认证,所以接口的实现上都带有 @login_required 装饰器:

class ChatMessageApi(Resource):
  @setup_required
  @login_required
  @account_initialization_required
  @get_app_model(mode=[AppMode.CHAT, AppMode.AGENT_CHAT])
  def post(self, app_model):
    # ...

Web 蓝图

在应用的配置页,我们可以将应用发布成 Web 应用:

web.png

该功能默认开启,会自动生成一个能够在互联网上公开访问的网址。Web 应用的页面适配了不同尺寸的设备,包括 PC,平板和手机,使用者无需登陆,即可使用。当用户通过 Web 应用页面和助手对话时:

web-2.png

调用的接口如下:

# 文本生成
api.add_resource(CompletionApi, "/completion-messages")
# 聊天助手、Agent 应用 或 对话流
api.add_resource(ChatApi, "/chat-messages")
# 工作流
api.add_resource(WorkflowRunApi, "/workflows/run")

Dify 还支持将你的 Web 应用嵌入到业务网站中,你可以在几分钟内制作具有业务数据的官网 AI 客服、业务知识问答等应用:

web-iframe.png

Dify 提供了 3 种不同的嵌入方式,分别为:

  • <iframe> 标签方式:将 <iframe> 代码复制到你网站用于显示 AI 应用的标签中,如 <div><section> 等标签
  • <script> 标签方式:将 <script> 代码复制到你网站 <head><body> 标签中,这会在你的网站上显示一个 Dify 聊天机器人气泡按钮
  • 安装 Dify Chrome 浏览器扩展方式:前往 Chrome 应用商店,安装 Dify Chatbot 扩展即可

Web 应用的嵌入有不少高级技巧,比如自定义聊天机器人的气泡按钮,向 Web 应用传值,甚至可以基于前端模板对 Web 应用进行二次开发。

当用户通过这些方式访问我们的应用时,都统一使用 Web 蓝图下的接口。这个蓝图下的会话接口有着完善的接口文档:

class ChatApi(WebApiResource):
  @api.doc("Create Chat Message")
  @api.doc(description="Create a chat message for conversational applications.")
  @api.doc(
    params={
      "inputs": {"description": "Input variables for the chat", "type": "object", "required": True},
      "query": {"description": "User query/message", "type": "string", "required": True},
      "files": {"description": "Files to be processed", "type": "array", "required": False},
      "response_mode": {
        "description": "Response mode: blocking or streaming",
        "type": "string",
        "enum": ["blocking", "streaming"],
        "required": False,
      },
      "conversation_id": {"description": "Conversation UUID", "type": "string", "required": False},
      "parent_message_id": {"description": "Parent message UUID", "type": "string", "required": False},
      "retriever_from": {"description": "Source of retriever", "type": "string", "required": False},
    }
  )
  @api.doc(
    responses={
      200: "Success",
      400: "Bad Request",
      401: "Unauthorized",
      403: "Forbidden",
      404: "App Not Found",
      500: "Internal Server Error",
    }
  )
  def post(self, app_model, end_user):
    # ...

值得注意的是,这个接口没有 app_id 参数,那么 Dify 是怎么区分用户是在和哪个应用进行对话呢?答案在于 WebApiResource 这个父类:

class WebApiResource(Resource):
  method_decorators = [validate_jwt_token]

WebApiResource 继承自 Flask-RESTX 的 Resource 类,method_decorators 是该类的一个属性,用于为资源类中的所有 HTTP 方法自动应用装饰器。Dify 在这里定义了一个 validate_jwt_token 装饰器,对 Web 蓝图下的接口验证 JWT 令牌,从而得到应用信息。

Web 应用的访问地址一般是 http://ip:port/chat/{app_code},当用户第一次访问 Web 应用时,会调用 api/passport 接口,通过 HTTP 请求头 X-App-Code 创建一个 JWT 令牌,后续对话时都会带上这个令牌。

Service API 蓝图

Dify 基于 后端即服务 理念为所有应用提供了 API 接口,为应用开发者带来了诸多便利。可以在应用配置页开启:

api.png

Dify 为每个应用提供了详尽的接口文档:

api-2.png

同时 Dify 还提供了多语言的 SDK,包括 PythonPHPNode.jsJavaGoRuby,方便开发者在不同技术栈中集成 Dify 的能力。

通过 API 访问的接口如下:

# 文本生成
@service_api_ns.route("/completion-messages")
class CompletionApi(Resource):

# 聊天助手、Agent 应用 或 对话流
@service_api_ns.route("/chat-messages")
class ChatApi(Resource):

# 工作流
@service_api_ns.route("/workflows/run")
class WorkflowRunApi(Resource):

不过要访问这些 API 接口,我们得先创建 API 密钥,可以点击文档页面右上角的 “API 密钥” 按钮创建。然后按照接口文档,发送请求如下:

curl -X POST 'http://localhost:5001/v1/chat-messages' \
  --header 'Authorization: Bearer app-cSZlATg5dglJSxVLLaJoXkgX' \
  --header 'Content-Type: application/json' \
  --data-raw '{
    "inputs": {},
    "query": "本是同根生,相煎何太急",
    "response_mode": "blocking",
    "user": "abc-123"
  }'

得到应用的响应如下:

{
  "event": "message",
  "task_id": "e705b358-5a75-4fc4-9dfe-c8bbc8095be7",
  "id": "005f66cf-08eb-4aac-9f19-3a6255ecdb66",
  "message_id": "005f66cf-08eb-4aac-9f19-3a6255ecdb66",
  "conversation_id": "5cd66e81-05fd-4a11-8596-586258653213",
  "mode": "chat",
  "answer": "Being born from the same root, why rush to fry each other to death?",
  "metadata": {
    "annotation_reply": null,
    "retriever_resources": [],
    "usage": {
      "prompt_tokens": 63,
      "prompt_unit_price": "5",
      "prompt_price_unit": "0.000001",
      "prompt_price": "0.000315",
      "completion_tokens": 17,
      "completion_unit_price": "15",
      "completion_price_unit": "0.000001",
      "completion_price": "0.000255",
      "total_tokens": 80,
      "total_price": "0.00057",
      "currency": "USD",
      "latency": 2.1960244579822756
    }
  }
}

很明显,Service API 蓝图下的接口要实现 API 密钥的认证:

class ChatApi(Resource):
  @service_api_ns.expect(chat_parser)
  @service_api_ns.doc("create_chat_message")
  @service_api_ns.doc(description="Send a message in a chat conversation")
  @service_api_ns.doc(
    responses={
      200: "Message sent successfully",
      400: "Bad request - invalid parameters or workflow issues",
      401: "Unauthorized - invalid API token",
      404: "Conversation or workflow not found",
      429: "Rate limit exceeded",
      500: "Internal server error",
    }
  )
  @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
  def post(self, app_model: App, end_user: EndUser):
    # ...

当我们创建 API 密钥时,Dify 会将 API 密钥和 app_id 保存到数据库中,然后用户调用 API 接口时,必须传入 API 密钥,接口通过 @validate_app_token 从数据库中获取 API 密钥对应的应用信息。

小结

今天我们主要对 Dify 的会话接口做了一个简单的梳理,学习了 Dify 应用的不同访问方式以及各自蓝图下的接口定义,它们之间的差异对比如下:

蓝图认证方式装饰器用户类型
ConsoleSession@login_requiredAccount(开发者)
WebJWT TokenWebApiResource 基类处理EndUser(终端用户),来自浏览器
Service APIAPI Key@validate_app_tokenEndUser(终端用户),来自 API

在下一篇文章中,我们将继续研究会话接口的源码,来具体看下不同应用的会话是如何实现的。


学习 Dify 的路由系统

我们昨天学习了 Dify 的代码架构和三种启动模式,不过在应用启动过程中,我们并没有看到路由注册的相关代码,这里的关键就在于它模块化的扩展系统。Dify 通过 ext_blueprints 模块注册路由,使用 Flask 的 Blueprint 和 Flask-RESTX 的 Namespace 实现模块化的 API 路由管理。我们今天就来学习这部分内容。

Flask 框架

在深入分析 Dify 的路由系统之前,我们先简单了解一下 Flask 框架。Flask 是一个基于 Python 的轻量级 Web 应用框架,它设计简洁、易于扩展,特别适合构建 API 服务。

flask.png

Flask 的主要优势包括:

  • 简单易用:代码简洁,学习曲线平缓
  • 灵活扩展:可以根据需要选择和集成各种扩展
  • 成熟稳定:经过多年发展,社区生态丰富
  • 适合 API:非常适合构建 RESTful API 服务

Flask 的设计哲学是 微框架,它只提供 Web 开发的核心功能,其他功能通过扩展来实现。在 Dify 项目中采用了大量的 Flask 扩展,比如:

  • Flask-RESTX:用于构建 RESTful API 和自动生成文档
  • Flask-SQLAlchemy:ORM 数据库操作
  • Flask-Migrate:数据库迁移管理
  • Flask-Login:用户认证和会话管理
  • Flask-CORS:跨域资源共享支持
  • Flask-Compress:自动压缩 HTTP 响应内容
  • Flask-orjson:使用 orjson(一个高性能的 JSON 库)替换 Flask 默认的 JSON 编码器和解码器

Flask 基本使用

下面是 Flask 框架的基本用法:

# 创建应用
from flask import Flask
app = Flask(__name__)

# 注册路由
@app.route('/user/profile')
def user_profile():
  return 'user profile'

@app.route('/user/settings')
def user_settings():
  return 'user settings'

if __name__ == "__main__":
  app.run(host="0.0.0.0", port=6001)

寥寥几行代码就可以创建一个 RESTful API 服务。可以看到,这里使用了 Flask 最原始的装饰器方式 @app.route 来注册路由,Flask 也支持手动注册路由,这种方式更灵活:

# 手动注册路由
app.add_url_rule('/user/prifile', 'user_profile', user_profile)
app.add_url_rule('/user/settings', 'user_settings', user_settings)

使用 Blueprint 注册路由

不过上述两种方式都存在着明显的缺点,所有路由都注册到同一个 app 对象上,难以按功能模块分离,导致代码组织混乱;如果是大型项目,路由会分散在各处,而且无法统一为一组相关路由设置 URL 前缀,维护起来很困难。

于是 Flask 引入了 Blueprint 功能,也被称为 蓝图,允许你将相关的路由、视图函数、模板和静态文件组织在一起,形成一个可重用的组件。通过模块化和代码隔离,很好地解决了这些问题。

下面演示下蓝图的基本用法,首先我们创建一个独立的 user.py 文件,内容如下:

# 创建蓝图
from flask import Blueprint
user_bp = Blueprint('user', __name__, url_prefix='/user')

# 定义路由
@user_bp.route('/profile')
def profile():
  return 'user profile'

@user_bp.route('/settings')
def settings():
  return 'user settings'

在这个文件中,我们创建了一个名为 user 的蓝图,并约定了统一的 URL 前缀,然后通过装饰器 @user_bp.route 定义路由。接着我们创建主程序:

# 创建应用
from flask import Flask
app = Flask(__name__)

# 注册蓝图
from user import user_bp
app.register_blueprint(user_bp)

if __name__ == "__main__":
  app.run(host="0.0.0.0", port=6001)

在主程序中,我们导入刚刚的蓝图,并通过 app.register_blueprint() 注册蓝图,这样蓝图中定义的路由就注册好了。

Flask 的 Blueprint 功能允许我们将大型应用拆分成多个模块,每个模块负责处理特定的功能,优势如下:

  • 模块化:将大型应用拆分成多个模块
  • URL 前缀管理:为一组路由添加统一的 URL 前缀
  • 代码复用:相同功能可以在不同应用中重复使用
  • 团队协作:不同开发者可以独立开发不同的 Blueprint

Dify 充分利用了这一特性来组织其复杂的 API 结构。

Dify 的 ext_blueprints 扩展

在 Dify 中,所有的蓝图都在 api/extensions/ext_blueprints.py 文件中统一注册:

def init_app(app: DifyApp):
    
  # 导入所有蓝图
  from controllers.console import bp as console_app_bp     # 管理控制台
  from controllers.web import bp as web_bp                 # Web 应用
  from controllers.service_api import bp as service_api_bp # 服务 API
  from controllers.files import bp as files_bp             # 文件操作
  from controllers.inner_api import bp as inner_api_bp     # 内部 API
  from controllers.mcp import bp as mcp_bp                 # MCP 协议

  # 为不同蓝图配置 CORS 策略
  from flask_cors import CORS
  CORS(service_api_bp,
     allow_headers=["Content-Type", "Authorization", "X-App-Code"])
  CORS(web_bp,
     resources={r"/*": {"origins": dify_config.WEB_API_CORS_ALLOW_ORIGINS}},
     supports_credentials=True)
  CORS(console_app_bp,
     resources={r"/*": {"origins": dify_config.CONSOLE_CORS_ALLOW_ORIGINS}},
     supports_credentials=True)
  
  # 注册所有蓝图到 Flask 应用
  app.register_blueprint(service_api_bp) # /v1/*
  app.register_blueprint(web_bp)         # /api/*
  app.register_blueprint(console_app_bp) # /console/api/*
  app.register_blueprint(files_bp)       # /files/*
  app.register_blueprint(inner_api_bp)   # /inner/api/*
  app.register_blueprint(mcp_bp)         # /mcp/*

可以看出,Dify 将 API 按照使用场景分成了几个主要的蓝图:

  • console:管理控制台的 API,用于应用管理、配置等
  • web:Web 端使用的 API,用于应用展示和交互
  • service_api:对外提供的服务 API,供第三方集成使用
  • files:文件上传下载相关的 API
  • inner_api:内部服务间通信的 API
  • mcp:MCP(Model Context Protocol)相关的 API

每个蓝图都对应自己的 URL 前缀:

dify-blueprints.png

而且 Dify 为不同蓝图配置了不同的 CORS 策略,比如 console 蓝图用于管理控制台 API,只允许管理控制台域名访问;web 蓝图用于前端应用 API,只允许配置的前端域名访问;service_api 蓝图用于第三方开发者 API,无 origins 限制,允许任何域名访问,并支持 X-App-Code 头,用于应用身份验证。这种分层的接口安全设计,同样得益于 Flask Blueprint 的模块化特性。

此外,不同的蓝图还使用了不同的认证策略:

Blueprint认证方式用途示例端点
consoleJWT管理后台操作/console/api/apps
webToken/Session前端应用调用/api/chat-messages
service_apiAPI Key第三方服务调用/v1/chat-messages
files多重认证文件上传下载/files/upload
inner_api内网限制服务间通信/inner/api/health

使用 Flask-RESTX 增强 API 开发体验

虽然 Flask 本身已经很适合构建 API,但 Dify 选择了 Flask-RESTX 这个扩展来进一步增强 API 开发体验。Flask-RESTX 是 Flask-RESTPlus 的社区维护版本,它在 Flask 的基础上提供了更多 RESTful API 开发的便利功能。

Flask-RESTX 的主要特性包括:

  • 自动 API 文档生成:通过装饰器自动生成 Swagger/OpenAPI 文档
  • 请求参数验证:内置参数验证和序列化功能
  • 命名空间管理:支持 API 版本控制和模块化组织
  • 响应模型定义:可以定义标准化的响应格式

使用 Api 注册路由

Flask-RESTX 在 Flask 的基础上,引入了 ResourceApi 的概念:Resource 用于定义 RESTful 资源,可以更方便地创建 RESTful 接口;而 Api 则是对 Flask 的 Blueprint 的扩展,专门用于添加 RESTful 资源类,它能够自动处理 HTTP 方法映射(GET、POST、PUT、DELETE 等),支持自动生成 API 文档和 Swagger UI,支持参数的解析和验证。下面是使用 Api 注册路由的示例代码:

from flask import Blueprint
from flask_restx import Api, Resource

# 创建 Blueprint 和 API
bp = Blueprint('api', __name__, url_prefix='/api')
api = Api(bp, doc='/docs')  # 启用 Swagger UI

# 定义 API 资源
class UserResource(Resource):
  
  def get(self):
    # 获取用户列表
    return []

  def post(self):
    # 创建新用户
    return {"message": "用户创建成功"}, 201

# 注册资源
api.add_resource(UserResource, '/users')

和直接在蓝图中添加路由不同,这里需要先定义 API 资源,它会自动生成资源对应的增删改查 RESTful 接口,然后通过 api.add_resource() 添加资源。而本质上 Api 还是对蓝图进行操作的,因此我们只要在主程序中通过 app.register_blueprint() 注册蓝图即可,和普通的蓝图用法一样。

使用 Namespace 管理路由

Flask-RESTX 的另一大特点是增加了 Namespace 的功能,可以对蓝图下的 API 端点进一步组织和分组。下面的示例演示了 Namespace 的基本用法:

from flask import Blueprint
from flask_restx import Api, Namespace, Resource, fields

# 创建 Blueprint 和 API
bp = Blueprint('api', __name__, url_prefix='/api')
api = Api(bp, doc='/docs')  # 启用 Swagger UI

# 创建命名空间
user_ns = Namespace('users', description='用户管理相关操作')

# 定义数据模型
user_model = user_ns.model('User', {
    'id': fields.Integer(description='用户ID'),
    'name': fields.String(required=True, description='用户名')
})

# 定义 API 资源
@user_ns.route('/')
class UserList(Resource):
    @user_ns.doc('get_users')
    @user_ns.marshal_list_with(user_model)
    def get(self):
        """获取用户列表"""
        return []

    @user_ns.doc('create_user')
    @user_ns.expect(user_model)
    def post(self):
        """创建新用户"""
        return {"message": "用户创建成功"}, 201

# 注册命名空间
api.add_namespace(user_ns, path='/users')

和上面 api.add_resource() 添加资源不同的是,这里是通过 api.add_namespace() 注册命名空间,Api 会将命名空间下的所有路由注册到蓝图中。主程序还是一样,通过 app.register_blueprint() 注册蓝图即可。

此外,我们还可以通过 @user_ns 对接口进行增强,比如定义数据模型,增加参数验证。访问 /api/docs 地址,可以查看 Swagger 页面:

api-docs.png

Dify 如何组织路由

我们上面已经了解到,Dify 将 API 分成了 consolewebservice_apifilesinner_apimcp 六个蓝图,所有的蓝图注册逻辑都位于 api/controllers 目录。让我们以 console 蓝图为例,看看它是如何组织的:

from flask import Blueprint
from libs.external_api import ExternalApi

# 创建 console Blueprint
bp = Blueprint("console", __name__, url_prefix="/console/api")
api = ExternalApi(bp)

# 文件相关 API
api.add_resource(FileApi, "/files/upload")
api.add_resource(FilePreviewApi, "/files/<uuid:file_id>/preview")

# 应用导入相关 API
api.add_resource(AppImportApi, "/apps/imports")
api.add_resource(AppImportConfirmApi, "/apps/imports/<string:import_id>/confirm")

# 导入各个子模块的控制器
from . import admin, apikey, extension, feature, ping, setup, version
# 导入应用相关的控制器
from .app import app, workflow, completion, conversation, message, model_config, statistic
# 导入认证相关的控制器
from .auth import login, oauth, forgot_password
# 导入数据集相关的控制器
from .datasets import datasets, datasets_document, hit_testing

从代码可以看出,console 蓝图都是通过 api.add_resource() 的方式注册路由的。我们再看看 service_api 蓝图:

from flask import Blueprint
from flask_restx import Namespace
from libs.external_api import ExternalApi

# 创建 service_api Blueprint
bp = Blueprint("service_api", __name__, url_prefix="/v1")

api = ExternalApi(
  bp,
  version="1.0",
  title="Service API",
  description="API for application services",
  doc="/docs",  # Enable Swagger UI at /v1/docs
)

# 创建 service_api Namespace
service_api_ns = Namespace("service_api", description="Service operations", path="/")

from . import index
# 导入应用相关的控制器
from .app import annotation, app, audio, completion, conversation, file, file_preview, message, site, workflow
# 导入数据集相关的控制器
from .dataset import dataset, document, hit_testing, metadata, segment, upload_file
# 导入模型相关的控制器
from .workspace import models

# 添加 Namespace
api.add_namespace(service_api_ns)

这里有点奇怪的是,service_apiconsole 不一样,它是通过 api.add_namespace() 方式注册路由的,而且如果你仔细阅读其他几个蓝图的代码会发现,所有的蓝图都使用了 Namespace 特性,唯独 console 是个例外,不清楚是历史遗留问题,还是设计如此,如果是设计如此,又是基于什么考虑的呢?有清楚的朋友,欢迎评论区交流~

小结

今天我们深入学习了 Dify 的路由系统。Dify 基于 Flask 的 Blueprint 功能,将复杂的 API 按照 consolewebservice_api 等不同场景进行模块化拆分。这种设计不仅使代码结构更加清晰,还允许为不同模块配置独立的 CORS 和认证策略,增强了系统的安全性。此外,它还借助 Flask-RESTX 扩展,通过 ApiResourceNamespace 等概念进一步优化了 API 的组织方式,并实现了 API 文档的自动生成和参数验证等功能。

明天我们将接着研究 Dify 的源码,来看看具体的接口实现,先从用户和助手对话时的后端逻辑开始。


学习 Dify 的代码结构

在前面的系列文章中,我们从实用的角度学习了 Dify 的部署方式、应用创建和各种应用类型的使用方法。今天,我们将深入 Dify 的源码,从技术架构的角度来理解这个 LLM 应用开发平台是如何构建的。

目录结构

首先,让我们从 Dify 的源码目录结构开始,了解整个项目的组织方式:

dify/
├── api/                    # 后端 API 服务 (Python Flask)
├── web/                    # 前端 Web 应用 (Next.js 15 + React 19)
├── sdks/                   # 多语言 SDK
│   ├── python-client/      # Python SDK
│   ├── nodejs-client/      # Node.js SDK
│   └── php-client/         # PHP SDK
├── dev/                    # 开发工具脚本
└── docker/                 # Docker 部署配置

从这个目录结构可以看出,Dify 采用了经典的前后端分离架构。后端 API 使用 Python Flask 框架提供 RESTful API 服务,前端 Web 使用 Next.js 构建现代化的用户界面,两者通过 HTTP API 进行通信。此外,Dify 还提供了多语言的 SDK,方便开发者在不同技术栈中集成 Dify 的能力。

后端架构

让我们继续深入后端 api/ 目录,这里是 Dify 的核心逻辑所在:

api/
├── app.py                 # 应用入口文件
├── app_factory.py         # Flask 应用工厂
├── dify_app.py            # 自定义 Flask 应用类
├── controllers/           # 控制器层 (路由处理)
├── services/              # 服务层 (业务逻辑)
├── models/                # 模型层 (数据模型)
├── repositories/          # 仓储层 (数据访问)
├── core/                  # 核心功能模块
├── extensions/            # Flask 扩展
├── libs/                  # 工具库
├── configs/               # 配置管理
├── tasks/                 # 异步任务
├── migrations/            # 数据库迁移
└── tests/                 # 测试代码

很显然 Dify 采用了分层的设计思想,将代码按照功能职责进行了清晰的分层:

  • 控制器层(Controllers):处理 HTTP 请求和响应,负责参数验证和路由分发;
  • 服务层(Services):包含核心业务逻辑,协调各个组件完成复杂的业务操作;
  • 仓储层(Repositories):封装数据访问逻辑,提供统一的数据操作接口;
  • 模型层(Models):定义数据模型和实体关系;

应用启动入口

其中 app.py 文件是 Dify 的启动入口,核心代码如下:

if is_db_command():
  from app_factory import create_migrations_app
  app = create_migrations_app()
else:
  from app_factory import create_app
  app = create_app()
  celery = app.extensions["celery"]

if __name__ == "__main__":
  app.run(host="0.0.0.0", port=5001)

Dify 应用有三种启动模式:

数据库迁移模式

在入门篇里我们学习过,首次运行 Dify 之前,需要运行下面的命令初始化数据库:

$ flask db upgrade

Dify 通过判断命令行参数是否包含 flask db 子命令,如果包含则调用 create_migrations_app() 执行数据库迁移逻辑。

这里使用了 Flask-SQLAlchemyFlask-Migrate 库,其核心是通过 SQLAlchemy 的迁移工具 Alembic 管理数据库结构的版本化变更。在软件开发过程中,我们经常会对数据模型进行修改,这时如果手动写 SQL 同步表结构,极有可能会导致不一致或误操作。而 Flask-Migrate 会自动跟踪数据库模型的变更、生成版本化迁移脚本、执行脚本更新数据库表结构,让数据迁移变得非常容易。

除了 flask db upgrade 命令,Flask-Migrate 还支持很多其他的 flask db 子命令,比如 initmigratedowngrade 等。

API 服务模式

正常情况下我们通过 flask run 运行程序时,Dify 会调用 create_app() 函数来创建 DifyApp 实例(其实就是 Flask 应用实例):

def create_app() -> DifyApp:

  # 1. 创建 Flask 应用并加载配置
  dify_app = DifyApp(__name__)
  dify_app.config.from_mapping(dify_config.model_dump())

  # 2. 初始化扩展系统
  initialize_extensions(app)

  return app

应用创建流程分为两个关键步骤:

  1. 创建 Flask 应用:初始化 DifyApp 实例,并加载配置
  2. 初始化扩展系统:按顺序加载所有功能扩展,这里是程序启动的关键

要注意的是 flask run 一般用于开发调试,在生产环境部署 Dify 时,建议使用 Gunicorn

$ gunicorn \
    --bind "0.0.0.0:5001" \
    --workers 1 \
    --worker-class gevent \
    --worker-connections 10 \
    --timeout 200 \
    app:app

flask run 是为开发阶段设计的轻量级工具,而 gunicorn 是为生产环境优化的专业 WSGI 应用服务器,专门解决生产场景下的高并发、稳定性和资源管理等问题。

gunicorn.png

Celery 任务模式

此外,Dify 支持通过 Celery 处理异步任务和定时任务,它利用 Redis 作为消息中间件,把一些耗时的操作(比如发送邮件、处理图片、数据分析等)放到后台去执行,而不会阻塞主程序的运行。

celery.png

我们可以执行下面的命令启动 Celery 的任务处理服务:

$ celery -A app.celery worker <其他参数>

或执行下面的命令启动 Celery 的定时任务调度器:

$ celery -A app.celery beat <其他参数>

这里的 -A 参数使用 Python 的模块导入机制,app.celery 指向 app.py 模块的 celery 变量,它其实是一个 Celery 应用,通过 ext_celery 扩展系统初始化的。

模块化的扩展系统

Dify 的扩展系统是其架构的一大亮点,通过模块化的方式组织各种功能:

def initialize_extensions(app: DifyApp):

  # 按顺序初始化所有扩展
  extensions = [
    ext_timezone,        # 时区设置
    ext_logging,         # 日志系统
    ext_warnings,        # 警告处理
    ext_import_modules,  # 模块导入
    ext_orjson,         # JSON 序列化
    ext_set_secretkey,   # 密钥设置
    ext_compress,        # 响应压缩
    ext_code_based_extension,  # 代码扩展
    ext_database,        # 数据库连接
    ext_app_metrics,     # 应用指标
    ext_migrate,         # 数据库迁移
    ext_redis,          # Redis 缓存
    ext_storage,        # 文件存储
    ext_celery,         # 异步任务队列
    ext_login,          # 登录认证
    ext_mail,           # 邮件服务
    ext_hosting_provider, # 托管提供商
    ext_sentry,         # 错误监控
    ext_proxy_fix,      # 代理修复
    ext_blueprints,     # 路由蓝图注册
    ext_commands,       # CLI 命令
    ext_otel,           # OpenTelemetry
    ext_request_logging, # 请求日志
  ]

  for ext in extensions:
    is_enabled = ext.is_enabled() if hasattr(ext, "is_enabled") else True
    if not is_enabled:
      continue  # 跳过禁用的扩展
    ext.init_app(app)  # 初始化扩展

扩展系统按照扩展之间的依赖关系顺序执行,确保正确初始化,并支持动态地启用或禁用扩展。

小结

通过对 Dify 代码结构的分析,我们可以看到:

  1. 前后端分离架构:后端使用 Python Flask 提供 API 服务,前端使用 Next.js 构建用户界面
  2. 分层设计思想:控制器、服务、仓储、模型层职责清晰,代码组织良好
  3. 灵活的启动模式:支持数据库迁移、API 服务和 Celery 任务三种运行模式
  4. 模块化扩展系统:通过扩展机制组织各种功能,支持按需加载和禁用

不过细心的读者可能会发现一个问题:在应用启动过程中,我们并没有看到路由注册的相关代码。那么 Dify 是如何处理 HTTP 请求路由的呢?

答案就在扩展系统中的 ext_blueprints 模块。Dify 通过 Flask 的 Blueprint 机制和 Flask-RESTX 的 Namespace 来组织和注册所有的 API 路由,我们明天就来看看这部分内容。


学习 Dify 的工作流和对话流应用

在前面的文章中,我们学习了 Dify 的聊天助手、文本生成和 Agent 应用,这些应用虽然功能强大,但在处理一些复杂业务逻辑时仍有局限性。今天,我们将学习 Dify 的 工作流(Workflow)对话流(Chatflow) 应用,了解如何通过可视化编排构建更加复杂和灵活的 AI 应用。

工作流基本概念

工作流是 Dify 提供的一种可视化应用构建方式,它通过将复杂的任务分解成较小的步骤(节点)来降低系统复杂度,减少了对提示词技术和模型推理能力的依赖,提高了 LLM 应用面向复杂任务的性能,同时提升了系统的可解释性、稳定性和容错性。

Dify 将工作流分为两种类型,每种都有不同的适用场景:

  • Workflow(工作流):面向自动化和批处理情景,比如高质量翻译、数据分析、内容生成、电子邮件自动化等,它的交互特点是单次输入输出,无法进行多轮对话;
  • Chatflow(对话流):面向对话类情景,比如客户服务、语义搜索、需要多步逻辑的对话式应用,它的交互特点是支持多轮对话交互,可以调整生成结果;

Workflow 这个单词翻译成中文是工作流,但是在 Dify 的概念中,工作流指的是 Workflow 和 Chatflow 两种。不过我感觉这种分类容易让人混淆,所以我倾向于把 Workflow 就叫做工作流,Chatflow 叫做对话流。

创建工作流

我们进入 “工作室” 页面,点击 “创建空白应用”,应用类型选择 “工作流”:

workflow.png

我们拿之前学习 Coze Studio 时用过的天气查询的例子,来体验下 Dify 的工作流。填写助手名称,以及可选的图标和描述,进入应用的配置页面:

workflow-nodes.png

可以看出 Dify 和 Coze Studio 工作流差不多,也提供了大量的节点可供使用,包括:

基础节点

  • LLM:调用大语言模型的能力,根据给定的提示词处理广泛的任务类型,比如意图识别、文本生成、内容分类、文本转换、代码生成、RAG、图片理解等;
  • 知识检索:从知识库中检索与用户问题相关的文本内容,可作为下游 LLM 节点的上下文来使用,实现 RAG 问答;
  • 结束:每一个工作流在完整执行后都需要至少一个结束节点,用于输出完整执行的最终结果;若流程中出现条件分叉,则需要定义多个结束节点;
  • Agent:通过集成不同的 Agent 推理策略,使大语言模型能够在运行时动态选择并执行工具,从而实现多步推理;

问题理解

  • 问题分类器:通过大模型对用户输入进行分类,类似于条件分支,只不过是用自然语言来定义分支;

逻辑

  • 条件分支:根据 if 条件表达式将工作流程拆分成多个分支;
  • 迭代:对数组中的元素依次执行相同的操作步骤,直至输出所有结果,可以理解为任务批处理器,支持并行模式;
  • 循环:用于执行依赖前一轮结果的重复任务,直到满足退出条件或达到最大循环次数;

转换

  • 代码执行:支持执行 Python 或 JavaScript 代码,对输入变量进行处理;
  • 模版转换:借助 Jinja2 模板引擎灵活地进行数据转换、文本处理等;
  • 变量聚合器:将多路分支的变量聚合为一个变量,确保无论哪个分支被执行,其结果都能通过一个统一的变量来引用和访问;
  • 文档提取器:解析并读取文件,返回文件内容,支持 TXT、Markdown、PDF、HTML、DOCX 等格式;
  • 变量赋值:将工作流内的变量赋值到会话变量中用于临时存储,并可以在后续对话中持续引用;
  • 参数提取器:利用 LLM 从自然语言推理并提取结构化参数,用于后置的工具调用或 HTTP 请求;

工具

  • HTTP 请求:向指定的网络地址发送定制化的 HTTP 请求,实现与各种外部服务的互联互通;
  • 列表操作:对列表进行过滤、排序、取第 N 项或取前 N 项等操作;
  • Dify 工具:支持调用平台上所有的工具,包括插件工具、自定义工具、工作流工具、MCP 工具等;

相比于 Coze Studio 的工作流,Dify 的工作流少了文本处理(字符串拼接和分割)、知识库写入以及一些数据库相关的组件。

编排工作流

接下来我们开始创建 “天气小助手” 工作流,我们首先在 “开始节点” 上定义输入字段 city 表示要查询的城市名称:

workflow-node-start.png

在 “开始节点” 中,除了我们定义的输入字段外,还有很多内置的系统变量,都以 sys 开头:

  • sys.files:用户上传的文件
  • sys.user_id:用户唯一标识
  • sys.app_id:应用唯一标识
  • sys.workflow_id:工作流标识
  • sys.workflow_run_id:工作流运行标识

这些参数面向具备开发能力的用户,以区分调用工作流的用户和应用或查询工作流或工作流的运行情况。

假设我们要对接 高德的天气查询接口,这个接口通过城市编码查询该城市的天气详情。因此第二步是想办法将用户输入的城市名称转换为城市编码,这可以通过 “代码执行” 节点实现:

workflow-node-code.png

这个节点的输入参数为 city_name 引用自 “开始节点”,输出参数为 city_id,转换的代码如下:

# 城市编码表,可以从高德官网下载
city_list = [
  "北京市,110000",
  "天津市,120000",
  "石家庄市,130100",
  # ...
]
def main(city_name: str) -> dict:
  city_id = '110000'
  for city_info in city_list:
    city_info = city_info.split(',')
    if city_name in city_info[0]:
      city_id = city_info[1]
      break
  return {
    'city_id': city_id
  }

代码写好后,可以点击上面的小三角测试该节点,验证没问题后,接着再在代码节点后面加一个 “HTTP 请求” 节点,调用高德的天气查询接口:

workflow-node-http.png

该节点配置如下:

  • 接口方式 - GET
  • API 地址 - https://restapi.amap.com/v3/weather/weatherInfo
  • 请求参数

    • city - 引用自代码节点的 city_id 参数;
    • extensions - 填写 all,表示返回的天气类型,base 返回实况天气,all 返回预报天气;
    • output - 填写 json 表示希望接口返回 JSON 格式;
    • key - 填写高德 API KEY,可以从高德开放平台免费申请;

这里的 key 我并没有直接填写 API KEY,而是使用了环境变量,可以点击上面的 “ENV” 按钮创建,类型选择 “Secret”:

workflow-env.png

使用环境变量的好处是,可以保护工作流内所涉及的敏感信息,例如运行工作流时所涉及的 API 密钥、数据库密码等。导出工作流时会提示用户,可以剔除掉 “Secret” 类型的变量值。

同样点击小三角验证,测试通过后,在 “HTTP 请求” 节点后再接一个 “LLM” 节点:

workflow-node-llm.png

使用大模型将 HTTP 返回的 JSON 结果转换为自然语言,方便用户查看。最后将 “LLM” 节点和 “结束” 节点连接起来,在结束节点上添加 text 参数,并引用 “LLM” 节点的出参:

workflow-node-end.png

至此,整个工作流搭建完成:

workflow-export.png

可以点击上方的 “运行” 按钮,对整个工作流进行测试:

workflow-output.png

在输出面板上,我们还可以点击 “详情” 查看工作流完整的入参和出参:

workflow-output-detail.png

以及工作流中每个节点的执行情况:

workflow-output-trace.png

调试完成后,点击右上角的发布按钮:

workflow-publish.png

可以将工作流发布为不同形式:

  • 直接运行:通过 Web 界面直接使用
  • 批量运行:支持 CSV 文件批量处理
  • 在 “探索” 中打开:在 “探索” 页面使用
  • API 访问:通过 REST API 调用
  • 发布为工具:这是工作流和其他应用的一大区别,可以将其发布成工具,可以在其他应用中复用

创建对话流

接下来我们再看看对话流。我们进入 “工作室” 页面,点击 “创建空白应用”,应用类型选择 “Chatflow”:

chatflow.png

还是以天气助手为例,填写助手名称,以及可选的图标和描述,进入应用的配置页面:

chatflow-nodes.png

可以看出,对话流和工作流的节点几乎一样,只不过工作流的 “结束” 节点在这里变成了 “直接回复” 节点,它和 “结束” 节点的最大区别在于,“直接回复” 节点可以不作为最终的输出节点,作为流程过程节点时,可以在中间步骤流式输出结果。

编排对话流

由于对话流直接是以对话框的形式交互,因此我们无需在 “开始节点” 定义输入字段:

chatflow-start.png

其实,对话流的 “开始节点” 也可以添加输入字段,如果添加的话,就会在对话框的上方多出一个表单,供用户填写。

和工作流一样,对话流的 “开始节点” 也有很多以 sys 开头的系统变量,相比于工作流,它还多了几个和会话相关的变量:

  • sys.query:用户输入内容
  • sys.dialogue_count:对话的轮次,每轮对话后自动加 1
  • sys.conversation_id:会话唯一标识

除此之外,对话流还多了一个 “会话变量” 的功能,用于临时存储一些信息,比如上下文、用户偏好等,确保在多轮对话中都能够引用该信息。会话变量为可读写变量,我们可以通过 “变量赋值” 节点修改会话变量的内容。

然后再 “开始节点” 后加入一个 “问题分类” 节点:

chatflow-classifier.png

我们添加两个分类,分类 1 为天气查询,分类 2 为其他。天气查询分类后接一个参数提取器:

chatflow-extract.png

该节点的作用是从用户问题中提取出城市名称,这样后面的流程就和之前的工作流一样了,通过代码节点转换城市编码,再通过高德接口查询天气,最后通过大模型输出润色结果,这里不再赘述。

其他分类后接 “LLM” 节点:

chatflow-llm.png

对话流的 “LLM” 节点和工作流也有一点区别,多了一个 “记忆” 选项,可以开启,这样在多轮对话时,大模型就能记住之前的对话内容。

至此,对话版的天气小助手就开发好了:

chatflow-export.png

点击上面的 “预览” 按钮,和小助手进行对话:

chatflow-output.png

调试完成后,点击右上角的发布按钮,可以将对话流发布为不同形式:

  • 直接运行:通过 Web 界面直接使用
  • 嵌入网站:通过 iframe 嵌入到其他网站
  • 在 “探索” 中打开:在 “探索” 页面使用
  • API 访问:通过 REST API 调用

小结

我们今天学习了 Dify 的工作流和对话流应用,通过可视化拖拽的方式连接各个节点,实现复杂的业务逻辑:

  • Workflow(工作流):单次输入输出,适用于自动化和批处理场景;
  • Chatflow(对话流):支持会话变量、对话记忆等功能,更适合多轮交互场景;

至此,我们已经完整体验了 Dify 的 5 种应用类型(聊天助手、文本生成、Agent、工作流、对话流),每种类型都有其独特的优势和适用场景。接下来,我们将进入 Dify 的源码,看看这些应用的实现原理,同时我们还会学习 Dify 的其他高级特性,比如知识库、工具、插件系统等,这些模块为应用提供了更强大的功能,为我们构建企业级 AI 应用提供技术基础。


学习 Dify 的文本生成和 Agent 应用

在前两篇文章中,我们介绍了 Dify 的基本概念和部署方式,并通过创建一个简单的翻译助手初步体验了聊天助手的构建流程。今天,我们将继续学习 Dify 的另外两种应用类型:文本生成应用Agent 应用,了解它们的用法、特点以及应用场景。

文本生成应用

文本生成应用是 Dify 提供的一种专门用于内容创作的应用类型,它专注于根据用户输入生成特定格式的文本内容。与聊天助手的多轮对话不同,文本生成应用采用 单次输入、单次输出 的交互模式,非常适合批量处理和标准化内容生成场景。比如:

  • 内容创作:文章摘要、产品描述、营销文案
  • 文本处理:翻译、分类、情感分析
  • 模板化生成:邮件回复、报告生成、技术文档
  • 批量处理:大规模内容生成和数据处理等

我们进入 “工作室” 页面,点击 “创建空白应用”,应用类型选择 “文本生成应用”:

text-generator-ui.png

我们继续以之前翻译助手为例,填写助手名称,以及可选的图标和描述,进入应用的配置页面:

text-generator-config.png

文本生成应用和聊天助手的配置页面几乎完全一样,整个配置页面也是分为左侧的 编排 和 右侧的 调试与预览 两大块。在编排区域里,同样可以配置 提示词变量知识库视觉 四个选项。

我们在这里添加两个变量:

  • lang - 目标语言,字段类型为下拉选项,选项配置有中文、英文、日文等
  • query - 翻译文本,字段类型为段落

然后在提示词中以 {{lang}}{{query}} 占位符插入变量,完整的提示词如下:

你是一个翻译助手,你的任务是将用户输入翻译成 {{lang}}
用户输入:{{query}}

这里定义的变量将以表单的形式显现在右侧的调试区域,和聊天助手不同的是,文本生成应用的对话页面就只有一个表单,用户填写的表单内容将自动替换提示词中的变量,然后调用大模型输出结果。

使用变量

Dify 变量支持的字段类型包括:文本(string)、段落(string)、下拉选项(string)、数字(number)和复选框(boolean)。

这里不支持文件上传类型,功能其实是很受限的,如果有上传文件的需求,可以考虑使用工作流。

此外,Dify 还支持以 API 扩展的方式创建 基于 API 的变量

dify-variable-api.png

关于 API 变量,我们后面在学习 API 扩展时再作讨论。

批量生成

文本生成应用的一个强大特性是支持 CSV 批量处理,非常适合批量数据处理或内容生产场景。点击右上角的 “发布” 按钮,可以看到 “批量运行” 的选项:

text-generator-publish.png

点击后进入翻译应用,页面有 “Run Once” 和 “Run Batch” 两个标签页:

text-generator-batch-1.png

要使用批量运行功能,需要准备一个 CSV 格式的文件,包含多条要处理的数据,文件内容要满足一定的格式要求,可以下载模版文件填写:

翻译文本,目标语言
"Hello","中文"
"What's your name?","中文"

批量运行结果如下:

text-generator-batch-2.png

运行的结果可以点击下载按钮导出。

和聊天助手的区别

文本生成应用和聊天助手在架构和使用方式上存在显著差异:

特性文本生成应用聊天应用
WebApp 界面表单 + 结果展示聊天对话框
API 端点completion-messageschat-messages
交互模式单次问答多轮对话
上下文保持仅当前会话跨会话持久化
开场白支持不支持支持
批量处理支持 CSV 批量处理不支持

Agent 应用

Agent(智能体)是 Dify 中最强大的应用类型之一,它利用大语言模型的推理能力,能够 自主规划目标、分解任务、调用工具并迭代执行,无需人工干预即可完成复杂任务。

我们进入 “工作室” 页面,点击 “创建空白应用”,应用类型选择 “Agent”:

agent-ui.png

我们创建一个 “生活小能手”,可以调用各种小工具解答用户的各种生活问题,填写应用名称,以及可选的图标和描述,进入应用的配置页面:

agent-config.png

Agent 应用和聊天助手以及文本生成应用的配置页面也没有太多的区别,总的来说有两点不同:

  • 工具配置:可以使用工具扩展 Agent 应用的能力,比如联网搜索或科学计算等;
  • Agent 设置:可以配置 Agent 的工作模式,默认支持 Function Calling 策略;

Dify 很贴心地内置了几个小工具:

  • 语音转写:支持文本转语音(Text to Speech)和语音转文本(Speech to Text);
  • 代码执行:运行一段代码并返回结果;
  • 时间计算:各种时间小工具,比如获取当前时间、计算星期几、时区转换等;
  • 网页抓取:根据网页链接获取网页内容;

我们不妨全都加上:

agent-tools.png

这样一个 “生活小能手” 智能体就做好了:

agent-chat.png

工具配置

值得注意的是,这里语音转写的两个工具需要配置后才能使用,可以点击工具右侧的 “信息和设置” 图标:

agent-tools-tts-1.png

在工具配置页面选择 TTS 或 ASR 模型即可:

agent-tools-tts-2.png

需要提前在设置中的模型供应商里配置对应的模型。

不过经过我的测试,这两个工具配在智能体里其实有点多余。首先对话窗口不支持上传音频文件,因此调不了语音转文本的工具;其次生成的音频文件直接保存在本地文件,也无法直接播放,也无法下载:

agent-chat-2.png

Agent 设置

在 Agent 设置中,Dify 会根据模型特性自动设置 Agent 的推理模式。Dify 在源码中定义了 9 种不同的模型特性:

class ModelFeature(Enum):
  # 单工具调用,模型能够理解工具描述并生成符合格式的工具调用请求
  TOOL_CALL = "tool-call"
  # 多工具调用,模型能够在一次响应中同时调用多个工具
  MULTI_TOOL_CALL = "multi-tool-call"
  # 流式工具调用,在流式响应中实时输出工具调用信息
  STREAM_TOOL_CALL = "stream-tool-call"
  # 思考能力,模型具备思维链推理能力
  AGENT_THOUGHT = "agent-thought"
  # 视觉理解能力,模型支持图像输入和理解
  VISION = "vision"
  # 文档理解能力,模型支持处理和理解文档格式内容
  DOCUMENT = "document"
  # 视频理解能力,模型支持视频内容的处理和分析
  VIDEO = "video"
  # 音频处理能力,模型支持音频内容的理解和处理
  AUDIO = "audio"
  # 结构化输出,模型能够按照指定的 JSON Schema 输出结果
  STRUCTURED_OUTPUT = "structured-output"

当模型支持工具调用能力时,默认使用 Function Calling 模式,拥有效果更佳、更稳定的表现:

agent-mode-function-call.png

对于不支持工具调用的模型系列,Dify 通过 ReAct 推理框架实现类似的效果:

agent-mode-react.png

我们可以在这里对 ReAct 提示词进行适当调整。此外,我们还可以修改最大迭代次数防止死循环,默认 10 次。

文件上传

当模型具备处理图片、文档、视频或音频时,智能体的对话入口会多一个文件上传按钮,比如 Gemini 2.5 Pro 同时支持视觉、文档和音频:

agent-files.png

开启之后,我们就可以对话时处理上传的文件:

agent-files-chat.png

小结

今天我们探索了 Dify 的文本生成和 Agent 应用:

  • 文本生成应用:专注于单次输入输出的内容生成场景,支持批量处理,它不维护对话状态,特别适合标准化内容生产;
  • Agent 应用:通过 Function Calling 或 ReAct 策略实现,支持工具调用和多步骤执行,利用大模型推理能力自主完成复杂任务;

文本生成、Agent 与之前学习的聊天助手这三种应用很相似,都是围绕大模型的调用来实现的,根据不同的业务场景选择合适的应用类型:

  • 简单对话:选择聊天助手
  • 内容生成:选择文本生成应用
  • 复杂任务:选择 Agent 应用

下一篇文章中,我们继续学习 Dify 的工作流应用和对话流应用,了解如何通过可视化编排构建更加复杂的业务逻辑。


使用 Dify 创建你的第一个 AI 应用

上一篇文章中,我们介绍了 Dify 这个开源 LLM 应用开发平台,并演示了两种主要的部署方式。今天,我们将进入实操阶段,学习如何使用 Dify 来构建我们的 AI 应用,Dify 提供了丰富的应用类型和强大的功能,让我们一步步来探索它的核心使用方法。

模型配置

成功部署 Dify 后,首次访问需要创建管理员账户。登录后,你会看到 Dify 的主界面,这里包含了 探索工作室知识库工具 四个核心功能模块。默认进入的是工作室模块:

apps.png

这里是应用开发的核心区域,用于创建和管理 AI 应用,包括工作流、对话流、聊天助手、Agent 和文本生成应用。而探索模块可以体验自己创建的应用,或浏览社区其他用户创建的应用,并支持复制到自己的工作区:

explore.png

知识库和工具是辅助模块,为你的应用添砖加瓦,在知识库模块你可以导入自己的文本数据,为 RAG 应用提供知识支撑,还支持通过 WebHook 实时写入或连接外部知识库:

datasets.png

在工具模块你可以自定义工具、工作流或 MCP 工具,还支持从 Dify 市场下载并安装第三方工具:

tools.png

但在使用这些功能之前,我们需要先在 Dify 中配置模型。点击右上角头像,选择 “设置” 进入设置页面,在左侧菜单中选择 “模型供应商”:

dify-model-providers.png

这里可以看到所有支持的模型供应商,包括国外的 OpenAI、Anthropic、Google Gemini,国内的智谱、DeepSeek、通义千问、月之暗面等 60 多家供应商。Dify 将模型供应商实现成一种插件,因此需要安装。以 OpenAI 为例,在列表中找到 “OpenAI” 并点击 “安装”,安装完成后,点击 “配置”:

dify-model-openai.png

输入你的 API Key 和其他必要信息并点击 “保存” 即可,我们可以为凭据取一个名字,方便后续使用。配置完成后,你就可以在创建应用时选择相应的模型了。

值得注意的是,Dify 使用 PKCS1_OAEP 加密算法安全存储 API 密钥,每个租户都有独立的密钥对,确保数据安全。

根据使用场景,Dify 将模型分为六大类:

  1. 系统推理模型:应用的核心推理引擎,用于对话生成、文本处理等
  2. Embedding 模型:将文本转换为向量表示,用于知识库检索
  3. Rerank 模型:优化检索结果排序,提升 RAG 应用效果
  4. 语音转文字模型:支持语音输入功能
  5. 文字转语音模型:支持将文本输出转换为语音
  6. Moderation 模型:支持内容审查功能

我们可以在 “系统模型设置” 中为每一类选择一个默认模型:

dify-model-system.png

应用的三种创建方式

配置好模型后,我们来创建第一个 AI 应用。Dify 提供了三种创建方式:

1. 从模板创建

初次使用 Dify 时,你可能对于应用创建比较陌生。为了帮助新手用户快速了解在 Dify 上能够构建哪些类型的应用,Dify 提供了丰富的应用模板,涵盖了智能客服、文案写作、数据分析、代码助手等不同的场景,推荐新手从这里快速上手:

apps-template.png

任意选择某个模板,并将其添加到工作区即可。

2. 创建空白应用

如果需要从零开始创建应用,可以选择这项,适用于对 Dify 有一定了解的用户:

apps-empty.png

Dify 提供了 5 种主要的应用类型,每种都适用于不同的场景:

  • 聊天助手(Chatbot):最常见的 AI 应用类型,适合构建对话式的智能助手。它支持多轮对话、上下文记忆,可以用来创建客服机器人、个人助理等;
  • 文本生成(Text Generator):这种应用专注于根据用户输入生成特定格式的文本内容,比如文章摘要、产品描述、邮件回复等。它通常是单次交互,输入提示词后直接输出结果;
  • 智能体(Agent):Dify 的高级功能之一,它可以使用工具,比如调用外部 API、执行代码、搜索网络信息、处理文件等,这使得它能够处理需要多步骤操作的复杂任务;
  • 工作流(Workflow):提供了可视化的节点编排界面,让你可以构建复杂的业务逻辑;
  • 对话流(Chatflow):结合了聊天助手和工作流的优点,在对话式交互的基础上增加了复杂的流程控制能力。

创建应用时,你需要给应用起一个名字,选择合适的图标,或者上传喜爱的图片用作图标,并使用一段清晰的文字描述此应用的用途,以便后续使用。

3. 通过 DSL 文件创建

这种方式不太常用,一般用于导入别人分享的应用,或者将应用从一个环境导入到另一个环境。DSL(Domain Specific Language) 是 Dify 定义的应用配置标准,采用 YAML 格式,文件内容包括应用的基本描述、模型参数、编排配置等信息。

已经创建好的应用可以导出成 DSL 文件,这样可以在另一个环境导入,支持本地文件导入和 URL 导入:

apps-dsl.png

导入 DSL 文件时将校对文件版本号,如果 DSL 版本号差异较大,有可能会出现兼容性问题。

创建你的第一个应用

让我们以创建一个 “翻译小能手” 为例,简单了解下应用构建的基本流程。点击 “创建空白应用”,应用类型选择 “聊天助手”,并填写基本的名称、图标和描述信息,然后进入应用的配置页面:

apps-config.png

整个配置页面可分为两大块:左侧为 编排 区域,右侧为 调试与预览 区域。在编排区域里,可以对聊天助手进行以下配置:

  • 提示词:用于对聊天助手的回复做出一系列指令和约束,提示词中可插入表单变量;
  • 变量:将以表单形式让用户在对话前填写,用户填写的表单内容将自动替换提示词中的变量;
  • 知识库:为聊天助手提供特定领域的知识背景,让其可以回答领域内的问题;
  • 视觉:开启视觉功能将允许模型输入图片,并根据图像内容的理解回答用户问题;

下面的配置项我们暂时不管,对于 “翻译小能手”,我们只需要配置提示词即可:

你是一个翻译助手,你的任务是将用户输入翻译成其他的语言,
如果用户输入是中文,翻译成英文,如果用户输入是英文,则翻译成中文。

Dify 提供了一个 “提示词生成器” 功能,可以对你的提示词进行优化,生成高质量、结构化的提示词:

prompt-generate.png

然后选择合适的大语言模型,就可以在右侧和其进行聊天了:

apps-chat-debug.png

测试通过后,点击 “发布” 按钮对应用进行发布:

apps-publish.png

至此,我们的第一个 AI 应用就开发好了,可以通过下面几种方式来访问该应用:

  • 通过助手的独立页面访问,可以将链接分享给任何人直接使用;
  • 将助手嵌入到你的网站中,通过 iframe 将其放在你的网站中的任意位置;
  • 在 “探索” 页面中访问;
  • 通过 API 调用,将助手的对话能力接入你的服务中;

高级功能配置

我们在调试聊天助手时,可以切换不同的模型,对比模型之间的效果,Dify 为此提供了一个 “多模型调试” 功能:

apps-chat-multi.png

我们可以同时和最多 4 个模型进行会话,非常方便:

apps-chat-multi-4.png

此外,Dify 还提供了一些高级功能来增强应用体验:

  • 对话开场白:设置应用的欢迎词,在对话型应用中,让 AI 主动说第一段话可以拉近与用户间的距离;同时可以预设最多 10 个常见问题供用户选择;
  • 下一步问题建议:设置下一步问题建议可以在每次回复后,根据对话内容推荐 3 条相关问题;
  • 文字转语音:开启后,回复的内容后面会多一个播放按钮,支持自动语音播放;
  • 语音转文字:开启后,对话框后面会多一个录音按钮,支持语音输入;
  • 引用和归属:显示源文档和生成内容的归属部分;
  • 内容审查:可以调用审查 API 或者维护敏感词库来使模型更安全地输出;
  • 标注回复:启用后,将标注用户的回复,以便在用户重复提问时快速响应;

apps-config-adv.png

小结

今天的内容比较简单,主要是熟悉下 Dify 平台的基本使用流程,涵盖以下关键环节:

  • 模型配置:Dify 支持 60+ 主流模型供应商,采用插件化安装,通过 PKCS1_OAEP 加密确保 API 密钥安全。系统将模型分为六大类型,满足从推理到语音转换的全链路需求。
  • 应用创建:提供模板创建、空白创建和 DSL 导入三种方式,覆盖聊天助手、文本生成、智能体、工作流和对话流五种应用类型,适配不同复杂度的业务场景。
  • 功能特性:内置多模型调试、语音转换、内容审查等高级功能,提供从原型验证到生产部署的完整工具链。

篇幅有限,关于 Dify 的应用创建,还有很多细节没有展开,比如,模型插件是如何加载的,聊天助手的各个功能特性是如何实现的,除聊天助手之外还有另四种应用又是如何使用的,我们下期再见。


Dify 快速上手指南

随着 AI 技术的快速发展,构建大语言模型应用已经成为了许多开发者和企业的重要需求。而 Dify 作为一个开源的 LLM 应用开发平台,凭借其直观的界面和强大的功能,正在成为这个领域的佼佼者。最近,Dify 发布了 v2.0.0 beta 版本,这个版本带来了许多重要的改进和新特性,正好趁着这个契机,我们来深入了解下这个平台。

Dify 介绍

Dify 是一个开源的 LLM 应用开发平台,其核心理念是让 AI 应用开发变得更加简单和高效。它不仅提供了可视化的工作流编排,还集成了 RAG 管道、智能体能力、模型管理和可观测性等功能,让开发者能够快速从原型迁移到生产环境。

Dify 一词源自 Define + Modify,指的是定义并持续改进你的 AI 应用,它也可以当做 Do it for you 的缩写。

dify-logo.png

它的核心功能包括:

  • 全面的模型支持:无缝集成包括 OpenAI 与 Anthropic 等数十个商业模型,支持 Hugging Face、OpenRouter 等 MaaS 供应商接口以及任意 OpenAI 兼容的接口,并提供了本地模型推理 Runtime 的支持;
  • 灵活的工作流:在可视化画布上构建和测试强大的 AI 工作流,分为 Chatflow 和 Workflow 两种类型;
  • 直观的提示词编排:用于制作提示词的直观界面,比较模型性能,并为基于聊天的应用添加文本转语音等附加功能;
  • 高质量的 RAG 管道:广泛的 RAG 功能,涵盖从文档摄取到检索的所有内容,支持从 PDF、PPT 和其他常见文档格式中提取文本;
  • 稳健的智能体框架:可以基于 LLM 的 Function Calling 或 ReAct 定义智能体,并为智能体添加自定义工具;
  • 丰富的工具系统:Dify 提供了 50 多个内置工具,如 Google Search、WolframAlpha 等;并支持轻松导入自定义的 API 工具或 MCP 工具;
  • 强大的插件生态:支持模型、工具、Agent 策略、扩展等插件类型,开发者可以根据需要方便的扩展 Dify 的功能;
  • LLMOps:随时间监控和分析应用日志和性能,你可以基于生产数据和注释持续改进提示词、数据集和模型;
  • 后端即服务:Dify 的所有功能都提供相应的 API,因此你可以轻松地将 Dify 集成到你自己的业务逻辑中;

本地部署准备工作

Dify 提供了多种部署方式,最简单的方式是使用 Docker Compose。在开始之前,请确保你的机器满足以下最低系统要求:

  • CPU >= 2 Core
  • RAM >= 4 GiB

首先,我们需要克隆 Dify 的源代码:

$ LATEST_TAG=$(curl -s https://api.github.com/repos/langgenius/dify/releases/latest | jq -r .tag_name)
$ git clone --branch "$LATEST_TAG" https://github.com/langgenius/dify.git

上面的命令会自动获取最新的稳定版本并克隆到本地,我在写这篇文章时最新版本是 v1.8.1。

然后进入 docker 目录:

$ cd dify/docker

这个目录包含了 Dify 部署所需的所有配置文件,其中 docker-compose.yml 文件定义了 Dify 的完整架构,包括以下核心组件:

  • api - Dify API 服务
  • worker - Celery 后台任务处理器
  • worker_beat - Celery 定时任务调度器
  • web - 前端 Web 应用
  • db - PostgreSQL 主数据库
  • redis - Redis 缓存和消息队列
  • sandbox - Dify 代码执行沙盒
  • plugin_daemon - 插件守护进程
  • ssrf_proxy - SSRF 代理 (Squid)
  • nginx - 反向代理和负载均衡器
  • weaviate - Weaviate 向量数据库

这个文件还内置了大量的其他向量数据库组件配置,可以通过不同的 Profile 来切换:

  • qdrant - Qdrant 向量数据库
  • pgvector - pgvector 扩展的 PostgreSQL
  • pgvecto-rs - pgvecto-rs 向量数据库
  • chroma - Chroma 向量数据库
  • milvus - 包括 etcd、minio、milvus 三个组件

    • etcd - Milvus 的 etcd 配置存储
    • minio - Milvus 的 MinIO 对象存储
    • milvus-standalone - Milvus 向量数据库
  • opensearch - 包括 opensearch 和 dashboards 两个组件

    • opensearch - OpenSearch 向量数据库
    • opensearch-dashboards - OpenSearch 仪表板
  • elasticsearch - 包括 elasticsearch 和 kibana 两个组件

    • elasticsearch - Elasticsearch 向量数据库
    • kibana - Kibana 可视化界面

还有不少比较小众的向量数据库,比如 Couchbase、Vastbase、OceanBase、Oracle、OpenGauss、MyScale、Matrixone 等等。除了向量数据库,文件中还有另外两个额外的组件,用户可以择需使用:

  • certbot - SSL 证书自动管理
  • unstructured - 非结构化文档处理

接下来,复制环境配置文件:

$ cp .env.example .env

这个 .env 文件包含了 Dify 运行所需的所有环境变量,包括数据库连接信息、加密密钥、模型配置等。默认配置已经可以正常运行,我们暂时不用改。

Docker Compose 本地部署

然后使用 Docker Compose 启动所有核心服务:

$ docker compose up -d

或者指定特定的 Profile 启动:

$ docker compose --profile qdrant up -d

这个命令会下载所有必要的镜像并启动服务,首次运行可能需要一些时间,因为需要下载各种 Docker 镜像。等待所有容器启动完毕后,如果一切正常,各服务状态如下所示:

docker-status.png

这些服务之间的依赖关系如下图所示:

docker-compose.png

源码安装

除了使用 Dify 官方构建好的 Docker 镜像启动服务之外,我们也可以直接通过源码来启动,不过还是需要 Docker Compose 来安装中间件:

$ cp middleware.env.example middleware.env
$ docker compose -f docker-compose.middleware.yaml up -d

启动成功后各服务状态如下:

docker-status-middleware.png

相比于之前的 Docker Compose 部署,这里少了 nginxwebapiworkerworker-beat 这 5 个服务,因此需要通过源码启动,包括两个后端服务和一个前端服务:

  • API 服务:为前端服务和 API 访问提供 API 请求服务
  • Worker 服务:为数据集处理、工作区、清理等异步任务提供服务
  • Web 服务:启动用于前端页面的 Web 服务

后端服务需要 Python 3.12 版本,可以使用 uv 来创建 Python 虚拟环境:

$ uv venv --python 3.12

前端服务需要 Node.js v22 (LTS) 和 PNPM v10 版本,安装方法参考官方文档:

启动 API 服务

我们首先进入 api 目录:

$ cd api

复制环境变量文件:

$ cp .env.example .env

然后使用 openssl 命令生成一个随机密钥:

$ openssl rand -base64 42

该命令的意思是生成 42 字节的随机数据并用 Base64 进行编码。

将该值替换 .env 文件中的 SECRET_KEY 值:

SECRET_KEY=xxx

也可以使用下面的 awk 命令一句话搞定:

$ awk -v key="$(openssl rand -base64 42)" \
    '/^SECRET_KEY=/ {sub(/=.*/, "=" key)} 1' \
    .env > temp_env && mv temp_env .env

这里的 awk -v key="$(...)" 表示将生成的密钥存储在 key 这个变量中,/^SECRET_KEY=/ 表示匹配以 SECRET_KEY= 开头的行,{sub(/=.*/, "=" key)} 表示将等号后的所有内容替换为新密钥,后面的 1 是 awk 的默认动作,表示打印所有行,然后将输出重定向到临时文件 temp_env,最后成功后再将临时文件重命名为原文件。注意这里不能直接输出到原文件,会导致原文件被清空!

然后运行 uv 命令安装所需依赖:

$ uv sync

在启动前,我们还需要初始化数据库:

$ uv run flask db upgrade

这个命令会按顺序执行 migrations/versions/ 目录下的迁移脚本,更新数据库结构,包括创建或修改表、列、索引等。

最后启动 API 服务:

$ uv run flask run --host 0.0.0.0 --port=5001 --debug

启动 Worker 服务

如果要从队列中消费异步任务,我们还需要启动 Worker 服务:

$ uv run celery -A app.celery worker \
    -P gevent \
    -c 1 \
    --loglevel INFO \
    -Q dataset,generation,mail,ops_trace

Celery 是一个基于 Python 的 分布式任务队列(Distributed Task Queue),主要用于处理异步任务和定时任务。它可以让你把一些耗时的操作(比如发送邮件、处理图片、数据分析等)放到后台去执行,而不会阻塞主程序的运行。Celery 通过 消息中间件(Broker) 来传递任务,常用的有 Redis、RabbitMQ、Amazon SQS 等。

这里的 -Q dataset,generation,mail,ops_trace 表示监听的队列:

  • dataset: 数据集相关任务(数据处理、索引等)
  • generation: 内容生成任务(大模型调用等)
  • mail: 邮件发送任务
  • ops_trace: 操作追踪任务

启动 Web 服务

后端服务启动好之后,我们最后再来启动前端 Web 服务。首先进入 web 目录:

$ cd web

安装依赖:

$ pnpm install --frozen-lockfile

再准备环境变量文件:

$ cp .env.example .env.local

构建 Web 服务:

$ pnpm build

启动 Web 服务:

$ pnpm start

成功启动后,在浏览器中访问 http://localhost:3000/install 设置管理员账户:

dify-install.png

设置管理员账户后,你就可以访问 http://localhost:3000 进入 Dify 的主界面了:

dify-apps.png

小结

从今天起,我们开始学习 Dify 这个开源的 LLM 应用开发平台。今天主要介绍了它的核心功能,并演示了两种主要的部署方式:

  1. Docker Compose 部署:这是最简单快捷的方式,适合快速体验和小规模部署,只需几条命令就能启动包含所有组件的完整 Dify 环境;
  2. 源码安装:提供了更大的灵活性和控制权,适合需要深度定制或开发调试的场景,也方便我们更好地理解 Dify 的架构;

Dify v2 版本发布在即,标志着这个平台正在朝着更加成熟和功能完备的方向发展,非常值得研究。接下来,就让我带领大家一起好好探索下 Dify 的各个功能特性,从源码的角度深入剖析其实现原理。


学习 Chat2Graph 的知识库服务

在前面的系列文章中,我们已经深入学习了 Chat2Graph 的多个核心模块:从智能体的协作机制,到任务规划与执行,再到工作流引擎,以及推理机和工具系统的实现。今天,我们将继续学习 Chat2Graph 的另一个关键组件——知识库服务,探索它如何为智能体系统提供强大的知识管理和检索能力。

知识库为智能体提供持久化的知识存储和检索服务,使得智能体能够基于领域知识回答专业问题,而不仅仅依赖于大模型的训练数据。在 Chat2Graph 中,知识库的设计兼顾了向量检索和图检索两种方式,形成了一套完整的知识管理体系。

知识库基本使用

Chat2Graph 的知识库在使用上分为 全局知识库会话知识库 两个层次:

  • 全局知识库:存储整个智能体系统的基础知识,任何会话都可以获取全局知识库中的知识;
  • 会话知识库:存储与当前会话相关的私有知识,与会话一一对应,只在当前会话中生效,不会被其他会话的知识干扰;

可以在 “知识库管理” 页面对这两种知识库进行管理:

kb-type.png

另外,Chat2Graph 支持两种不同的知识存储和检索方式:

  • 向量知识库:基于 Chroma 设计的向量知识库,检索时根据向量相似度匹配与问题最接近的文档片段;
  • 图知识库:基于 TuGraph 设计的图知识库,检索时在知识图谱中匹配与问题相关的子图与社区摘要;

尽管 Chat2Graph 支持 TuGraph 和 Neo4j 两种图数据库的管理,但是知识库这里暂时只支持 TuGraph。

用户可以通过 .env 环境变量配置来选择使用哪种知识库类型。

配置向量知识库:

KNOWLEDGE_STORE_TYPE=VECTOR

向量知识库默认存储在本地 Chroma 文件中,位于 ~/.chat2graph/knowledge_bases 目录。

配置图知识库:

KNOWLEDGE_STORE_TYPE=GRAPH
# TuGraph 配置
GRAPH_KNOWLEDGE_STORE_HOST=127.0.0.1
GRAPH_KNOWLEDGE_STORE_PORT=17687
GRAPH_KNOWLEDGE_STORE_USERNAME=admin
GRAPH_KNOWLEDGE_STORE_PASSWORD=password

配置好知识库类型后,点击 “全局知识库” 或 “会话知识库”,上传文件:

kb-upload.png

支持 PDF、TXT、XLSX、DOC、DOCX、MD 等文件格式。然后点击 “下一步” 进入数据处理的配置,这块目前很粗糙,就一个分块大小参数:

kb-setting.png

然后点击 “确定” 完成知识的添加。

知识库接口

Chat2Graph 的知识库服务提供了一套完整的 RESTful API:

# 获取所有知识库
GET /knowledgebases/

# 获取指定知识库
GET /knowledgebases/{kb_id}

# 更新知识库信息
PUT /knowledgebases/{kb_id}

# 清空/删除知识库
DELETE /knowledgebases/{kb_id}?drop=true

# 向知识库添加文档
POST /knowledgebases/{kb_id}/files/{file_id}

# 从知识库删除文档
DELETE /knowledgebases/{kb_id}/files/{file_id}

上一节点击 “确实” 后实际上调用的就是 “向知识库添加文档” 这个接口:

@knowledgebases_bp.route("/<string:knowledge_base_id>/files/<string:file_id>", methods=["POST"])
def load_knowledge_with_file_id(knowledge_base_id: str, file_id: str):
  
  manager = KnowledgeBaseManager()
  data: Dict[str, Any] = cast(Dict[str, Any], request.json)

  # 加载文档
  result, message = manager.load_knowledge(
    kb_id=knowledge_base_id,
    file_id=file_id,
    knowledge_config=KnowledgeBaseViewTransformer.deserialize_knowledge_config(
      json.loads(data.get("config", "{}"))
    ),
  )
  return make_response(data=result, message=message)

KnowledgeBaseManager 通过核心服务 KnowledgeBaseService 实现知识的管理。它采用单例模式,负责:

  • 知识库的创建、管理和删除
  • 文件与知识库的映射关系管理
  • 知识的加载、检索和删除
  • 全局知识库和会话知识库的统一管理

知识库服务实现

KnowledgeStore 是一个抽象基类,定义了知识存储的标准接口:

class KnowledgeStore(ABC):

  @abstractmethod
  def load_document(self, file_path: str, config: Optional[KnowledgeConfig]) -> str:
    """加载文档"""

  @abstractmethod
  def delete_document(self, chunk_ids: str) -> None:
    """删除文档"""

  @abstractmethod
  def retrieve(self, query: str) -> List[KnowledgeChunk]:
    """检索知识"""

  @abstractmethod
  def drop(self) -> None:
    """删除整个知识库"""

Chat2Graph 通过 KnowledgeStoreFactory 工厂模式提供了两种实现:

class KnowledgeStoreFactory:

  @classmethod
  def get_or_create(cls, name: str) -> KnowledgeStore:
    if SystemEnv.KNOWLEDGE_STORE_TYPE == KnowledgeStoreType.VECTOR:
      return VectorKnowledgeStore(name)
    elif SystemEnv.KNOWLEDGE_STORE_TYPE == KnowledgeStoreType.GRAPH:
      return GraphKnowledgeStore(name)

有意思的是,这两种都是基于 DB-GPT 框架实现的:

  • 向量知识库:使用 ChromaStore 定义向量存储,使用 EmbeddingAssembler 加载知识,使用 EmbeddingRetriever 检索知识;
  • 图知识库:使用 CommunitySummaryKnowledgeGraph 定义图存储,使用 EmbeddingAssembler 加载知识,使用图存储的 asimilar_search_with_scores() 方法检索知识;

实现都比较简单,这里就不赘述了。具体内容可参考 DB-GPT 的 RAG 文档:

与推理机的集成

知识库服务与推理机系统紧密集成,在算子的执行过程中,会通过知识库服务获取相关知识,注入到推理上下文中:

task = Task(
  job=job,
  operator_config=self._config,
  workflow_messages=merged_workflow_messages,
  tools=rec_tools,
  actions=rec_actions,
  knowledge=self.get_knowledge(job), # 注入知识
  insights=self.get_env_insights(),
  lesson=lesson,
  file_descriptors=file_descriptors,
)

result = await reasoner.infer(task=task)

推理机会将检索到的知识作为上下文信息,帮助大模型生成更准确、更符合领域知识的回答。

与记忆系统的关系

20 世纪 70 年代,管理学家 罗素・艾可夫(Russell L. Ackoff) 等人提出 DIKW 金字塔模型,它将人类对世界的认知过程拆解为四个层层递进的层级,清晰展现了原始素材如何转化为决策能力的逻辑链条。

dikw.png

  1. 数据(Data) 是未经处理的、客观存在的原始符号或事实,不包含任何上下文或意义,仅代表 “是什么”。它是认知的起点,没有价值判断,也无法直接指导决策。
  2. 信息(Information) 是经过加工、赋予上下文和意义的数据,通过 “数据 + 背景” 的组合,回答 “这是什么意思”。它解决了数据的无意义性,让原始素材具备了初步价值。
  3. 知识(Knowledge) 是信息之间建立关联、经过验证并可用于指导实践的结构化体系,它回答 “为什么会这样”“如何应用”。知识源于对信息的归纳、总结和验证,是可复用的经验或规律。
  4. 智慧(Wisdom) 是基于知识进行价值判断、权衡利弊后做出最优决策的能力,它回答 “这样做是否明智”“如何实现长期目标”。智慧是认知的最高层级,需要结合价值观、伦理观和长期视角。

DIKW 模型是信息科学与知识管理领域的核心框架,如今已广泛应用于企业管理、信息技术、教育、科研等多个领域。

Chat2Graph 参考 DIKW 模型,设计了一个分层的记忆架构:

memory-level.png

每一层和 DIKW 的对应关系如下:

dikw-memory.png

注意,目前 Chat2Graph 的记忆系统功能还在建设中,文档中只是介绍了它的设计理念,我们在设计记忆系统时也可以参考之。

分层记忆系统引入了多级的知识抽象,包含三个关键能力:

  1. 知识精练(Knowledge Refinement):原始知识经过逐级的处理、分析、抽象、压缩,形成更高层次的知识;
  2. 知识下钻(Knowledge Drilling):在使用高维知识的同时,还可以按需下钻低维知识,让推理上下文粗中有细;
  3. 知识延拓(Knowledge Expansion):同层级知识关联的构建和召回,通过特定方法丰富知识上下文;

不难看出,知识库服务可以视为记忆系统 L2 层(Lesson)的初步实现,主要存储领域知识和最佳实践。当前 Chat2Graph 只是初步地将 RAG 作为知识库的实现形式,未来随着分层记忆架构的完善,知识库将进一步整合到完整的记忆体系中,形成更加智能的知识管理和利用机制。

此外,除了知识库,环境也可以当做记忆的一部分。环境指的是智能体执行过程中可交互的外部空间,智能体可以通过工具操作感知环境变化,影响环境状态。Chat2Graph 还提出了一个有趣的观点:环境可以被视为当前时刻的外部记忆,而记忆则是历史时刻的环境快照。这种同质性使得环境可以无缝地融入分层记忆模型中,从技术实现角度来看,记忆系统、知识库和环境的架构可以统一。通过工具这座桥梁,打通记忆系统与环境状态,构建智能体的精神世界与外部环境的物理世界的映射关系,即世界知识模型。

小结

通过这篇文章,我们学习了 Chat2Graph 知识库服务的设计与实现,包括全局知识库与会话知识库的分离,基于 DB-GPT 的向量知识库和图知识库实现,以及与记忆系统的关系。知识库服务作为智能体系统的重要组成部分,为智能体提供了强大的知识管理和检索能力。同时,它也是更大的记忆系统的一部分,通过多层级记忆系统的设计,从记忆存储、信息评估、经验提取到洞察生成,记忆系统从原始数据逐步提炼到高层次的智慧洞察,为 Chat2Graph 提供持久化的学习和适应能力,提升系统的整体智能水平。