Fork me on GitHub

分类 Chat2Graph 下的文章

学习 Chat2Graph 的知识库服务

在前面的系列文章中,我们已经深入学习了 Chat2Graph 的多个核心模块:从智能体的协作机制,到任务规划与执行,再到工作流引擎,以及推理机和工具系统的实现。今天,我们将继续学习 Chat2Graph 的另一个关键组件——知识库服务,探索它如何为智能体系统提供强大的知识管理和检索能力。

知识库为智能体提供持久化的知识存储和检索服务,使得智能体能够基于领域知识回答专业问题,而不仅仅依赖于大模型的训练数据。在 Chat2Graph 中,知识库的设计兼顾了向量检索和图检索两种方式,形成了一套完整的知识管理体系。

知识库基本使用

Chat2Graph 的知识库在使用上分为 全局知识库会话知识库 两个层次:

  • 全局知识库:存储整个智能体系统的基础知识,任何会话都可以获取全局知识库中的知识;
  • 会话知识库:存储与当前会话相关的私有知识,与会话一一对应,只在当前会话中生效,不会被其他会话的知识干扰;

可以在 “知识库管理” 页面对这两种知识库进行管理:

kb-type.png

另外,Chat2Graph 支持两种不同的知识存储和检索方式:

  • 向量知识库:基于 Chroma 设计的向量知识库,检索时根据向量相似度匹配与问题最接近的文档片段;
  • 图知识库:基于 TuGraph 设计的图知识库,检索时在知识图谱中匹配与问题相关的子图与社区摘要;

尽管 Chat2Graph 支持 TuGraph 和 Neo4j 两种图数据库的管理,但是知识库这里暂时只支持 TuGraph。

用户可以通过 .env 环境变量配置来选择使用哪种知识库类型。

配置向量知识库:

KNOWLEDGE_STORE_TYPE=VECTOR

向量知识库默认存储在本地 Chroma 文件中,位于 ~/.chat2graph/knowledge_bases 目录。

配置图知识库:

KNOWLEDGE_STORE_TYPE=GRAPH
# TuGraph 配置
GRAPH_KNOWLEDGE_STORE_HOST=127.0.0.1
GRAPH_KNOWLEDGE_STORE_PORT=17687
GRAPH_KNOWLEDGE_STORE_USERNAME=admin
GRAPH_KNOWLEDGE_STORE_PASSWORD=password

配置好知识库类型后,点击 “全局知识库” 或 “会话知识库”,上传文件:

kb-upload.png

支持 PDF、TXT、XLSX、DOC、DOCX、MD 等文件格式。然后点击 “下一步” 进入数据处理的配置,这块目前很粗糙,就一个分块大小参数:

kb-setting.png

然后点击 “确定” 完成知识的添加。

知识库接口

Chat2Graph 的知识库服务提供了一套完整的 RESTful API:

# 获取所有知识库
GET /knowledgebases/

# 获取指定知识库
GET /knowledgebases/{kb_id}

# 更新知识库信息
PUT /knowledgebases/{kb_id}

# 清空/删除知识库
DELETE /knowledgebases/{kb_id}?drop=true

# 向知识库添加文档
POST /knowledgebases/{kb_id}/files/{file_id}

# 从知识库删除文档
DELETE /knowledgebases/{kb_id}/files/{file_id}

上一节点击 “确实” 后实际上调用的就是 “向知识库添加文档” 这个接口:

@knowledgebases_bp.route("/<string:knowledge_base_id>/files/<string:file_id>", methods=["POST"])
def load_knowledge_with_file_id(knowledge_base_id: str, file_id: str):
  
  manager = KnowledgeBaseManager()
  data: Dict[str, Any] = cast(Dict[str, Any], request.json)

  # 加载文档
  result, message = manager.load_knowledge(
    kb_id=knowledge_base_id,
    file_id=file_id,
    knowledge_config=KnowledgeBaseViewTransformer.deserialize_knowledge_config(
      json.loads(data.get("config", "{}"))
    ),
  )
  return make_response(data=result, message=message)

KnowledgeBaseManager 通过核心服务 KnowledgeBaseService 实现知识的管理。它采用单例模式,负责:

  • 知识库的创建、管理和删除
  • 文件与知识库的映射关系管理
  • 知识的加载、检索和删除
  • 全局知识库和会话知识库的统一管理

知识库服务实现

KnowledgeStore 是一个抽象基类,定义了知识存储的标准接口:

class KnowledgeStore(ABC):

  @abstractmethod
  def load_document(self, file_path: str, config: Optional[KnowledgeConfig]) -> str:
    """加载文档"""

  @abstractmethod
  def delete_document(self, chunk_ids: str) -> None:
    """删除文档"""

  @abstractmethod
  def retrieve(self, query: str) -> List[KnowledgeChunk]:
    """检索知识"""

  @abstractmethod
  def drop(self) -> None:
    """删除整个知识库"""

Chat2Graph 通过 KnowledgeStoreFactory 工厂模式提供了两种实现:

class KnowledgeStoreFactory:

  @classmethod
  def get_or_create(cls, name: str) -> KnowledgeStore:
    if SystemEnv.KNOWLEDGE_STORE_TYPE == KnowledgeStoreType.VECTOR:
      return VectorKnowledgeStore(name)
    elif SystemEnv.KNOWLEDGE_STORE_TYPE == KnowledgeStoreType.GRAPH:
      return GraphKnowledgeStore(name)

有意思的是,这两种都是基于 DB-GPT 框架实现的:

  • 向量知识库:使用 ChromaStore 定义向量存储,使用 EmbeddingAssembler 加载知识,使用 EmbeddingRetriever 检索知识;
  • 图知识库:使用 CommunitySummaryKnowledgeGraph 定义图存储,使用 EmbeddingAssembler 加载知识,使用图存储的 asimilar_search_with_scores() 方法检索知识;

实现都比较简单,这里就不赘述了。具体内容可参考 DB-GPT 的 RAG 文档:

与推理机的集成

知识库服务与推理机系统紧密集成,在算子的执行过程中,会通过知识库服务获取相关知识,注入到推理上下文中:

task = Task(
  job=job,
  operator_config=self._config,
  workflow_messages=merged_workflow_messages,
  tools=rec_tools,
  actions=rec_actions,
  knowledge=self.get_knowledge(job), # 注入知识
  insights=self.get_env_insights(),
  lesson=lesson,
  file_descriptors=file_descriptors,
)

result = await reasoner.infer(task=task)

推理机会将检索到的知识作为上下文信息,帮助大模型生成更准确、更符合领域知识的回答。

与记忆系统的关系

20 世纪 70 年代,管理学家 罗素・艾可夫(Russell L. Ackoff) 等人提出 DIKW 金字塔模型,它将人类对世界的认知过程拆解为四个层层递进的层级,清晰展现了原始素材如何转化为决策能力的逻辑链条。

dikw.png

  1. 数据(Data) 是未经处理的、客观存在的原始符号或事实,不包含任何上下文或意义,仅代表 “是什么”。它是认知的起点,没有价值判断,也无法直接指导决策。
  2. 信息(Information) 是经过加工、赋予上下文和意义的数据,通过 “数据 + 背景” 的组合,回答 “这是什么意思”。它解决了数据的无意义性,让原始素材具备了初步价值。
  3. 知识(Knowledge) 是信息之间建立关联、经过验证并可用于指导实践的结构化体系,它回答 “为什么会这样”“如何应用”。知识源于对信息的归纳、总结和验证,是可复用的经验或规律。
  4. 智慧(Wisdom) 是基于知识进行价值判断、权衡利弊后做出最优决策的能力,它回答 “这样做是否明智”“如何实现长期目标”。智慧是认知的最高层级,需要结合价值观、伦理观和长期视角。

DIKW 模型是信息科学与知识管理领域的核心框架,如今已广泛应用于企业管理、信息技术、教育、科研等多个领域。

Chat2Graph 参考 DIKW 模型,设计了一个分层的记忆架构:

memory-level.png

每一层和 DIKW 的对应关系如下:

dikw-memory.png

注意,目前 Chat2Graph 的记忆系统功能还在建设中,文档中只是介绍了它的设计理念,我们在设计记忆系统时也可以参考之。

分层记忆系统引入了多级的知识抽象,包含三个关键能力:

  1. 知识精练(Knowledge Refinement):原始知识经过逐级的处理、分析、抽象、压缩,形成更高层次的知识;
  2. 知识下钻(Knowledge Drilling):在使用高维知识的同时,还可以按需下钻低维知识,让推理上下文粗中有细;
  3. 知识延拓(Knowledge Expansion):同层级知识关联的构建和召回,通过特定方法丰富知识上下文;

不难看出,知识库服务可以视为记忆系统 L2 层(Lesson)的初步实现,主要存储领域知识和最佳实践。当前 Chat2Graph 只是初步地将 RAG 作为知识库的实现形式,未来随着分层记忆架构的完善,知识库将进一步整合到完整的记忆体系中,形成更加智能的知识管理和利用机制。

此外,除了知识库,环境也可以当做记忆的一部分。环境指的是智能体执行过程中可交互的外部空间,智能体可以通过工具操作感知环境变化,影响环境状态。Chat2Graph 还提出了一个有趣的观点:环境可以被视为当前时刻的外部记忆,而记忆则是历史时刻的环境快照。这种同质性使得环境可以无缝地融入分层记忆模型中,从技术实现角度来看,记忆系统、知识库和环境的架构可以统一。通过工具这座桥梁,打通记忆系统与环境状态,构建智能体的精神世界与外部环境的物理世界的映射关系,即世界知识模型。

小结

通过这篇文章,我们学习了 Chat2Graph 知识库服务的设计与实现,包括全局知识库与会话知识库的分离,基于 DB-GPT 的向量知识库和图知识库实现,以及与记忆系统的关系。知识库服务作为智能体系统的重要组成部分,为智能体提供了强大的知识管理和检索能力。同时,它也是更大的记忆系统的一部分,通过多层级记忆系统的设计,从记忆存储、信息评估、经验提取到洞察生成,记忆系统从原始数据逐步提炼到高层次的智慧洞察,为 Chat2Graph 提供持久化的学习和适应能力,提升系统的整体智能水平。


盘点 Chat2Graph 中的专家和工具

至此,我们完成了 Chat2Graph 中从用户会话到工具执行的完整链路的学习。让我们先通过一个流程图来回顾 Chat2Graph 的整体运行流程:

chat2graph-flow.png

这个流程处处都体现着图原生的理念,里包含了三个循环:

  1. Chat2Graph 实现了一主动多被动多智能体架构,当用户的问题到来时,Leader 智能体会将任务分配给多个 Expert 智能体执行,这些 Expert 智能体是以图的形式组织的;
  2. 每个智能体由一个工作流实现,也是一个有向无环图,工作流包含多个算子,它们按序执行,等到所有算子都执行完成,整个工作流才算完成;
  3. 算子和推理机相互协作,推理机通过大模型判断下一步的动作和该执行的工具,工具的结果又会返回给推理机判断下一步的动作,直到完成了算子所定义的任务目标为止;

最后,所有智能体全部运行结束,才算完成了一次用户的会话过程。

今天,我们将从专家和工具的角度,详细盘点 Chat2Graph 中都有哪些专家智能体,每个专家都包含哪些算子,每个算子又配置了哪些动作,以及每个动作关联了哪些工具。根据 chat2graph.yml 配置文件,我们可以看出,Chat2Graph 一共内置了 7 个专家,除了我们之前介绍过的 5 大核心专家,还专门为 GAIA 基准测试新增了两个专家。

Design Expert - 建模专家

Design Expert 是知识图谱建模(Schema)专家,专注于根据特定的数据需求设计图的 Schema,清晰定义顶点(Vertices)和边(Edges)的类型、属性及关系。

design-expert.png

它的核心工具包括:

  1. DocumentReader:从指定路径读取并返回文档内容,支持多种文档格式解析,为后续的图建模提供原始数据输入;
  2. SchemaGetter:连接到图数据库并获取其当前的 Schema 信息,用于了解现有的图结构;
  3. VertexLabelAdder:在图数据库中创建新的顶点标签,定义新的实体类型;
  4. EdgeLabelAdder:在图数据库中创建新的边标签,定义新的关系类型;
  5. GraphReachabilityGetter:查询图数据库以获取图的可达性信息,验证图结构的连通性;

Extraction Expert - 导数专家

Extraction Expert 是原始数据提取和数据导入图数据专家,负责根据已定义的 Schema 从原始数据中提取结构化信息,并将其导入到目标图数据库中。

extraction-expert.png

它的核心工具包括:

  1. DataStatusCheck:检查图数据库中当前数据的状态,了解现有数据情况,确保后续数据导入的一致性;
  2. DataImport:将提取的三元组数据导入到图数据库中,支持批量导入和增量更新;

Query Expert - 查询专家

Query Expert 专注于图数据查询,能够理解用户的查询意图,编写精确的图查询语句,并在目标图数据库上执行查询。

query-expert.png

它的核心工具包括:

  1. CypherExecutor:执行 Cypher 查询语句,支持复杂的图遍历和数据检索操作;

Analysis Expert - 分析专家

Analysis Expert 专注于图数据分析和算法应用,能够基于分析目标选择、配置并执行相应的图算法。

analysis-expert.png

这个专家包含大量图分析的工具:

  1. AlgorithmsGetter:获取图数据库支持的算法列表和相关信息;
  2. PageRankExecutor:执行 PageRank 算法,计算图中节点的重要性得分;
  3. BetweennessCentralityExecutor:执行介数中心性算法,识别图中的关键连接节点;
  4. LouvainExecutor:执行 Louvain 社区发现算法,识别图中的社区结构;
  5. LabelPropagationExecutor:执行标签传播算法,进行社区检测;
  6. ShortestPathExecutor:计算图中两点间的最短路径;
  7. NodeSimilarityExecutor:计算节点间的相似性得分;
  8. CommonNeighborsExecutor:分析节点间的共同邻居关系;
  9. KMeansExecutor:执行 K-means 聚类算法;

值得注意的是,这些算法都是基于 Neo4j 的 GDS(Graph Data Science) 库实现的,这是 Neo4j 推出的一套 图数据科学库,专为在 Neo4j 图数据库上执行高效、可扩展的图算法而设计。它允许开发者、数据科学家和分析师通过图结构挖掘隐藏的关系模式(如社区、路径、影响力),广泛应用于推荐系统、欺诈检测、社交网络分析、供应链优化等场景。

因此在部署 Neo4j 时,需要开启 graph-data-science 插件:

$ docker run -d \
  -p 7474:7474 \
  -p 7687:7687 \
  --name neo4j-server \
  --env NEO4J_AUTH=none \
  --env NEO4J_PLUGINS='["apoc", "graph-data-science"]' \
  neo4j:2025.04

