Fork me on GitHub

2025年9月

盘点 Chat2Graph 中的专家和工具

至此,我们完成了 Chat2Graph 中从用户会话到工具执行的完整链路的学习。让我们先通过一个流程图来回顾 Chat2Graph 的整体运行流程:

chat2graph-flow.png

这个流程处处都体现着图原生的理念,里包含了三个循环:

  1. Chat2Graph 实现了一主动多被动多智能体架构,当用户的问题到来时,Leader 智能体会将任务分配给多个 Expert 智能体执行,这些 Expert 智能体是以图的形式组织的;
  2. 每个智能体由一个工作流实现,也是一个有向无环图,工作流包含多个算子,它们按序执行,等到所有算子都执行完成,整个工作流才算完成;
  3. 算子和推理机相互协作,推理机通过大模型判断下一步的动作和该执行的工具,工具的结果又会返回给推理机判断下一步的动作,直到完成了算子所定义的任务目标为止;

最后,所有智能体全部运行结束,才算完成了一次用户的会话过程。

今天,我们将从专家和工具的角度,详细盘点 Chat2Graph 中都有哪些专家智能体,每个专家都包含哪些算子,每个算子又配置了哪些动作,以及每个动作关联了哪些工具。根据 chat2graph.yml 配置文件,我们可以看出,Chat2Graph 一共内置了 7 个专家,除了我们之前介绍过的 5 大核心专家,还专门为 GAIA 基准测试新增了两个专家。

Design Expert - 建模专家

Design Expert 是知识图谱建模(Schema)专家,专注于根据特定的数据需求设计图的 Schema,清晰定义顶点(Vertices)和边(Edges)的类型、属性及关系。

design-expert.png

它的核心工具包括:

  1. DocumentReader:从指定路径读取并返回文档内容,支持多种文档格式解析,为后续的图建模提供原始数据输入;
  2. SchemaGetter:连接到图数据库并获取其当前的 Schema 信息,用于了解现有的图结构;
  3. VertexLabelAdder:在图数据库中创建新的顶点标签,定义新的实体类型;
  4. EdgeLabelAdder:在图数据库中创建新的边标签,定义新的关系类型;
  5. GraphReachabilityGetter:查询图数据库以获取图的可达性信息,验证图结构的连通性;

Extraction Expert - 导数专家

Extraction Expert 是原始数据提取和数据导入图数据专家,负责根据已定义的 Schema 从原始数据中提取结构化信息,并将其导入到目标图数据库中。

extraction-expert.png

它的核心工具包括:

  1. DataStatusCheck:检查图数据库中当前数据的状态,了解现有数据情况,确保后续数据导入的一致性;
  2. DataImport:将提取的三元组数据导入到图数据库中,支持批量导入和增量更新;

Query Expert - 查询专家

Query Expert 专注于图数据查询,能够理解用户的查询意图,编写精确的图查询语句,并在目标图数据库上执行查询。

query-expert.png

它的核心工具包括:

  1. CypherExecutor:执行 Cypher 查询语句,支持复杂的图遍历和数据检索操作;

Analysis Expert - 分析专家

Analysis Expert 专注于图数据分析和算法应用,能够基于分析目标选择、配置并执行相应的图算法。

analysis-expert.png

这个专家包含大量图分析的工具:

  1. AlgorithmsGetter:获取图数据库支持的算法列表和相关信息;
  2. PageRankExecutor:执行 PageRank 算法,计算图中节点的重要性得分;
  3. BetweennessCentralityExecutor:执行介数中心性算法,识别图中的关键连接节点;
  4. LouvainExecutor:执行 Louvain 社区发现算法,识别图中的社区结构;
  5. LabelPropagationExecutor:执行标签传播算法,进行社区检测;
  6. ShortestPathExecutor:计算图中两点间的最短路径;
  7. NodeSimilarityExecutor:计算节点间的相似性得分;
  8. CommonNeighborsExecutor:分析节点间的共同邻居关系;
  9. KMeansExecutor:执行 K-means 聚类算法;

值得注意的是,这些算法都是基于 Neo4j 的 GDS(Graph Data Science) 库实现的,这是 Neo4j 推出的一套 图数据科学库,专为在 Neo4j 图数据库上执行高效、可扩展的图算法而设计。它允许开发者、数据科学家和分析师通过图结构挖掘隐藏的关系模式(如社区、路径、影响力),广泛应用于推荐系统、欺诈检测、社交网络分析、供应链优化等场景。

因此在部署 Neo4j 时,需要开启 graph-data-science 插件:

$ docker run -d \
  -p 7474:7474 \
  -p 7687:7687 \
  --name neo4j-server \
  --env NEO4J_AUTH=none \
  --env NEO4J_PLUGINS='["apoc", "graph-data-science"]' \
  neo4j:2025.04

Neo4j GDS 内置算法覆盖图分析的主流场景,以下是核心分类及典型应用:

  • 社区检测

    • 典型算法:Louvain、Weakly Connected Components(WCC)
    • 应用场景:社交网络圈子识别、客户分群、欺诈团伙检测
  • 中心性分析

    • 典型算法:PageRank、Betweenness Centrality、Degree Centrality
    • 应用场景:影响力节点识别(如社交网络 KOL)、关键路径分析(如供应链核心节点)
  • 路径分析

    • 典型算法:Shortest Path(最短路径)、All Pairs Shortest Path(APSP)
    • 应用场景:物流路线优化(比如从仓库 A 到门店 B 的最短配送路径)、导航系统
  • 相似性分析

    • 典型算法:Jaccard Similarity、Cosine Similarity
    • 应用场景:推荐系统(比如购买过相似商品的用户)、内容相似度匹配
  • 链接预测

    • 典型算法:Common Neighbors、Preferential Attachment
    • 应用场景:社交网络可能认识的人、客户潜在购买行为预测
  • 图嵌入

    • 典型算法:Node2Vec、FastRP
    • 应用场景:将图节点转换为向量(可输入机器学习模型),用于分类、聚类、推荐的预处理

以社交网络为例,通过 PageRank 算法识别最具影响力的用户:

// 1. 先投影社交网络图(节点:User,关系:FOLLOWS)
CALL gds.graph.project('socialGraph', 'User', 'FOLLOWS');

// 2. 运行 PageRank 算法
CALL gds.pageRank.stream('socialGraph')
YIELD nodeId, score
MATCH (user:User) WHERE id(user) = nodeId
RETURN user.name AS userName, score
ORDER BY score DESC LIMIT 10;

Q&A Expert - 问答专家

Q&A Expert 是通用问答和信息检索专家,具备优先级多源研究能力,优先使用知识库检索作为主要信息源,必要时进行网络研究作为补充。

qa-expert.png

它的核心工具包括:

  1. KnowledgeBaseRetriever:从外部知识库中检索相关文档和信息;
  2. BrowserUsing:使用浏览器进行网络搜索和内容抓取,基于 MCP 协议实现;
  3. FileTool:文件系统操作工具,用于读写本地文件;

其中 BrowserUsing 是一个 MCP 工具,能基于 Playwright 操作浏览器,运行程序时通过 start_mcp_server.sh 脚本启动:

$ npx @playwright/mcp@latest --port 8931

WebSurferExpert - 网络浏览专家

WebSurferExpert 负责所有在线信息获取任务,具备多模态分析能力,可以处理网页中的图像、音频和 PDF 文件。

web-surfer-expert.png

它的核心工具包括:

  1. BrowserTool:基于 browser-use 的浏览器自动化工具,支持网页导航和交互;
  2. UrlDownloaderTool:从 URL 下载文件的工具;
  3. GeminiMultiModalTool:基于 Gemini 的多模态分析工具,支持图像、音频等内容分析;
  4. PageVisionTool:页面视觉分析工具,能够理解网页的视觉布局和内容,它首先通过 browser-use 将网页保存成 PDF,然后调用 Gemini 多模态大模型对其进行分析;
  5. YouTubeTool:YouTube 视频内容分析工具;

DeveloperExpert - 本地开发专家

DeveloperExpert 负责所有本地环境中的操作,包括文件读写、数据处理、代码执行等任务。

developer-expert.png

它的核心工具包括:

  1. CodeExecutorTool:执行 Python 代码的工具,支持动态代码执行;
  2. ShellExecutorTool:执行 Shell 命令的工具;
  3. SpreadsheetTool:处理电子表格文件的工具,比如 Excel、CSV 等;
  4. ZipTool:处理 ZIP 压缩文件的工具,支持压缩和解压操作;

Leader 智能体

除了 Expert 智能体的特定工具之外,Chat2Graph 还有一个 Leader 智能体。

leader-agent.png

它提供了一个系统级的工具:

  1. SystemStatusChecker:查询系统状态的工具,帮助智能体了解当前系统的运行状态,用于更好的推理和决策;

工具的实现