Neo4j GDS 内置算法覆盖图分析的主流场景,以下是核心分类及典型应用:

  • 社区检测

    • 典型算法:Louvain、Weakly Connected Components(WCC)
    • 应用场景:社交网络圈子识别、客户分群、欺诈团伙检测
  • 中心性分析

    • 典型算法:PageRank、Betweenness Centrality、Degree Centrality
    • 应用场景:影响力节点识别(如社交网络 KOL)、关键路径分析(如供应链核心节点)
  • 路径分析

    • 典型算法:Shortest Path(最短路径)、All Pairs Shortest Path(APSP)
    • 应用场景:物流路线优化(比如从仓库 A 到门店 B 的最短配送路径)、导航系统
  • 相似性分析

    • 典型算法:Jaccard Similarity、Cosine Similarity
    • 应用场景:推荐系统(比如购买过相似商品的用户)、内容相似度匹配
  • 链接预测

    • 典型算法:Common Neighbors、Preferential Attachment
    • 应用场景:社交网络可能认识的人、客户潜在购买行为预测
  • 图嵌入

    • 典型算法:Node2Vec、FastRP
    • 应用场景:将图节点转换为向量(可输入机器学习模型),用于分类、聚类、推荐的预处理

以社交网络为例,通过 PageRank 算法识别最具影响力的用户:

// 1. 先投影社交网络图(节点:User,关系:FOLLOWS)
CALL gds.graph.project('socialGraph', 'User', 'FOLLOWS');

// 2. 运行 PageRank 算法
CALL gds.pageRank.stream('socialGraph')
YIELD nodeId, score
MATCH (user:User) WHERE id(user) = nodeId
RETURN user.name AS userName, score
ORDER BY score DESC LIMIT 10;

Q&A Expert - 问答专家

Q&A Expert 是通用问答和信息检索专家,具备优先级多源研究能力,优先使用知识库检索作为主要信息源,必要时进行网络研究作为补充。

qa-expert.png

它的核心工具包括:

  1. KnowledgeBaseRetriever:从外部知识库中检索相关文档和信息;
  2. BrowserUsing:使用浏览器进行网络搜索和内容抓取,基于 MCP 协议实现;
  3. FileTool:文件系统操作工具,用于读写本地文件;

其中 BrowserUsing 是一个 MCP 工具,能基于 Playwright 操作浏览器,运行程序时通过 start_mcp_server.sh 脚本启动:

$ npx @playwright/mcp@latest --port 8931

WebSurferExpert - 网络浏览专家

WebSurferExpert 负责所有在线信息获取任务,具备多模态分析能力,可以处理网页中的图像、音频和 PDF 文件。

web-surfer-expert.png

它的核心工具包括:

  1. BrowserTool:基于 browser-use 的浏览器自动化工具,支持网页导航和交互;
  2. UrlDownloaderTool:从 URL 下载文件的工具;
  3. GeminiMultiModalTool:基于 Gemini 的多模态分析工具,支持图像、音频等内容分析;
  4. PageVisionTool:页面视觉分析工具,能够理解网页的视觉布局和内容,它首先通过 browser-use 将网页保存成 PDF,然后调用 Gemini 多模态大模型对其进行分析;
  5. YouTubeTool:YouTube 视频内容分析工具;

DeveloperExpert - 本地开发专家

DeveloperExpert 负责所有本地环境中的操作,包括文件读写、数据处理、代码执行等任务。

developer-expert.png

它的核心工具包括:

  1. CodeExecutorTool:执行 Python 代码的工具,支持动态代码执行;
  2. ShellExecutorTool:执行 Shell 命令的工具;
  3. SpreadsheetTool:处理电子表格文件的工具,比如 Excel、CSV 等;
  4. ZipTool:处理 ZIP 压缩文件的工具,支持压缩和解压操作;

Leader 智能体

除了 Expert 智能体的特定工具之外,Chat2Graph 还有一个 Leader 智能体。

leader-agent.png

它提供了一个系统级的工具:

  1. SystemStatusChecker:查询系统状态的工具,帮助智能体了解当前系统的运行状态,用于更好的推理和决策;

工具的实现

Chat2Graph 中的工具主要分为两种类型:本地工具(LOCAL_TOOLMCP 工具(MCP_TOOL

本地工具是直接在 Agent 运行的 Python 环境中通过函数调用执行的工具。这些工具通过 module_path 指向包含工具实现的 Python 模块。比如下面是文档读取工具的定义:

tools:
  - &document_reader_tool
    name: "DocumentReader"
    module_path: "app.plugin.neo4j.resource.graph_modeling"

其实现位于 app/plugin/neo4j/resource/graph_modeling.py 文件中的 DocumentReader 类:

class DocumentReader(Tool):
  """Tool for analyzing document content."""

  async def read_document(self, file_service: FileService, file_id: str) -> str:
    """Read the document content given the document name and chapter name.

    Args:
      file_id (str): The ID of the file to be used to fetch the doc content.

    Returns:
      The content of the document.
    """
    return file_service.read_file(file_id=file_id)

Chat2Graph 同时支持同步或异步函数,它会自动检测函数类型,并适配调用方式(基于 inspect 库的 iscoroutinefunction() 函数实现):

# execute function call
if inspect.iscoroutinefunction(func):
  result = await func(**func_args)
else:
  result = func(**func_args)

MCP 工具基于 Model Context Protocol 协议,用于与独立的外部进程或服务进行交互。这对于集成非 Python 实现的或需要隔离环境的复杂工具(如 Playwright 浏览器自动化)至关重要。下面是两个 MCP 工具的示例:

  - &browser_tool
    name: "BrowserUsing"
    type: "MCP"
    mcp_transport_config:
      transport_type: "SSE"
      url: "http://localhost:8931/sse"

  - &file_tool
    name: "FileTool"
    type: "MCP"
    mcp_transport_config:
      transport_type: "STDIO"
      command: "npx"
      args: ["@modelcontextprotocol/server-filesystem", "."]

MCP 工具的配置包括:

  • transport_type:支持 STDIOSSEWEBSOCKETSTREAMABLE_HTTP 四种不同的通信协议;
  • command/args:启动外部进程的命令和参数,适用于 STDIO 模式;
  • url:连接外部服务的网络地址,适用于 SSEWEBSOCKETSTREAMABLE_HTTP 模式;

MCP 工具的实现位于 app/core/toolkit/mcp/mcp_connection.py 文件:

class McpConnection(ToolConnection):

  # 调用 MCP 工具
  async def call(self, tool_name: str, **kwargs) -> List[ContentBlock]:
    with self._lock:
      result = await self._session.call_tool(tool_name, kwargs or {})
      return result.content

  # 根据通信协议创建 MCP 连接
  async def connect(self) -> None:
    with self._lock:
      # 建立连接
      if transport_config.transport_type == McpTransportType.STDIO:
        await self._connect_stdio()
      elif transport_config.transport_type == McpTransportType.SSE:
        await self._connect_sse()
      elif transport_config.transport_type == McpTransportType.WEBSOCKET:
        await self._connect_websocket()
      elif transport_config.transport_type == McpTransportType.STREAMABLE_HTTP:
        await self._connect_streamable_http()
      else:
        raise ValueError(f"Unsupported transport type: {transport_config.transport_type}")

      # 初始化会话
      session: ClientSession = cast(ClientSession, self._session)
      await session.initialize()

这段代码基于 MCP 官方的 Python SDK 实现,其用法可参考文档:

小结

今天我们详细盘点了 Chat2Graph 中的所有专家和工具,通过清晰的职责划分,不同专家可完成从图谱建模、数据提取、查询分析、知识问答等不同的任务。此外,还简单讲解了工具的实现原理,支持本地工具和 MCP 工具两种类型,满足不同场景的需求。

Chat2Graph 通过这种分层的 专家-算子-动作-工具 架构,不仅实现了复杂任务的有序执行,还为图原生智能体系统提供了一个完整的工具生态。每个专家都在自己的专业领域内发挥最大价值,通过丰富的工具集合来完成具体的任务执行。


详解 Chat2Graph 的工具系统实现

在上一篇文章中,我们深入分析了 Chat2Graph 中算子与推理机的协作机制,以及双模推理机的设计原理。今天,我们将继续深入源码,从推理机如何调用大模型开始,详细介绍 Chat2Graph 的工具系统实现,包括 ActionToolToolGroupToolkit 等核心概念,以及它们如何通过有向图来优化工具调用的效果。

模型服务

在 Chat2Graph 的架构中,推理机并不直接与大模型交互,而是通过 模型服务(ModelService 这个中间层来实现。模型服务封装了不同 LLM 平台的调用细节,为推理机提供了统一的接口,它是一个抽象基类,定义在 app/core/reasoner/model_service.py 中:

class ModelService(ABC):

  @abstractmethod
  async def generate(
    self,
    sys_prompt: str,
    messages: List[ModelMessage],
    tools: Optional[List[Tool]] = None,
    tool_call_ctx: Optional[ToolCallContext] = None,
  ) -> ModelMessage:
    """生成模型响应"""

  async def call_function(
    self,
    tools: List[Tool],
    model_response_text: str,
    tool_call_ctx: Optional[ToolCallContext] = None,
  ) -> Optional[List[FunctionCallResult]]:
    """基于模型响应调用工具函数"""

模型服务的核心职责包括:

  1. 统一接口:通过 generate() 方法为推理机提供统一的模型调用接口;
  2. 工具调用:通过 call_function() 方法解析模型响应中的工具调用请求并执行;
  3. 服务注入:自动注入系统内部服务到工具参数中,比如 GraphDbServiceKnowledgeBaseService 等;
  4. 错误处理:处理工具调用过程中的各种异常情况;

Chat2Graph 支持三种模型服务的实现:

  • LiteLlmClient - 使用 LiteLLM 库调用大模型,;
  • AiSuiteLlmClient - 使用 AiSuite 库调用大模型,该项目由大佬吴恩达领导开发,旨在提供统一接口来调用不同的 LLM API,简化 AI 模型的调用过程,不过项目不怎么活跃,已不推荐使用;
  • DbgptLlmClient - 使用 DB-GPT 中的 LLMClient 类调用大模型;

可以在 .env 环境变量中切换:

MODEL_PLATFORM_TYPE="LITELLM"

工具调用原理

Chat2Graph 没有使用目前比较流行的 Function Call 或 Tool Call 技术,而是使用纯 Prompt 方式,通过让大模型生成带 <function_call>...</function_call> 这样的标签,实现工具调用。

工具调用的 Prompt 参考 FUNC_CALLING_PROMPT,我对其进行总结如下:

  1. 所有工具调用都必须包裹在 <action> 部分内的 <function_call>...</function_call> 标签中。如需调用多个函数,可包含多个 <function_call> 代码块。
  2. 每个 <function_call> 内部的内容必须是有效的 JSON 对象,且包含三个必填键:

    • "name":待调用函数的名称(字符串类型);
    • "call_objective":调用此函数的简要目的说明(字符串类型);
    • "args":包含该函数所有参数的对象,若无需参数,使用空对象 {}
  3. 对于标准数据类型(如短字符串、数字、布尔值或简单嵌套对象/数组),使用标准 JSON 格式:

    • 键名和字符串必须使用 双引号")包裹;
    • 不得使用末尾逗号;
  4. 对于复杂的、多行参数(如代码、HTML、Markdown),使用 Payload 包装器

    • Payload 包装器起始标记(Start Marker) __PAYLOAD_START__ 开始,并以 结束标记(End Marker) __PAYLOAD_END__ 结束;
    • 对于多行参数,只需将原始内容放置在这两个标记之间即可,无需手动转义;

下面是一个简单的工具调用示例:

<action>
  <function_call>
  {
    "name": "update_user_profile",
    "call_objective": "更新用户姓名和通知设置。",
    "args": {
      "user_id": 123,
      "profile": {
        "name": "Alice",
        "notifications_enabled": true
      }
    }
  }
  </function_call>
</action>

这里比较有意思的一点是 Payload 包装器(Payload Wrapper) 的设计。当 JSON 字符串中包含多行代码或含特殊字符的文本时,需要进行转义处理,比如让大模型生成下面这样的代码片段,不仅可读性差,而且很容易出错:

{
  "code": "def greet(name):\n\t# 这是代码内部的注释\n\tmessage = f\"Hello, {name}! Welcome.\"\n\tprint(message)"
}

使用 Payload 包装器,无需对特殊符号进行转义,大模型的输出更加可控:

<action>
  <function_call>
  {
    "name": "execute_python_code",
    "call_objective": "定义一个向用户打招呼的函数,并调用该函数。",
    "args": {
      "code": __PAYLOAD_START__
def greet(name):
  # 这是代码内部的注释
  # 注意:此处无需添加任何转义字符
  message = f"Hello, {name}! Welcome."
  print(message)

greet("World")
__PAYLOAD_END__
    }
  }
  </function_call>
</action>

然后,模型服务通过 call_function() 方法解析模型响应中的这些标签,找到对应的工具并执行:

async def call_function(
  self,
  tools: List[Tool],
  model_response_text: str,
  tool_call_ctx: Optional[ToolCallContext] = None,
) -> Optional[List[FunctionCallResult]]:

  # 解析模型响应中的 `<function_call>` 标签
  func_calls = self._parse_function_calls(model_response_text)

  func_call_results: List[FunctionCallResult] = []
  for func_tuple, err in func_calls:

    # 根据名称找到可执行的函数
    func_name, call_objective, func_args = func_tuple
    func = self._find_function(func_name, tools)
    
    # 通过反射获取函数签名,根据参数类型注入一些内置的参数或服务
    # 比如 `ToolCallContext`、`GraphDbService`、`KnowledgeBaseService` 等
    sig = inspect.signature(func)
    for param_name, param in sig.parameters.items():
      injection_type_found: bool = False
      param_type: Any = param.annotation

      # 注入 `ToolCallContext` 参数
      if param_type is ToolCallContext:
        func_args[param_name] = tool_call_ctx
        injection_type_found = True
        continue

      # 注入内置服务
      if not injection_type_found:
        if param_type in injection_services_mapping:
          func_args[param_name] = injection_services_mapping[param_type]

    # 调用函数
    if inspect.iscoroutinefunction(func):
      result = await func(**func_args)
    else:
      result = func(**func_args)

    # 封装函数调用结果
    func_call_results.append(
      FunctionCallResult(
        func_name=func_name,
        call_objective=call_objective,
        func_args=func_args,
        status=FunctionCallStatus.SUCCEEDED,
        output=str(result),
      )
    )

  return func_call_results

整个过程比较简单:首先解析 <function_call> 标签,然后根据名称找到可执行的函数并调用,最后返回封装后的调用结果。

这里值得一提的是 Chat2Graph 的服务注入机制,通过自动检测函数签名中的参数类型,允许工具函数自动获取系统内部服务的实例,比如 ToolCallContextGraphDbServiceKnowledgeBaseService 等。例如,一个图数据库查询工具可以这样定义:

def query_graph_database(
  query: str,
  graph_service: GraphDbService
) -> str:
  """查询图数据库
  
  Args:
    query: Cypher 查询语句
    graph_service: 图数据库服务
  """
  return graph_service.execute_query(query)

当 LLM 调用这个工具时,只需要提供 query 参数,graph_service 参数会由系统自动注入。

最后,工具执行的结果会返回给推理机,推理机基于执行结果判断是否完成了算子所定义的任务目标,推理下一步的动作,如果未完成,可能会继续调用其他工具,直到完成为止。整个流程如下所示:

tool-call-flow.png

基于图的工具库设计

那么这些工具是哪里来的呢?答案是 Chat2Graph 的 工具库(Toolkit 模块。工具库是一个有向图,这是 Chat2Graph 作为图原生智能体系统的又一体现,它通过 动作(Action工具(Tool工具组(ToolGroup 构成的图结构来实现智能化的工具推荐和调用。

tool-kit.png
这里对几个概念稍加解释:

  1. 工具(Tool 是工具系统的基本执行单元,包含其名称、功能描述以及具体的执行逻辑;支持 LOCAL_TOOLMCP_TOOL 两种类型的工具,LOCAL_TOOL 指的是一个本地函数,可以是同步或异步函数,Chat2Graph 会自动检测函数类型并适配调用方式;MCP_TOOL 指的是 MCP Server 中的一个工具,一般通过工具组来配置,不会单独配置;
  2. 动作(Action 代表大模型在执行任务过程中的一个状态或决策点,是工具库中的核心节点,它的描述信息有助于大模型理解其代表的意图和功能;一个动作可以关联一个或多个工具,也可以一个都不关联;
  3. 工具组(ToolGroup 是工具的一个逻辑集合,通常代表一组功能相关的工具,比如 MCP Server 就是一个工具组;另外,看代码中有一个 ToolPackage 类,不过还没实现,应该是想将同一个包下的多个函数打包成一组;总之,工具组使得批量注册和管理工具变得更加便捷;

工具库中存在三种类型的边:

  • Next 边:连接 ActionAction,表示动作间的顺序关系,边上可以带权重,表示两个动作之间的关联强度;
  • Call 边:连接 ActionTool,表示动作可以调用的工具,边上也可以带权重,表示动作调用工具的可能性;
  • Group_Has_Tool 边:连接 ToolGroupTool,表示工具的归属关系;

通过这些点和边,这个图不仅精确地定义了不同工具之间的调用关系,还明确了它们之间潜在的执行顺序,相比于传统的工具列表,图能表达的含义要丰富的多。基于图结构,系统能够更智能地向大模型推荐当前情境下可调用的工具及相关的动作,显著提高了推荐的准确率。同时,图的精确性有效约束了大模型选择工具的范围,从而降低了工具调用的不确定性和潜在的错误率。

基于图的工具推荐

工具库的核心优势在于能够基于图结构进行上下文感知的工具推荐。当算子执行时,ToolkitService 会根据算子配置的 actions 列表,在图中进行广度优先搜索(BFS),找到相关的工具和动作,搜索的深度由图遍历的跳数 hops 来控制,关联的强度由阈值 threshold 来决定。

rec_tools, rec_actions = toolkit_service.recommend_tools_actions(
  actions=self._config.actions,
  threshold=self._config.threshold,
  hops=self._config.hops,
)

下面通过一个简单示例来演示 Chat2Graph 的推荐过程。我们看这样一个工具库,它包含四个动作(A1 ~ A4)和四个工具(T1 ~ T4),各个节点之间的关系和权重如下图所示:

tool-kit-rec-0.png

假设现在 A1 是起始点,并且配置的跳数是 1,阈值是 0.6。首先,A1 有一个工具 T1,且权重为 0.9 大于阈值,因此直接保留。然后,通过 BFS 搜索下一个节点,A1 到 A2 之间权重 0.8 大于阈值,因此可以到达,A1 到 A3 之间权重 0.5 小于阈值,所以被丢弃;然后再看 A2 的工具 T2 和 T4,同样的,大于阈值保留,小于阈值丢弃。搜索过程如下:

tool-kit-rec-1.png

搜索结束后仅保留子图,子图中的工具和动作将作为上下文送给推理机进行推理:

tool-kit-rec-2.png

这种基于图的推荐机制带来了显著优势:

  1. 上下文感知:系统能根据当前 Action 的位置,推荐最相关的工具和后续动作;
  2. 减少搜索空间:通过预定义的图结构,有效缩小了大模型选择工具的范围;
  3. 提高准确性:基于权重的推荐机制,确保优先推荐最匹配的工具;
  4. 便于扩展:新增工具只需要在图中添加相应的节点和边,无需修改现有逻辑;

值得注意的是,这里的跳数 hops 默认是 0,阈值 threshold 默认是 0.5,暂时没看到哪里可以配,所以目前的工具推荐逻辑比较简单,算子里配了哪些动作,就推荐哪些动作以及关联的工具。关于工具库的能力还在持续优化中,后面可能会支持工具集的一键注册,以及基于强化学习的工具图谱优化等,大家可以关注项目的后续发展。

小结

今天我们深入分析了 Chat2Graph 工具系统的实现原理。从推理机如何通过 ModelService 调用大模型开始,我们了解了工具调用的完整流程;然后深入探讨了基于有向图的工具系统设计,包括 ActionToolToolGroupToolkit 等核心概念,以及它们如何协作实现智能化的工具推荐。

这种图结构清晰地定义了任务执行过程中的不同阶段、每个阶段可用的工具,以及它们之间潜在的转换路径,从而为任务执行提供了明确的流程引导。

关于工具系统的原理我们就学到这,不过 Chat2Graph 到底有哪些工具,每个工具又是做什么的,我们目前还不太清楚,明天就来看看这块。


详解 Chat2Graph 的推理机实现

经过昨天的学习,我们了解了智能体的三大核心组件:角色、推理机和工作流。角色用于对智能体的专业能力、任务范围和操作限制进行描述,帮助 Leader 更好地分配任务;工作流则是通过将多个算子按照 DAG 结构组织,形成标准化的任务处理流程;而推理机是整个系统的核心,它通过调用大模型并执行推理任务,承担着理解任务指令、生成响应内容、调用外部工具以及进行复杂推理等职责。

Chat2Graph 的推理机和工作流中的算子是密不可分的,今天,我们将继续深入源码,从算子的配置和运行逻辑开始,详细分析推理机的实现机制。

算子的配置

我们知道,工作流由多个算子组成,每个算子承担着特定阶段的任务处理,Chat2Graph 的算子和 AWEL 中的基础算子不同,Chat2Graph 中的每个算子都是由推理机驱动的。在构建工作流时,我们会对算子执行初始化:

operator = (
  OperatorWrapper()
  .instruction(op_config.instruction)
  .output_schema(op_config.output_schema)
  .actions(operator_actions)
  .build()
)

可以看到算子的配置包括 instructionoutput_schemaactions 三个字段,这些信息都是供推理机使用的:

  • instruction - 给推理机的指令,用于描述算子的角色、需要完成的任务目标、执行任务时的注意事项以及期望的输出风格等;推理机会根据这个指令来理解任务并规划执行步骤;
  • output_schema - 定义算子执行完成后期望输出内容的格式和结构,可以是 YAML、JSON 格式或者自然语言的字符串描述,用于指导推理机生成符合预期的结构化输出;
  • actions - 算子在执行任务时可以调用的一系列动作,算子会基于这些动作,通过 ToolkitService 获取推荐的工具和更具体的动作,供推理机选择和执行;

昨天我们提到,“建模专家” 的工作流包含两个算子:文档分析(analysis_operator)和概念建模(concept_modeling_operator),我们可以看下这两个算子在 chat2graph.yml 文件中的配置:

operators:
  - &analysis_operator
    instruction: |
      你是一名专业的文档分析专家,擅长从文档中提取关键信息,为构建知识图谱奠定坚实基础。
      你需要理解文档内容。请注意,你所分析的文档可能只是完整集合的一部分,这要求你能从局部细节推断出整体情况。
      请注意,你的任务不是操作图数据库。你的任务是对文档进行分析,为后续的知识图谱建模提供重要信息。
      请确保你的分析全面且详细,并为每个结论提供充分的推理依据。
    output_schema: |
      **领域**: 对文档领域的描述,有助于后续的建模和数据提取。
      **数据全景视图**:对文档内整体数据的详细评估,包括数据结构、规模、实体关系等,并提供推理和依据。
      **概念**:已识别的关键概念列表,每个概念包含名称、描述和重要性。
      **属性**:为概念识别出的属性列表,每个属性包括其关联的概念、名称、描述和数据类型。
      **潜在关系**:已识别的潜在关系列表,每个关系包括其类型、涉及的实体、描述和强度。
      **文档洞察**:该文档特有的其他重要信息或发现,用分号分隔。例如,对文档中提到的特定事件或概念的独特解读。
      **文档片段**:文档中用于支持分析结论并提供上下文的关键片段。可以是直接引语或重要段落。
    actions:
      - *content_understanding_action
      - *deep_recognition_action

  - &concept_modeling_operator
    instruction: |
      你是一位知识图谱建模专家,擅长将概念和关系转化为图数据库模式。
      你需要基于文档分析的结果完成概念建模任务,同时确保图模型的正确性和可达性。

      1. 模式生成
      使用 `graph_schema_creator` 函数生成模式,为顶点和边创建特定的模式。你不能直接编写 Cypher 语句,而是使用提供的工具函数与数据库进行交互。
      请注意:模式生成不是关于向数据库中插入特定数据(如节点、关系),而是定义图数据库的结构(模式/标签)。期望是定义实体类型、关系类型、约束等内容。
      该任务的背景是知识图谱,因此应关注相对通用的实体类型,而非特定的个体实体。例如,考虑时间、抽象概念、物理实体和社会实体等主要维度。
      你需要多次阅读现有的 TuGraph 模式,以确保使用该工具创建的模式符合预期。

      2. 验证图的可达性
      可达性是图数据库的核心特性之一,它确保图中的实体和关系之间存在有效的连接路径,以支持复杂的查询需求。这在图建模中很重要,因为如果图不可达,就无法构建完整的知识图谱。
      通过查询图数据库以检索图的结构信息,来验证实体和关系的可达性。
    output_schema: |
      **图模式可达性**:可达性分析结果,描述图中实体与关系之间的连接路径。
      **状态**:模式状态,指示其是否通过验证。
      **实体标签**:成功创建的实体标签列表,例如 “Person(人)”、“Organization(组织)”。
      **关系标签**:成功创建的关系标签列表,例如 “WorksAt(就职于)”、“LocatedIn(位于)”。
    actions:
      - *content_understanding_action
      - *deep_recognition_action
      - *entity_type_definition_action
      - *relation_type_definition_action
      - *schema_design_and_import_action
      - *graph_validation_action

注意这里的 YAML 语法,使用 & 表示引用,在定义专家时,可以使用 * 表示引用的内容:

experts:
  - profile:
      name: "Design Expert"
    workflow:
      - [*analysis_operator, *concept_modeling_operator]

算子的运行逻辑

让我们深入 app/core/workflow/operator.py 文件,了解算子的执行逻辑:

class Operator:

  async def execute(
    self,
    reasoner: Reasoner,
    job: Job,
    workflow_messages: Optional[List[WorkflowMessage]] = None,
    previous_expert_outputs: Optional[List[WorkflowMessage]] = None,
    lesson: Optional[str] = None,
  ) -> WorkflowMessage:
    
    # 1. 构建 Task 对象
    task = self._build_task(
      job=job,
      workflow_messages=workflow_messages,
      previous_expert_outputs=previous_expert_outputs,
      lesson=lesson,
    )

    # 2. 通过推理机进行推理
    result = await reasoner.infer(task=task)

    # 3. 返回工作流消息
    return WorkflowMessage(payload={"scratchpad": result}, job_id=job.id)

算子的执行过程比较简单,可以分为三个核心步骤:

  1. 构建任务对象:将原始的 Job 信息、前置算子的输出、相关工具和动作等整合成一个完整的 Task 对象,具体的整合步骤包括:

    • 获取推荐工具和动作:通过 ToolkitService 根据算子配置中的 actions、相似度阈值和图中的跳数,推荐相关的工具和动作;
    • 整合上下文信息:合并来自前置算子和专家的输出消息,形成完整的上下文;
    • 检索知识信息:从知识库中获取与任务目标相关的知识;
    • 获取文件资源:提取任务相关的文件描述符,便于推理机访问文件内容;
    • 包含经验教训:如果存在来自智能体的反馈,也会包含在任务中指导执行;
  2. 调用推理机推理:将构建好的 Task 对象交给推理机处理,这是算子执行的核心环节;
  3. 返回工作流消息:将推理机的执行结果封装为 WorkflowMessage 返回,供后续算子使用;

算子和推理机的关系

从算子的执行流程可以看出,算子和推理机之间是一种明确的协作关系:

  • 算子是调度者:负责收集和整理任务执行所需的所有资源和上下文信息,包括工具、动作、知识、文件、经验教训等;
  • 推理机是执行者:接收算子构建的 Task 对象,基于大语言模型的推理能力,理解任务指令、选择合适的工具、执行相应的动作,并生成最终结果。

operator-execute.png

这种分工让系统具备了良好的可扩展性。算子专注于任务的资源准备和流程编排,而推理机专注于智能推理和工具调用。两者通过标准化的 Task 对象进行交互,实现了松耦合的设计。

推理机的定义

Chat2Graph 的推理机模块是与大语言模型交互的核心,其职责在于处理提示词,执行推理任务,并提供工具调用的能力。它封装了底层大模型的调用细节,为算子提供统一的推理接口。推理机的基类定义在 app/core/reasoner/reasoner.py 中:

class Reasoner(ABC):
  
  @abstractmethod
  async def infer(self, task: Task) -> str:
    # 通过推理机进行推理

  @abstractmethod
  async def conclude(self, reasoner_memory: ReasonerMemory) -> str:
    # 总结推理结果

推理机的核心职责包括:

  1. 任务理解:解析算子传递的 Task 对象,理解任务目标、上下文信息和可用资源;
  2. 推理规划:基于任务要求制定执行计划,选择合适的工具和动作;
  3. 工具调用:与外部工具进行交互,获取必要的信息或执行特定操作;
  4. 结果生成:综合推理过程和工具调用结果,生成最终的任务输出;

Chat2Graph 提供了两种推理机实现:单模推理机(MonoModelReasoner)双模推理机(DualModelReasoner),以适应不同场景下的推理需求。我们可以在 chat2graph.yml 配置文件中切换:

reasoner:
  type: "DUAL" # MONO 单模,DUAL 双模

单模推理机

单模推理机依赖于单个大模型实例来完成所有任务处理阶段,包括理解用户指令、进行思考、选择必要的工具,并最终生成回复或执行动作。它的实现如下:

class MonoModelReasoner(Reasoner):

  async def infer(self, task: Task) -> str:
    
    # 设置系统提示词
    sys_prompt = self._format_system_prompt(task=task)
    
    # 初始消息,触发推理过程
    init_message = ModelMessage(
      source_type=MessageSourceType.MODEL,
      payload=(
        "<deep_thinking>\nLet's start to complete the task.\n</deep_thinking>\n"
        "<action>\nEmpty\n</action>\n"
      ),
      job_id=task.job.id,
      step=1,
    )
    reasoner_memory.add_message(init_message)

    # 不断调用大模型生成响应
    for _ in range(max_reasoning_rounds):
      response = await self._model.generate(
        sys_prompt=sys_prompt,
        messages=reasoner_memory.get_messages(),
        tools=task.tools,
        tool_call_ctx=task.get_tool_call_ctx(),
      )
      response.set_source_type(MessageSourceType.MODEL)
      reasoner_memory.add_message(response)

      # 判断任务是否完成
      if self.stopped(response):
        break

    # 提取最终的推理结果
    return await self.conclude(reasoner_memory=reasoner_memory)

单模推理机的工作方式相对直观:

  1. 系统提示词:结合算子的配置和任务上下文等信息构建系统提示词;
  2. 初始化推理:创建一个固定的初始消息,包含基本的思考和行动框架;
  3. 循环推理:在最大推理轮数内不断调用模型生成响应;
  4. 结果判断:检查模型是否完成任务,通过 stopped() 方法检查是否包含 <deliverable> 标签;
  5. 结论生成:从 <deliverable> 标签中提取最终的推理结果;

单模推理机的核心在于 MONO_PROMPT_TEMPLATE 这个提示词模板,这个提示词比较长,我们主要关注几点:

  • 元认知框架:要求大模型使用元认知框架和自己对话,进行深度思考 (<deep_thinking>),展示其推理过程和深度;
  • 思考与行动一体:在深度思考之后,在 <action> 部分执行具体的行动,包括文本生成、分析或调用外部工具;
  • 终止条件与交付:当大模型判断任务已解决时,必须使用 TASK_DONE<deliverable> 标签来标记任务完成;

元认知(Metacognition) 是心理学和教育学领域的一个概念,简单来说,它指的是 对认知的认知,也就是我们主动监控、评估、调节自己思考过程的能力。

单模推理机旨在使单个大模型能够以自给自足的方式处理复杂任务,它的主要优点在于配置简单直观,推理链路相对简短。但在处理需要强大推理能力或复杂能力组合的任务时,可能会表现出性能不足。

双模推理机

心理学家卡尼曼提出,人类大脑中存在两套系统:系统1系统2。系统1是无意识的、快速的、直观的,而系统2则是有意识的、缓慢的、需要付出心理努力的,这两套系统在我们日常生活中相互作用,共同影响着我们的思考、决策和行为。

system1-system2.png

双模推理机模拟了人类的快思考和慢思考系统,采用两个大模型实例协同工作的模式。让我们看看它的实现:

class DualModelReasoner(Reasoner):

  async def infer(self, task: Task) -> str:
    
    # 设置 Thinker 和 Actor 的系统提示词
    actor_sys_prompt = self._format_actor_sys_prompt(task=task)
    thinker_sys_prompt = self._format_thinker_sys_prompt(task=task)
    
    # 交替调用大模型生成响应
    for _ in range(max_reasoning_rounds):
      # Thinker 思考
      response = await self._thinker_model.generate(
        sys_prompt=thinker_sys_prompt,
        messages=reasoner_memory.get_messages(),
        tool_call_ctx=task.get_tool_call_ctx(),
      )
      response.set_source_type(MessageSourceType.THINKER)
      reasoner_memory.add_message(response)

      # Actor 执行
      response = await self._actor_model.generate(
        sys_prompt=actor_sys_prompt,
        messages=reasoner_memory.get_messages(),
        tools=task.tools,
        tool_call_ctx=task.get_tool_call_ctx(),
      )
      response.set_source_type(MessageSourceType.ACTOR)
      reasoner_memory.add_message(response)

      # 判断任务是否完成
      if self.stopped(response):
        break

    # 提取最终的推理结果
    return await self.conclude(reasoner_memory=reasoner_memory)

双模推理机和单模推理机的代码结构几乎是一样的,只是在每轮推理时调用了两次大模型,分别扮演两个不同的角色:

  • Thinker(思考者):负责理解复杂的用户意图、分解任务、制定计划,并在需要时决定调用哪个工具或子任务;
  • Actor(执行者):负责接收并执行 Thinker 的指令,擅长快速思考,专注于工具调用和格式化输出;

dual-reasoner.png

双模推理机为 Thinker 和 Actor 分别设计了不同的提示词模板。

Thinker 提示词(THINKER_PROMPT_TEMPLATE)的要点如下:

  • 元认知框架:同样要求大模型使用元认知框架,使用 <deep_thinking> 标签进行深度思考,结合思考模式标记来展现其推理过程;
  • 指令生成:主要输出是为 Actor 生成清晰、具体的指令 (<instruction>) 和必要的输入 (<input>);
  • 结果评估:负责评估 Actor 的执行结果,包括工具调用的结果,基于此进行下一步规划;
  • 禁止行为:不应自己执行工具调用或生成最终交付成果;

Actor 提示词(ACTOR_PROMPT_TEMPLATE)的要点如下:

  • 指令执行:接收并执行 Thinker 提供的指令和输入;
  • 浅层思考:使用 <shallow_thinking> 进行浅层思考,主要聚焦于如何准确执行当前指令;
  • 动作执行:在 <action> 部分执行具体操作,包括生成文本、分析或调用工具;
  • 最终交付:当 Thinker 发出 TASK_DONE 指令后,负责整合信息并生成 <deliverable> 最终交付物;

下面是我在任务执行时抓的一次请求:

dual-reasoner-log.png

可以看到,Thinker 和 Actor 就像人一样,你一言我一语地交叉式对话,直到 Thinker 认为任务已经完成,发出 TASK_DONE 指令。当 Actor 收到该指令后,就会通过 <deliverable> 标签生成最终结果,完成推理。

CoA 多智能体范式

Chat2Graph 的双模推理机设计受到了 OPPO 提出的 Chain-of-Agents(CoA) 范式的启发。CoA 是一种将多智能体协作能力融入单一模型的创新方法,通过 单一模型模拟多智能体协作 来解决传统多智能体系统效率低、泛化弱、数据驱动学习缺失等问题。论文地址:

这是一种端到端的训练方法,在单一模型内动态激活,模拟多智能体协作,通过 智能体监督微调(Agentic SFT)智能体强化学习(Agentic RL) 两个阶段,训练出 智能体基础模型(Agent Foundation Models, AFMs),在网页搜索、代码生成、数学推理三大领域的近 20 个基准测试中刷新了 SOTA 水平。

与传统的智能体范式相比,CoA 的优势如下表所示:

agent-methods.png

前两种范式用于单智能体:ReAct 基于 Prompt 实现,是最早也是最简单的智能体实现,但是这种方式不是端到端的,没办法训练,于是提出了 TIR 工具集成推理。

后两种范式用于多智能体:传统的 MAS 框架和 ReAct 一样,也很难训练,无法通过数据驱动学习,CoA 方法的提出就是弥补这块的缺口。

CoA 将智能体划分为 角色智能体工具智能体,这和之前介绍的 LLM-Agent-UMF 框架将智能体划分为 主动核心智能体被动核心智能体 有着异曲同工之妙:

coa-agents.png

CoA 和 TIR 很像,只不过两者推理的轨迹不同,TIR 推理轨迹是工具的调用,而 CoA 的轨迹是不同智能体的激活:

tir-vs-coa.png

Chat2Graph 的双模推理机借鉴了 CoA 的核心思想,但做了适合系统架构的简化:比如 Thinker 承担了 CoA 中思考智能体、规划智能体和反思智能体的职责,负责深度分析、任务规划和结果评估;而 Actor 类似于 CoA 中的各种工具智能体,专注于执行具体的工具调用和操作;通过 Thinker 和 Actor 之间的交替对话,实现了推理状态的动态转移和智能体角色的切换;但是与 CoA 端到端优化的训练方法不同,Chat2Graph 通过提示词工程和推理机架构设计,在推理时实现了多智能体协作的效果。

小结

今天,我们深入分析了 Chat2Graph 中算子与推理机的协作机制,算子作为调度者负责收集和整理任务资源,推理机作为执行者基于大语言模型的能力完成智能推理和工具调用,两者通过标准化的 Task 对象实现了松耦合的协作。

特别值得关注的是 Chat2Graph 的双模推理机设计,它借鉴了 OPPO 提出的 Chain-of-Agents 范式,通过 Thinker 和 Actor 两个角色的协作,模拟了人类的快慢思考系统。Thinker 负责深度分析和规划,Actor 专注于执行和工具调用,这种分工让系统在处理复杂任务时展现出了更强的推理能力和更好的可解释性。

相比单模推理机的简洁直观,双模推理机虽然增加了一定的复杂度,但通过角色专业化和任务分工,能够在复杂场景下提供更优的推理质量。这种设计为智能体系统的推理能力提升提供了新的思路和实践路径。

在下一篇文章中,我们将继续深入 Chat2Graph 的工具系统,了解它如何实现智能体与外部世界的交互,以及如何通过有向图来优化工具调用的效果。


详解 Chat2Graph 的工作流实现

在前一篇文章中,我们深入分析了 Chat2Graph 中 Leader 智能体的任务分解与执行机制,了解了基于 DAG 的子任务图规划器和状态驱动的容错机制。今天,我们将继续深入源码,从 Expert 接受子任务开始,详细剖析智能体的初始化过程和工作流的实现原理。

Expert 智能体的实现

我们昨天学到,当 Leader 完成任务分解后,子任务会被分配给对应的 Expert 智能体执行。Expert 智能体的实现位于 app/core/agent/expert.py 文件:

class Expert(Agent):
  
  # 调用 Expert 智能体执行子任务
  def execute(self, agent_message: AgentMessage, retry_count: int = 0) -> AgentMessage:
    
    # 更新任务状态为 `RUNNING`
    job_result.status = JobStatus.RUNNING
    self._job_service.save_job_result(job_result=job_result)

    # 获取前置 Expert 的输出和经验教训
    workflow_messages: List[WorkflowMessage] = agent_message.get_workflow_messages()

    # 执行 Expert 智能体的工作流
    workflow_message: WorkflowMessage = self._workflow.execute(
      job=job,
      reasoner=self._reasoner,
      workflow_messages=workflow_messages,
      lesson=agent_message.get_lesson(),
    )

    # 检查执行状态
    self._message_service.save_message(message=workflow_message)
    if workflow_message.status == WorkflowStatus.SUCCESS:
      # 执行成功,保存结果,更新任务状态
      return expert_message
    if workflow_message.status == WorkflowStatus.EXECUTION_ERROR:
      # 执行出错,基于失败经验重试一次
      lesson = workflow_message.evaluation + "\n" + workflow_message.lesson
      agent_message.add_lesson(lesson)
      return self.execute(agent_message=agent_message, retry_count=retry_count + 1)
    if workflow_message.status == WorkflowStatus.INPUT_DATA_ERROR:
      # 输入数据错误,通知 Leader 重新执行前置任务
      lesson = "The output data is not valid"
      expert_message = self.save_output_agent_message(
        job=job, workflow_message=workflow_message, lesson=lesson
      )
      return expert_message
    if workflow_message.status == WorkflowStatus.JOB_TOO_COMPLICATED_ERROR:
      # 子任务过于复杂,通知 Leader 重新分解任务
      lesson = "The job is too complicated to be executed by the expert"
      expert_message = self.save_output_agent_message(
        job=job, workflow_message=workflow_message, lesson=lesson
      )
      return expert_message
    raise Exception("The workflow status is not defined.")

Expert 的执行和 Leader 的执行非常类似,都是先将状态置为 RUNNING,然后获取前置 Expert 的输出和经验教训,调用该智能体的工作流,最后根据不同的返回状态进行相应的处理。当返回 EXECUTION_ERROR 时,Expert 智能体会基于失败经验重试;当返回 INPUT_DATA_ERROR 时,通知 Leader 重新执行前置任务;当返回 JOB_TOO_COMPLICATED_ERROR 时,通知 Leader 重新分解任务。我们昨天已经详细学习过这些状态,此处不再赘述。

智能体的核心组成

从 Leader 智能体和 Expert 智能体的执行过程可以看出,其本质都是调用其内置的工作流实现的。在学习 chat2graph.yml 配置文件时,我们曾介绍过,Chat2Graph 的智能体由 角色(Profile)推理机(Reasoner)工作流(Workflow) 三个核心组件构成。下面是 5 位专家的详细配置:

experts:
  - profile:
      name: "Design Expert"
      desc: |
        他是一位知识图谱建模(模式)专家。
        他的任务是根据特定的数据需求设计图谱的模式,清晰定义顶点(Vertices)和边(Edges)的类型、属性及关系。同时,他负责在图谱数据中创建/更新模式。
        他只能为特定的图数据库实例创建或修改数据结构(模式)。
        他的输出是清晰的模式定义,供后续数据导入使用。**他本身不处理具体数据(增删改查),也从不回答关于图数据库产品或技术本身的一般性介绍或询问。**
    reasoner:
      actor_name: "Design Expert"
      thinker_name: "Design Expert"
    workflow:
      - [*analysis_operator, *concept_modeling_operator]

  - profile:
      name: "Extraction Expert"
      desc: |
        他是一位原始数据提取与图数据导入专家。
        其前提条件是:目标图数据库中必须已存在图模式,且该模式已定义好节点和边的标签(无论其是否采用弱模式;否则,该专家无法执行任务),同时必须指定明确的原始数据源(例如用户提供的文档、文件、数据库表、待处理文本等),以便进行处理并导入到特定的图数据库实例中。
        他的任务是:1. 根据已定义的模式从原始数据中提取结构化信息。2. 将提取到的信息导入到目标图数据库中。
        他会输出数据导入过程的摘要或状态报告。**他不负责设计模式或执行查询分析,也绝不会提供有关图数据库技术或产品的一般性介绍。**
    reasoner:
      actor_name: "Extraction Expert"
      thinker_name: "Extraction Expert"
    workflow:
      - [*data_importation_operator]

  - profile:
      name: "Query Expert"
      desc: |
        他是一位图数据查询专家。
        假设有一个具有现有数据和已知结构的特定图数据库实例,需要执行精确的查询来检索特定的数据点或关系。
        他的任务是:1. 理解用户的具体查询意图。2. 编写精确的图查询语句。3. 在目标图数据库上执行查询。
        他将返回从查询中获得的具体数据结果。**他不进行复杂的图算法分析,不负责数据建模或导入,并且绝对不回答关于图数据库概念、产品或技术本身的一般性问题。**
    reasoner:
      actor_name: "Query Expert"
      thinker_name: "Query Expert"
    workflow:
      - [*query_design_operator]

  - profile:
      name: "Analysis Expert"
      desc: |
        他是一位图数据分析与算法应用专家。
        假设有一个特定的图数据库实例,其中包含现有的结构化数据,需要进行超出简单查询的复杂网络分析(如社区检测、中心性计算等)。
        他的任务是:根据分析目标,在目标图数据库上选择、配置并执行相应的图算法。
        他会返回算法的执行结果及其解释。**他不负责数据建模、导入、简单的节点/关系查询,也从不提供关于图数据库技术或产品的一般性介绍。**
    reasoner:
      actor_name: "Analysis Expert"
      thinker_name: "Analysis Expert"
    workflow:
      - [*algorithms_execute_operator]

  - profile:
      name: "Q&A Expert"
      desc: |
        他是一位通用问答与信息检索专家,具备不同优先级的多源研究能力。
        **当任务是要求提供关于某个概念、技术或产品的一般信息、定义、解释、比较或总结(例如,“介绍图”)时,他是首选专家,通常也是唯一的专家,** 尤其是当问题不涉及使用现有数据操作或查询特定的图数据库实例时。
        他的任务是:1. 理解问题。2. **优先将知识库检索作为主要信息来源。** 3. **仅在知识库结果不足或不完整时才进行网络研究。** 4. 综合信息时明确优先考虑知识库内容,将网络研究作为战略性补充。
        他会输出包含分层引用的全面答案,且优先引用知识库来源。** 他绝对不与任何特定项目的图数据库进行交互,不执行图查询或图算法,也不进行数据建模或导入。他专注于通过多源研究提供全面信息:知识库优先,网络研究作为智能备用。**
    reasoner:
      actor_name: "Q&A Expert"
      thinker_name: "Q&A Expert"
    workflow:
      - [*retrieval_operator, *summary_operator]

其中角色比较好理解,它是智能体的身份标识和能力定义,包含名称和描述两个字段。在 Expert 中,角色名称是被 Leader 识别的唯一标识,用于任务分配和结果追溯;描述部分则详细说明了智能体的专业能力、任务范围和操作限制,帮助 Leader 更好地根据专家配置信息来分配任务。在定义角色描述时,必须明确说明其能够处理的任务类型、执行所需的前置条件以及明确不应承担的职责范围。下面是定义角色描述的一些指导原则,供开发者参考:

  • 能力边界明确定义:应明确列出智能体的核心能力和操作限制。比如在 “建模专家” 的描述中,明确说明了 “他只能为特定的图数据库实例创建或修改数据结构” 以及 “他本身不处理具体数据(增删改查)”,这种明确的边界定义避免了任务分配时的歧义。
  • 前置条件和依赖关系声明:对于有特定前置要求的智能体,必须在描述中明确声明。比如在 “导数专家” 的描述中,要求 “目标图数据库中必须已存在图模式,且该模式已定义好节点和边的标签”,确保 Leader 在分配任务时能够正确评估任务的可执行性。
  • 职责范围的精确表述:使用否定描述来明确智能体不应承担的职责。比如在 “导数专家” 的描述中,要求 “他不负责设计模式或执行查询分析,也绝不会提供有关图数据库技术或产品的一般性介绍”,帮助 Leader 避免将不合适的任务分配给该智能体。
  • 输出预期的具体化:角色描述应说明智能体的典型输出形式。比如在 “建模专家” 的描述中,说明了 “他的输出是清晰的模式定义,供后续数据导入使用”,这有助于后续工作流的设计和任务链的构建。

通过这些指导原则,开发者能够创建具有清晰职责边界和明确能力定位的智能体,确保多智能体系统的高效协作。

第二个核心组件是推理机,它提供了基于大语言模型的推理能力。在智能体的工作流执行过程中,推理机承担着理解任务指令、生成响应内容、调用外部工具以及进行复杂推理等职责。

第三个核心组件是工作流,它是智能体执行任务的核心机制。它编排了任务执行的具体流程,定义了从任务接收到结果输出的完整处理步骤。

今天我们先来了解下工作流的实现原理,推理机的内容我们放到后面再看。

工作流的定义

工作流是由多个 算子(Operator) 按照特定的依赖关系组织而成的有向无环图(DAG),这是 Chat2Graph 作为图原生智能体系统的又一体现。在这个图中,每个节点代表一个算子,边代表算子之间的数据流向和执行顺序。

workflow.png

每个智能体(包括 Leader 和 Expert)都必须内置一个工作流,该工作流规定了智能体为完成特定类型任务所应遵循的标准化流程(SOP)。比如上面的 “建模专家”,它的工作流配置如下:

experts:
  - profile:
      name: "Design Expert"
    workflow:
      - [*analysis_operator, *concept_modeling_operator]

这表示 “建模专家” 包含两个算子,它的工作流程是先进行文档分析(analysis_operator),然后再进行概念建模(concept_modeling_operator)。

每个算子负责特定阶段的任务处理,算子之间通过 WorkflowMessage 进行数据传递,形成了一个有序的处理链路。此外,除了主要的执行算子外,工作流还可以在末尾集成一个可选的评估算子,负责对整个工作流的输出结果进行评估,并可能生成反馈,供后续的算子或智能体参考和使用。

Chat2Graph 默认并没有开启评估算子,如果要开启,需要修改 AgenticService.load() 的代码,在构建智能体的地方加上 evaluator() 方法。

工作流的实现

接下来我们来看下调用工作流的具体实现,其代码逻辑位于 app/core/workflow/workflow.py 文件:

class Workflow(ABC):

  def execute(
    self,
    job: Job,
    reasoner: Reasoner,
    workflow_messages: Optional[List[WorkflowMessage]] = None,
    lesson: Optional[str] = None,
  ) -> WorkflowMessage:

    # 构建工作流
    def build_workflow():
      with self.__lock:
        if self.__workflow is None:
          self.__workflow = self._build_workflow(reasoner)
        return self.__workflow
    built_workflow = build_workflow()

    # 执行工作流
    workflow_message = self._execute_workflow(
      built_workflow, job, workflow_messages, lesson
    )
    return workflow_message

当工作流首次被调用时,它会触发一个构建工作流的方法(由线程锁保护,确保只构建一次),此方法负责将配置的所有算子转换成一个具体的可执行工作流实例。构建完成后,调用工作流,传入当前任务(job)、来自先前智能体的输出(workflow_messages)以及可能的经验教训(lesson)。

这里的 self.__lock 是通过 threading.Lock() 创建的,是一个线程锁对象,结合 with 语句确保在进入代码块时自动获取锁,在退出代码块时自动释放锁。这是 Python 中线程同步的标准做法,用于保护临界区代码,防止多个线程同时访问共享资源。

值得注意的是,这里的 _build_workflow_execute_workflow 都是抽象方法,Chat2Graph 通过插件系统支持不同的工作流实现:

plugin:
  workflow_platform: "DBGPT"

不过目前暂时只支持一种,即 DB-GPT 工作流,由 DbgptWorkflow 类实现:

class DbgptWorkflow(Workflow):
  
  # 构建 DB-GPT 工作流
  def _build_workflow(self, reasoner: Reasoner) -> DbgptMapOperator:
    
    # 创建 DAG
    with DAG("dbgpt_workflow"):

      # 创建 InputOperator
      input_op = InputOperator(input_source=SimpleCallDataInputSource())

      # 第一步:将所有的算子转换为 MapOperator
      map_ops: Dict[str, DbgptMapOperator] = {}  # op_id -> map_op
      for op_id in self._operator_graph.nodes():
        base_op = self._operator_graph.nodes[op_id]["operator"]
        map_ops[op_id] = DbgptMapOperator(operator=base_op, reasoner=reasoner)

      # 第二步:插入 JoinOperator 连接 MapOperator
      for op_id in nx.topological_sort(self._operator_graph):
        current_op: DbgptMapOperator = map_ops[op_id]
        in_edges = list(self._operator_graph.in_edges(op_id))
        # 如果有前驱节点,连接前驱 MapOperator 到 JoinOperator,再连接到当前 MapOperator
        # JoinOperator 用于合并 InputOperator 和所有前驱 MapOperator 的数据
        if in_edges:
          join_op = JoinOperator(combine_function=_merge_workflow_messages)
          for src_id, _ in in_edges:
            map_ops[src_id] >> join_op
          input_op >> join_op
          join_op >> current_op
        else:
          # 如果没有前置节点,直接连接 InputOperator 和当前节点
          input_op >> current_op

      # 第三步:找到工作流的尾部,出度为 0 的算子(应该只有一个)
      tail_map_op_ids = [
        n for n in self._operator_graph.nodes() if self._operator_graph.out_degree(n) == 0
      ]
      _tail_map_op: DbgptMapOperator = map_ops[tail_map_op_ids[0]]

      # 第四步:如果存在评估器,添加到工作流末尾
      if self._evaluator:
        eval_map_op = DbgptMapOperator(operator=self._evaluator, reasoner=reasoner)
        join_op = JoinOperator(combine_function=_merge_workflow_messages)
        _tail_map_op >> join_op
        input_op >> join_op
        join_op >> eval_map_op
        self._tail_map_op = eval_map_op
      else:
        self._tail_map_op = _tail_map_op
      
      # 返回尾部算子
      return self._tail_map_op

  # 执行 DB-GPT 工作流
  def _execute_workflow(
    self,
    workflow: DbgptMapOperator,
    job: Job,
    workflow_messages: Optional[List[WorkflowMessage]] = None,
    lesson: Optional[str] = None,
  ) -> WorkflowMessage:
    return run_async_function(workflow.call, call_data=(job, workflow_messages, [], lesson))

这里的代码比较晦涩难懂,主要是因为这里使用了 DB-GPT 的 AWEL 语法,为了搞清楚这里的逻辑,我们不妨先了解下 DB-GPT 以及它的 AWEL 技术。

DB-GPT 和 AWEL

DB-GPT 是一个开源的 AI 原生数据应用开发框架。2023 年 6 月由蚂蚁集团发起,通过 多模型管理Text2SQL 效果优化RAG 框架多智能体框架协作基于 AWEL 的工作流编排 等多项技术能力,让以数据库为基础的大模型应用变得更简单、更便捷。

db-gpt.jpg

DB-GPT 的主要功能特性包括:

  • 检索增强生成:实现了一套基于 RAG 的框架,用户可基于此构建知识类应用,支持自定义构建知识库,对海量结构化和非结构化数据进行统一向量存储与检索;
  • 生成式商业智能:支持自然语言与 Excel、数据库、数据仓库等多种数据源交互,并能进行分析报告,为企业报表分析、业务洞察提供数智化技术保障;
  • 微调框架:提供 Text2SQL 的自动化微调框架,支持 LoRA/QLoRA/Pturning 等微调方法,与 DB-GPT 项目无缝集成,在 Spider 数据集的准确率达到了 82.5%;
  • 多智能体框架:提供了一个数据驱动的自进化多智能体框架,旨在基于数据持续进行决策和执行,且原生支持 Auto-GPT 插件模型,智能体协议遵循 Agent Protocol 标准;
  • 数据工厂:主要用于在大模型时代对可信的知识和数据进行清洗与处理;
  • 数据源:整合各种数据源,将生产业务数据与 DB-GPT 的核心能力无缝连接;

以下为它的架构图:

db-gpt-arch.png

在架构图的下方,可以看到一块绿色,这就是 DB-GPT 独创的智能工作流表达语言 AWEL (Agentic Workflow Expression Language),Chat2Graph 的工作流就是基于它实现的。

AWEL 中的一个核心概念是 算子(Operator),Chat2Graph 中的算子就是来源于他,下面是 AWEL 的入门示例:

import asyncio

from dbgpt.core.awel import DAG, MapOperator

with DAG("awel_hello_world") as dag:
  task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))

asyncio.run(task.call(call_data="world"))

这段代码创建了一个名为 awel_hello_world 的 DAG,该 DAG 仅包含一个简单的 MapOperator 算子,它是 AWEL 内置的基础算子,接收一个 map_function 函数并使用传递给它的数据来调用该函数。所有的算子都是异步执行的,因此我们用 asyncio.run() 来执行工作流。

上面这个 DAG 只有一个节点,让我们看一个包含多节点的例子:

import asyncio

from dbgpt.core.awel import DAG, MapOperator, InputOperator, SimpleCallDataInputSource

with DAG("awel_hello_world") as dag:
  input_task = InputOperator(input_source=SimpleCallDataInputSource())
  task = MapOperator(map_function=lambda x: print(f"Hello, {x}!"))
  input_task >> task

asyncio.run(task.call(call_data="world"))

在这段代码中,我们新加了一个 InputOperator 算子,这也是 AWEL 的基础算子之一,该算子接受工作流的输入,并传递给下一个算子。我们使用 >> 运算符来连接两个算子,该运算符用于定义算子之间的父子关系,也称为任务依赖。你也可以使用 set_downstream() 方法来定义任务依赖,比如:

  input_task.set_downstream(task)

了解了 AWEL 中的这些概念,再回过头看 DbgptWorkflow 的实现就简单多了。其中 DbgptMapOperator 就是 MapOperator 的一个实现,它是对 Chat2Graph 算子的封装,每个 Chat2Graph 算子都是通过推理机调用大模型完成某项具体的任务;另外,前后 MapOperator 算子之间使用 JoinOperator 算子连接,它接受一个 combine_function 函数将来自多个输入的数据合并为一个数据;最后将评估器算子 EvalOperator 添加到整个工作流的尾部。

值得留意的一点是,在 AWEL 定义的工作流中,可以有多个起始节点,但只有一个结束节点,因此在调用工作流时是通过尾节点进行调用的。看 _build_workflow() 的代码,返回尾部算子即可,在 _execute_workflow() 中,直接调用这个尾部算子的 call() 方法。

更多关于 AWEL 的内容,可以参考 DB-GPT 的官方文档:

小结

今天,我们深入分析了 Chat2Graph 中工作流的实现原理,从 Expert 智能体的执行逻辑开始,详细了解了智能体的三大核心组件:角色、推理机和工作流。工作流作为智能体执行任务的核心机制,通过多个算子按照 DAG 结构组织,形成了标准化的任务处理流程,同时也体现了 Chat2Graph 作为图原生智能体系统的特色。

此外,通过对 DB-GPT 和 AWEL 技术的学习,我们理解了 Chat2Graph 工作流的底层实现机制。AWEL 作为智能工作流表达语言,提供了基于算子和有向无环图的编程模型,使得复杂的智能体工作流能够以声明式的方式进行定义和执行。

在下一篇文章中,我们将继续深入 Chat2Graph 的推理机制,了解它如何实现快思考与慢思考的双 LLM 推理,以及这种设计如何提升智能体的推理质量和可解释性。


学习 Chat2Graph 的任务分解与执行

在前一篇文章中,我们学习了 Chat2Graph 一主动多被动的多智能体混合架构,了解了 Leader 智能体作为主动核心,负责统一决策和任务协调,而 Expert 智能体作为被动核心,专注于特定领域的任务执行。今天,我们将继续深入源码,详细分析 Leader 接受到任务之后的具体执行逻辑。

Leader 智能体的实现

我们接着昨天的流程往下走,当通过 JobWrapper 提交任务后,系统调用 AgentService 的 Leader 智能体开始处理。让我们深入 leader.py 文件的 execute_original_job() 方法,了解 Leader 的完整工作流程:

class Leader(Agent):
  
  # 执行原始任务
  def execute_original_job(self, original_job: Job) -> None:
    
    # 更新任务状态为 `RUNNING`
    original_job_result.status = JobStatus.RUNNING
    self._job_service.save_job_result(job_result=original_job_result)

    # 将任务分解为子任务图
    decomposed_job_graph: JobGraph = self.execute(
      agent_message=AgentMessage(
        job_id=original_job.id,
      )
    )

    # 将子任务图更新到 JobService
    self._job_service.replace_subgraph(
      original_job_id=original_job.id, new_subgraph=decomposed_job_graph
    )

    # 执行子任务图
    self.execute_job_graph(original_job_id=original_job.id)

该方法接收一个原始任务,首先将其状态更新为 RUNNING,然后调用 execute() 方法将其分解为子任务图,并将此图存入 JobService,最后调用 execute_job_graph() 来执行这张图。

任务分解和分配

我们先看一下任务分解的逻辑:

class Leader(Agent):

  # 将任务分解为子任务图
  def execute(self, agent_message: AgentMessage, retry_count: int = 0) -> JobGraph:

    # 如果已经预设了 Expert,那么 Leader 将会跳过任务分解,直接将该任务分配给预设的 Expert
    # 此时,子任务图只有一个节点,即原始任务
    assigned_expert_name: Optional[str] = job.assigned_expert_name
    if assigned_expert_name:
      subjob = SubJob(
        original_job_id=original_job_id,
        goal=job.goal,
        assigned_expert_name=assigned_expert_name,
      )
      self._job_service.save_job(job=subjob)
      job_graph: JobGraph = JobGraph()
      job_graph.add_vertex(subjob.id)
      return job_graph

    # 获取专家列表
    # 组装分解提示词

    try:
      # 调用 Leader 智能体分解任务
      workflow_message = self._workflow.execute(job=decomp_job, reasoner=self._reasoner)

      # 解析子任务
      results: List[Union[Dict[str, Dict[str, str]], json.JSONDecodeError]] = parse_jsons(
        text=workflow_message.scratchpad,
        start_marker=r"^\s*<decomposition>\s*",
        end_marker="</decomposition>",
      )

    except (ValueError, json.JSONDecodeError, Exception) as e:
      # 出错后,提供异常信息再重试一次
      print("\033[38;5;208m[INFO]: Retrying decomposition with lesson...\033[0m")
      lesson = "<异常信息>"
      workflow_message = self._workflow.execute(
        job=decomp_job,
        reasoner=self._reasoner,
        lesson=lesson,
      )

    # 根据模型返回,创建子任务图
    job_graph = JobGraph()
    for subjob_id, subjob_dict in job_dict.items():
      # 每个子任务分配一个专家,对应图中的节点
      expert_name = subjob_dict["assigned_expert"]
      subjob = SubJob(
        original_job_id=original_job_id,
        goal=subjob_dict["goal"],
        assigned_expert_name=expert_name,
      )
      self._job_service.save_job(job=subjob)
      job_graph.add_vertex(subjob.id)

    for subjob_id, subjob_dict in job_dict.items():
      # 根据任务之间的依赖关系构建边
      current_unique_id = temp_to_unique_id_map[subjob_id]
      for dep_id in subjob_dict.get("dependencies", []):
        job_graph.add_edge(dep_unique_id, current_unique_id)

    # 确保子任务图为 DAG
    if not nx.is_directed_acyclic_graph(job_graph.get_graph()):
      self.fail_job_graph(...)
      return JobGraph()

    return job_graph

代码首先判断是否已经预设了 Expert,也就是用户对话时手动选定了某个专家,那么 Leader 将会跳过任务分解,直接将该任务分配给预设的 Expert,此时,子任务图只会有一个节点,即原始任务。

如果没有预设 Expert,Leader 则将用户的原始任务分解为一张子任务图。不同于传统智能体系统的线性规划器,Chat2Graph 采用了基于图结构的规划器,将原始任务拆分为可执行单元的同时,还保留了子任务间的依赖关系,以更好地应对任务执行的不确定性。

这是一个有向无环图(DAG),其中节点代表子任务,边代表子任务间的依赖关系:

plan.png

任务分解的核心在于一个精心设计的 Prompt,即 JOB_DECOMPOSITION_PROMPT,它指导 Leader 将一个主任务分解为一系列可执行的子任务并分配给对应的 Expert:

## 任务范围与大语言模型能力

### 角色

将核心任务拆解为多个子任务(分配给多个领域专家)或单个子任务(分配给单个领域专家)。有时,一名专家可承担多个子任务。需确保下游专家获取完成任务所需的全部信息。构建上下文字段时,不得因总结或简化而丢失用户原始请求中的任何约束条件、数据或上下文信息。数学公式、代码片段、特定名称、假设前提、限制条件及指定要求等关键信息,必须完整保留原始形式。

### 能力

1. **主动推断意图(强制第一步)**:至关重要的是,你必须结合给定任务、完整对话历史及当前系统状态,推断用户的**真实潜在意图及下一步合理操作**。需自问:“结合当前系统状态、过往交互记录及用户最新输入(即便非指令形式),用户在整体任务中真正希望下一步实现什么目标?”  
2. **确定目标专家与行动**:分析当前系统状态(若提供相关工具/函数,可调用以获取信息),理解系统环境。随后,基于第一步推断的意图、当前系统状态、专家资质及其他潜在信息,确定具备完成下一步合理操作所需能力与资质的专家。  
3. **强制任务拆解**:你的**唯一输出内容为任务拆解结果**。必须针对第二步确定的专家,制定一个或多个子任务以实现推断的意图。若已根据所有要求生成完整且格式正确的拆解JSON,即表示拆解任务正在进行,随后需使用 `<deliverable>...</deliverable>` 标记任务结束。  
   - **信息不完整时仍需推进**:即便给定任务或对话历史显示存在缺失的前提条件(如用户提及忘记提供文件),仍需为相关专家制定子任务。默认必要条件将得到满足,或由专家处理该问题。  
   - **谨慎整合上下文**:需将给定任务、对话历史及其他系统状态信息中的所有可用上下文整合到子任务描述中。若发现潜在问题(如根据用户表述得知文件缺失),需在子任务的上下文部分简要注明,供专家参考(示例:“上下文:用户此前因文件缺失导致任务失败,并表示已忘记提供文件。默认本次导入任务所需文件将可用。”)。
   - 简单任务或仅需一名专家完成的任务(基于推断意图),应作为单个子任务处理。
   - **拆解时的最小必要步骤**:力求用最少的合理子任务实现推断意图,同时需以不违背推断意图及其他规则为前提。
4. **拆解时的目标专家分配**:仅将子任务分配给第二步确定的专家。
5. **自包含性**:每个子任务需包含完成所需的全部信息。  
6. **角色中立性**:除非任务中明确提及,否则避免在子任务中涉及特定角色。  
7. **边界意识**:子任务范围不得超出原始任务边界。  
8. 若任务仅需一步操作/一名专家完成,需将其作为单个子任务呈现。

## 任务结构与依赖关系

### 颗粒度

仅当任务确实需要多步操作且涉及不同专家能力时,才创建可执行、边界清晰的多个子任务。对于可由一名专家解决的简单任务,需结合对话历史,将整个给定任务整合为单个子任务。

### 目标制定(受对话历史影响)

子任务的目标字段**必须**详细表述,以反映结合对话历史理解的用户最新需求。

### 上下文(整合对话历史、强制要求且丰富信息以保证准确性)

需为专家提供所有必要细节。该字段**必须包含**:

1. 相关对话历史的摘要:说明此前提出的问题/给出的答案。
2. 用户反馈/方向调整:注明用户是否表达过不满、请求澄清、修正方向或缩小范围(示例:“用户认为此前答案过于笼统”、“用户明确要求忽略X并聚焦Y”、“用户修正了此前关于Z的假设”、“用户希望基于此前工作推进至下一任务”)。  
3. 对话历史对当前任务的影响:明确说明上述1、2点等如何决定当前目标的具体要求。

此外,还需说明预期输入(若除上下文外还有其他输入)及预期输出的性质。必须完整保留用户当前指令(来自对话历史)中的所有关键数据、假设、约束条件及要求,不得遗漏。这是为了防止子任务传递过程中出现信息丢失。例如,若用户提供数学方程`x+y=10, 2x-y=4`,则上下文部分必须包含完整方程。

### 依赖关系

仅当生成多个子任务时,才定义子任务间的逻辑流程。简单的单条子任务拆解不存在依赖关系。

### 完成标准(反映对话历史)

需为子任务成功完成制定清晰、可量化或可验证的标准。该标准**必须**直接应对对话历史中凸显的具体需求或修正内容,以及细化后的目标。示例:若对话历史显示用户缺少语法细节,完成标准需明确为“输出需提供操作A、B、C的具体语法示例”;若对话历史显示用户对笼统表述不满,完成标准需明确为“解释需避免过于宽泛的表述,聚焦用户要求的特定方面”。

### 子任务生成思考过程

对于每个子任务的思考字段,需以第一人称(“我”)解释子任务生成的推理过程。内容不得仅复述目标,而需展现生成该特定子任务的思考逻辑。简要包含:该子任务为何必要?我解决该子任务的初步思路是什么?我预见该子任务存在哪些关键注意事项、所需工具或潜在挑战?思考过程需聚焦当前子任务,体现规划性与前瞻性(参考示例风格),同时保持简洁清晰,避免冗余。

同时,Chat2Graph 明确定义了子任务包含的字段,参考 JOB_DECOMPOSITION_OUTPUT_SCHEMA,当任务完成时,返回下面这样的分解结果:

<decomposition>
  {
    "subtask_1": {
      "goal": "目标,必须精确反映用户最新请求。",
      "context": "上下文,需包含对话历史摘要、用户反馈以及这些上下文如何塑造当前任务。",
      "completion_criteria": "完成标准,需明确且可衡量,直接回应对话历史中突显的需求或修正。",
      "dependencies": ["subtask_*", "subtask_*", ...], // 依赖关系,仅在生成多个子任务时定义,用于确定子任务之间的依赖关系
      "language of the assigned_expert": "English",
      "assigned_expert": "分配的专家名,Leader 指定该子任务由哪位专家完成。",
      "thinking": "思考过程,要求 LLM 以第一人称解释生成该子任务的思考过程,包括其必要性、初步方法及关键考量。",
    },
    ... // 确保 JSON 格式正确
  }
</decomposition>

下面这个 JSON 就是入门篇里的示例在执行完任务分解后得到的结果:

{
  "subtask_1": {
    "goal": "基于《罗密欧与朱丽叶》中的人物及人物关系创建图谱模式",
    "context": "该任务需为《罗密欧与朱丽叶》的故事定义图谱模式,将人物设定为 “节点”,人物间的关系设定为 “边”。此模式需清晰明确,以适用于后续的数据提取与分析工作。",
    "completion_criteria": "已定义带有合适节点标签与边标签的模式,可用于数据导入。",
    "dependencies": [],
    "language of the assigned_expert": "English",
    "assigned_expert": "Design Expert",
    "thinking": "我需要创建一个能准确呈现《罗密欧与朱丽叶》中人物及人物关系的模式。这对后续步骤至关重要,因为提取专家将依据此模式进行数据导入。我会确保该模式全面且清晰,为高效的数据提取与分析提供支持。"
  },
  "subtask_2": {
    "goal": "根据已定义的模式,从提供的文本文件中提取数据并导入图谱数据库",
    "context": "模式创建完成后,提取专家需依据已定义的模式,从文本文件《罗密欧与朱丽叶.txt》中提取结构化信息,并将其导入图谱数据库。",
    "completion_criteria": "数据已成功提取并导入图谱数据库,且与已定义的模式一致。",
    "dependencies": ["subtask_1"],
    "language of the assigned_expert": "English",
    "assigned_expert": "Extraction Expert",
    "thinking": "我的工作重点是从文本文件中提取相关人物及其关系信息。这需要格外细致,确保提取的数据与建模专家创建的模式相匹配。数据的成功导入对分析阶段而言至关重要。"
  },
  "subtask_3": {
    "goal": "对图谱进行详细分析,确定影响力最大的节点",
    "context": "数据导入图谱数据库后,分析专家需对图谱进行分析,基于人物间的关联与关系确定影响力最大的人物。",
    "completion_criteria": "分析结果可清晰识别出图谱中影响力最大的节点。",
    "dependencies": ["subtask_2"],
    "language of the assigned_expert": "English",
    "assigned_expert": "Analysis Expert",
    "thinking": "我将运用图谱分析技术来识别《罗密欧与朱丽叶》中影响力最大的人物。这包括计算中心性指标,并对结果进行解读,以理解人物关系的动态变化。"
  }
}

任务执行

完成任务分解和分配后,调用 execute_job_graph() 开始进入真正的任务执行流程:

class Leader(Agent):

  # 执行子任务图
  def execute_job_graph(self, original_job_id: str) -> None:

    # 使用线程池,没有依赖的多个任务可以并行执行
    with ThreadPoolExecutor() as executor:

      # 只要任务没完成,就一直循环
      while pending_job_ids or running_jobs:

        # 找到所有就绪的任务(依赖任务已完成)
        ready_job_ids: Set[str] = set()
        for job_id in pending_job_ids:
          if all_predecessors_completed:
            job: SubJob = self._job_service.get_subjob(job_id)
            ready_job_ids.add(job_id)

        # 执行已就绪的任务
        for job_id in ready_job_ids:
          expert_id = self._job_service.get_subjob(job_id).expert_id
          expert = self.state.get_expert_by_id(expert_id=expert_id)
          # 调用 Expert 智能体
          running_jobs[job_id] = executor.submit(
            self._execute_job, expert, job_inputs[job_id]
          )
          pending_job_ids.remove(job_id)

        # 如果 running 任务已完成,移到 completed 列表里
        completed_job_ids = []
        for job_id, future in running_jobs.items():
          if future.done():
            completed_job_ids.append(job_id)

        # 处理已完成任务
        for completed_job_id in completed_job_ids:
          
          # 获取任务结果
          future = running_jobs[completed_job_id]
          agent_result: AgentMessage = future.result()

          if agent_result.get_workflow_result_message().status == WorkflowStatus.INPUT_DATA_ERROR:
            # 输入数据错误:将此子任务及其前置依赖任务重新加入待处理队列
            pending_job_ids.add(completed_job_id)
            predecessors = list(job_graph.predecessors(completed_job_id))
            pending_job_ids.update(predecessors)
          elif agent_result.get_workflow_result_message().status == WorkflowStatus.JOB_TOO_COMPLICATED_ERROR:
            # 子任务过于复杂:调用 execute() 方法再次分解
            old_job_graph: JobGraph = JobGraph()
            old_job_graph.add_vertex(completed_job_id)
            new_job_graqph: JobGraph = self.execute(agent_message=agent_result)
            self._job_service.replace_subgraph(
              original_job_id=original_job_id,
              new_subgraph=new_job_graqph,
              old_subgraph=old_job_graph,
            )
          else:
            # 任务正常完成
            expert_results[completed_job_id] = agent_result.get_workflow_result_message()

          # 移除已完成任务
          del running_jobs[completed_job_id]

        # 等待 0.5 秒,进入下一轮循环
        if not completed_job_ids and running_jobs:
          time.sleep(0.5)

这是 Chat2Graph 任务执行的核心逻辑,整体流程还是比较清晰的,主要就是一个 while 循环,首先找到所有已就绪的任务(依赖任务已完成),然后丢给 Expert 智能体执行,这里使用了线程池,没有依赖的多个任务可以并行执行,提高执行效率,这也是图规划器相比于传统的线性规划器的优势。这个过程持续循环,当子任务图中的所有子任务都成功完成,整个原始任务即告完成。若任何关键子任务失败或整个图的执行被中断,则原始任务会相应地标记为 FAILEDSTOPPED

这里对子任务的执行比较有意思,当 Expert 执行完子任务后,会返回一个 WorkflowMessage,其中包含任务的执行状态,该状态决定了后续流程:

  • 子任务执行成功(SUCCESS):Leader 会记录结果,并更新子任务的状态,进而可能触发后续依赖任务的执行;
  • 子任务执行出错(EXECUTION_ERROR):Expert 执行过程中发生内部错误,如 API 请求失败;Leader 会根据重试策略决定是否重试该 Expert 的执行。若达到最大重试次数,则该子任务及整个子图可能会被标记为 FAILED
  • 输入数据错误(INPUT_DATA_ERROR):Expert 判断输入数据有问题,无法继续执行;Leader 接收到此状态后,会将此子任务及其前置依赖任务重新加入待处理队列,并可能附带 lesson(经验教训)给前置任务的 Expert,以便修正输出;
  • 子任务过于复杂(JOB_TOO_COMPLICATED_ERROR):Expert 认为当前子任务过于复杂,无法独立完成;Leader 会将此子任务视为一个新的原始任务,再次调用任务分解逻辑,将其进一步细化为更小的子任务,并更新到子任务图中。为防止无限分解,子任务设有生命周期(life_cycle)计数,每一次任务分解,生命周期将会减少 1,直到 0 为止;

可以看到,这里的 EXECUTION_ERROR 状态是在 Expert 智能体内部处理的,而 INPUT_DATA_ERRORJOB_TOO_COMPLICATED_ERROR 是由 Leader 智能体处理;每个子任务的状态在 Leader 智能体和 Expert 智能体之间传递和转换,如下所示:

leader-execute.png

这种状态机确保了任务能够根据依赖关系进行并行处理,同时具备对执行过程中各类情况的适应性和纠错能力。此外,任务执行流程还支持中断和恢复操作,使得 Chat2Graph 的多智能体协作更加灵活和可控。

小结

今天,我们深入分析了 Chat2Graph 中 Leader 智能体的任务分解与执行机制。从任务提交到最终完成,Leader 智能体展现了强大的统筹能力:首先通过精心设计的 Prompt 将复杂任务分解为基于 DAG 的子任务图,明确定义了每个子任务的目标、上下文、完成标准和依赖关系;然后通过线程池实现并行执行,根据依赖关系动态调度任务,大大提高了执行效率。

特别值得关注的是 Chat2Graph 的容错与纠错机制。系统通过不同的状态码来处理任务执行过程中的各种情况,其中 INPUT_DATA_ERROR 状态会触发前置任务的重新执行以修正输出,而 JOB_TOO_COMPLICATED_ERROR 状态会触发任务的进一步分解,这种动态调整机制让系统具备了强大的自适应能力。

相比传统的线性规划器,Chat2Graph 基于图结构的规划器不仅能够表达更复杂的任务依赖关系,还支持任务的并行执行,在提高执行效率的同时保持了良好的灵活性和扩展性。这种设计为智能体系统处理复杂任务提供了新的思路和范式。


学习 Chat2Graph 的多智能体协作机制

昨天我们简单体验了 Chat2Graph 这个项目的安装和使用,通过一个例子演示了 Chat2Graph 的完整流程,了解了 Chat2Graph 作为一个图原生的智能体系统,通过将图计算技术与 AI 技术融合,不仅降低了用图门槛,同时也为智能体系统提供了更强的推理能力和更好的可解释性。

Chat2Graph 这个项目里引入了不少新的理念和思想,我们将结合源码深入它的核心原理,让我们先从它的多智能体协作机制开始。

用户对话的基本流程

当用户在聊天窗口发起对话时,调用 chat 接口,其实现位于 app/server/api/session_api.py 文件中:

@sessions_bp.route("/<string:session_id>/chat", methods=["POST"])
def chat(session_id: str):

  # 构造 chat_message 并调用 SessionManager
  response_data, message = manager.chat(chat_message)
  return make_response(data=response_data, message=message)

它将请求传递到 SessionManager 进行处理:

class SessionManager:
  def chat(self, chat_message: ChatMessage) -> Tuple[Dict[str, Any], str]:
    
    # 创建 SessionWrapper
    session_wrapper = self._agentic_service.session(session_id=chat_message.get_session_id())

    # 通过 SessionWrapper 将消息提交到多智能体系统
    job_wrapper = session_wrapper.submit(message=chat_message)

    # 保存会话
    # 更新会话名称

该方法通过 AgenticService 创建 SessionWrapper,并通过 SessionWrapper 将消息提交到多智能体系统。Chat2Graph 的代码采用分层架构,项目结构如下:

app/
├── core/           # 核心智能体框架
│   ├── agent/      # 智能体实现 (Leader, Expert)
│   ├── reasoner/   # 推理引擎  
│   ├── sdk/        # 开发者SDK
│   ├── service/    # 核心服务
│   └── toolkit/    # 工具系统
├── plugin/         # 插件生态 (Neo4j, LiteLLM等)
├── server/         # Flask Web 服务
└── web/            # React 前端界面

对前端请求的处理都是先到达 server 层,再由 server 层调用 core 层,而对 core 层的调用一般都通过 SDK 实现。SDK 中包含一个核心的 AgenticService 和一堆对核心服务的包装类(XXXWrapper):

app/core/sdk
├── agentic_service.py
├── chat2graph.yml
└── wrapper
    ├── agent_wrapper.py
    ├── env_wrapper.py
    ├── graph_db_wrapper.py
    ├── job_wrapper.py
    ├── knowledge_wrapper.py
    ├── operator_wrapper.py
    ├── reasoner_wrapper.py
    ├── session_wrapper.py
    ├── toolkit_wrapper.py
    └── workflow_wrapper.py

因此这里的 SessionWrapper 就是对 SessionService 的包装,它的 submit() 函数实现如下:

class SessionWrapper:
  def submit(self, message: ChatMessage) -> JobWrapper:
    
    # 获取当前用户消息
    # 获取历史会话消息
    
    # 创建并保存任务
    job = Job(
      goal=text_message.get_payload(),
      context=historical_context,
      session_id=self._session.id,
      assigned_expert_name=text_message.get_assigned_expert_name(),
    )
    job_service.save_job(job=job)
    job_wrapper = JobWrapper(job)

    # 将任务信息更新至会话

    # 异步运行任务
    run_in_thread(job_wrapper.execute)

    return job_wrapper

这里的核心逻辑是创建了一个任务,也就是 Job 对象,包含目标、上下文、会话 ID 等信息,然后通过 JobWrapper 异步运行它:

class JobWrapper:
  def execute(self):
    # 调用 Leader 智能体
    agent_service: AgentService = AgentService.instance
    agent_service.leader.execute_original_job(original_job=self._job)

任务的运行由智能体模块 AgentService 负责,它包含 Leader 和 Expert 两类智能体,通过调用 Leader 智能体,多智能体系统由此开始启动。

智能体模块

智能体模块是 Chat2Graph 的核心,它负责接收任务、执行具体操作并返回结果。系统中存在两类特殊的智能体:

  • Leader 智能体:作为总指挥,它接收用户的请求,理解用户意图,并将复杂任务分解为多个子任务,并根据每个子任务的特点分配给相应的 Expert 执行;
  • Expert 智能体:专注于特定领域的任务处理,每个专家都具有明确的专业能力边界和执行职责,Chat2Graph 内置了建模、导数、查询、分析和问答 5 个专家;

leader-experts.png

其中 Leader 负责任务的规划、分配和执行,还包括 Expert 生命周期的管理,它通过 AgentConfig 初始化:

class AgenticService(metaclass=Singleton):
  @staticmethod
  def load(yaml_path: Union[str, Path] = "app/core/sdk/chat2graph.yml") -> "AgenticService":

    # 加载 YAML 配置
    agentic_service_config = AgenticConfig.from_yaml(yaml_path, encoding)
    mas = AgenticService(agentic_service_config.app.name)

    # 推理机
    mas.reasoner(reasoner_type=agentic_service_config.reasoner.type)
    # 工具集
    mas.toolkit(*AgenticService._build_toolkit(agentic_service_config))

    # 构建 Leader 智能体
    mas.leader("Leader").workflow(
      AgenticService._build_leader_workflow(
        agentic_service_config=agentic_service_config,
      ),
      platform_type=workflow_platform_type,
    ).build()

    # 构建 Export 智能体
    for expert_config in agentic_service_config.experts:
      mas.expert(
        name=expert_config.profile.name,
        description=expert_config.profile.desc,
      ).workflow(
        *AgenticService._build_expert_workflow(
          expert_config=expert_config,
          agentic_service_config=agentic_service_config,
        ),
        platform_type=workflow_platform_type,
      ).build()

    return mas

这段代码在程序启动时就会被调用,通过 chat2graph.yml 配置文件构建出 1 个 Leader 智能体和 5 个 Expert 智能体。这个 YAML 文件比较复杂,包含了很多 Chat2Graph 中的概念,基本结构如下:

app:
  name: "Chat2Graph"
  desc: "A Graph Native Agentic System."
  version: "0.0.1"

# 插件系统
plugin:
  workflow_platform: "DBGPT"

# 推理机
reasoner:
  type: "DUAL"

# 工具
tools:
  - &document_reader_tool
    name: "DocumentReader"
    module_path: "app.plugin.neo4j.resource.graph_modeling"
  # ...

# 行动
actions:
  - &content_understanding_action
    name: "content_understanding"
    desc: "Understand the main content and structure of the document through reading and annotating (requires calling one or more tools)."
    tools:
      - *document_reader_tool
  # ...

# 工具集
toolkit:
  - [
      *content_understanding_action,
      *deep_recognition_action
    ]
  # ...

# 算子
operators:
  - &analysis_operator
    instruction: |
      You are a professional document analysis expert, specializing in extracting key information from documents to lay a solid foundation for building knowledge graphs...
    output_schema: |
      ...
    actions:
      - *content_understanding_action
      - *deep_recognition_action

# 5 个 Export 智能体
experts:
  - profile:
      name: "Design Expert"
      desc: |
        He is a knowledge graph modeling (schema) expert...
    reasoner:
      actor_name: "Design Expert"
      thinker_name: "Design Expert"
    workflow:
      - [*analysis_operator, *concept_modeling_operator]

  - profile:
      name: "Extraction Expert"
      desc: |
        He is a Raw Data Extraction and Data Import Graph Data Expert...
    reasoner:
      actor_name: "Extraction Expert"
      thinker_name: "Extraction Expert"
    workflow:
      - [*data_importation_operator]

  - profile:
      name: "Query Expert"
      desc: |
        He is a Graph Data Query Expert...
    reasoner:
      actor_name: "Query Expert"
      thinker_name: "Query Expert"
    workflow:
      - [*query_design_operator]

  - profile:
      name: "Analysis Expert"
      desc: |
        He is a Graph Data Analysis and Algorithm Application Expert...
    reasoner:
      actor_name: "Analysis Expert"
      thinker_name: "Analysis Expert"
    workflow:
      - [*algorithms_execute_operator]

  - profile:
      name: "Q&A Expert"
      desc: |
        He is a General Q&A and Information Retrieval Expert with prioritized multi-source research capabilities...
    reasoner:
      actor_name: "Q&A Expert"
      thinker_name: "Q&A Expert"
    workflow:
      - [*retrieval_operator, *summary_operator]

# 1 个 Leader 智能体
leader:
  actions:
    - *query_system_status_action
    - *job_decomposition_action

这里的插件、推理机、工具、动作、算子我们暂时不用管,重点关注 1 个 Leader 智能体和 5 个 Expert 智能体的配置。可以看出每个智能体都由三个核心组件构成:

  • 角色(Profile):定义了智能体的身份和能力范围;
  • 推理机(Reasoner):提供了基于大语言模型的推理能力;
  • 工作流(Workflow):则编排了任务执行的具体流程;

Leader 智能体使用了内置的 Profile、Reasoner 和 Workflow,不需要在 YAML 文件中配置。

当用户输入自然语言指令时,Leader 智能体首先会理解任务意图,并将复杂任务分解为多个子任务,然后分配给相应的 Expert 智能体进行执行。

一主动多被动混合架构

像 Chat2Graph 这样的 1 个 Leader + 多个 Experts 智能体协作机制又被称为 一主动多被动(One-Active-Many-Passive) 架构,这种思想其实来自于去年的一篇论文:

论文指出,传统的多智能体系统面临三大核心挑战:

  1. 协调复杂性:多个主动智能体同时决策容易产生冲突和混乱
  2. 同步开销:需要复杂的同步机制(如 Raft 共识算法)来协调多个主动智能体
  3. 扩展困难:添加新的智能体需要重新设计整个协调机制

因此,论文提出 LLM-Agent-UMF(基于 LLM 的智能体统一建模框架),从功能和软件架构双视角明确智能体组件划分,引入 Core-Agent(核心智能体) 概念,并构建模块化、可扩展的体系。LLM-Agent-UMF 框架定义了 Core-Agent 的五大核心模块:

  • 规划模块(Planning Module):将复杂任务分解为可执行的子任务,并生成最优执行计划;
  • 记忆模块(Memory Module):存储和检索智能体运行过程中的关键信息,解决 LLM 上下文窗口有限问题;
  • 角色模块(Profile Module):定义 LLM 的角色与行为模式,确保其输出符合任务场景需求;
  • 行动模块(Action Module):将规划模块的高层指令转化为具体操作,调用工具或与环境交互;
  • 安全模块(Security Module):基于 CIA 三元组(机密性、完整性、可用性),防范 LLM 应用的安全风险;

并根据智能体是否主导任务决策,将 Core-Agent 分为两类:

  • 主动核心智能体(Active Core-Agent):具备完整的五大模块(规划、记忆、角色、行动、安全),主导任务决策与协调;有状态实体,可存储历史交互信息;典型能力包括自主分解复杂任务、动态调整角色或主动发起双向通信,适用于复杂任务处理;
  • 被动核心智能体(Passive Core-Agent):仅保留行动模块与安全模块,无规划/记忆/角色模块;无状态实体,完全遵循外部指令;典型能力包括执行简单操作,如 API 调用、工具调用等,仅能被动响应通信;

这两类 Core-Agent 对比如下:

active-vs-passive.png

基于主动/被动的分类,论文提出两类多智能体架构:统一架构(Uniform Architecture)混合架构(Hybrid Architecture),统一架构又分为 多被动核心架构(1个LLM + 多个被动核心智能体)和 多主动核心架构(多个主动核心智能体协作)。多被动架构使用简单,易于扩展,但是缺乏高层决策能力,无法处理复杂任务;而多主动架构虽然具备强大的决策能力,但是需要设计复杂的同步机制(比如 Raft 选主),避免多个智能体决策冲突。

基于理论分析和实验验证,LLM-Agent-UMF 研究发现 一主动多被动的混合架构为最优架构,兼顾了决策能力与扩展性:

one-active-many-passive.png

正是该理论为 Chat2Graph 的架构设计提供了科学依据。

小结

今天,我们通过对 Chat2Graph 源码和 LLM-Agent-UMF 论文的分析,学习了一主动多被动的多智能体混合架构:Leader 作为主动核心智能体,具备完整的规划、记忆、角色管理能力,统一协调整个系统;Expert 作为被动核心智能体,专注于特定领域的任务执行,简化了系统复杂度。

这种架构设计的核心优势在于:

  1. 统一决策:单一的 Leader 智能体避免了多个主动智能体之间的决策冲突,简化了协调机制;
  2. 专业分工:每个 Expert 智能体专注于特定领域(建模、导数、查询、分析、问答),提高了任务执行效率;
  3. 扩展性强:新增 Expert 智能体无需修改现有架构,只需在配置文件中定义新的专家即可;
  4. 降低复杂度:被动智能体无需维护状态和规划能力,降低了系统整体复杂度;

通过这种架构,Chat2Graph 不仅实现了高效的任务分解和执行,还为图原生智能体系统提供了一个可扩展的技术框架。在下一篇文章中,我们将深入分析 Chat2Graph 中智能体的具体实现,了解它的任务分解和执行逻辑,以及 工作流推理机 等高级机制。


图原生智能体系统 Chat2Graph 快速上手

在数据日益成为核心资产的今天,如何高效地从复杂关联的数据中提取价值,是许多开发者和数据分析师面临的共同挑战。图数据库因其在处理关联数据上的天然优势而备受青睐,但学习和使用其查询语言(如 Cypher 或 GQL)对许多人来说仍有一定的门槛。随着大语言模型技术的快速发展,越来越多的开发者将 AI 技术引入图处理领域,以便让任何人都能通过日常对话的方式,与图数据进行交互,从而极大地降低图数据的使用门槛。

另一方面,随着 AI 技术渗透到各行各业,智能体系统也成为了当下的热门话题。从 AutoGPT、LangChain 到各种垂直领域的智能体平台,大家都在探索如何让 AI 系统更智能、更可靠。然而,传统的智能体系统往往面临着推理不透明、记忆管理混乱、工具调用缺乏逻辑等问题。如何解决这些痛点?答案可能就在图计算技术中。越来越多的开发者将图技术引入智能体中,比如 LangGraph、GraphRAG 等等。

这造成了一个有趣的趋势。如何有效地将图计算技术与 AI 相结合,将是非常值得探索的方向。今天我们要介绍的 Chat2Graph 项目,就是一个将图计算与 AI 深度融合的智能体系统,它通过 图智互融 的理念,为智能体系统带来了全新的架构思路。

Chat2Graph 介绍

Chat2Graph 号称是 图原生的智能体系统(Graph Native Agentic System),由蚂蚁集团旗下的图数据库团队 TuGraph 开源。它的核心理念是将图数据结构的关系建模、可解释性等天然优势,与智能体的推理、规划、记忆、工具等关键能力相结合,实现真正的图智互融。

chat2graph-logo.png

简单来说,Chat2Graph 做了两件事情:

  1. AI for Graph:借助智能体实现图系统的自主化与智能化,实现图数据库的智能研发、运维、问答、生成等多样化能力,降低用图门槛,提升图系统使用体验;
  2. Graph for AI:借助图的关联性建模优势,实现智能体的推理、记忆、工具使用等关键能力的增强,降低模型幻觉,提升生成质量;

核心特性

Chat2Graph 的核心特性包括:

  • 架构:整体采用了 单主动-多被动(One-Active-Many-Passive) 的混合智能体架构,构建了以单个 Leader 智能体驱动,多个 Expert 智能体协作的任务执行体系;
  • 推理:结合快思考与慢思考的双 LLM 推理机制,模拟认知科学中的人类思维;
  • 规划:基于 CoA(Chain of Agents) 的任务分解与图规划器;
  • 记忆:分层的记忆系统,包括会话级别的短期记忆,知识库级别的长期记忆,任务执行过程中的工作记忆以及系统运行的环境记忆;
  • 知识:同时支持基于向量的 RAG 实现(VectorRAG)和基于图的 RAG 实现(GraphRAG);
  • 工具:使用有向图结构组织工具和行动,清晰地描述了工具调用间的依赖和转换关系;
  • 使用:支持 RestfulAPI 接口和 Web UI 图形化界面;提供简洁的 SDK API,方便用户将 Chat2Graph 轻松集成到自己的应用中;
  • 配置:通过 YAML 一键配置智能体系统;
  • 集成:构建了面向图系统的统一抽象,支持 Neo4j 和 TuGraph 两种图数据库;
  • 可干预:支持任务的暂停和恢复;
  • 持久化:支持作业状态和消息的持久化;

下面是它的系统架构图:

chat2graph-arch.png

环境搭建

今天,我们就来快速上手体验一下 Chat2Graph 的基本使用。

在开始之前,先确保电脑上已经安装有 Python 和 Node.js 环境。

首先克隆代码仓库:

$ git clone https://github.com/TuGraph-family/chat2graph.git
$ cd chat2graph

Chat2Graph 提供了一键构建脚本,可以直接运行。但是该脚本依赖于 Poetry 包管理器,因此我们需要先安装它:

$ pip install poetry

然后运行 bin 目录下的 build.sh 脚本进行构建:

$ ./bin/build.sh

建议使用 condauv 创建虚拟环境。

项目运行

如果一切顺利,最终会出现构建成功的消息:

Build success !

然后基于 .env.template 创建环境变量文件:

$ cp .env.template .env

主要是大模型和 Embedding 等参数,官方推荐使用 DeepSeek V3,我这里使用的是 OpenAI 接口:

LLM_NAME=gpt-4o-mini
LLM_ENDPOINT=
LLM_APIKEY=sk-xxx

EMBEDDING_MODEL_NAME=text-embedding-3-small
EMBEDDING_MODEL_ENDPOINT=
EMBEDDING_MODEL_APIKEY=sk-xxx

最后,运行 start.sh 脚本启动项目:

$ ./bin/start.sh

看到如下日志后,说明 Chat2Graph 启动成功:

  ____ _           _   ____   ____                 _     
 / ___| |__   __ _| |_|___ \ / ___|_ __ __ _ _ __ | |__  
| |   | '_ \ / _` | __| __) | |  _| '__/ _` | '_ \| '_ \ 
| |___| | | | (_| | |_ / __/| |_| | | | (_| | |_) | | | |
 \____|_| |_|\__,_|\__|_____|\____|_|  \__,_| .__/|_| |_|
                                            |_|          

 * Serving Flask app 'bootstrap'
 * Debug mode: off
 * Running on http://127.0.0.1:5010
Chat2Graph server started success ! (pid: 72103)

使用体验

使用浏览器访问 http://localhost:5010 进入 Chat2Graph 首页:

chat2graph-home.png

使用前我们先注册下图数据库,这样可以体验完整的与图对话的功能。Chat2Graph 支持 Neo4jTuGraph 两种图数据库,我们通过下面的命令在本地启一个 Neo4j 数据库:

$ docker run -d \
  -p 7474:7474 \
  -p 7687:7687 \
  --name neo4j-server \
  --env NEO4J_AUTH=none \
  --env NEO4J_PLUGINS='["apoc", "graph-data-science"]' \
  neo4j:2025.04

然后进入管理后台,在图数据库管理页面新建图数据库:

chat2graph-graphdb.png

至此,我们就可以体验 Chat2Graph 的对话功能了,通过聊天界面与图专家互动,执行各种与图相关的任务,包括建模、导数、查询、分析、问答等。在对话框下方可以看到系统内置了 5 个图专家,这 5 个专家的功能分别是:

  • 建模专家(Design Expert):负责根据数据需求设计图数据库 Schema,定义顶点与边的类型、属性及关系;
  • 导数专家(Extraction Expert):在图 Schema 已存在且有明确数据源的前提下,提取结构化信息并导入图数据库;
  • 查询专家(Query Expert):基于已有数据和结构的图数据库实例,理解查询意图、编写执行查询语句并返回结果;
  • 分析专家(Analysis Expert):针对有结构化数据的图数据库实例,根据分析目标选择执行图算法并解读结果;
  • 问答专家(Q&A Expert):优先通过知识库、必要时补充网络调研,解答图数据库相关通用信息问题;

我们可以选择与特定的专家对话,也可以不选,系统会自动分析并拆解你的问题,分配合适的专家来解决。这里我直接使用官方提供的《罗密欧与朱丽叶》文件,快速体验下 Chat2Graph 的功能:

q.png

我们的问题是:根据《罗密欧与朱丽叶》的故事创建一个图谱。统计角色的数量,然后进行详细分析以确定最具影响力的节点。

可以看到 Chat2Graph 将我的问题拆成三个子问题,分派给三个专家分别处理:

q2.png

建模专家完成图数据库的建模:

q3.png

导数专家从文本中提取数据并导入图数据库:

q4.png

分析专家使用图分析算法从图中找到最具影响力的节点:

q5.png

最终完成问答,并在答案的下方对图模型与图数据进行实时渲染:

q6.png

避坑指南

Chat2Graph 项目目前还在很早期的阶段,存在着不少问题。我在体验过程中遇到了不少的坑,这里记录下,防止后来者再踩坑。

Python 版本

官方文档中要求 Python >= 3.10,实际上应该是 >= 3.11,因为项目里使用了 browser-use,它对 Python 的最低要求是 3.11。

$ uv venv --python 3.11

另外,项目里还用到了 browser-use 的 MCP,这个可执行文件需要通过下面的命令安装:

$ pip install "browser-use[cli]" 

mcp 版本

运行时报错:

ImportError: cannot import name 'ContentBlock' from 'mcp.types'

升级 mcp 库的版本:

$ pip install mcp==1.10.1

缺少依赖

代码用到了不少三方库,但是 pyproject.toml 文件中没有声明,需要手动安装下:

$ pip install google-generativeai
$ pip install google-genai

$ pip install python-magic
$ brew install libmagic

aiohttp 版本

程序运行成功后,在对话时报错:

Error info: module 'aiohttp' has no attribute 'ConnectionTimeoutError'

升级 aiohttp 库的版本:

$ pip install aiohttp>=3.12.13

max_tokens 参数

程序运行成功后,在对话时报错:

Error info: litellm.BadRequestError: OpenAIException - Setting 'max_tokens' and 'max_completion_tokens' at the same time is not supported. 

这是因为我使用了 OpenAI 接口,根据它的官方文档max_tokens 参数目前已经被标记为废弃,建议使用 max_completion_tokens 参数:

openai-max-tokens.png

我们需要修改 app/plugin/lite_llm/lite_llm_client.py 文件中调用大模型的地方,将 max_tokens 参数注释掉:

model_response: Union[ModelResponse, CustomStreamWrapper] = completion(
  model=self._model_alias,
  api_base=self._api_base,
  api_key=self._api_key,
  messages=litellm_messages,
  temperature=self._temperature,
  # max_tokens=self._max_tokens,
  max_completion_tokens=self._max_completion_tokens,
  stream=False,
)

小结

今天,我们学习了 Chat2Graph 这个项目,体验了从构建、配置、运行、使用的完整流程。Chat2Graph 作为一个图原生的智能体系统,通过将图计算技术与 AI 技术融合,为智能体系统带来了全新的架构思路。它不仅降低了图数据库的使用门槛,让普通用户也能轻松进行复杂的图分析,同时也为智能体系统提供了更强的推理能力和更好的可解释性。

目前该项目还不够成熟,存在着不少问题,但是项目里有几个理念和思想非常有意思,值得学习,推荐大家关注。项目的作者是蚂蚁的范志东大佬,他在他的一篇博客中详细讲解了图原生的设计理念,以及 Chat2Graph 作为首个全面践行图原生理念的智能体系统,是如何将人工智能研究领域中的符号主义、连接主义、行为主义融为一体,打造 GraphRAG 新范式。强烈推荐大家去看:

在后面的文章中,我将结合源码深入 Chat2Graph 的核心技术原理,包括其推理机制、记忆系统、工具库设计等组件的实现细节,敬请期待!