Chat2Graph 中的工具主要分为两种类型:本地工具(LOCAL_TOOLMCP 工具(MCP_TOOL

本地工具是直接在 Agent 运行的 Python 环境中通过函数调用执行的工具。这些工具通过 module_path 指向包含工具实现的 Python 模块。比如下面是文档读取工具的定义:

tools:
  - &document_reader_tool
    name: "DocumentReader"
    module_path: "app.plugin.neo4j.resource.graph_modeling"

其实现位于 app/plugin/neo4j/resource/graph_modeling.py 文件中的 DocumentReader 类:

class DocumentReader(Tool):
  """Tool for analyzing document content."""

  async def read_document(self, file_service: FileService, file_id: str) -> str:
    """Read the document content given the document name and chapter name.

    Args:
      file_id (str): The ID of the file to be used to fetch the doc content.

    Returns:
      The content of the document.
    """
    return file_service.read_file(file_id=file_id)

Chat2Graph 同时支持同步或异步函数,它会自动检测函数类型,并适配调用方式(基于 inspect 库的 iscoroutinefunction() 函数实现):

# execute function call
if inspect.iscoroutinefunction(func):
  result = await func(**func_args)
else:
  result = func(**func_args)

MCP 工具基于 Model Context Protocol 协议,用于与独立的外部进程或服务进行交互。这对于集成非 Python 实现的或需要隔离环境的复杂工具(如 Playwright 浏览器自动化)至关重要。下面是两个 MCP 工具的示例:

  - &browser_tool
    name: "BrowserUsing"
    type: "MCP"
    mcp_transport_config:
      transport_type: "SSE"
      url: "http://localhost:8931/sse"

  - &file_tool
    name: "FileTool"
    type: "MCP"
    mcp_transport_config:
      transport_type: "STDIO"
      command: "npx"
      args: ["@modelcontextprotocol/server-filesystem", "."]

MCP 工具的配置包括:

  • transport_type:支持 STDIOSSEWEBSOCKETSTREAMABLE_HTTP 四种不同的通信协议;
  • command/args:启动外部进程的命令和参数,适用于 STDIO 模式;
  • url:连接外部服务的网络地址,适用于 SSEWEBSOCKETSTREAMABLE_HTTP 模式;

MCP 工具的实现位于 app/core/toolkit/mcp/mcp_connection.py 文件:

class McpConnection(ToolConnection):

  # 调用 MCP 工具
  async def call(self, tool_name: str, **kwargs) -> List[ContentBlock]:
    with self._lock:
      result = await self._session.call_tool(tool_name, kwargs or {})
      return result.content

  # 根据通信协议创建 MCP 连接
  async def connect(self) -> None:
    with self._lock:
      # 建立连接
      if transport_config.transport_type == McpTransportType.STDIO:
        await self._connect_stdio()
      elif transport_config.transport_type == McpTransportType.SSE:
        await self._connect_sse()
      elif transport_config.transport_type == McpTransportType.WEBSOCKET:
        await self._connect_websocket()
      elif transport_config.transport_type == McpTransportType.STREAMABLE_HTTP:
        await self._connect_streamable_http()
      else:
        raise ValueError(f"Unsupported transport type: {transport_config.transport_type}")

      # 初始化会话
      session: ClientSession = cast(ClientSession, self._session)
      await session.initialize()

这段代码基于 MCP 官方的 Python SDK 实现,其用法可参考文档:

小结

今天我们详细盘点了 Chat2Graph 中的所有专家和工具,通过清晰的职责划分,不同专家可完成从图谱建模、数据提取、查询分析、知识问答等不同的任务。此外,还简单讲解了工具的实现原理,支持本地工具和 MCP 工具两种类型,满足不同场景的需求。

Chat2Graph 通过这种分层的 专家-算子-动作-工具 架构,不仅实现了复杂任务的有序执行,还为图原生智能体系统提供了一个完整的工具生态。每个专家都在自己的专业领域内发挥最大价值,通过丰富的工具集合来完成具体的任务执行。


详解 Chat2Graph 的工具系统实现

在上一篇文章中,我们深入分析了 Chat2Graph 中算子与推理机的协作机制,以及双模推理机的设计原理。今天,我们将继续深入源码,从推理机如何调用大模型开始,详细介绍 Chat2Graph 的工具系统实现,包括 ActionToolToolGroupToolkit 等核心概念,以及它们如何通过有向图来优化工具调用的效果。

模型服务

在 Chat2Graph 的架构中,推理机并不直接与大模型交互,而是通过 模型服务(ModelService 这个中间层来实现。模型服务封装了不同 LLM 平台的调用细节,为推理机提供了统一的接口,它是一个抽象基类,定义在 app/core/reasoner/model_service.py 中:

class ModelService(ABC):

  @abstractmethod
  async def generate(
    self,
    sys_prompt: str,
    messages: List[ModelMessage],
    tools: Optional[List[Tool]] = None,
    tool_call_ctx: Optional[ToolCallContext] = None,
  ) -> ModelMessage:
    """生成模型响应"""

  async def call_function(
    self,
    tools: List[Tool],
    model_response_text: str,
    tool_call_ctx: Optional[ToolCallContext] = None,
  ) -> Optional[List[FunctionCallResult]]:
    """基于模型响应调用工具函数"""

模型服务的核心职责包括:

  1. 统一接口:通过 generate() 方法为推理机提供统一的模型调用接口;
  2. 工具调用:通过 call_function() 方法解析模型响应中的工具调用请求并执行;
  3. 服务注入:自动注入系统内部服务到工具参数中,比如 GraphDbServiceKnowledgeBaseService 等;
  4. 错误处理:处理工具调用过程中的各种异常情况;

Chat2Graph 支持三种模型服务的实现:

  • LiteLlmClient - 使用 LiteLLM 库调用大模型,;
  • AiSuiteLlmClient - 使用 AiSuite 库调用大模型,该项目由大佬吴恩达领导开发,旨在提供统一接口来调用不同的 LLM API,简化 AI 模型的调用过程,不过项目不怎么活跃,已不推荐使用;
  • DbgptLlmClient - 使用 DB-GPT 中的 LLMClient 类调用大模型;

可以在 .env 环境变量中切换:

MODEL_PLATFORM_TYPE="LITELLM"

工具调用原理

Chat2Graph 没有使用目前比较流行的 Function Call 或 Tool Call 技术,而是使用纯 Prompt 方式,通过让大模型生成带 <function_call>...</function_call> 这样的标签,实现工具调用。

工具调用的 Prompt 参考 FUNC_CALLING_PROMPT,我对其进行总结如下:

  1. 所有工具调用都必须包裹在 <action> 部分内的 <function_call>...</function_call> 标签中。如需调用多个函数,可包含多个 <function_call> 代码块。
  2. 每个 <function_call> 内部的内容必须是有效的 JSON 对象,且包含三个必填键:

    • "name":待调用函数的名称(字符串类型);
    • "call_objective":调用此函数的简要目的说明(字符串类型);
    • "args":包含该函数所有参数的对象,若无需参数,使用空对象 {}
  3. 对于标准数据类型(如短字符串、数字、布尔值或简单嵌套对象/数组),使用标准 JSON 格式:

    • 键名和字符串必须使用 双引号")包裹;
    • 不得使用末尾逗号;
  4. 对于复杂的、多行参数(如代码、HTML、Markdown),使用 Payload 包装器

    • Payload 包装器起始标记(Start Marker) __PAYLOAD_START__ 开始,并以 结束标记(End Marker) __PAYLOAD_END__ 结束;
    • 对于多行参数,只需将原始内容放置在这两个标记之间即可,无需手动转义;

下面是一个简单的工具调用示例:

<action>
  <function_call>
  {
    "name": "update_user_profile",
    "call_objective": "更新用户姓名和通知设置。",
    "args": {
      "user_id": 123,
      "profile": {
        "name": "Alice",
        "notifications_enabled": true
      }
    }
  }
  </function_call>
</action>

这里比较有意思的一点是 Payload 包装器(Payload Wrapper) 的设计。当 JSON 字符串中包含多行代码或含特殊字符的文本时,需要进行转义处理,比如让大模型生成下面这样的代码片段,不仅可读性差,而且很容易出错:

{
  "code": "def greet(name):\n\t# 这是代码内部的注释\n\tmessage = f\"Hello, {name}! Welcome.\"\n\tprint(message)"
}

使用 Payload 包装器,无需对特殊符号进行转义,大模型的输出更加可控:

<action>
  <function_call>
  {
    "name": "execute_python_code",
    "call_objective": "定义一个向用户打招呼的函数,并调用该函数。",
    "args": {
      "code": __PAYLOAD_START__
def greet(name):
  # 这是代码内部的注释
  # 注意:此处无需添加任何转义字符
  message = f"Hello, {name}! Welcome."
  print(message)

greet("World")
__PAYLOAD_END__
    }
  }
  </function_call>
</action>

然后,模型服务通过 call_function() 方法解析模型响应中的这些标签,找到对应的工具并执行:

async def call_function(
  self,
  tools: List[Tool],
  model_response_text: str,
  tool_call_ctx: Optional[ToolCallContext] = None,
) -> Optional[List[FunctionCallResult]]:

  # 解析模型响应中的 `<function_call>` 标签
  func_calls = self._parse_function_calls(model_response_text)

  func_call_results: List[FunctionCallResult] = []
  for func_tuple, err in func_calls:

    # 根据名称找到可执行的函数
    func_name, call_objective, func_args = func_tuple
    func = self._find_function(func_name, tools)
    
    # 通过反射获取函数签名,根据参数类型注入一些内置的参数或服务
    # 比如 `ToolCallContext`、`GraphDbService`、`KnowledgeBaseService` 等
    sig = inspect.signature(func)
    for param_name, param in sig.parameters.items():
      injection_type_found: bool = False
      param_type: Any = param.annotation

      # 注入 `ToolCallContext` 参数
      if param_type is ToolCallContext:
        func_args[param_name] = tool_call_ctx
        injection_type_found = True
        continue

      # 注入内置服务
      if not injection_type_found:
        if param_type in injection_services_mapping:
          func_args[param_name] = injection_services_mapping[param_type]

    # 调用函数
    if inspect.iscoroutinefunction(func):
      result = await func(**func_args)
    else:
      result = func(**func_args)

    # 封装函数调用结果
    func_call_results.append(
      FunctionCallResult(
        func_name=func_name,
        call_objective=call_objective,
        func_args=func_args,
        status=FunctionCallStatus.SUCCEEDED,
        output=str(result),
      )
    )

  return func_call_results

整个过程比较简单:首先解析 <function_call> 标签,然后根据名称找到可执行的函数并调用,最后返回封装后的调用结果。

这里值得一提的是 Chat2Graph 的服务注入机制,通过自动检测函数签名中的参数类型,允许工具函数自动获取系统内部服务的实例,比如 ToolCallContextGraphDbServiceKnowledgeBaseService 等。例如,一个图数据库查询工具可以这样定义:

def query_graph_database(
  query: str,
  graph_service: GraphDbService
) -> str:
  """查询图数据库
  
  Args:
    query: Cypher 查询语句
    graph_service: 图数据库服务
  """
  return graph_service.execute_query(query)

当 LLM 调用这个工具时,只需要提供 query 参数,graph_service 参数会由系统自动注入。

最后,工具执行的结果会返回给推理机,推理机基于执行结果判断是否完成了算子所定义的任务目标,推理下一步的动作,如果未完成,可能会继续调用其他工具,直到完成为止。整个流程如下所示:

tool-call-flow.png

基于图的工具库设计

那么这些工具是哪里来的呢?答案是 Chat2Graph 的 工具库(Toolkit 模块。工具库是一个有向图,这是 Chat2Graph 作为图原生智能体系统的又一体现,它通过 动作(Action工具(Tool工具组(ToolGroup 构成的图结构来实现智能化的工具推荐和调用。

tool-kit.png
这里对几个概念稍加解释:

  1. 工具(Tool 是工具系统的基本执行单元,包含其名称、功能描述以及具体的执行逻辑;支持 LOCAL_TOOLMCP_TOOL 两种类型的工具,LOCAL_TOOL 指的是一个本地函数,可以是同步或异步函数,Chat2Graph 会自动检测函数类型并适配调用方式;MCP_TOOL 指的是 MCP Server 中的一个工具,一般通过工具组来配置,不会单独配置;
  2. 动作(Action 代表大模型在执行任务过程中的一个状态或决策点,是工具库中的核心节点,它的描述信息有助于大模型理解其代表的意图和功能;一个动作可以关联一个或多个工具,也可以一个都不关联;
  3. 工具组(ToolGroup 是工具的一个逻辑集合,通常代表一组功能相关的工具,比如 MCP Server 就是一个工具组;另外,看代码中有一个 ToolPackage 类,不过还没实现,应该是想将同一个包下的多个函数打包成一组;总之,工具组使得批量注册和管理工具变得更加便捷;

工具库中存在三种类型的边:

  • Next 边:连接 ActionAction,表示动作间的顺序关系,边上可以带权重,表示两个动作之间的关联强度;
  • Call 边:连接 ActionTool,表示动作可以调用的工具,边上也可以带权重,表示动作调用工具的可能性;
  • Group_Has_Tool 边:连接 ToolGroupTool,表示工具的归属关系;

通过这些点和边,这个图不仅精确地定义了不同工具之间的调用关系,还明确了它们之间潜在的执行顺序,相比于传统的工具列表,图能表达的含义要丰富的多。基于图结构,系统能够更智能地向大模型推荐当前情境下可调用的工具及相关的动作,显著提高了推荐的准确率。同时,图的精确性有效约束了大模型选择工具的范围,从而降低了工具调用的不确定性和潜在的错误率。

基于图的工具推荐

工具库的核心优势在于能够基于图结构进行上下文感知的工具推荐。当算子执行时,ToolkitService 会根据算子配置的 actions 列表,在图中进行广度优先搜索(BFS),找到相关的工具和动作,搜索的深度由图遍历的跳数 hops 来控制,关联的强度由阈值 threshold 来决定。

rec_tools, rec_actions = toolkit_service.recommend_tools_actions(
  actions=self._config.actions,
  threshold=self._config.threshold,
  hops=self._config.hops,
)

下面通过一个简单示例来演示 Chat2Graph 的推荐过程。我们看这样一个工具库,它包含四个动作(A1 ~ A4)和四个工具(T1 ~ T4),各个节点之间的关系和权重如下图所示:

tool-kit-rec-0.png

假设现在 A1 是起始点,并且配置的跳数是 1,阈值是 0.6。首先,A1 有一个工具 T1,且权重为 0.9 大于阈值,因此直接保留。然后,通过 BFS 搜索下一个节点,A1 到 A2 之间权重 0.8 大于阈值,因此可以到达,A1 到 A3 之间权重 0.5 小于阈值,所以被丢弃;然后再看 A2 的工具 T2 和 T4,同样的,大于阈值保留,小于阈值丢弃。搜索过程如下:

tool-kit-rec-1.png

搜索结束后仅保留子图,子图中的工具和动作将作为上下文送给推理机进行推理:

tool-kit-rec-2.png

这种基于图的推荐机制带来了显著优势:

  1. 上下文感知:系统能根据当前 Action 的位置,推荐最相关的工具和后续动作;
  2. 减少搜索空间:通过预定义的图结构,有效缩小了大模型选择工具的范围;
  3. 提高准确性:基于权重的推荐机制,确保优先推荐最匹配的工具;
  4. 便于扩展:新增工具只需要在图中添加相应的节点和边,无需修改现有逻辑;

值得注意的是,这里的跳数 hops 默认是 0,阈值 threshold 默认是 0.5,暂时没看到哪里可以配,所以目前的工具推荐逻辑比较简单,算子里配了哪些动作,就推荐哪些动作以及关联的工具。关于工具库的能力还在持续优化中,后面可能会支持工具集的一键注册,以及基于强化学习的工具图谱优化等,大家可以关注项目的后续发展。

小结

今天我们深入分析了 Chat2Graph 工具系统的实现原理。从推理机如何通过 ModelService 调用大模型开始,我们了解了工具调用的完整流程;然后深入探讨了基于有向图的工具系统设计,包括 ActionToolToolGroupToolkit 等核心概念,以及它们如何协作实现智能化的工具推荐。

这种图结构清晰地定义了任务执行过程中的不同阶段、每个阶段可用的工具,以及它们之间潜在的转换路径,从而为任务执行提供了明确的流程引导。

关于工具系统的原理我们就学到这,不过 Chat2Graph 到底有哪些工具,每个工具又是做什么的,我们目前还不太清楚,明天就来看看这块。


详解 Chat2Graph 的推理机实现

经过昨天的学习,我们了解了智能体的三大核心组件:角色、推理机和工作流。角色用于对智能体的专业能力、任务范围和操作限制进行描述,帮助 Leader 更好地分配任务;工作流则是通过将多个算子按照 DAG 结构组织,形成标准化的任务处理流程;而推理机是整个系统的核心,它通过调用大模型并执行推理任务,承担着理解任务指令、生成响应内容、调用外部工具以及进行复杂推理等职责。

Chat2Graph 的推理机和工作流中的算子是密不可分的,今天,我们将继续深入源码,从算子的配置和运行逻辑开始,详细分析推理机的实现机制。

算子的配置

我们知道,工作流由多个算子组成,每个算子承担着特定阶段的任务处理,Chat2Graph 的算子和 AWEL 中的基础算子不同,Chat2Graph 中的每个算子都是由推理机驱动的。在构建工作流时,我们会对算子执行初始化:

operator = (
  OperatorWrapper()
  .instruction(op_config.instruction)
  .output_schema(op_config.output_schema)
  .actions(operator_actions)
  .build()
)

可以看到算子的配置包括 instructionoutput_schemaactions 三个字段,这些信息都是供推理机使用的:

  • instruction - 给推理机的指令,用于描述算子的角色、需要完成的任务目标、执行任务时的注意事项以及期望的输出风格等;推理机会根据这个指令来理解任务并规划执行步骤;
  • output_schema - 定义算子执行完成后期望输出内容的格式和结构,可以是 YAML、JSON 格式或者自然语言的字符串描述,用于指导推理机生成符合预期的结构化输出;
  • actions - 算子在执行任务时可以调用的一系列动作,算子会基于这些动作,通过 ToolkitService 获取推荐的工具和更具体的动作,供推理机选择和执行;

昨天我们提到,“建模专家” 的工作流包含两个算子:文档分析(analysis_operator)和概念建模(concept_modeling_operator),我们可以看下这两个算子在 chat2graph.yml 文件中的配置:

operators:
  - &analysis_operator
    instruction: |
      你是一名专业的文档分析专家,擅长从文档中提取关键信息,为构建知识图谱奠定坚实基础。
      你需要理解文档内容。请注意,你所分析的文档可能只是完整集合的一部分,这要求你能从局部细节推断出整体情况。
      请注意,你的任务不是操作图数据库。你的任务是对文档进行分析,为后续的知识图谱建模提供重要信息。
      请确保你的分析全面且详细,并为每个结论提供充分的推理依据。
    output_schema: |
      **领域**: 对文档领域的描述,有助于后续的建模和数据提取。
      **数据全景视图**:对文档内整体数据的详细评估,包括数据结构、规模、实体关系等,并提供推理和依据。
      **概念**:已识别的关键概念列表,每个概念包含名称、描述和重要性。
      **属性**:为概念识别出的属性列表,每个属性包括其关联的概念、名称、描述和数据类型。
      **潜在关系**:已识别的潜在关系列表,每个关系包括其类型、涉及的实体、描述和强度。
      **文档洞察**:该文档特有的其他重要信息或发现,用分号分隔。例如,对文档中提到的特定事件或概念的独特解读。
      **文档片段**:文档中用于支持分析结论并提供上下文的关键片段。可以是直接引语或重要段落。
    actions:
      - *content_understanding_action
      - *deep_recognition_action

  - &concept_modeling_operator
    instruction: |
      你是一位知识图谱建模专家,擅长将概念和关系转化为图数据库模式。
      你需要基于文档分析的结果完成概念建模任务,同时确保图模型的正确性和可达性。

      1. 模式生成
      使用 `graph_schema_creator` 函数生成模式,为顶点和边创建特定的模式。你不能直接编写 Cypher 语句,而是使用提供的工具函数与数据库进行交互。
      请注意:模式生成不是关于向数据库中插入特定数据(如节点、关系),而是定义图数据库的结构(模式/标签)。期望是定义实体类型、关系类型、约束等内容。
      该任务的背景是知识图谱,因此应关注相对通用的实体类型,而非特定的个体实体。例如,考虑时间、抽象概念、物理实体和社会实体等主要维度。
      你需要多次阅读现有的 TuGraph 模式,以确保使用该工具创建的模式符合预期。

      2. 验证图的可达性
      可达性是图数据库的核心特性之一,它确保图中的实体和关系之间存在有效的连接路径,以支持复杂的查询需求。这在图建模中很重要,因为如果图不可达,就无法构建完整的知识图谱。
      通过查询图数据库以检索图的结构信息,来验证实体和关系的可达性。
    output_schema: |
      **图模式可达性**:可达性分析结果,描述图中实体与关系之间的连接路径。
      **状态**:模式状态,指示其是否通过验证。
      **实体标签**:成功创建的实体标签列表,例如 “Person(人)”、“Organization(组织)”。
      **关系标签**:成功创建的关系标签列表,例如 “WorksAt(就职于)”、“LocatedIn(位于)”。
    actions:
      - *content_understanding_action
      - *deep_recognition_action
      - *entity_type_definition_action
      - *relation_type_definition_action
      - *schema_design_and_import_action
      - *graph_validation_action

注意这里的 YAML 语法,使用 & 表示引用,在定义专家时,可以使用 * 表示引用的内容:

experts:
  - profile:
      name: "Design Expert"
    workflow:
      - [*analysis_operator, *concept_modeling_operator]

算子的运行逻辑

让我们深入 app/core/workflow/operator.py 文件,了解算子的执行逻辑:

class Operator:

  async def execute(
    self,
    reasoner: Reasoner,
    job: Job,
    workflow_messages: Optional[List[WorkflowMessage]] = None,
    previous_expert_outputs: Optional[List[WorkflowMessage]] = None,
    lesson: Optional[str] = None,
  ) -> WorkflowMessage:
    
    # 1. 构建 Task 对象
    task = self._build_task(
      job=job,
      workflow_messages=workflow_messages,
      previous_expert_outputs=previous_expert_outputs,
      lesson=lesson,
    )

    # 2. 通过推理机进行推理
    result = await reasoner.infer(task=task)

    # 3. 返回工作流消息
    return WorkflowMessage(payload={"scratchpad": result}, job_id=job.id)

算子的执行过程比较简单,可以分为三个核心步骤:

  1. 构建任务对象:将原始的 Job 信息、前置算子的输出、相关工具和动作等整合成一个完整的 Task 对象,具体的整合步骤包括:

    • 获取推荐工具和动作:通过 ToolkitService 根据算子配置中的 actions、相似度阈值和图中的跳数,推荐相关的工具和动作;
    • 整合上下文信息:合并来自前置算子和专家的输出消息,形成完整的上下文;
    • 检索知识信息:从知识库中获取与任务目标相关的知识;
    • 获取文件资源:提取任务相关的文件描述符,便于推理机访问文件内容;
    • 包含经验教训:如果存在来自智能体的反馈,也会包含在任务中指导执行;
  2. 调用推理机推理:将构建好的 Task 对象交给推理机处理,这是算子执行的核心环节;
  3. 返回工作流消息:将推理机的执行结果封装为 WorkflowMessage 返回,供后续算子使用;

算子和推理机的关系

从算子的执行流程可以看出,算子和推理机之间是一种明确的协作关系:

  • 算子是调度者:负责收集和整理任务执行所需的所有资源和上下文信息,包括工具、动作、知识、文件、经验教训等;
  • 推理机是执行者:接收算子构建的 Task 对象,基于大语言模型的推理能力,理解任务指令、选择合适的工具、执行相应的动作,并生成最终结果。

operator-execute.png

这种分工让系统具备了良好的可扩展性。算子专注于任务的资源准备和流程编排,而推理机专注于智能推理和工具调用。两者通过标准化的 Task 对象进行交互,实现了松耦合的设计。

推理机的定义

Chat2Graph 的推理机模块是与大语言模型交互的核心,其职责在于处理提示词,执行推理任务,并提供工具调用的能力。它封装了底层大模型的调用细节,为算子提供统一的推理接口。推理机的基类定义在 app/core/reasoner/reasoner.py 中:

class Reasoner(ABC):
  
  @abstractmethod
  async def infer(self, task: Task) -> str:
    # 通过推理机进行推理

  @abstractmethod
  async def conclude(self, reasoner_memory: ReasonerMemory) -> str:
    # 总结推理结果

推理机的核心职责包括:

  1. 任务理解:解析算子传递的 Task 对象,理解任务目标、上下文信息和可用资源;
  2. 推理规划:基于任务要求制定执行计划,选择合适的工具和动作;
  3. 工具调用:与外部工具进行交互,获取必要的信息或执行特定操作;
  4. 结果生成:综合推理过程和工具调用结果,生成最终的任务输出;

Chat2Graph 提供了两种推理机实现:单模推理机(MonoModelReasoner)双模推理机(DualModelReasoner),以适应不同场景下的推理需求。我们可以在 chat2graph.yml 配置文件中切换:

reasoner:
  type: "DUAL" # MONO 单模,DUAL 双模

单模推理机

单模推理机依赖于单个大模型实例来完成所有任务处理阶段,包括理解用户指令、进行思考、选择必要的工具,并最终生成回复或执行动作。它的实现如下:

class MonoModelReasoner(Reasoner):

  async def infer(self, task: Task) -> str:
    
    # 设置系统提示词
    sys_prompt = self._format_system_prompt(task=task)
    
    # 初始消息,触发推理过程
    init_message = ModelMessage(
      source_type=MessageSourceType.MODEL,
      payload=(
        "<deep_thinking>\nLet's start to complete the task.\n</deep_thinking>\n"
        "<action>\nEmpty\n</action>\n"
      ),
      job_id=task.job.id,
      step=1,
    )
    reasoner_memory.add_message(init_message)

    # 不断调用大模型生成响应
    for _ in range(max_reasoning_rounds):
      response = await self._model.generate(
        sys_prompt=sys_prompt,
        messages=reasoner_memory.get_messages(),
        tools=task.tools,
        tool_call_ctx=task.get_tool_call_ctx(),
      )
      response.set_source_type(MessageSourceType.MODEL)
      reasoner_memory.add_message(response)

      # 判断任务是否完成
      if self.stopped(response):
        break

    # 提取最终的推理结果
    return await self.conclude(reasoner_memory=reasoner_memory)

单模推理机的工作方式相对直观:

  1. 系统提示词:结合算子的配置和任务上下文等信息构建系统提示词;
  2. 初始化推理:创建一个固定的初始消息,包含基本的思考和行动框架;
  3. 循环推理:在最大推理轮数内不断调用模型生成响应;
  4. 结果判断:检查模型是否完成任务,通过 stopped() 方法检查是否包含 <deliverable> 标签;
  5. 结论生成:从 <deliverable> 标签中提取最终的推理结果;

单模推理机的核心在于 MONO_PROMPT_TEMPLATE 这个提示词模板,这个提示词比较长,我们主要关注几点:

  • 元认知框架:要求大模型使用元认知框架和自己对话,进行深度思考 (<deep_thinking>),展示其推理过程和深度;
  • 思考与行动一体:在深度思考之后,在 <action> 部分执行具体的行动,包括文本生成、分析或调用外部工具;
  • 终止条件与交付:当大模型判断任务已解决时,必须使用 TASK_DONE<deliverable> 标签来标记任务完成;

元认知(Metacognition) 是心理学和教育学领域的一个概念,简单来说,它指的是 对认知的认知,也就是我们主动监控、评估、调节自己思考过程的能力。

单模推理机旨在使单个大模型能够以自给自足的方式处理复杂任务,它的主要优点在于配置简单直观,推理链路相对简短。但在处理需要强大推理能力或复杂能力组合的任务时,可能会表现出性能不足。

双模推理机

心理学家卡尼曼提出,人类大脑中存在两套系统:系统1系统2。系统1是无意识的、快速的、直观的,而系统2则是有意识的、缓慢的、需要付出心理努力的,这两套系统在我们日常生活中相互作用,共同影响着我们的思考、决策和行为。

system1-system2.png

双模推理机模拟了人类的快思考和慢思考系统,采用两个大模型实例协同工作的模式。让我们看看它的实现:

class DualModelReasoner(Reasoner):

  async def infer(self, task: Task) -> str:
    
    # 设置 Thinker 和 Actor 的系统提示词
    actor_sys_prompt = self._format_actor_sys_prompt(task=task)
    thinker_sys_prompt = self._format_thinker_sys_prompt(task=task)
    
    # 交替调用大模型生成响应
    for _ in range(max_reasoning_rounds):
      # Thinker 思考
      response = await self._thinker_model.generate(
        sys_prompt=thinker_sys_prompt,
        messages=reasoner_memory.get_messages(),
        tool_call_ctx=task.get_tool_call_ctx(),
      )
      response.set_source_type(MessageSourceType.THINKER)
      reasoner_memory.add_message(response)

      # Actor 执行
      response = await self._actor_model.generate(
        sys_prompt=actor_sys_prompt,
        messages=reasoner_memory.get_messages(),
        tools=task.tools,
        tool_call_ctx=task.get_tool_call_ctx(),
      )
      response.set_source_type(MessageSourceType.ACTOR)
      reasoner_memory.add_message(response)

      # 判断任务是否完成
      if self.stopped(response):
        break

    # 提取最终的推理结果
    return await self.conclude(reasoner_memory=reasoner_memory)

双模推理机和单模推理机的代码结构几乎是一样的,只是在每轮推理时调用了两次大模型,分别扮演两个不同的角色:

  • Thinker(思考者):负责理解复杂的用户意图、分解任务、制定计划,并在需要时决定调用哪个工具或子任务;
  • Actor(执行者):负责接收并执行 Thinker 的指令,擅长快速思考,专注于工具调用和格式化输出;

dual-reasoner.png

双模推理机为 Thinker 和 Actor 分别设计了不同的提示词模板。

Thinker 提示词(THINKER_PROMPT_TEMPLATE)的要点如下:

  • 元认知框架:同样要求大模型使用元认知框架,使用 <deep_thinking> 标签进行深度思考,结合思考模式标记来展现其推理过程;
  • 指令生成:主要输出是为 Actor 生成清晰、具体的指令 (<instruction>) 和必要的输入 (<input>);
  • 结果评估:负责评估 Actor 的执行结果,包括工具调用的结果,基于此进行下一步规划;
  • 禁止行为:不应自己执行工具调用或生成最终交付成果;

Actor 提示词(ACTOR_PROMPT_TEMPLATE)的要点如下:

  • 指令执行:接收并执行 Thinker 提供的指令和输入;
  • 浅层思考:使用 <shallow_thinking> 进行浅层思考,主要聚焦于如何准确执行当前指令;
  • 动作执行:在 <action> 部分执行具体操作,包括生成文本、分析或调用工具;
  • 最终交付:当 Thinker 发出 TASK_DONE 指令后,负责整合信息并生成 <deliverable> 最终交付物;

下面是我在任务执行时抓的一次请求:

dual-reasoner-log.png

可以看到,Thinker 和 Actor 就像人一样,你一言我一语地交叉式对话,直到 Thinker 认为任务已经完成,发出 TASK_DONE 指令。当 Actor 收到该指令后,就会通过 <deliverable> 标签生成最终结果,完成推理。

CoA 多智能体范式

Chat2Graph 的双模推理机设计受到了 OPPO 提出的 Chain-of-Agents(CoA) 范式的启发。CoA 是一种将多智能体协作能力融入单一模型的创新方法,通过 单一模型模拟多智能体协作 来解决传统多智能体系统效率低、泛化弱、数据驱动学习缺失等问题。论文地址:

这是一种端到端的训练方法,在单一模型内动态激活,模拟多智能体协作,通过 智能体监督微调(Agentic SFT)智能体强化学习(Agentic RL) 两个阶段,训练出 智能体基础模型(Agent Foundation Models, AFMs),在网页搜索、代码生成、数学推理三大领域的近 20 个基准测试中刷新了 SOTA 水平。

与传统的智能体范式相比,CoA 的优势如下表所示:

agent-methods.png

前两种范式用于单智能体:ReAct 基于 Prompt 实现,是最早也是最简单的智能体实现,但是这种方式不是端到端的,没办法训练,于是提出了 TIR 工具集成推理。

后两种范式用于多智能体:传统的 MAS 框架和 ReAct 一样,也很难训练,无法通过数据驱动学习,CoA 方法的提出就是弥补这块的缺口。

CoA 将智能体划分为 角色智能体工具智能体,这和之前介绍的 LLM-Agent-UMF 框架将智能体划分为 主动核心智能体被动核心智能体 有着异曲同工之妙:

coa-agents.png

CoA 和 TIR 很像,只不过两者推理的轨迹不同,TIR 推理轨迹是工具的调用,而 CoA 的轨迹是不同智能体的激活:

tir-vs-coa.png

Chat2Graph 的双模推理机借鉴了 CoA 的核心思想,但做了适合系统架构的简化:比如 Thinker 承担了 CoA 中思考智能体、规划智能体和反思智能体的职责,负责深度分析、任务规划和结果评估;而 Actor 类似于 CoA 中的各种工具智能体,专注于执行具体的工具调用和操作;通过 Thinker 和 Actor 之间的交替对话,实现了推理状态的动态转移和智能体角色的切换;但是与 CoA 端到端优化的训练方法不同,Chat2Graph 通过提示词工程和推理机架构设计,在推理时实现了多智能体协作的效果。

小结

今天,我们深入分析了 Chat2Graph 中算子与推理机的协作机制,算子作为调度者负责收集和整理任务资源,推理机作为执行者基于大语言模型的能力完成智能推理和工具调用,两者通过标准化的 Task 对象实现了松耦合的协作。

特别值得关注的是 Chat2Graph 的双模推理机设计,它借鉴了 OPPO 提出的 Chain-of-Agents 范式,通过 Thinker 和 Actor 两个角色的协作,模拟了人类的快慢思考系统。Thinker 负责深度分析和规划,Actor 专注于执行和工具调用,这种分工让系统在处理复杂任务时展现出了更强的推理能力和更好的可解释性。

相比单模推理机的简洁直观,双模推理机虽然增加了一定的复杂度,但通过角色专业化和任务分工,能够在复杂场景下提供更优的推理质量。这种设计为智能体系统的推理能力提升提供了新的思路和实践路径。

在下一篇文章中,我们将继续深入 Chat2Graph 的工具系统,了解它如何实现智能体与外部世界的交互,以及如何通过有向图来优化工具调用的效果。


详解 Chat2Graph 的工作流实现

在前一篇文章中,我们深入分析了 Chat2Graph 中 Leader 智能体的任务分解与执行机制,了解了基于 DAG 的子任务图规划器和状态驱动的容错机制。今天,我们将继续深入源码,从 Expert 接受子任务开始,详细剖析智能体的初始化过程和工作流的实现原理。

Expert 智能体的实现

我们昨天学到,当 Leader 完成任务分解后,子任务会被分配给对应的 Expert 智能体执行。Expert 智能体的实现位于 app/core/agent/expert.py 文件:

class Expert(Agent):
  
  # 调用 Expert 智能体执行子任务
  def execute(self, agent_message: AgentMessage, retry_count: int = 0) -> AgentMessage:
    
    # 更新任务状态为 `RUNNING`
    job_result.status = JobStatus.RUNNING
    self._job_service.save_job_result(job_result=job_result)

    # 获取前置 Expert 的输出和经验教训
    workflow_messages: List[WorkflowMessage] = agent_message.get_workflow_messages()

    # 执行 Expert 智能体的工作流
    workflow_message: WorkflowMessage = self._workflow.execute(
      job=job,
      reasoner=self._reasoner,
      workflow_messages=workflow_messages,
      lesson=agent_message.get_lesson(),
    )

    # 检查执行状态
    self._message_service.save_message(message=workflow_message)
    if workflow_message.status == WorkflowStatus.SUCCESS:
      # 执行成功,保存结果,更新任务状态
      return expert_message
    if workflow_message.status == WorkflowStatus.EXECUTION_ERROR:
      # 执行出错,基于失败经验重试一次
      lesson = workflow_message.evaluation + "\n" + workflow_message.lesson
      agent_message.add_lesson(lesson)
      return self.execute(agent_message=agent_message, retry_count=retry_count + 1)
    if workflow_message.status == WorkflowStatus.INPUT_DATA_ERROR:
      # 输入数据错误,通知 Leader 重新执行前置任务
      lesson = "The output data is not valid"
      expert_message = self.save_output_agent_message(
        job=job, workflow_message=workflow_message, lesson=lesson
      )
      return expert_message
    if workflow_message.status == WorkflowStatus.JOB_TOO_COMPLICATED_ERROR:
      # 子任务过于复杂,通知 Leader 重新分解任务
      lesson = "The job is too complicated to be executed by the expert"
      expert_message = self.save_output_agent_message(
        job=job, workflow_message=workflow_message, lesson=lesson
      )
      return expert_message
    raise Exception("The workflow status is not defined.")

Expert 的执行和 Leader 的执行非常类似,都是先将状态置为 RUNNING,然后获取前置 Expert 的输出和经验教训,调用该智能体的工作流,最后根据不同的返回状态进行相应的处理。当返回 EXECUTION_ERROR 时,Expert 智能体会基于失败经验重试;当返回 INPUT_DATA_ERROR 时,通知 Leader 重新执行前置任务;当返回 JOB_TOO_COMPLICATED_ERROR 时,通知 Leader 重新分解任务。我们昨天已经详细学习过这些状态,此处不再赘述。

智能体的核心组成

从 Leader 智能体和 Expert 智能体的执行过程可以看出,其本质都是调用其内置的工作流实现的。在学习 chat2graph.yml 配置文件时,我们曾介绍过,Chat2Graph 的智能体由 角色(Profile)推理机(Reasoner)工作流(Workflow) 三个核心组件构成。下面是 5 位专家的详细配置:

experts:
  - profile:
      name: "Design Expert"
      desc: |
        他是一位知识图谱建模(模式)专家。
        他的任务是根据特定的数据需求设计图谱的模式,清晰定义顶点(Vertices)和边(Edges)的类型、属性及关系。同时,他负责在图谱数据中创建/更新模式。
        他只能为特定的图数据库实例创建或修改数据结构(模式)。
        他的输出是清晰的模式定义,供后续数据导入使用。**他本身不处理具体数据(增删改查),也从不回答关于图数据库产品或技术本身的一般性介绍或询问。**
    reasoner:
      actor_name: "Design Expert"
      thinker_name: "Design Expert"
    workflow:
      - [*analysis_operator, *concept_modeling_operator]

  - profile:
      name: "Extraction Expert"
      desc: |
        他是一位原始数据提取与图数据导入专家。
        其前提条件是:目标图数据库中必须已存在图模式,且该模式已定义好节点和边的标签(无论其是否采用弱模式;否则,该专家无法执行任务),同时必须指定明确的原始数据源(例如用户提供的文档、文件、数据库表、待处理文本等),以便进行处理并导入到特定的图数据库实例中。
        他的任务是:1. 根据已定义的模式从原始数据中提取结构化信息。2. 将提取到的信息导入到目标图数据库中。
        他会输出数据导入过程的摘要或状态报告。**他不负责设计模式或执行查询分析,也绝不会提供有关图数据库技术或产品的一般性介绍。**
    reasoner:
      actor_name: "Extraction Expert"
      thinker_name: "Extraction Expert"
    workflow:
      - [*data_importation_operator]

  - profile:
      name: "Query Expert"
      desc: |
        他是一位图数据查询专家。
        假设有一个具有现有数据和已知结构的特定图数据库实例,需要执行精确的查询来检索特定的数据点或关系。
        他的任务是:1. 理解用户的具体查询意图。2. 编写精确的图查询语句。3. 在目标图数据库上执行查询。
        他将返回从查询中获得的具体数据结果。**他不进行复杂的图算法分析,不负责数据建模或导入,并且绝对不回答关于图数据库概念、产品或技术本身的一般性问题。**
    reasoner:
      actor_name: "Query Expert"
      thinker_name: "Query Expert"
    workflow:
      - [*query_design_operator]

  - profile:
      name: "Analysis Expert"
      desc: |
        他是一位图数据分析与算法应用专家。
        假设有一个特定的图数据库实例,其中包含现有的结构化数据,需要进行超出简单查询的复杂网络分析(如社区检测、中心性计算等)。
        他的任务是:根据分析目标,在目标图数据库上选择、配置并执行相应的图算法。
        他会返回算法的执行结果及其解释。**他不负责数据建模、导入、简单的节点/关系查询,也从不提供关于图数据库技术或产品的一般性介绍。**
    reasoner:
      actor_name: "Analysis Expert"
      thinker_name: "Analysis Expert"
    workflow:
      - [*algorithms_execute_operator]

  - profile:
      name: "Q&A Expert"
      desc: |
        他是一位通用问答与信息检索专家,具备不同优先级的多源研究能力。
        **当任务是要求提供关于某个概念、技术或产品的一般信息、定义、解释、比较或总结(例如,“介绍图”)时,他是首选专家,通常也是唯一的专家,** 尤其是当问题不涉及使用现有数据操作或查询特定的图数据库实例时。
        他的任务是:1. 理解问题。2. **优先将知识库检索作为主要信息来源。** 3. **仅在知识库结果不足或不完整时才进行网络研究。** 4. 综合信息时明确优先考虑知识库内容,将网络研究作为战略性补充。
        他会输出包含分层引用的全面答案,且优先引用知识库来源。** 他绝对不与任何特定项目的图数据库进行交互,不执行图查询或图算法,也不进行数据建模或导入。他专注于通过多源研究提供全面信息:知识库优先,网络研究作为智能备用。**
    reasoner:
      actor_name: "Q&A Expert"
      thinker_name: "Q&A Expert"
    workflow:
      - [*retrieval_operator, *summary_operator]

其中角色比较好理解,它是智能体的身份标识和能力定义,包含名称和描述两个字段。在 Expert 中,角色名称是被 Leader 识别的唯一标识,用于任务分配和结果追溯;描述部分则详细说明了智能体的专业能力、任务范围和操作限制,帮助 Leader 更好地根据专家配置信息来分配任务。在定义角色描述时,必须明确说明其能够处理的任务类型、执行所需的前置条件以及明确不应承担的职责范围。下面是定义角色描述的一些指导原则,供开发者参考:

  • 能力边界明确定义:应明确列出智能体的核心能力和操作限制。比如在 “建模专家” 的描述中,明确说明了 “他只能为特定的图数据库实例创建或修改数据结构” 以及 “他本身不处理具体数据(增删改查)”,这种明确的边界定义避免了任务分配时的歧义。
  • 前置条件和依赖关系声明:对于有特定前置要求的智能体,必须在描述中明确声明。比如在 “导数专家” 的描述中,要求 “目标图数据库中必须已存在图模式,且该模式已定义好节点和边的标签”,确保 Leader 在分配任务时能够正确评估任务的可执行性。
  • 职责范围的精确表述:使用否定描述来明确智能体不应承担的职责。比如在 “导数专家” 的描述中,要求 “他不负责设计模式或执行查询分析,也绝不会提供有关图数据库技术或产品的一般性介绍”,帮助 Leader 避免将不合适的任务分配给该智能体。
  • 输出预期的具体化:角色描述应说明智能体的典型输出形式。比如在 “建模专家” 的描述中,说明了 “他的输出是清晰的模式定义,供后续数据导入使用”,这有助于后续工作流的设计和任务链的构建。

通过这些指导原则,开发者能够创建具有清晰职责边界和明确能力定位的智能体,确保多智能体系统的高效协作。

第二个核心组件是推理机,它提供了基于大语言模型的推理能力。在智能体的工作流执行过程中,推理机承担着理解任务指令、生成响应内容、调用外部工具以及进行复杂推理等职责。

第三个核心组件是工作流,它是智能体执行任务的核心机制。它编排了任务执行的具体流程,定义了从任务接收到结果输出的完整处理步骤。

今天我们先来了解下工作流的实现原理,推理机的内容我们放到后面再看。

工作流的定义

工作流是由多个 算子(Operator) 按照特定的依赖关系组织而成的有向无环图(DAG),这是 Chat2Graph 作为图原生智能体系统的又一体现。在这个图中,每个节点代表一个算子,边代表算子之间的数据流向和执行顺序。

workflow.png

每个智能体(包括 Leader 和 Expert)都必须内置一个工作流,该工作流规定了智能体为完成特定类型任务所应遵循的标准化流程(SOP)。比如上面的 “建模专家”,它的工作流配置如下:

experts:
  - profile:
      name: "Design Expert"
    workflow:
      - [*analysis_operator, *concept_modeling_operator]

这表示 “建模专家” 包含两个算子,它的工作流程是先进行文档分析(analysis_operator),然后再进行概念建模(concept_modeling_operator)。

每个算子负责特定阶段的任务处理,算子之间通过 WorkflowMessage 进行数据传递,形成了一个有序的处理链路。此外,除了主要的执行算子外,工作流还可以在末尾集成一个可选的评估算子,负责对整个工作流的输出结果进行评估,并可能生成反馈,供后续的算子或智能体参考和使用。

Chat2Graph 默认并没有开启评估算子,如果要开启,需要修改 AgenticService.load() 的代码,在构建智能体的地方加上 evaluator() 方法。

工作流的实现

接下来我们来看下调用工作流的具体实现,其代码逻辑位于 app/core/workflow/workflow.py 文件:

class Workflow(ABC):

  def execute(
    self,
    job: Job,
    reasoner: Reasoner,
    workflow_messages: Optional[List[WorkflowMessage]] = None,
    lesson: Optional[str] = None,
  ) -> WorkflowMessage:

    # 构建工作流
    def build_workflow():
      with self.__lock:
        if self.__workflow is None:
          self.__workflow = self._build_workflow(reasoner)
        return self.__workflow
    built_workflow = build_workflow()

    # 执行工作流
    workflow_message = self._execute_workflow(
      built_workflow, job, workflow_messages, lesson
    )
    return workflow_message

当工作流首次被调用时,它会触发一个构建工作流的方法(由线程锁保护,确保只构建一次),此方法负责将配置的所有算子转换成一个具体的可执行工作流实例。构建完成后,调用工作流,传入当前任务(job)、来自先前智能体的输出(workflow_messages)以及可能的经验教训(lesson)。

这里的 self.__lock 是通过 threading.Lock() 创建的,是一个线程锁对象,结合 with 语句确保在进入代码块时自动获取锁,在退出代码块时自动释放锁。这是 Python 中线程同步的标准做法,用于保护临界区代码,防止多个线程同时访问共享资源。

值得注意的是,这里的 _build_workflow_execute_workflow 都是抽象方法,Chat2Graph 通过插件系统支持不同的工作流实现:

plugin:
  workflow_platform: "DBGPT"

不过目前暂时只支持一种,即 DB-GPT 工作流,由 DbgptWorkflow 类实现:

class DbgptWorkflow(Workflow):
  
  # 构建 DB-GPT 工作流
  def _build_workflow(self, reasoner: Reasoner) -> DbgptMapOperator:
    
    # 创建 DAG
    with DAG("dbgpt_workflow"):

      # 创建 InputOperator
      input_op = InputOperator(input_source=SimpleCallDataInputSource())

      # 第一步:将所有的算子转换为 MapOperator
      map_ops: Dict[str, DbgptMapOperator] = {}  # op_id -> map_op
      for op_id in self._operator_graph.nodes():
        base_op = self._operator_graph.nodes[op_id]["operator"]
        map_ops[op_id] = DbgptMapOperator(operator=base_op, reasoner=reasoner)

      # 第二步:插入 JoinOperator 连接 MapOperator
      for op_id in nx.topological_sort(self._operator_graph):
        current_op: DbgptMapOperator = map_ops[op_id]
        in_edges = list(self._operator_graph.in_edges(op_id))
        # 如果有前驱节点,连接前驱 MapOperator 到 JoinOperator,再连接到当前 MapOperator
        # JoinOperator 用于合并 InputOperator 和所有前驱 MapOperator 的数据
        if in_edges:
          join_op = JoinOperator(combine_function=_merge_workflow_messages)
          for src_id, _ in in_edges:
            map_ops[src_id] >> join_op
          input_op >> join_op
          join_op >> current_op
        else:
          # 如果没有前置节点,直接连接 InputOperator 和当前节点
          input_op >> current_op

      # 第三步:找到工作流的尾部,出度为 0 的算子(应该只有一个)
      tail_map_op_ids = [
        n for n in self._operator_graph.nodes() if self._operator_graph.out_degree(n) == 0
      ]
      _tail_map_op: DbgptMapOperator = map_ops[tail_map_op_ids[0]]

      # 第四步:如果存在评估器,添加到工作流末尾
      if self._evaluator:
        eval_map_op = DbgptMapOperator(operator=self._evaluator, reasoner=reasoner)
        join_op = JoinOperator(combine_function=_merge_workflow_messages)
        _tail_map_op >> join_op
        input_op >> join_op
        join_op >> eval_map_op
        self._tail_map_op = eval_map_op
      else:
        self._tail_map_op = _tail_map_op
      
      # 返回尾部算子
      return self._tail_map_op

  # 执行 DB-GPT 工作流
  def _execute_workflow(
    self,
    workflow: DbgptMapOperator,
    job: Job,
    workflow_messages: Optional[List[WorkflowMessage]] = None,
    lesson: Optional[str] = None,
  ) -> WorkflowMessage:
    return run_async_function(workflow.call, call_data=(job, workflow_messages, [], lesson))

这里的代码比较晦涩难懂,主要是因为这里使用了 DB-GPT 的 AWEL 语法,为了搞清楚这里的逻辑,我们不妨先了解下 DB-GPT 以及它的 AWEL 技术。

DB-GPT 和 AWEL

DB-GPT 是一个开源的 AI 原生数据应用开发框架。2023 年 6 月由蚂蚁集团发起,通过 多模型管理Text2SQL 效果优化RAG 框架多智能体框架协作基于 AWEL 的工作流编排 等多项技术能力,让以数据库为基础的大模型应用变得更简单、更便捷。

db-gpt.jpg

DB-GPT 的主要功能特性包括:

  • 检索增强生成:实现了一套基于 RAG 的框架,用户可基于此构建知识类应用,支持自定义构建知识库,对海量结构化和非结构化数据进行统一向量存储与检索;
  • 生成式商业智能:支持自然语言与 Excel、数据库、数据仓库等多种数据源交互,并能进行分析报告,为企业报表分析、业务洞察提供数智化技术保障;
  • 微调框架:提供 Text2SQL 的自动化微调框架,支持 LoRA/QLoRA/Pturning 等微调方法,与 DB-GPT 项目无缝集成,在 Spider 数据集的准确率达到了 82.5%;
  • 多智能体框架:提供了一个数据驱动的自进化多智能体框架,旨在基于数据持续进行决策和执行,且原生支持 Auto-GPT 插件模型,智能体协议遵循 Agent Protocol 标准;
  • 数据工厂:主要用于在大模型时代对可信的知识和数据进行清洗与处理;
  • 数据源:整合各种数据源,将生产业务数据与 DB-GPT 的核心能力无缝连接;

以下为它的架构图:

db-gpt-arch.png

在架构图的下方,可以看到一块绿色,这就是 DB-GPT 独创的智能工作流表达语言 AWEL (Agentic Workflow Expression Language),Chat2Graph 的工作流就是基于它实现的。

AWEL 中的一个核心概念是 算子(Operator),Chat2Graph 中的算子就是来源于他,下面是 AWEL 的入门示例:

import asyncio

from dbgpt.core.awel import DAG, MapOperator

with DAG("awel_hello_world") as dag:
  task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))

asyncio.run(task.call(call_data="world"))

这段代码创建了一个名为 awel_hello_world 的 DAG,该 DAG 仅包含一个简单的 MapOperator 算子,它是 AWEL 内置的基础算子,接收一个 map_function 函数并使用传递给它的数据来调用该函数。所有的算子都是异步执行的,因此我们用 asyncio.run() 来执行工作流。

上面这个 DAG 只有一个节点,让我们看一个包含多节点的例子:

import asyncio

from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource

with DAG("awel_hello_world") as dag:
  input_task = InputOperator(input_source=SimpleCallDataInputSource())
  task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
  input_task >> task

asyncio.run(task.call(call_data="world"))

在这段代码中,我们新加了一个 InputOperator 算子,这也是 AWEL 的基础算子之一,该算子接受工作流的输入,并传递给下一个算子。我们使用 >> 运算符来连接两个算子,该运算符用于定义算子之间的父子关系,也称为任务依赖。你也可以使用 set_downstream() 方法来定义任务依赖,比如:

  input_task.set_downstream(task)

了解了 AWEL 中的这些概念,再回过头看 DbgptWorkflow 的实现就简单多了。其中 DbgptMapOperator 就是 MapOperator 的一个实现,它是对 Chat2Graph 算子的封装,每个 Chat2Graph 算子都是通过推理机调用大模型完成某项具体的任务;另外,前后 MapOperator 算子之间使用 JoinOperator 算子连接,它接受一个 combine_function 函数将来自多个输入的数据合并为一个数据;最后将评估器算子 EvalOperator 添加到整个工作流的尾部。

值得留意的一点是,在 AWEL 定义的工作流中,可以有多个起始节点,但只有一个结束节点,因此在调用工作流时是通过尾节点进行调用的。看 _build_workflow() 的代码,返回尾部算子即可,在 _execute_workflow() 中,直接调用这个尾部算子的 call() 方法。

更多关于 AWEL 的内容,可以参考 DB-GPT 的官方文档:

小结

今天,我们深入分析了 Chat2Graph 中工作流的实现原理,从 Expert 智能体的执行逻辑开始,详细了解了智能体的三大核心组件:角色、推理机和工作流。工作流作为智能体执行任务的核心机制,通过多个算子按照 DAG 结构组织,形成了标准化的任务处理流程,同时也体现了 Chat2Graph 作为图原生智能体系统的特色。

此外,通过对 DB-GPT 和 AWEL 技术的学习,我们理解了 Chat2Graph 工作流的底层实现机制。AWEL 作为智能工作流表达语言,提供了基于算子和有向无环图的编程模型,使得复杂的智能体工作流能够以声明式的方式进行定义和执行。

在下一篇文章中,我们将继续深入 Chat2Graph 的推理机制,了解它如何实现快思考与慢思考的双 LLM 推理,以及这种设计如何提升智能体的推理质量和可解释性。


学习 Chat2Graph 的任务分解与执行

在前一篇文章中,我们学习了 Chat2Graph 一主动多被动的多智能体混合架构,了解了 Leader 智能体作为主动核心,负责统一决策和任务协调,而 Expert 智能体作为被动核心,专注于特定领域的任务执行。今天,我们将继续深入源码,详细分析 Leader 接受到任务之后的具体执行逻辑。

Leader 智能体的实现

我们接着昨天的流程往下走,当通过 JobWrapper 提交任务后,系统调用 AgentService 的 Leader 智能体开始处理。让我们深入 leader.py 文件的 execute_original_job() 方法,了解 Leader 的完整工作流程:

class Leader(Agent):
  
  # 执行原始任务
  def execute_original_job(self, original_job: Job) -> None:
    
    # 更新任务状态为 `RUNNING`
    original_job_result.status = JobStatus.RUNNING
    self._job_service.save_job_result(job_result=original_job_result)

    # 将任务分解为子任务图
    decomposed_job_graph: JobGraph = self.execute(
      agent_message=AgentMessage(
        job_id=original_job.id,
      )
    )

    # 将子任务图更新到 JobService
    self._job_service.replace_subgraph(
      original_job_id=original_job.id, new_subgraph=decomposed_job_graph
    )

    # 执行子任务图
    self.execute_job_graph(original_job_id=original_job.id)

该方法接收一个原始任务,首先将其状态更新为 RUNNING,然后调用 execute() 方法将其分解为子任务图,并将此图存入 JobService,最后调用 execute_job_graph() 来执行这张图。

任务分解和分配

我们先看一下任务分解的逻辑:

class Leader(Agent):

  # 将任务分解为子任务图
  def execute(self, agent_message: AgentMessage, retry_count: int = 0) -> JobGraph:

    # 如果已经预设了 Expert,那么 Leader 将会跳过任务分解,直接将该任务分配给预设的 Expert
    # 此时,子任务图只有一个节点,即原始任务
    assigned_expert_name: Optional[str] = job.assigned_expert_name
    if assigned_expert_name:
      subjob = SubJob(
        original_job_id=original_job_id,
        goal=job.goal,
        assigned_expert_name=assigned_expert_name,
      )
      self._job_service.save_job(job=subjob)
      job_graph: JobGraph = JobGraph()
      job_graph.add_vertex(subjob.id)
      return job_graph

    # 获取专家列表
    # 组装分解提示词

    try:
      # 调用 Leader 智能体分解任务
      workflow_message = self._workflow.execute(job=decomp_job, reasoner=self._reasoner)

      # 解析子任务
      results: List[Union[Dict[str, Dict[str, str]], json.JSONDecodeError]] = parse_jsons(
        text=workflow_message.scratchpad,
        start_marker=r"^\s*<decomposition>\s*",
        end_marker="</decomposition>",
      )

    except (ValueError, json.JSONDecodeError, Exception) as e:
      # 出错后,提供异常信息再重试一次
      print("\033[38;5;208m[INFO]: Retrying decomposition with lesson...\033[0m")
      lesson = "<异常信息>"
      workflow_message = self._workflow.execute(
        job=decomp_job,
        reasoner=self._reasoner,
        lesson=lesson,
      )

    # 根据模型返回,创建子任务图
    job_graph = JobGraph()
    for subjob_id, subjob_dict in job_dict.items():
      # 每个子任务分配一个专家,对应图中的节点
      expert_name = subjob_dict["assigned_expert"]
      subjob = SubJob(
        original_job_id=original_job_id,
        goal=subjob_dict["goal"],
        assigned_expert_name=expert_name,
      )
      self._job_service.save_job(job=subjob)
      job_graph.add_vertex(subjob.id)

    for subjob_id, subjob_dict in job_dict.items():
      # 根据任务之间的依赖关系构建边
      current_unique_id = temp_to_unique_id_map[subjob_id]
      for dep_id in subjob_dict.get("dependencies", []):
        job_graph.add_edge(dep_unique_id, current_unique_id)

    # 确保子任务图为 DAG
    if not nx.is_directed_acyclic_graph(job_graph.get_graph()):
      self.fail_job_graph(...)
      return JobGraph()

    return job_graph

代码首先判断是否已经预设了 Expert,也就是用户对话时手动选定了某个专家,那么 Leader 将会跳过任务分解,直接将该任务分配给预设的 Expert,此时,子任务图只会有一个节点,即原始任务。

如果没有预设 Expert,Leader 则将用户的原始任务分解为一张子任务图。不同于传统智能体系统的线性规划器,Chat2Graph 采用了基于图结构的规划器,将原始任务拆分为可执行单元的同时,还保留了子任务间的依赖关系,以更好地应对任务执行的不确定性。

这是一个有向无环图(DAG),其中节点代表子任务,边代表子任务间的依赖关系:

plan.png

任务分解的核心在于一个精心设计的 Prompt,即 JOB_DECOMPOSITION_PROMPT,它指导 Leader 将一个主任务分解为一系列可执行的子任务并分配给对应的 Expert:

## 任务范围与大语言模型能力

### 角色

将核心任务拆解为多个子任务(分配给多个领域专家)或单个子任务(分配给单个领域专家)。有时,一名专家可承担多个子任务。需确保下游专家获取完成任务所需的全部信息。构建上下文字段时,不得因总结或简化而丢失用户原始请求中的任何约束条件、数据或上下文信息。数学公式、代码片段、特定名称、假设前提、限制条件及指定要求等关键信息,必须完整保留原始形式。

### 能力

1. **主动推断意图(强制第一步)**:至关重要的是,你必须结合给定任务、完整对话历史及当前系统状态,推断用户的**真实潜在意图及下一步合理操作**。需自问:“结合当前系统状态、过往交互记录及用户最新输入(即便非指令形式),用户在整体任务中真正希望下一步实现什么目标?”  
2. **确定目标专家与行动**:分析当前系统状态(若提供相关工具/函数,可调用以获取信息),理解系统环境。随后,基于第一步推断的意图、当前系统状态、专家资质及其他潜在信息,确定具备完成下一步合理操作所需能力与资质的专家。  
3. **强制任务拆解**:你的**唯一输出内容为任务拆解结果**。必须针对第二步确定的专家,制定一个或多个子任务以实现推断的意图。若已根据所有要求生成完整且格式正确的拆解JSON,即表示拆解任务正在进行,随后需使用 `<deliverable>...</deliverable>` 标记任务结束。  
   - **信息不完整时仍需推进**:即便给定任务或对话历史显示存在缺失的前提条件(如用户提及忘记提供文件),仍需为相关专家制定子任务。默认必要条件将得到满足,或由专家处理该问题。  
   - **谨慎整合上下文**:需将给定任务、对话历史及其他系统状态信息中的所有可用上下文整合到子任务描述中。若发现潜在问题(如根据用户表述得知文件缺失),需在子任务的上下文部分简要注明,供专家参考(示例:“上下文:用户此前因文件缺失导致任务失败,并表示已忘记提供文件。默认本次导入任务所需文件将可用。”)。
   - 简单任务或仅需一名专家完成的任务(基于推断意图),应作为单个子任务处理。
   - **拆解时的最小必要步骤**:力求用最少的合理子任务实现推断意图,同时需以不违背推断意图及其他规则为前提。
4. **拆解时的目标专家分配**:仅将子任务分配给第二步确定的专家。
5. **自包含性**:每个子任务需包含完成所需的全部信息。  
6. **角色中立性**:除非任务中明确提及,否则避免在子任务中涉及特定角色。  
7. **边界意识**:子任务范围不得超出原始任务边界。  
8. 若任务仅需一步操作/一名专家完成,需将其作为单个子任务呈现。

## 任务结构与依赖关系

### 颗粒度

仅当任务确实需要多步操作且涉及不同专家能力时,才创建可执行、边界清晰的多个子任务。对于可由一名专家解决的简单任务,需结合对话历史,将整个给定任务整合为单个子任务。

### 目标制定(受对话历史影响)

子任务的目标字段**必须**详细表述,以反映结合对话历史理解的用户最新需求。

### 上下文(整合对话历史、强制要求且丰富信息以保证准确性)

需为专家提供所有必要细节。该字段**必须包含**:

1. 相关对话历史的摘要:说明此前提出的问题/给出的答案。
2. 用户反馈/方向调整:注明用户是否表达过不满、请求澄清、修正方向或缩小范围(示例:“用户认为此前答案过于笼统”、“用户明确要求忽略X并聚焦Y”、“用户修正了此前关于Z的假设”、“用户希望基于此前工作推进至下一任务”)。  
3. 对话历史对当前任务的影响:明确说明上述1、2点等如何决定当前目标的具体要求。

此外,还需说明预期输入(若除上下文外还有其他输入)及预期输出的性质。必须完整保留用户当前指令(来自对话历史)中的所有关键数据、假设、约束条件及要求,不得遗漏。这是为了防止子任务传递过程中出现信息丢失。例如,若用户提供数学方程`x+y=10, 2x-y=4`,则上下文部分必须包含完整方程。

### 依赖关系

仅当生成多个子任务时,才定义子任务间的逻辑流程。简单的单条子任务拆解不存在依赖关系。

### 完成标准(反映对话历史)

需为子任务成功完成制定清晰、可量化或可验证的标准。该标准**必须**直接应对对话历史中凸显的具体需求或修正内容,以及细化后的目标。示例:若对话历史显示用户缺少语法细节,完成标准需明确为“输出需提供操作A、B、C的具体语法示例”;若对话历史显示用户对笼统表述不满,完成标准需明确为“解释需避免过于宽泛的表述,聚焦用户要求的特定方面”。

### 子任务生成思考过程

对于每个子任务的思考字段,需以第一人称(“我”)解释子任务生成的推理过程。内容不得仅复述目标,而需展现生成该特定子任务的思考逻辑。简要包含:该子任务为何必要?我解决该子任务的初步思路是什么?我预见该子任务存在哪些关键注意事项、所需工具或潜在挑战?思考过程需聚焦当前子任务,体现规划性与前瞻性(参考示例风格),同时保持简洁清晰,避免冗余。

同时,Chat2Graph 明确定义了子任务包含的字段,参考 JOB_DECOMPOSITION_OUTPUT_SCHEMA,当任务完成时,返回下面这样的分解结果:

<decomposition>
  {
    "subtask_1": {
      "goal": "目标,必须精确反映用户最新请求。",
      "context": "上下文,需包含对话历史摘要、用户反馈以及这些上下文如何塑造当前任务。",
      "completion_criteria": "完成标准,需明确且可衡量,直接回应对话历史中突显的需求或修正。",
      "dependencies": ["subtask_*", "subtask_*", ...], // 依赖关系,仅在生成多个子任务时定义,用于确定子任务之间的依赖关系
      "language of the assigned_expert": "English",
      "assigned_expert": "分配的专家名,Leader 指定该子任务由哪位专家完成。",
      "thinking": "思考过程,要求 LLM 以第一人称解释生成该子任务的思考过程,包括其必要性、初步方法及关键考量。",
    },
    ... // 确保 JSON 格式正确
  }
</decomposition>

下面这个 JSON 就是入门篇里的示例在执行完任务分解后得到的结果:

{
  "subtask_1": {
    "goal": "基于《罗密欧与朱丽叶》中的人物及人物关系创建图谱模式",
    "context": "该任务需为《罗密欧与朱丽叶》的故事定义图谱模式,将人物设定为 “节点”,人物间的关系设定为 “边”。此模式需清晰明确,以适用于后续的数据提取与分析工作。",
    "completion_criteria": "已定义带有合适节点标签与边标签的模式,可用于数据导入。",
    "dependencies": [],
    "language of the assigned_expert": "English",
    "assigned_expert": "Design Expert",
    "thinking": "我需要创建一个能准确呈现《罗密欧与朱丽叶》中人物及人物关系的模式。这对后续步骤至关重要,因为提取专家将依据此模式进行数据导入。我会确保该模式全面且清晰,为高效的数据提取与分析提供支持。"
  },
  "subtask_2": {
    "goal": "根据已定义的模式,从提供的文本文件中提取数据并导入图谱数据库",
    "context": "模式创建完成后,提取专家需依据已定义的模式,从文本文件《罗密欧与朱丽叶.txt》中提取结构化信息,并将其导入图谱数据库。",
    "completion_criteria": "数据已成功提取并导入图谱数据库,且与已定义的模式一致。",
    "dependencies": ["subtask_1"],
    "language of the assigned_expert": "English",
    "assigned_expert": "Extraction Expert",
    "thinking": "我的工作重点是从文本文件中提取相关人物及其关系信息。这需要格外细致,确保提取的数据与建模专家创建的模式相匹配。数据的成功导入对分析阶段而言至关重要。"
  },
  "subtask_3": {
    "goal": "对图谱进行详细分析,确定影响力最大的节点",
    "context": "数据导入图谱数据库后,分析专家需对图谱进行分析,基于人物间的关联与关系确定影响力最大的人物。",
    "completion_criteria": "分析结果可清晰识别出图谱中影响力最大的节点。",
    "dependencies": ["subtask_2"],
    "language of the assigned_expert": "English",
    "assigned_expert": "Analysis Expert",
    "thinking": "我将运用图谱分析技术来识别《罗密欧与朱丽叶》中影响力最大的人物。这包括计算中心性指标,并对结果进行解读,以理解人物关系的动态变化。"
  }
}

任务执行

完成任务分解和分配后,调用 execute_job_graph() 开始进入真正的任务执行流程:

class Leader(Agent):

  # 执行子任务图
  def execute_job_graph(self, original_job_id: str) -> None:

    # 使用线程池,没有依赖的多个任务可以并行执行
    with ThreadPoolExecutor() as executor:

      # 只要任务没完成,就一直循环
      while pending_job_ids or running_jobs:

        # 找到所有就绪的任务(依赖任务已完成)
        ready_job_ids: Set[str] = set()
        for job_id in pending_job_ids:
          if all_predecessors_completed:
            job: SubJob = self._job_service.get_subjob(job_id)
            ready_job_ids.add(job_id)

        # 执行已就绪的任务
        for job_id in ready_job_ids:
          expert_id = self._job_service.get_subjob(job_id).expert_id
          expert = self.state.get_expert_by_id(expert_id=expert_id)
          # 调用 Expert 智能体
          running_jobs[job_id] = executor.submit(
            self._execute_job, expert, job_inputs[job_id]
          )
          pending_job_ids.remove(job_id)

        # 如果 running 任务已完成,移到 completed 列表里
        completed_job_ids = []
        for job_id, future in running_jobs.items():
          if future.done():
            completed_job_ids.append(job_id)

        # 处理已完成任务
        for completed_job_id in completed_job_ids:
          
          # 获取任务结果
          future = running_jobs[completed_job_id]
          agent_result: AgentMessage = future.result()

          if agent_result.get_workflow_result_message().status == WorkflowStatus.INPUT_DATA_ERROR:
            # 输入数据错误:将此子任务及其前置依赖任务重新加入待处理队列
            pending_job_ids.add(completed_job_id)
            predecessors = list(job_graph.predecessors(completed_job_id))
            pending_job_ids.update(predecessors)
          elif agent_result.get_workflow_result_message().status == WorkflowStatus.JOB_TOO_COMPLICATED_ERROR:
            # 子任务过于复杂:调用 execute() 方法再次分解
            old_job_graph: JobGraph = JobGraph()
            old_job_graph.add_vertex(completed_job_id)
            new_job_graqph: JobGraph = self.execute(agent_message=agent_result)
            self._job_service.replace_subgraph(
              original_job_id=original_job_id,
              new_subgraph=new_job_graqph,
              old_subgraph=old_job_graph,
            )
          else:
            # 任务正常完成
            expert_results[completed_job_id] = agent_result.get_workflow_result_message()

          # 移除已完成任务
          del running_jobs[completed_job_id]

        # 等待 0.5 秒,进入下一轮循环
        if not completed_job_ids and running_jobs:
          time.sleep(0.5)

这是 Chat2Graph 任务执行的核心逻辑,整体流程还是比较清晰的,主要就是一个 while 循环,首先找到所有已就绪的任务(依赖任务已完成),然后丢给 Expert 智能体执行,这里使用了线程池,没有依赖的多个任务可以并行执行,提高执行效率,这也是图规划器相比于传统的线性规划器的优势。这个过程持续循环,当子任务图中的所有子任务都成功完成,整个原始任务即告完成。若任何关键子任务失败或整个图的执行被中断,则原始任务会相应地标记为 FAILEDSTOPPED

这里对子任务的执行比较有意思,当 Expert 执行完子任务后,会返回一个 WorkflowMessage,其中包含任务的执行状态,该状态决定了后续流程:

  • 子任务执行成功(SUCCESS):Leader 会记录结果,并更新子任务的状态,进而可能触发后续依赖任务的执行;
  • 子任务执行出错(EXECUTION_ERROR):Expert 执行过程中发生内部错误,如 API 请求失败;Leader 会根据重试策略决定是否重试该 Expert 的执行。若达到最大重试次数,则该子任务及整个子图可能会被标记为 FAILED
  • 输入数据错误(INPUT_DATA_ERROR):Expert 判断输入数据有问题,无法继续执行;Leader 接收到此状态后,会将此子任务及其前置依赖任务重新加入待处理队列,并可能附带 lesson(经验教训)给前置任务的 Expert,以便修正输出;
  • 子任务过于复杂(JOB_TOO_COMPLICATED_ERROR):Expert 认为当前子任务过于复杂,无法独立完成;Leader 会将此子任务视为一个新的原始任务,再次调用任务分解逻辑,将其进一步细化为更小的子任务,并更新到子任务图中。为防止无限分解,子任务设有生命周期(life_cycle)计数,每一次任务分解,生命周期将会减少 1,直到 0 为止;

可以看到,这里的 EXECUTION_ERROR 状态是在 Expert 智能体内部处理的,而 INPUT_DATA_ERRORJOB_TOO_COMPLICATED_ERROR 是由 Leader 智能体处理;每个子任务的状态在 Leader 智能体和 Expert 智能体之间传递和转换,如下所示:

leader-execute.png

这种状态机确保了任务能够根据依赖关系进行并行处理,同时具备对执行过程中各类情况的适应性和纠错能力。此外,任务执行流程还支持中断和恢复操作,使得 Chat2Graph 的多智能体协作更加灵活和可控。

小结

今天,我们深入分析了 Chat2Graph 中 Leader 智能体的任务分解与执行机制。从任务提交到最终完成,Leader 智能体展现了强大的统筹能力:首先通过精心设计的 Prompt 将复杂任务分解为基于 DAG 的子任务图,明确定义了每个子任务的目标、上下文、完成标准和依赖关系;然后通过线程池实现并行执行,根据依赖关系动态调度任务,大大提高了执行效率。

特别值得关注的是 Chat2Graph 的容错与纠错机制。系统通过不同的状态码来处理任务执行过程中的各种情况,其中 INPUT_DATA_ERROR 状态会触发前置任务的重新执行以修正输出,而 JOB_TOO_COMPLICATED_ERROR 状态会触发任务的进一步分解,这种动态调整机制让系统具备了强大的自适应能力。

相比传统的线性规划器,Chat2Graph 基于图结构的规划器不仅能够表达更复杂的任务依赖关系,还支持任务的并行执行,在提高执行效率的同时保持了良好的灵活性和扩展性。这种设计为智能体系统处理复杂任务提供了新的思路和范式。


学习 Chat2Graph 的多智能体协作机制

昨天我们简单体验了 Chat2Graph 这个项目的安装和使用,通过一个例子演示了 Chat2Graph 的完整流程,了解了 Chat2Graph 作为一个图原生的智能体系统,通过将图计算技术与 AI 技术融合,不仅降低了用图门槛,同时也为智能体系统提供了更强的推理能力和更好的可解释性。

Chat2Graph 这个项目里引入了不少新的理念和思想,我们将结合源码深入它的核心原理,让我们先从它的多智能体协作机制开始。

用户对话的基本流程

当用户在聊天窗口发起对话时,调用 chat 接口,其实现位于 app/server/api/session_api.py 文件中:

@sessions_bp.route("/<string:session_id>/chat", methods=["POST"])
def chat(session_id: str):

  # 构造 chat_message 并调用 SessionManager
  response_data, message = manager.chat(chat_message)
  return make_response(data=response_data, message=message)

它将请求传递到 SessionManager 进行处理:

class SessionManager:
  def chat(self, chat_message: ChatMessage) -> Tuple[Dict[str, Any], str]:
    
    # 创建 SessionWrapper
    session_wrapper = self._agentic_service.session(session_id=chat_message.get_session_id())

    # 通过 SessionWrapper 将消息提交到多智能体系统
    job_wrapper = session_wrapper.submit(message=chat_message)

    # 保存会话
    # 更新会话名称

该方法通过 AgenticService 创建 SessionWrapper,并通过 SessionWrapper 将消息提交到多智能体系统。Chat2Graph 的代码采用分层架构,项目结构如下:

app/
├── core/           # 核心智能体框架
│   ├── agent/      # 智能体实现 (Leader, Expert)
│   ├── reasoner/   # 推理引擎  
│   ├── sdk/        # 开发者SDK
│   ├── service/    # 核心服务
│   └── toolkit/    # 工具系统
├── plugin/         # 插件生态 (Neo4j, LiteLLM等)
├── server/         # Flask Web 服务
└── web/            # React 前端界面

对前端请求的处理都是先到达 server 层,再由 server 层调用 core 层,而对 core 层的调用一般都通过 SDK 实现。SDK 中包含一个核心的 AgenticService 和一堆对核心服务的包装类(XXXWrapper):

app/core/sdk
├── agentic_service.py
├── chat2graph.yml
└── wrapper
    ├── agent_wrapper.py
    ├── env_wrapper.py
    ├── graph_db_wrapper.py
    ├── job_wrapper.py
    ├── knowledge_wrapper.py
    ├── operator_wrapper.py
    ├── reasoner_wrapper.py
    ├── session_wrapper.py
    ├── toolkit_wrapper.py
    └── workflow_wrapper.py

因此这里的 SessionWrapper 就是对 SessionService 的包装,它的 submit() 函数实现如下:

class SessionWrapper:
  def submit(self, message: ChatMessage) -> JobWrapper:
    
    # 获取当前用户消息
    # 获取历史会话消息
    
    # 创建并保存任务
    job = Job(
      goal=text_message.get_payload(),
      context=historical_context,
      session_id=self._session.id,
      assigned_expert_name=text_message.get_assigned_expert_name(),
    )
    job_service.save_job(job=job)
    job_wrapper = JobWrapper(job)

    # 将任务信息更新至会话

    # 异步运行任务
    run_in_thread(job_wrapper.execute)

    return job_wrapper

这里的核心逻辑是创建了一个任务,也就是 Job 对象,包含目标、上下文、会话 ID 等信息,然后通过 JobWrapper 异步运行它:

class JobWrapper:
  def execute(self):
    # 调用 Leader 智能体
    agent_service: AgentService = AgentService.instance
    agent_service.leader.execute_original_job(original_job=self._job)

任务的运行由智能体模块 AgentService 负责,它包含 Leader 和 Expert 两类智能体,通过调用 Leader 智能体,多智能体系统由此开始启动。

智能体模块

智能体模块是 Chat2Graph 的核心,它负责接收任务、执行具体操作并返回结果。系统中存在两类特殊的智能体:

  • Leader 智能体:作为总指挥,它接收用户的请求,理解用户意图,并将复杂任务分解为多个子任务,并根据每个子任务的特点分配给相应的 Expert 执行;
  • Expert 智能体:专注于特定领域的任务处理,每个专家都具有明确的专业能力边界和执行职责,Chat2Graph 内置了建模、导数、查询、分析和问答 5 个专家;

leader-experts.png

其中 Leader 负责任务的规划、分配和执行,还包括 Expert 生命周期的管理,它通过 AgentConfig 初始化:

class AgenticService(metaclass=Singleton):
  @staticmethod
  def load(yaml_path: Union[str, Path] = "app/core/sdk/chat2graph.yml") -> "AgenticService":

    # 加载 YAML 配置
    agentic_service_config = AgenticConfig.from_yaml(yaml_path, encoding)
    mas = AgenticService(agentic_service_config.app.name)

    # 推理机
    mas.reasoner(reasoner_type=agentic_service_config.reasoner.type)
    # 工具集
    mas.toolkit(*AgenticService._build_toolkit(agentic_service_config))

    # 构建 Leader 智能体
    mas.leader("Leader").workflow(
      AgenticService._build_leader_workflow(
        agentic_service_config=agentic_service_config,
      ),
      platform_type=workflow_platform_type,
    ).build()

    # 构建 Export 智能体
    for expert_config in agentic_service_config.experts:
      mas.expert(
        name=expert_config.profile.name,
        description=expert_config.profile.desc,
      ).workflow(
        *AgenticService._build_expert_workflow(
          expert_config=expert_config,
          agentic_service_config=agentic_service_config,
        ),
        platform_type=workflow_platform_type,
      ).build()

    return mas

这段代码在程序启动时就会被调用,通过 chat2graph.yml 配置文件构建出 1 个 Leader 智能体和 5 个 Expert 智能体。这个 YAML 文件比较复杂,包含了很多 Chat2Graph 中的概念,基本结构如下:

app:
  name: "Chat2Graph"
  desc: "A Graph Native Agentic System."
  version: "0.0.1"

# 插件系统
plugin:
  workflow_platform: "DBGPT"

# 推理机
reasoner:
  type: "DUAL"

# 工具
tools:
  - &document_reader_tool
    name: "DocumentReader"
    module_path: "app.plugin.neo4j.resource.graph_modeling"
  # ...

# 行动
actions:
  - &content_understanding_action
    name: "content_understanding"
    desc: "Understand the main content and structure of the document through reading and annotating (requires calling one or more tools)."
    tools:
      - *document_reader_tool
  # ...

# 工具集
toolkit:
  - [
      *content_understanding_action,
      *deep_recognition_action
    ]
  # ...

# 算子
operators:
  - &analysis_operator
    instruction: |
      You are a professional document analysis expert, specializing in extracting key information from documents to lay a solid foundation for building knowledge graphs...
    output_schema: |
      ...
    actions:
      - *content_understanding_action
      - *deep_recognition_action

# 5 个 Export 智能体
experts:
  - profile:
      name: "Design Expert"
      desc: |
        He is a knowledge graph modeling (schema) expert...
    reasoner:
      actor_name: "Design Expert"
      thinker_name: "Design Expert"
    workflow:
      - [*analysis_operator, *concept_modeling_operator]

  - profile:
      name: "Extraction Expert"
      desc: |
        He is a Raw Data Extraction and Data Import Graph Data Expert...
    reasoner:
      actor_name: "Extraction Expert"
      thinker_name: "Extraction Expert"
    workflow:
      - [*data_importation_operator]

  - profile:
      name: "Query Expert"
      desc: |
        He is a Graph Data Query Expert...
    reasoner:
      actor_name: "Query Expert"
      thinker_name: "Query Expert"
    workflow:
      - [*query_design_operator]

  - profile:
      name: "Analysis Expert"
      desc: |
        He is a Graph Data Analysis and Algorithm Application Expert...
    reasoner:
      actor_name: "Analysis Expert"
      thinker_name: "Analysis Expert"
    workflow:
      - [*algorithms_execute_operator]

  - profile:
      name: "Q&A Expert"
      desc: |
        He is a General Q&A and Information Retrieval Expert with prioritized multi-source research capabilities...
    reasoner:
      actor_name: "Q&A Expert"
      thinker_name: "Q&A Expert"
    workflow:
      - [*retrieval_operator, *summary_operator]

# 1 个 Leader 智能体
leader:
  actions:
    - *query_system_status_action
    - *job_decomposition_action

这里的插件、推理机、工具、动作、算子我们暂时不用管,重点关注 1 个 Leader 智能体和 5 个 Expert 智能体的配置。可以看出每个智能体都由三个核心组件构成:

  • 角色(Profile):定义了智能体的身份和能力范围;
  • 推理机(Reasoner):提供了基于大语言模型的推理能力;
  • 工作流(Workflow):则编排了任务执行的具体流程;

Leader 智能体使用了内置的 Profile、Reasoner 和 Workflow,不需要在 YAML 文件中配置。

当用户输入自然语言指令时,Leader 智能体首先会理解任务意图,并将复杂任务分解为多个子任务,然后分配给相应的 Expert 智能体进行执行。

一主动多被动混合架构

像 Chat2Graph 这样的 1 个 Leader + 多个 Experts 智能体协作机制又被称为 一主动多被动(One-Active-Many-Passive) 架构,这种思想其实来自于去年的一篇论文:

论文指出,传统的多智能体系统面临三大核心挑战:

  1. 协调复杂性:多个主动智能体同时决策容易产生冲突和混乱
  2. 同步开销:需要复杂的同步机制(如 Raft 共识算法)来协调多个主动智能体
  3. 扩展困难:添加新的智能体需要重新设计整个协调机制

因此,论文提出 LLM-Agent-UMF(基于 LLM 的智能体统一建模框架),从功能和软件架构双视角明确智能体组件划分,引入 Core-Agent(核心智能体) 概念,并构建模块化、可扩展的体系。LLM-Agent-UMF 框架定义了 Core-Agent 的五大核心模块:

  • 规划模块(Planning Module):将复杂任务分解为可执行的子任务,并生成最优执行计划;
  • 记忆模块(Memory Module):存储和检索智能体运行过程中的关键信息,解决 LLM 上下文窗口有限问题;
  • 角色模块(Profile Module):定义 LLM 的角色与行为模式,确保其输出符合任务场景需求;
  • 行动模块(Action Module):将规划模块的高层指令转化为具体操作,调用工具或与环境交互;
  • 安全模块(Security Module):基于 CIA 三元组(机密性、完整性、可用性),防范 LLM 应用的安全风险;

并根据智能体是否主导任务决策,将 Core-Agent 分为两类:

  • 主动核心智能体(Active Core-Agent):具备完整的五大模块(规划、记忆、角色、行动、安全),主导任务决策与协调;有状态实体,可存储历史交互信息;典型能力包括自主分解复杂任务、动态调整角色或主动发起双向通信,适用于复杂任务处理;
  • 被动核心智能体(Passive Core-Agent):仅保留行动模块与安全模块,无规划/记忆/角色模块;无状态实体,完全遵循外部指令;典型能力包括执行简单操作,如 API 调用、工具调用等,仅能被动响应通信;

这两类 Core-Agent 对比如下:

active-vs-passive.png

基于主动/被动的分类,论文提出两类多智能体架构:统一架构(Uniform Architecture)混合架构(Hybrid Architecture),统一架构又分为 多被动核心架构(1个LLM + 多个被动核心智能体)和 多主动核心架构(多个主动核心智能体协作)。多被动架构使用简单,易于扩展,但是缺乏高层决策能力,无法处理复杂任务;而多主动架构虽然具备强大的决策能力,但是需要设计复杂的同步机制(比如 Raft 选主),避免多个智能体决策冲突。

基于理论分析和实验验证,LLM-Agent-UMF 研究发现 一主动多被动的混合架构为最优架构,兼顾了决策能力与扩展性:

one-active-many-passive.png

正是该理论为 Chat2Graph 的架构设计提供了科学依据。

小结

今天,我们通过对 Chat2Graph 源码和 LLM-Agent-UMF 论文的分析,学习了一主动多被动的多智能体混合架构:Leader 作为主动核心智能体,具备完整的规划、记忆、角色管理能力,统一协调整个系统;Expert 作为被动核心智能体,专注于特定领域的任务执行,简化了系统复杂度。

这种架构设计的核心优势在于:

  1. 统一决策:单一的 Leader 智能体避免了多个主动智能体之间的决策冲突,简化了协调机制;
  2. 专业分工:每个 Expert 智能体专注于特定领域(建模、导数、查询、分析、问答),提高了任务执行效率;
  3. 扩展性强:新增 Expert 智能体无需修改现有架构,只需在配置文件中定义新的专家即可;
  4. 降低复杂度:被动智能体无需维护状态和规划能力,降低了系统整体复杂度;

通过这种架构,Chat2Graph 不仅实现了高效的任务分解和执行,还为图原生智能体系统提供了一个可扩展的技术框架。在下一篇文章中,我们将深入分析 Chat2Graph 中智能体的具体实现,了解它的任务分解和执行逻辑,以及 工作流推理机 等高级机制。


图原生智能体系统 Chat2Graph 快速上手

在数据日益成为核心资产的今天,如何高效地从复杂关联的数据中提取价值,是许多开发者和数据分析师面临的共同挑战。图数据库因其在处理关联数据上的天然优势而备受青睐,但学习和使用其查询语言(如 Cypher 或 GQL)对许多人来说仍有一定的门槛。随着大语言模型技术的快速发展,越来越多的开发者将 AI 技术引入图处理领域,以便让任何人都能通过日常对话的方式,与图数据进行交互,从而极大地降低图数据的使用门槛。

另一方面,随着 AI 技术渗透到各行各业,智能体系统也成为了当下的热门话题。从 AutoGPT、LangChain 到各种垂直领域的智能体平台,大家都在探索如何让 AI 系统更智能、更可靠。然而,传统的智能体系统往往面临着推理不透明、记忆管理混乱、工具调用缺乏逻辑等问题。如何解决这些痛点?答案可能就在图计算技术中。越来越多的开发者将图技术引入智能体中,比如 LangGraph、GraphRAG 等等。

这造成了一个有趣的趋势。如何有效地将图计算技术与 AI 相结合,将是非常值得探索的方向。今天我们要介绍的 Chat2Graph 项目,就是一个将图计算与 AI 深度融合的智能体系统,它通过 图智互融 的理念,为智能体系统带来了全新的架构思路。

Chat2Graph 介绍

Chat2Graph 号称是 图原生的智能体系统(Graph Native Agentic System),由蚂蚁集团旗下的图数据库团队 TuGraph 开源。它的核心理念是将图数据结构的关系建模、可解释性等天然优势,与智能体的推理、规划、记忆、工具等关键能力相结合,实现真正的图智互融。

chat2graph-logo.png

简单来说,Chat2Graph 做了两件事情:

  1. AI for Graph:借助智能体实现图系统的自主化与智能化,实现图数据库的智能研发、运维、问答、生成等多样化能力,降低用图门槛,提升图系统使用体验;
  2. Graph for AI:借助图的关联性建模优势,实现智能体的推理、记忆、工具使用等关键能力的增强,降低模型幻觉,提升生成质量;

核心特性

Chat2Graph 的核心特性包括:

  • 架构:整体采用了 单主动-多被动(One-Active-Many-Passive) 的混合智能体架构,构建了以单个 Leader 智能体驱动,多个 Expert 智能体协作的任务执行体系;
  • 推理:结合快思考与慢思考的双 LLM 推理机制,模拟认知科学中的人类思维;
  • 规划:基于 CoA(Chain of Agents) 的任务分解与图规划器;
  • 记忆:分层的记忆系统,包括会话级别的短期记忆,知识库级别的长期记忆,任务执行过程中的工作记忆以及系统运行的环境记忆;
  • 知识:同时支持基于向量的 RAG 实现(VectorRAG)和基于图的 RAG 实现(GraphRAG);
  • 工具:使用有向图结构组织工具和行动,清晰地描述了工具调用间的依赖和转换关系;
  • 使用:支持 RestfulAPI 接口和 Web UI 图形化界面;提供简洁的 SDK API,方便用户将 Chat2Graph 轻松集成到自己的应用中;
  • 配置:通过 YAML 一键配置智能体系统;
  • 集成:构建了面向图系统的统一抽象,支持 Neo4j 和 TuGraph 两种图数据库;
  • 可干预:支持任务的暂停和恢复;
  • 持久化:支持作业状态和消息的持久化;

下面是它的系统架构图:

chat2graph-arch.png

环境搭建

今天,我们就来快速上手体验一下 Chat2Graph 的基本使用。

在开始之前,先确保电脑上已经安装有 Python 和 Node.js 环境。

首先克隆代码仓库:

$ git clone https://github.com/TuGraph-family/chat2graph.git
$ cd chat2graph

Chat2Graph 提供了一键构建脚本,可以直接运行。但是该脚本依赖于 Poetry 包管理器,因此我们需要先安装它:

$ pip install poetry

然后运行 bin 目录下的 build.sh 脚本进行构建:

$ ./bin/build.sh

建议使用 condauv 创建虚拟环境。

项目运行

如果一切顺利,最终会出现构建成功的消息:

Build success !

然后基于 .env.template 创建环境变量文件:

$ cp .env.template .env

主要是大模型和 Embedding 等参数,官方推荐使用 DeepSeek V3,我这里使用的是 OpenAI 接口:

LLM_NAME=gpt-4o-mini
LLM_ENDPOINT=
LLM_APIKEY=sk-xxx

EMBEDDING_MODEL_NAME=text-embedding-3-small
EMBEDDING_MODEL_ENDPOINT=
EMBEDDING_MODEL_APIKEY=sk-xxx

最后,运行 start.sh 脚本启动项目:

$ ./bin/start.sh

看到如下日志后,说明 Chat2Graph 启动成功:

  ____ _           _   ____   ____                 _     
 / ___| |__   __ _| |_|___ \ / ___|_ __ __ _ _ __ | |__  
| |   | '_ \ / _` | __| __) | |  _| '__/ _` | '_ \| '_ \ 
| |___| | | | (_| | |_ / __/| |_| | | | (_| | |_) | | | |
 \____|_| |_|\__,_|\__|_____|\____|_|  \__,_| .__/|_| |_|
                                            |_|          

 * Serving Flask app 'bootstrap'
 * Debug mode: off
 * Running on http://127.0.0.1:5010
Chat2Graph server started success ! (pid: 72103)

使用体验

使用浏览器访问 http://localhost:5010 进入 Chat2Graph 首页:

chat2graph-home.png

使用前我们先注册下图数据库,这样可以体验完整的与图对话的功能。Chat2Graph 支持 Neo4jTuGraph 两种图数据库,我们通过下面的命令在本地启一个 Neo4j 数据库:

$ docker run -d \
  -p 7474:7474 \
  -p 7687:7687 \
  --name neo4j-server \
  --env NEO4J_AUTH=none \
  --env NEO4J_PLUGINS='["apoc", "graph-data-science"]' \
  neo4j:2025.04

然后进入管理后台,在图数据库管理页面新建图数据库:

chat2graph-graphdb.png

至此,我们就可以体验 Chat2Graph 的对话功能了,通过聊天界面与图专家互动,执行各种与图相关的任务,包括建模、导数、查询、分析、问答等。在对话框下方可以看到系统内置了 5 个图专家,这 5 个专家的功能分别是:

  • 建模专家(Design Expert):负责根据数据需求设计图数据库 Schema,定义顶点与边的类型、属性及关系;
  • 导数专家(Extraction Expert):在图 Schema 已存在且有明确数据源的前提下,提取结构化信息并导入图数据库;
  • 查询专家(Query Expert):基于已有数据和结构的图数据库实例,理解查询意图、编写执行查询语句并返回结果;
  • 分析专家(Analysis Expert):针对有结构化数据的图数据库实例,根据分析目标选择执行图算法并解读结果;
  • 问答专家(Q&A Expert):优先通过知识库、必要时补充网络调研,解答图数据库相关通用信息问题;

我们可以选择与特定的专家对话,也可以不选,系统会自动分析并拆解你的问题,分配合适的专家来解决。这里我直接使用官方提供的《罗密欧与朱丽叶》文件,快速体验下 Chat2Graph 的功能:

q.png

我们的问题是:根据《罗密欧与朱丽叶》的故事创建一个图谱。统计角色的数量,然后进行详细分析以确定最具影响力的节点。

可以看到 Chat2Graph 将我的问题拆成三个子问题,分派给三个专家分别处理:

q2.png

建模专家完成图数据库的建模:

q3.png

导数专家从文本中提取数据并导入图数据库:

q4.png

分析专家使用图分析算法从图中找到最具影响力的节点:

q5.png

最终完成问答,并在答案的下方对图模型与图数据进行实时渲染:

q6.png

避坑指南

Chat2Graph 项目目前还在很早期的阶段,存在着不少问题。我在体验过程中遇到了不少的坑,这里记录下,防止后来者再踩坑。

Python 版本

官方文档中要求 Python >= 3.10,实际上应该是 >= 3.11,因为项目里使用了 browser-use,它对 Python 的最低要求是 3.11。

$ uv venv --python 3.11

另外,项目里还用到了 browser-use 的 MCP,这个可执行文件需要通过下面的命令安装:

$ pip install "browser-use[cli]" 

mcp 版本

运行时报错:

ImportError: cannot import name 'ContentBlock' from 'mcp.types'

升级 mcp 库的版本:

$ pip install mcp==1.10.1

缺少依赖

代码用到了不少三方库,但是 pyproject.toml 文件中没有声明,需要手动安装下:

$ pip install google-generativeai
$ pip install google-genai

$ pip install python-magic
$ brew install libmagic

aiohttp 版本

程序运行成功后,在对话时报错:

Error info: module 'aiohttp' has no attribute 'ConnectionTimeoutError'

升级 aiohttp 库的版本:

$ pip install aiohttp>=3.12.13

max_tokens 参数

程序运行成功后,在对话时报错:

Error info: litellm.BadRequestError: OpenAIException - Setting 'max_tokens' and 'max_completion_tokens' at the same time is not supported. 

这是因为我使用了 OpenAI 接口,根据它的官方文档max_tokens 参数目前已经被标记为废弃,建议使用 max_completion_tokens 参数:

openai-max-tokens.png

我们需要修改 app/plugin/lite_llm/lite_llm_client.py 文件中调用大模型的地方,将 max_tokens 参数注释掉:

model_response: Union[ModelResponse, CustomStreamWrapper] = completion(
  model=self._model_alias,
  api_base=self._api_base,
  api_key=self._api_key,
  messages=litellm_messages,
  temperature=self._temperature,
  # max_tokens=self._max_tokens,
  max_completion_tokens=self._max_completion_tokens,
  stream=False,
)

小结

今天,我们学习了 Chat2Graph 这个项目,体验了从构建、配置、运行、使用的完整流程。Chat2Graph 作为一个图原生的智能体系统,通过将图计算技术与 AI 技术融合,为智能体系统带来了全新的架构思路。它不仅降低了图数据库的使用门槛,让普通用户也能轻松进行复杂的图分析,同时也为智能体系统提供了更强的推理能力和更好的可解释性。

目前该项目还不够成熟,存在着不少问题,但是项目里有几个理念和思想非常有意思,值得学习,推荐大家关注。项目的作者是蚂蚁的范志东大佬,他在他的一篇博客中详细讲解了图原生的设计理念,以及 Chat2Graph 作为首个全面践行图原生理念的智能体系统,是如何将人工智能研究领域中的符号主义、连接主义、行为主义融为一体,打造 GraphRAG 新范式。强烈推荐大家去看:

在后面的文章中,我将结合源码深入 Chat2Graph 的核心技术原理,包括其推理机制、记忆系统、工具库设计等组件的实现细节,敬请期待!


学习 GraphRAG 四大搜索策略

在前面的系列文章中,我们深入学习了 GraphRAG 索引构建的完整流程,从文档加载、文本分片,到实体关系提取、社区检测,最终生成了包括实体表、关系表、社区报告等在内的结构化输出文件。这些文件构成了 GraphRAG 的知识库基础,为查询阶段提供了丰富的数据源。现在,让我们转入查询阶段的学习,探索 GraphRAG 如何基于这些数据实现智能问答。

在之前的入门篇中,我们已经体验了通过 query 命令来查询,并通过 --method 参数指定搜索策略:

$ uv run poe query \
    --root ./ragtest \
    --method global \
    --query "What are the top themes in this story?"

GraphRAG 提供了四种不同的搜索策略,按照复杂程度递增的顺序,它们分别是:基础搜索(Basic Search)本地搜索(Local Search)全局搜索(Global Search)漂移搜索(DRIFT Search)。本文将由浅入深地剖析这四种搜索策略的工作原理和使用场景。

基础搜索(Basic Search)

基础搜索是 GraphRAG 中最简单的查询方式,本质上是传统向量 RAG 的实现。它主要用于与其他搜索策略进行对比,帮助用户了解 GraphRAG 相对于传统方法的优势。

basic-search.png

基础搜索的核心思想非常直接:对用户查询进行向量化,然后在文本单元的向量数据库中进行相似性搜索,找到最相关的前 K 个文本片段,最后使用这些片段作为上下文,连同原始问题一起提交给大语言模型,生成最终答案。

基础搜索适用于简单的事实性问题,特别是答案可以直接从单个或少数几个文档片段中获得的情况,比如:

  • 什么是机器学习?
  • Python 中如何定义函数?
  • 苹果公司的总部在哪里?

由于基础搜索仅依赖向量相似性,它无法处理需要多步推理或全局理解的复杂问题。这正是 GraphRAG 引入图结构和社区检测的动机。

本地搜索(Local Search)

本地搜索,或者叫 局部搜索,是 GraphRAG 的核心创新之一,它利用知识图谱的实体关系结构来增强传统 RAG 的检索能力。相比基础搜索,本地搜索能够理解查询中的实体,并利用这些实体在知识图谱中的连接关系来查找更丰富、更相关的上下文信息。

本地搜索的流程分为三个主要阶段:实体识别、上下文扩展和答案生成。

local-search.png

这个过程的详细步骤如下:

  1. 实体识别阶段:对用户查询进行向量化,在实体描述向量数据库中进行相似性搜索,获取语义相关的候选实体集合;
  2. 上下文扩展阶段

    • 文本单元扩展:根据实体和文本单元映射关系,找到包含这些实体的原始文本片段;
    • 社区报告扩展:根据实体和社区映射关系,获取相关的社区摘要报告;
    • 实体关系扩展:使用实体名称和描述等信息构建实体上下文,然后再依次查找与实体相连的关系构建关系上下文;
    • 协变量扩展:如果配置了协变量提取,还会包含相关的声明信息;
  3. 排序过滤阶段

    • 排序:对每类候选数据进行重要性排序,比如对于文本单元来说,关系数量更多优先级更高,对于社区来说,包含的选中实体数量更多的优先级更高;
    • 过滤:根据 token 预算进行过滤,确保上下文适合大语言模型的输入窗口;
  4. 答案生成阶段:将过滤后的多源信息整合成结构化上下文,调用大模型生成最终答案;

本地搜索的过程就像是一次从图谱中的特定节点出发,向外探索和收集信息的侦察任务,它的关键优势在于 多源信息融合 能力:通过文本单元保留原始文档的详细信息,通过知识图谱揭示实体间的复杂关系,通过社区报告提供结构化的主题总结。它特别适合需要理解特定实体及其关系的问题,比如:

  • 洋甘菊有什么治疗功效?(需要理解洋甘菊这个实体的属性)
  • 苹果公司和微软公司有什么合作关系?(需要理解两个实体间的关系)
  • 机器学习领域的主要算法有哪些?(需要找到与机器学习相关的算法实体)

全局搜索(Global Search)

全局搜索解决了传统 RAG 的一个根本性问题:无法处理需要理解整个数据集的查询。当用户问 数据中的主要主题是什么?最重要的趋势有哪些? 这种需要全局视角和高度概括性的问题时,传统的向量检索往往无法给出满意的答案,因为这类问题需要对整个数据集进行宏观理解和总结。

全局搜索采用了 Map-Reduce 架构,分为两个核心阶段:

global-search.png

首先是 Map 阶段,收集指定层级的社区报告,并行处理:

  1. 报告收集:从指定层级的社区层次结构中收集所有的社区报告;如果开启了动态社区选择,则使用大模型根据用户问题对社区报告进行打分,选择得分大于阈值的社区报告;
  2. 分批处理:将社区报告按照 token 预算随机分成多个批次;
  3. 并行分析:对于每个批次,将其中的社区报告作为上下文,连同用户查询一起发送给大模型,使用 Map 系统提示词生成一个中间响应;这个响应包含一系列观点,每个观点都有一个重要性评分(1-100 分);
{
  "points": [
    { "description": "观点1的描述 [数据:报告(报告ID)]", "score": "评分值" },
    { "description": "观点2的描述 [数据:报告(报告ID)]", "score": "评分值" }
  ]
}

第一阶段并不回答用户的原始问题,只是生成中间响应,然后再通过 Reduce 阶段,对其聚合生成最终答案:

  1. 重要性排序:收集所有中间响应的观点并按重要性评分排序;
  2. 智能过滤:按评分选出最重要的观点,并确保在 token 预算内;
  3. 最终聚合:使用 Reduce 系统提示将选中的观点整合起来,作为最终的上下文,再次提交给大模型,生成一个全面连贯的最终答案;

可以看出,全局搜索通过直接利用最高度浓缩的知识(社区报告),避免了在海量细碎文本块中进行检索,特别适合需要宏观理解和数据集级别分析的问题:

  • 这个数据集中讨论的主要主题有哪些?
  • 文档中提到的最重要的趋势是什么?
  • 数据中的关键人物和组织有哪些?
  • 这些文档反映了什么样的总体情况?

它的效果很大程度上取决于选择的社区层级:

  • 底层社区:包含更详细的信息,答案更全面,但需要更多的计算资源和时间;
  • 顶层社区:处理速度更快,成本更低,但答案可能较为宏观;

用户可以根据问题的复杂程度和计算预算选择合适的社区层级。

漂移搜索(DRIFT Search)

我们看到,全局搜索通过检索社区报告来回答宏观问题,而本地搜索则通过图遍历和多源信息融合来回答关于具体实体的微观问题,用户在使用时,必须先判断出问题的类型,然后再明确指定对应的策略,使用起来非常费脑子,如果能将两种搜索策略融合起来那就完美了。

因此,GraphRAG 推出了一种更高级的混合搜索策略 —— 漂移搜索(DRIFT Search),这里的 DRIFT 其实是首字母缩写,全称为 Dynamic Reasoning and Inference with Flexible Traversal,它巧妙地结合了全局搜索和本地搜索的优势,通过动态推理和灵活遍历实现了更全面、更深入的查询能力。

漂移搜索的设计理念是渐进式查询精化,首先进行全局层面的主题探索,建立宏观认知,然后基于初步发现生成深入的后续问题,最后通过本地搜索验证和细化这些问题的答案。这种方式避免了全局搜索过于宏观和本地搜索过于狭隘的问题,实现了广度和深度的平衡。它的运行流程包含三个核心阶段:

drift-search.png

阶段一被称为 Primer,用于建立初步的、较宽泛的全局认知,分为两步:

  1. 报告选择:将用户查询与所有社区报告进行语义相似性比较,选择最相关的 Top-K 个社区报告作为初始上下文;这里使用了 HyDE(Hypothetical Document Embeddings) 策略,首先根据用户查询生成和社区报告结构类似的假设性文档,然后使用这个假设性文档的嵌入来选择社区报告,这通常比原始的用户查询嵌入效果更好;
  2. 初步回答:将这些报告拆成 N 个批次,针对每个批次,生成一个宏观层面的初步答案,分析初步答案和原始查询的相关性,并识别需要进一步探索的方向,生成具体的后续问题;
{
  "intermediate_answer": "<初步答案>",
  "score": "<和原始查询的相关性>",
  "follow_up_queries": [
    "<后续问题1>",
    "<后续问题2>"
  ]
}

阶段二叫做 Follow-Up,对生成的后续问题进一步的深入探索:

  1. 问题选择:从后续问题列表中选择评分最高的 Top-K 个问题,如果没有评分则随机选择;
  2. 局部问答:使用本地搜索(Local Search)对选定的问题进行详细回答,这个过程会产生更精确的中间答案,并可能生成新一轮的、更具针对性的后续问题;
{
  "response": "<对后续问题进行回答,注意不要回答原始查询>",
  "score": "<和原始查询的相关性>",
  "follow_up_queries": [
    "<后续问题1>",
    "<后续问题2>"
  ]
}

阶段二是一个 while 循环,这个循环过程会持续进行,直到满足停止条件,当达到预设的深度限制(默认 3 层)或所有后续问题均已回答时。然后进入阶段三 Reduce,系统将所有的中间答案汇聚起来,为用户提供一个既有全局概览又有局部细节的综合性答案。

漂移搜索通过动态地在全局和局部信息之间“漂移”,模拟了人类分析师的探索过程,这和最近流行的 Deep Research 技术非常类似,能够更智能地应对复杂的、探索性的查询需求,这类需求通常会从一个宽泛的起点开始逐步深入到具体细节,比如:

  • 这个数据集中的关键问题是什么,它们是如何相互关联的?
  • 分析一下某个领域的发展趋势及其影响因素
  • 深入探讨某个复杂事件的原因、过程和影响
  • 比较分析多个概念或实体的异同及其关系

综合对比

为了更好地理解这四种搜索策略的特点和适用场景,我们来做一个综合对比:

search-diff.png

只有根据问题场景选择最适合的搜索策略,才能达到最好的效果。

技术实现细节

最后,我们来看下 GraphRAG 搜索策略的技术实现,源码位于 query/structured_search 目录下:

├── base.py
├── basic_search
│   ├── basic_context.py
│   └── search.py
├── drift_search
│   ├── action.py
│   ├── drift_context.py
│   ├── primer.py
│   ├── search.py
│   └── state.py
├── global_search
│   ├── community_context.py
│   └── search.py
└── local_search
    ├── mixed_context.py
    └── search.py

GraphRAG 采用了一致的架构设计,所有的搜索策略都统一继承自 BaseSearch 基类,提供 search()stream_search() 两个方法:

class BasicSearch(BaseSearch[BasicContextBuilder]):
  '''基础搜索'''
class LocalSearch(BaseSearch[LocalContextBuilder]):
  '''本地搜索'''
class GlobalSearch(BaseSearch[GlobalContextBuilder]):
  '''全局搜索'''
class DRIFTSearch(BaseSearch[DRIFTSearchContextBuilder]):
  '''漂移搜索'''

每个搜索策略都对应一个 上下文构建器(Context Builder),负责从索引数据中构建查询上下文。不同的搜索策略使用专门优化的系统提示:

  • 基础搜索:

    • BASIC_SEARCH_SYSTEM_PROMPT:基于文本片段回答用户问题
  • 本地搜索:

    • LOCAL_SEARCH_SYSTEM_PROMPT:基于实体关系、社区报告、文本片段等回答用户问题
  • 全局搜索:

    • RATE_QUERY:根据用户问题对社区报告进行打分,用于选择最合适的社区报告;
    • MAP_SYSTEM_PROMPT:根据社区报告,让大模型生成一个中间响应,包含一系列关于用户查询的观点及重要性评分;
    • REDUCE_SYSTEM_PROMPT:聚合中间观点,生成最终答案;
  • 漂移搜索:

    • HyDE:根据用户问题生成假设性文档;
    • DRIFT_PRIMER_PROMPT:基于社区报告对用户问题作出初步回答,生成后续问题;
    • DRIFT_LOCAL_SYSTEM_PROMPT:针对后续问题调用本地搜索后,生成中间答案,并生成新一轮的、更具针对性的后续问题;
    • DRIFT_REDUCE_PROMPT:基于所有的中间答案,回答用户的原始问题;

具体的源码解读就不再展开了,有兴趣的朋友可以对照着上面各节的讲解去看下对应的代码。

小结

今天,我们详细剖析了 GraphRAG 的四大搜索策略,从简单到复杂,从局部到全局,从静态到动态,每种策略都有其独特的设计理念,通过合理选择和组合这些策略,GraphRAG 能够处理从简单事实查询到复杂推理问题的各种场景。

至此,我们对 GraphRAG 的探索之旅也告一段落。从最初的快速上手,到深入索引构建的三个核心阶段(文档处理、知识提取、图谱增强),再到今天的搜索策略,我们系统地学习了 GraphRAG 如何将扁平的非结构化文本,一步步转化为一个结构丰富、可深度推理的知识库,并最终利用它来赋能更智能的问答系统。

希望这个系列的文章能帮助你全面理解 GraphRAG 的核心思想与实现原理,并为你构建自己的高级 RAG 应用带来启发。


详解 GraphRAG 索引构建的输出文件

在前面的系列文章中,我们深入学习了 GraphRAG 索引构建的完整流程,从文档加载、文本分片,到实体关系提取、社区检测和向量化。经过这些复杂的处理步骤,GraphRAG 最终生成了一系列结构化的输出文件,这些文件就是整个知识图谱的数字化资产。通过这些文件,GraphRAG 实现了不同的检索策略,可以回答用户的各种问题。但是在正式进入检索阶段之前,让我们先来详细剖析下这些输出文件,了解每个文件的格式、字段信息和存储机制。

输出文件概览

当 GraphRAG 索引构建完成后,所有结果都保存在 output 目录中。让我们先来看看完整的文件结构:

output/
├── documents.parquet              # 文档表
├── text_units.parquet             # 文本单元表
├── entities.parquet               # 实体表
├── relationships.parquet          # 关系表
├── communities.parquet            # 社区表
├── community_reports.parquet      # 社区报告表
├── embeddings.entity.description.parquet       # 实体描述向量
├── embeddings.community.full_content.parquet   # 社区内容向量
├── embeddings.text_unit.text.parquet          # 文本单元向量
├── graph.graphml                  # 图结构文件
├── lancedb/                       # LanceDB 向量数据库
├── stats.json                     # 统计信息
└── context.json                   # 上下文信息

这些文件可以分为三类:

  1. 核心数据表:存储结构化数据,包括文档、文本单元、实体、关系、社区、社区报告等,使用 Parquet 格式;
  2. 向量数据:存储嵌入向量,包括文本单元、实体描述、社区内容等,使用 Parquet 和 LanceDB 双重存储;
  3. 元数据文件:包含图结构、统计信息等辅助数据;其中 graph.graphml 文件在之前学习可视化的时候已经详细介绍过,此处不再赘述;

核心数据表详解

这一节对所有的核心数据表做个总结。

1. documents.parquet - 文档表

文档表存储了原始文档的基础信息和预处理后的文本内容:

字段描述
id文档唯一标识符(SHA-256 哈希)
human_readable_id人类可读的递增 ID
title文档标题(通常是文件名,或 CSV 中的 title 列)
text文档的完整文本内容
text_unit_ids关联的文本单元 ID 列表
creation_date创建时间戳
metadata额外的元数据信息

文档表是整个知识图谱的源头,每个文档都会被分解成多个文本单元进行后续处理。

2. text_units.parquet - 文本单元表

文本单元是 GraphRAG 处理的基本单位,通过 tokens 策略将长文档分解成固定长度的片段,或者通过 sentence 策略将其分解成自然语言的句子:

字段描述
id文本单元唯一标识符
human_readable_id人类可读的递增 ID
text文本单元的具体内容
n_tokens文本的 token 长度,应该和配置文件中的 chunk_size 相等,除了最后一个分块
document_ids所属文档 ID 列表,通常情况下只有一个,但是也可以修改分组策略让一个分块跨多个文档
entity_ids包含的实体 ID 列表
relationship_ids包含的关系 ID 列表
covariate_ids关联的协变量 ID 列表(如果存在)

文本单元表是连接文档和知识图谱的桥梁,每个文本单元都详细记录了从中提取的结构化信息。

3. entities.parquet - 实体表

实体表存储了从文本中识别出的所有命名实体及其详细信息:

字段描述
id实体唯一标识符
human_readable_id人类可读的递增 ID
title实体名称
type实体类型,默认支持 "organization", "person", "geo" 和 "event" 四种
description实体详细描述,一个实体可能出现在多个文本单元中,使用大模型生成总结摘要
text_unit_ids出现的文本单元 ID 列表
frequency出现频次
degree图中的度数(连接的边数量)
x图布局中的 x 坐标,必须开启图嵌入和 UMAP 配置
y图布局中的 y 坐标,必须开启图嵌入和 UMAP 配置

实体表是知识图谱的节点集合,每个实体都包含了丰富的语义信息和图结构属性。

4. relationships.parquet - 关系表

关系表记录了实体之间的连接关系:

字段描述
id关系唯一标识符
human_readable_id人类可读的递增 ID
source源实体名称
target目标实体名称
description关系的详细描述,一个关系也可能出现在多个文本单元中,使用大模型生成总结摘要
weight关系的权重,如果是基于大模型的提取,让大模型评估关系的强度,如果是基于 NLP 的提取,则通过共现频次计算
combined_degree源和目标实体的度数总和
text_unit_ids支持该关系的文本单元 ID 列表

关系表定义了知识图谱的边集合,捕捉了实体间的语义关联。

5. communities.parquet - 社区表

社区表存储了通过 Leiden 算法识别出的图社区结构:

字段描述
id社区唯一标识符
human_readable_id人类可读的递增 ID
level社区层级(支持层次化聚类)
community社区 ID,通过 Leiden 算法生成的唯一标识
parent父社区 ID
children子社区 ID 列表
title社区标题
entity_ids包含的实体 ID 列表
relationship_ids包含的关系 ID 列表
text_unit_ids关联的文本单元 ID 列表
period创建时间,用于增量更新合并
size社区大小(实体数量),用于增量更新合并

社区表将知识图谱组织成层次化的主题集群,为生成高质量摘要提供了基础。

6. community_reports.parquet - 社区报告表

社区报告是通过大模型对每个社区生成的结构化摘要:

字段描述
id报告唯一标识符
human_readable_id人类可读的递增 ID
community关联的社区 ID
parent父社区 ID
children子社区 ID 列表
level社区层级
title报告标题
summary执行摘要
full_content完整报告内容
rank重要性排名
rank_explanation排名解释
findings关于社区的前 5-10 条关键发现(JSON 格式)
full_content_json大模型返回的完整内容,大多数内容已经被提取到对应列中
period创建时间,用于增量更新合并
size社区大小(实体数量),用于增量更新合并

社区报告表是知识图谱的智能摘要,将复杂的图结构转化为人类可理解的洞察。

Parquet:高效的列式存储

上述这些核心数据表最终都使用 Parquet 格式进行存储。Apache Parquet 是一种开源的列式数据文件格式,专为大数据场景设计,旨在优化数据存储效率和查询性能。它由 Twitter 和 Cloudera 联合开发,现为 Apache 顶级开源项目,广泛应用于 Hadoop 生态系统及各类大数据处理框架。

apache-parquet.png

传统数据库(如 MySQL)采用行式存储(一行数据连续存储),适合 整行读取 场景(如事务处理);而 Parquet 的列式存储(一列数据连续存储),更适合大数据 多列筛选、聚合 场景(如数据分析)。它的核心特性包括:

  • 列式存储:按列而非按行存储数据,查询时仅读取目标列(而非整行),减少 I/O 开销;
  • 高效压缩:基于列的相似数据特征(如同一列数据类型一致、重复值多),支持多种高效的编码和压缩算法,存储成本降低 3-5 倍;
  • 谓词下推(Predicate Pushdown):配合 Spark 等查询引擎,将过滤条件下推到存储层,提前过滤无效数据,减少数据传输量;
  • Schema 演进:支持 Schema 的动态修改,比如新增列或修改列类型,无需重写历史数据,兼容新旧数据查询;
  • 平台无关性:不依赖特定计算框架或存储系统,可在 Spark、Flink、Hive、Presto 等工具中通用;
  • 支持嵌套数据:原生支持数组、字典、结构体等复杂数据类型,无需将嵌套数据扁平化,适配 JSON、Avro 等半结构化数据;

在 GraphRAG 中,使用了 Pandas 库对 Parquet 文件进行读取和写入,具体的逻辑位于 utils/storage.py 中:

async def load_table_from_storage(name: str, storage: PipelineStorage) -> pd.DataFrame:
  """从存储中读取表格数据"""
  filename = f"{name}.parquet"
  return pd.read_parquet(BytesIO(await storage.get(filename, as_bytes=True)))

async def write_table_to_storage(
  table: pd.DataFrame, name: str, storage: PipelineStorage
) -> None:
  """向存储中写入表格数据"""
  await storage.set(f"{name}.parquet", table.to_parquet())

其中 pd.read_parquet() 用于读取 Parquet 文件,table.to_parquet() 用于将 DataFrame 转换为 Parquet 格式。下面是一个完整示例,演示了如何使用 Pandas 写入和读取 Parquet 文件:

import pandas as pd
from pathlib import Path

def write_parquet_example(filepath):
  """写入示例数据"""
  
  # 创建示例数据
  data = {
    'id': [1, 2, 3, 4, 5],
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 35, 28, 32],
    'city': ['New York', 'London', 'Paris', 'Tokyo', 'Berlin'],
    'salary': [50000.0, 60000.0, 70000.0, 55000.0, 65000.0]
  }
  
  # 创建 DataFrame
  df = pd.DataFrame(data)
  
  # 写入 Parquet 文件
  output_path = Path(filepath)
  df.to_parquet(output_path, engine='pyarrow')
  
  print(f"数据已写入到: {output_path}")
  print(f"数据形状: {df.shape}")
  print(f"数据预览:\n{df.head()}")
  
  return output_path

def read_parquet_example(file_path: str):
  """读取示例文件"""
  
  # 读取完整数据
  df = pd.read_parquet(file_path, engine='pyarrow')
  print(f"\n读取完整数据:\n{df}")
  
  # 读取特定列
  df = pd.read_parquet(file_path, columns=['name', 'age', 'city'])
  print(f"\n读取特定列:\n{df}")
  
  return df

if __name__ == "__main__":
  
  # 1. 写入 Parquet 文件
  parquet_file = write_parquet_example('sample_data.parquet')
  
  # 2. 读取 Parquet 文件
  df = read_parquet_example('sample_data.parquet')

有兴趣的同学可以将 filepath 改成 GraphRAG 的 output 目录,研究下每个 Parquet 文件的内容。

向量数据存储

我们昨天学过,GraphRAG 支持对多种文本字段进行向量化,默认包括文本单元、实体描述和社区完整内容。根据配置,这些向量既保存在 Parquet 文件中(快照存储),也存储在 LanceDB 向量数据库中(用于高效检索):

├── lancedb
│   ├── default-community-full_content.lance
│   ├── default-entity-description.lance
│   └── default-text_unit-text.lance
├── embeddings.community.full_content.parquet
├── embeddings.entity.description.parquet
└── embeddings.text_unit.text.parquet

GraphRAG 的输出文件形成了完整的数据血缘链路,每一级的输出都基于上一级的结果:

data-flow.png

在 LanceDB 中,每条记录都包含以下字段:

columns = [
  'id',         # 源数据的唯一标识符
  'text',       # 原始文本内容
  'vector',     # 嵌入向量(浮点数数组)
  'attributes'  # 额外属性,比如 title 文档标题
]

LanceDB:面向 AI 的向量数据库

LanceDB 是一款基于 Lance 存储格式(一种高性能列式存储格式)构建的开源向量数据库,主打 低延迟查询高吞吐量易用性 三大核心优势,专为大规模向量数据的存储、检索与管理设计。它由 Lance 生态团队开发,旨在解决传统向量数据库在性能、成本、易用性之间的平衡问题,尤其适配机器学习和人工智能场景的需求,比如语义搜索、推荐系统、多模态检索、特征存储等。

lancedb.png

LanceDB 的核心定位是 为 AI 原生应用打造的向量数据库,其设计理念围绕以下几点展开:

  • 存储与计算融合:基于 Lance 格式的底层优化,将向量存储与向量索引、过滤计算深度整合,避免传统数据库 存算分离 带来的性能损耗;
  • 极简易用性:提供 Python、Rust 等简洁 API,支持本地文件级存储,同时兼容 S3、GCS 等云存储,降低开发者上手门槛;
  • 多模态数据支持:原生支持多模态数据管理,不仅存储向量和元数据,还能存储原始数据(文本、图像、视频等);
  • 混合检索能力:支持 向量检索多向量检索全文检索混合检索元数据过滤SQL 检索 等多种检索策略;

在 GraphRAG 中,操作 LanceDB 的代码位于 vector_stores/lancedb.py 文件中的 LanceDBVectorStore 类:

import lancedb

class LanceDBVectorStore(BaseVectorStore):

  def connect(self, **kwargs: Any) -> Any:
    """连接 LanceDB 数据库"""
    self.db_connection = lancedb.connect(kwargs["db_uri"])

  def load_documents(
    self, documents: list[VectorStoreDocument], overwrite: bool = True
  ) -> None:
    """将文档存入 LanceDB 数据库"""
    if overwrite:
      self.document_collection = self.db_connection.create_table(self.collection_name, data=data, mode="overwrite")
    else:
      self.document_collection = self.db_connection.open_table(self.collection_name)
      self.document_collection.add(data)

  def similarity_search_by_vector(
    self, query_embedding: list[float], k: int = 10, **kwargs: Any
  ) -> list[VectorStoreSearchResult]:
    """使用向量,执行向量相似性检索"""
    docs = (
      self.document_collection.search(
        query=query_embedding, vector_column_name="vector"
      )
      .limit(k)
      .to_list()
    )

  def similarity_search_by_text(
    self, text: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any
  ) -> list[VectorStoreSearchResult]:
    """使用文本,执行向量相似性检索"""
    query_embedding = text_embedder(text)
    if query_embedding:
      return self.similarity_search_by_vector(query_embedding, k)
    return []

  def search_by_id(self, id: str) -> VectorStoreDocument:
    """根据 ID 搜索文档"""
    doc = (
      self.document_collection.search()
        .where(f"id == '{id}'", prefilter=True)
        .to_list()
    )

我们可以针对 LanceDBVectorStore 类写一个简单的测试脚本:

import openai

# 使用 OpenAI Embedding
def embedder(text: str) -> list[float]:
  return openai.embeddings.create(
    input=text, model="text-embedding-3-small"
  ).data[0].embedding

# 连接 LanceDB 数据库
from graphrag.vector_stores.lancedb import LanceDBVectorStore
vector_store = LanceDBVectorStore(collection_name="default-entity-description")
vector_store.connect(db_uri="./ragtest/output/lancedb")

# 向量相似性检索
results = vector_store.similarity_search_by_text(
  "Who is Scrooge?", embedder, k=2
)
print(results)

感兴趣的同学可以用它来查询 GraphRAG 生成的这几个 LanceDB 数据库文件,研究其数据结构和内容。

小结

今天我们深入学习了 GraphRAG 索引构建的输出文件,这些结构化的输出文件是 GraphRAG 查询阶段的基础数据源。通过逐一分析 6 个核心数据表和 3 个向量数据库的字段结构和存储内容,了解 Parquet 列式存储格式以及专为 AI 优化的 LanceDB 向量数据库,我们对 GraphRAG 如何组织输出数据有了更清晰的认知。

在下一篇文章中,我们将转入查询阶段的学习,探索 GraphRAG 如何基于这些数据文件实现智能问答,包括全局查询、局部查询和混合查询等不同的检索策略。


GraphRAG 索引构建之图谱增强

在前面的文章中,我们已经深入学习了 GraphRAG 索引构建的前两个阶段,从原始文档到文本单元,再从文本单元提取出结构化的知识图谱。现在,我们已经拥有了包含实体和关系的网络图,接下来,我们将进入索引构建的第三阶段 —— 图谱增强,探索 GraphRAG 如何从这张零散的网络图中挖掘出更高层次的结构和洞见。

图谱增强阶段包含四个核心工作流:

  • 创建社区(create_communities - 使用 Leiden 算法对知识图谱进行层次化社区检测,返回多层次的社区结构;
  • 创建最终文本单元(create_final_text_units - 负责将基础文本单元与实体、关系和协变量信息进行关联,生成最终的文本单元数据;
  • 生成社区报告(create_community_reports - 为每个社区生成高质量的摘要报告;
  • 生成文本嵌入(generate_text_embeddings - 将各层次文本信息向量化,并存储到外部的向量数据库;

其中,创建最终文本单元按理应该放在第二阶段的末尾,当图谱提取结束后就可以创建了,它的作用是将文本单元与提取出来的实体、关系和协变量进行关联并重新保存。这里按照源码中顺序,将其放在第三阶段,这一步也比较简单,本篇不做过多赘述,主要关注另三个工作流的实现。

创建社区

创建社区是图谱增强阶段的核心步骤,它通过图聚类算法将零散的实体组织成有意义的主题集群。该工作流的实现位于 index/workflows/create_communities.py 文件中:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:

  # 1. 加载实体和关系数据
  entities = await load_table_from_storage("entities", context.output_storage)
  relationships = await load_table_from_storage("relationships", context.output_storage)

  # 2. 获取聚类配置
  max_cluster_size = config.cluster_graph.max_cluster_size
  use_lcc = config.cluster_graph.use_lcc
  seed = config.cluster_graph.seed

  # 3. 执行社区创建
  output = create_communities(
    entities,
    relationships,
    max_cluster_size=max_cluster_size,
    use_lcc=use_lcc,
    seed=seed,
  )

  # 4. 保存结果
  await write_table_to_storage(output, "communities", context.output_storage)

创建社区使用的配置如下:

cluster_graph:
  max_cluster_size: 10 # 最大社区数量
  use_lcc: true        # 是否使用最大连通分量
  seed: 0xDEADBEEF     # 随机种子

其中 max_cluster_size 表示最多生成 10 个社区;use_lcc 表示是否使用最大连通分量,我们在之前计算图嵌入时也见过这个参数,意味着只有图中最大的连通子图会被用于计算,孤立的节点或较小的连通分量会被过滤掉;seed 表示随机种子,一般保持默认即可,主要用于在多次运行聚类算法时获得一致的结果。

核心处理逻辑位于 create_communities() 函数中:

def create_communities(
  entities: pd.DataFrame,
  relationships: pd.DataFrame,
  max_cluster_size: int,
  use_lcc: bool,
  seed: int | None = None,
) -> pd.DataFrame:

  # 1. 构建图结构
  graph = create_graph(relationships, edge_attr=["weight"])

  # 2. 执行层次化聚类
  clusters = cluster_graph(
    graph,
    max_cluster_size,
    use_lcc,
    seed=seed,
  )

  # 3. 格式化聚类结果
  communities = pd.DataFrame(
    clusters, columns=pd.Index(["level", "community", "parent", "title"])
  ).explode("title")

它首先将关系数据转换为图结构,这里使用的是 NetworkX 库的 nx.from_pandas_edgelist() 方法,根据边创建加权无向图,然后调用 Leiden 聚类算法 进行社区检测,返回多层次的社区结构。

使用 Leiden 进行社区检测

在网络科学或图论中,社区(Community) 是指网络中的一组节点,其核心特征是:社区内部的节点之间连接紧密,而与社区外部节点的连接相对稀疏,这种 “内密外疏” 的结构是社区的核心标志,反映了网络中节点的聚类性和关联性。Leiden 算法是一种在图数据中识别社区结构的高效算法,由 Traag 等人在莱顿大学于 2018 年提出。它在经典的 Louvain 算法 基础上进行了改进,解决了 Louvain 算法中可能出现的 “分辨率限制” 和社区划分不精确的问题,因此在复杂网络分析中被广泛应用。

和之前学习的 RAGFlow 一样,GraphRAG 也是使用 graspologic 库的 hierarchical_leiden() 方法进行社区检测,为了搞懂 cluster_graph() 函数的具体逻辑,我们不妨先快速上手 graspologic 库,通过一个简单的示例 ,看看如何使用它对图中的节点执行层次化聚类:

import json
import networkx as nx
from graspologic.partition import hierarchical_leiden

def create_random_graph(num_nodes, edge_probability, seed):
  """生成一个随机图"""
  
  G = nx.erdos_renyi_graph(num_nodes, edge_probability, seed=seed)
  
  # 给节点加上 label 属性
  for node in G.nodes():
    G.nodes[node]['label'] = f"Node_{node}"
  
  return G

def apply_hierarchical_leiden_clustering(graph, max_cluster_size=10, seed=42):
  """调用 Leiden 层次化聚类算法"""
  
  # 层次化聚类
  community_mapping = hierarchical_leiden(
    graph, 
    max_cluster_size=max_cluster_size, 
    random_seed=seed
  )
  
  # 组织聚类结果
  results = {}
  hierarchy = {}
  
  for partition in community_mapping:
    level = partition.level
    if level not in results:
      results[level] = {}
    results[level][partition.node] = partition.cluster
    hierarchy[partition.cluster] = partition.parent_cluster if partition.parent_cluster is not None else -1
  
  return results, hierarchy

if __name__ == "__main__":
  
  # 生成一个随机图
  G = create_random_graph(num_nodes=25, edge_probability=0.2, seed=42)
  
  # 调用 Leiden 层次化聚类算法
  clustering_results, hierarchy = apply_hierarchical_leiden_clustering(
    G, max_cluster_size=8, seed=42
  )

这里先用 nx.erdos_renyi_graph() 创建一个符合 Erdős-Rényi 模型(ER 模型) 的随机图,图中有 25 个节点,然后使用 graspologic 库的 hierarchical_leiden() 方法对其进行聚类,聚类返回的结果是一个列表:

cluster-results.png

列表中的对象结构如下:

class HierarchicalCluster(NamedTuple):
  # 节点 ID
  node: Any
  # 聚类 ID
  cluster: int
  # 父聚类 ID
  parent_cluster: Optional[int]
  # 聚类层次
  level: int
  # 是否为最终聚类
  is_final_cluster: bool

尽管返回的是列表结构,实际上,它是一个层次化的树状结构。每当某个聚类的数量超过某个限制时,Leiden 算法就会为该聚类划分子聚类,每划分一次,聚类层次 level 就会加 1,并将 parent_cluster 设置为父聚类的 ID;如果聚类不会进一步划分,就将 is_final_cluster 设置为 true;下面是聚类返回结果的示例:

[
  {
    "node": 0,
    "cluster": 0,
    "parent_cluster": null,
    "level": 0,
    "is_final_cluster": true
  },
  // 省略 5 个 cluster == 0 的节点
  {
    "node": 24,
    "cluster": 1,
    "parent_cluster": null,
    "level": 0,
    "is_final_cluster": false
  },
  // 省略 7 个 cluster == 1 的节点
  {
    "node": 13,
    "cluster": 2,
    "parent_cluster": null,
    "level": 0,
    "is_final_cluster": true
  },
  // 省略 5 个 cluster == 2 的节点
  {
    "node": 21,
    "cluster": 3,
    "parent_cluster": null,
    "level": 0,
    "is_final_cluster": true
  },
  // 省略 4 个 cluster == 3 的节点
  {
    "node": 24,
    "cluster": 4,
    "parent_cluster": 1,
    "level": 1,
    "is_final_cluster": true
  },
  // 省略 1 个 cluster == 4 的节点
  {
    "node": 22,
    "cluster": 5,
    "parent_cluster": 1,
    "level": 1,
    "is_final_cluster": true
  },
  // 省略 3 个 cluster == 5 的节点
  {
    "node": 3,
    "cluster": 6,
    "parent_cluster": 1,
    "level": 1,
    "is_final_cluster": true
  },
  // 省略 1 个 cluster == 6 的节点
]

这个图共有 25 个节点,通过聚类算法检测出了 6 个聚类(即社区):

clusters-1.png

可以看到 1 号聚类共 8 个节点,被进一步划分成了 4、5、6 三个聚类,它们的 parent_cluster 是 1,表示由 1 号聚类派生而来,level 是 1,表示聚类的层级:

clusters-2.png

接着 GraphRAG 将聚类结果组织成层次关系,results 是每个层级下的节点和节点所属的聚类:

{
  "0": {
    "0": 0,
    "1": 1,
    "2": 0,
    "3": 1,
    "4": 2,
    // ...
    "21": 3,
    "22": 1,
    "23": 1,
    "24": 1
  },
  "1": {
    "1": 4,
    "3": 6,
    "12": 5,
    "16": 5,
    "19": 5,
    "22": 5,
    "23": 6,
    "24": 4
  }
}

hierarchy 是每个聚类对应的父聚类:

{
  "0": -1,
  "1": -1,
  "2": -1,
  "3": -1,
  "4": 1,
  "5": 1,
  "6": 1
}

然后进一步将其格式化成最终的社区数据,包含以下字段:

  • id: 社区唯一标识符
  • human_readable_id: 人类可读的递增ID
  • level: 社区层级
  • community: 社区ID
  • parent: 父社区ID
  • children: 子社区ID列表
  • title: 社区标题,字符串 Community N
  • entity_ids: 包含的实体ID列表
  • relationship_ids: 包含的关系ID列表
  • text_unit_ids: 关联的文本单元ID列表
  • period: 创建时间
  • size: 社区大小(实体数量)

生成社区报告

识别出社区结构后,下一步通过 LLM 对社区内的信息进行深度分析和总结,为每个社区生成摘要报告,代码如下:

async def create_community_reports(
  edges_input: pd.DataFrame,
  entities: pd.DataFrame,
  communities: pd.DataFrame,
  claims_input: pd.DataFrame | None
) -> pd.DataFrame:
  
  # 1. 分解社区数据,建立社区和实体映射
  nodes = explode_communities(communities, entities)

  # 2. 准备节点、边和声明数据
  nodes = _prep_nodes(nodes)
  edges = _prep_edges(edges_input)
  claims = _prep_claims(claims_input)

  # 3. 构建本地上下文
  local_contexts = build_local_context(
    nodes,
    edges,
    claims,
  )

  # 4. 生成社区摘要
  community_reports = await summarize_communities(
    nodes,
    communities,
    local_contexts,
    build_level_context,
  )

  # 5. 最终化报告
  return finalize_community_reports(community_reports, communities)

其中 local_contexts 被称为本地上下文,包含所有社区的详细信息,build_local_context() 函数将社区内的所有节点、边和声明汇总成一个字符串 CONTEXT_STRING,同时计算上下文长度是否超过 max_context_tokens 配置,如果超过则打上 CONTEXT_EXCEED_FLAG 标记。

生成社区摘要的核心逻辑位于 summarize_communities() 函数,它首先获取所有的社区层级,并从高到低排序,这里的倒序是关键,先生成最底层(层级最大)的社区报告,因为越底层包含的节点越少,不容易超出 token 限制;然后通过 build_level_context() 构建层级上下文,只包含特定层级的社区信息,针对每一个社区,有两种情况:

  • 如果上下文未超出 token 限制,则直接使用该社区的上下文;
  • 如果上下文超出 token 限制,则判断是否存在子社区报告;

    • 不存在子社区报告,一般出现在最底层的社区,则对该社区的上下文进行裁剪;
    • 存在子社区报告,则使用子社区报告作为该社区的上下文,如果超出 token 限制,也需要裁剪;

就这样从最底层的社区开始,一层层往上遍历,生成每个社区的摘要,使用的提示词如下:

你是一个帮助人类分析师进行一般信息挖掘的人工智能助手。信息挖掘是在一个网络中识别和评估与特定实体(如组织和个人)相关的信息的过程。

# 目标
根据属于某个社区的实体列表及其关系和可选的相关声明,撰写一份关于该社区的综合报告。这份报告将用于向决策者告知与该社区相关的信息及其潜在影响。报告内容包括社区主要实体的概述、它们的合规性、技术能力、声誉以及值得关注的声明。

# 报告结构

报告应包含以下部分:
- 标题:代表社区主要实体的名称——标题应简短但具体。可能的话,在标题中包含有代表性的命名实体。
- 摘要:对社区整体结构、其内部实体之间的关系以及与这些实体相关的重要信息的执行摘要。
- 影响严重程度评级:一个介于0-10之间的浮点分数,代表社区内实体所带来的影响的严重程度。影响是对一个社区重要性的评分。
- 评级解释:用一句话解释影响严重程度评级。
- 详细发现:关于该社区的 5-10 个关键见解。每个见解都应有一个简短摘要,随后是根据以下依据规则展开的多个段落的解释性文本。内容需全面详实。

这里和 RAGFlow 是一模一样的,因为 RAGFlow 就是参考了 GraphRAG 的实现。

最终的报告输出为 JSON 格式:

{
  "title": "<报告标题>",
  "summary": "<执行摘要>",
  "rating": "<影响严重程度评级>",
  "rating_explanation": "<评级解释>",
  "findings": [
    {
      "summary":"<见解1摘要>",
      "explanation": "<见解1解释>"
    },
    {
      "summary":"<见解2摘要>",
      "explanation": "<见解2解释>"
    }
  ]
}

生成文本嵌入

社区报告生成后,索引构建就基本上结束了。最后一步是将各个工作流中生成的文本信息向量化,为了在查询阶段能够快速地进行语义检索。该步骤位于 generate_text_embeddings 工作流:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 加载所有含文本信息的表
  documents = await load_table_from_storage("documents", context.output_storage)
  relationships = await load_table_from_storage("relationships", context.output_storage)
  text_units = await load_table_from_storage("text_units", context.output_storage)
  entities = await load_table_from_storage("entities", context.output_storage)
  community_reports = await load_table_from_storage("community_reports", context.output_storage)
  
  # 获取配置
  embedded_fields = config.embed_text.names
  text_embed = get_embedding_settings(config)

  # 根据配置,只计算特定文本字段的向量
  output = await generate_text_embeddings(
    documents=documents,
    relationships=relationships,
    text_units=text_units,
    entities=entities,
    community_reports=community_reports,
    text_embed_config=text_embed,
    embedded_fields=embedded_fields,
  )

  # 持久化存储向量数据
  if config.snapshots.embeddings:
    for name, table in output.items():
      await write_table_to_storage(table, f"embeddings.{name}", context.output_storage)

  return WorkflowFunctionOutput(result=output)

值得注意的一点是,GraphRAG 支持 8 个字段的向量计算:

  • entity.title - 实体标题
  • entity.description - 实体描述
  • relationship.description - 关系描述
  • document.text - 文档文本
  • community.title - 社区标题
  • community.summary - 社区摘要
  • community.full_content - 社区完整内容
  • text_unit.text - 文本单元

但是默认只计算实体描述、社区内容和文本单元的向量,可以在 embed_text 配置进行调整:

embed_text:
  model: text-embedding-3-small
  batch_size: 16
  batch_max_tokens: 8191
  model_id: default_embedding_model
  vector_store_id: default_vector_store
  names:
    - entity.description
    - community.full_content
    - text_unit.text

GraphRAG 使用这里配置的嵌入模型(如 text-embedding-3-small)来执行向量化。这里的处理比较有意思,GraphRAG 首先使用 TokenTextSplitter 将长文本按 batch_max_tokens 分割为符合 token 限制的片段;如果只有一个片段,则返回该片段的向量,如果文本被分成了多个片段,那么计算平均向量并归一化:

if size == 0:
  embeddings.append(None)
elif size == 1:
  embedding = raw_embeddings[cursor]
  embeddings.append(embedding)
else:
  chunk = raw_embeddings[cursor : cursor + size]
  average = np.average(chunk, axis=0)
  normalized = average / np.linalg.norm(average)
  embeddings.append(normalized.tolist())

生成的向量会与对应的文本和 ID 一同存储在向量数据库中,支持批量处理,默认使用 LanceDB,这是一个现代化的、为 AI 优化的高性能向量数据库,它将索引文件直接存储在本地文件系统中。除此之外,GraphRAG 还支持 Azure 认知搜索服务 Azure AI Search 以及 Azure 的 NoSQL 数据库 CosmosDB 等向量存储。可以通过 vector_store 配置进行修改:

vector_store:
  default_vector_store:
    type: lancedb
    db_uri: output/lancedb
    container_name: default
    overwrite: True

另外,如果我们配置了 snapshots.embeddings,向量数据还会被存到独立的 Parquet 文件中:

  • embeddings.entity.description.parquet
  • embeddings.community.full_content.parquet
  • embeddings.text_unit.text.parquet

小结

今天我们深入学习了 GraphRAG 索引构建流程中的最后一个阶段,将零散的知识组织成层次化结构的社区。主要内容总结如下:

  • Leiden 聚类:详细了解了 GraphRAG 如何使用 Leiden 算法对知识图谱进行社区检测,构建层次化的社区结构;
  • 层次化报告生成:探讨了如何从底层社区开始一层一层的往上为每个社区生成摘要报告,以及高层社区上下文超出 token 限制的解决方法;
  • 文本列向量化:了解了如何将文本单元、实体描述和社区报告等不同层次的文本信息转化为向量嵌入,为语义检索提供支持。

至此,GraphRAG 的索引构建阶段就完成了,索引构建的产物位于 output 目录中,包括一系列 Parquet 格式的文件和 LanceDB 数据:

  • 文档表(documents.parquet
  • 文本单元表(text_units.parquet
  • 实体表(entities.parquet
  • 关系表(relationships.parquet
  • 社区表(communities.parquet
  • 社区报告(community_reports.parquet
  • 向量数据库(lancedb

接下来,我们就可以通过 query 命令来查询这些数据,正式进入检索和问答环节。