Fork me on GitHub

2025年8月

GraphRAG 索引构建之知识提取(二)

昨天我们学习了知识提取阶段的提取图谱工作流,今天我们继续学习另外两个:图谱规范化提取事实声明

图谱规范化

提取出的原始实体和关系需要进一步处理才能构成完整的知识图谱,finalize_graph 工作流负责这一规范化过程:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:

  # 1. 加载实体和关系
  entities = await load_table_from_storage("entities", context.output_storage)
  relationships = await load_table_from_storage("relationships", context.output_storage)

  # 2. 执行图谱规范化
  final_entities, final_relationships = finalize_graph(
    entities,
    relationships,
    embed_config=config.embed_graph,
    layout_enabled=config.umap.enabled,
  )

  # 3. 保存结果
  await write_table_to_storage(final_entities, "entities", context.output_storage)
  await write_table_to_storage(final_relationships, "relationships", context.output_storage)
  if config.snapshots.graphml:
    # 生成图描述文件
    graph = create_graph(final_relationships, edge_attr=["weight"])
    await snapshot_graphml(
      graph,
      name="graph",
      storage=context.output_storage,
    )

处理流程很清晰,首先加载上一步提取的实体和关系,然后调用 finalize_graph() 函数对其进行规范化处理,最后将处理后的数据重新保存,并根据配置生成 GraphML 图描述文件,便于外部工具进行图分析和可视化,这也就是我们之前学习可视化时所导入的文件。

其中 finalize_graph() 函数的实现如下:

def finalize_graph(
  entities: pd.DataFrame,
  relationships: pd.DataFrame,
  embed_config: EmbedGraphConfig | None = None,
  layout_enabled: bool = False,
) -> tuple[pd.DataFrame, pd.DataFrame]:
  # 实体规范化
  final_entities = finalize_entities(
    entities, relationships, embed_config, layout_enabled
  )
  # 关系规范化
  final_relationships = finalize_relationships(relationships)
  return (final_entities, final_relationships)

可以看出,它主要包括实体规范化和关系规范化:

  • 实体规范化:使用 NetworkX 创建网络图,计算每个节点的度数,使用 Node2Vec 生成图嵌入向量,使用 UMAP 降维算法进行图布局计算,得到 2D 可视化坐标,为每个实体分配唯一的 UUID 和人类可读的 ID 并输出最终规范化的实体对象;
  • 关系规范化:基于 source 和 target 去除重复的关系,计算每个关系的组合度数,为每个关系分配唯一的 UUID 和人类可读的 ID 并输出最终规范化的关系对象;

下面我们仔细看下这两个部分。

实体规范化

实体规范化的逻辑位于 finalize_entities() 函数:

def finalize_entities(
  entities: pd.DataFrame,
  relationships: pd.DataFrame,
  embed_config: EmbedGraphConfig | None = None,
  layout_enabled: bool = False,
) -> pd.DataFrame:
  # 创建网络图
  graph = create_graph(relationships, edge_attr=["weight"])
  # 生成图嵌入向量
  graph_embeddings = embed_graph(graph, embed_config)
  # 计算图布局
  layout = layout_graph(graph, layout_enabled, embeddings=graph_embeddings)
  # 计算节点度数
  degrees = compute_degree(graph)
  # 生成最终实体表
  final_entities = (
    entities.merge(layout, left_on="title", right_on="label", how="left")
    .merge(degrees, on="title", how="left")
    .drop_duplicates(subset="title")
  )
  # 字段规整
  final_entities = final_entities.loc[entities["title"].notna()].reset_index()
  final_entities["degree"] = final_entities["degree"].fillna(0).astype(int)
  final_entities.reset_index(inplace=True)
  final_entities["human_readable_id"] = final_entities.index
  final_entities["id"] = final_entities["human_readable_id"].apply(
    lambda _x: str(uuid4())
  )
  # 只保留预定义的规范化的字段
  return final_entities.loc[
    :,
    ENTITIES_FINAL_COLUMNS,
  ]

这个函数还比较有意思,涉及了多个图分析和机器学习技术。比如图的构建:

graph = nx.from_pandas_edgelist(edges, edge_attr=["weight"])

这里使用 NetworkX 库从边列表构建无向图,同时保留边的权重属性,支持加权图分析。基于这个图,我们可以计算每个节点的度数:

def compute_degree(graph: nx.Graph) -> pd.DataFrame:
  return pd.DataFrame([
    {"title": node, "degree": int(degree)}
    for node, degree in graph.degree
  ])

节点度数反映了实体的重要性和连接性,为后续的图分析提供数据基础。

再比如使用 Node2Vec 对图进行嵌入:

embeddings = embed_node2vec(
  graph=graph,
  dimensions=config.dimensions,   # 默认1536维
  num_walks=config.num_walks,     # 默认10次随机游走
  walk_length=config.walk_length, # 默认40步长
  window_size=config.window_size, # 默认2窗口大小
  iterations=config.iterations,   # 默认3次迭代
  random_seed=config.random_seed, # 默认86种子
)

该功能默认禁用,可通过下面的配置开启:

embed_graph:
  enabled: true

这里其实使用的是 graspologic 库的 embed.node2vec_embed() 方法,感兴趣的可以看下 graspologic 的文档:

通过将节点映射到 1536 维向量空间,捕获图的拓扑结构和语义信息,主要用于图的可视化分析。

此外,我们还可以通过下面的配置,在图嵌入之前将图限制为 最大连通分量(Largest Connected Component, LCC)

embed_graph:
  use_lcc: true

这意味着只有图中最大的连通子图会被用于生成嵌入向量,孤立的节点或较小的连通分量会被过滤掉。这样做的好处是:

  • 提高嵌入质量:Node2Vec 算法需要在图上进行随机游走,连通的图能产生更有意义的节点嵌入;
  • 确保稳定性:使用相同的最大连通分量可以确保结果的一致性;
  • 高效率:只处理最重要的连通部分,减少计算开销;

LCC 的计算同样使用的 graspologic 库,参考 utils.largest_connected_component() 函数。

此外,finalize_entities() 函数还使用了降维技术,通过 UMAP 算法对上一步计算的图嵌入执行降维操作,为实体提供 x/y 位置坐标,便于在 2D 平面下进行可视化分析。这个功能同样需要在配置中开启:

umap:
  enabled: true # 必须同时开启 embed_graph 功能

UMAP 的全称为 均匀流形逼近与投影(Uniform Manifold Approximation and Projection),是一种用于高维数据降维的机器学习算法。它在保留数据的局部和全局结构方面表现出色,广泛应用于数据可视化、特征学习等领域,与 t-SNE 等算法类似,但在计算效率和保留全局结构上往往更具优势。

umap.png

直接使用 umap-learn 库计算即可:

embedding_positions = umap.UMAP(
  min_dist=min_dist,         # 默认0.75最小距离,控制点的紧密程度
  n_neighbors=n_neighbors,   # 默认5个邻居,控制局部/全局平衡
  spread=spread,             # 默认1传播参数
  n_components=2,            # 2D可视化
  metric="euclidean",        # 欧几里得距离
  random_state=86,           # 随机种子
).fit_transform(embedding_vectors)

对这些参数感兴趣的同学可以参考 umap-learn 的官方文档:

关于 UMAP 降维,我们之前在学习 RAGFlow 的 RAPTOR 分块策略时也曾简单介绍过,只不过 RAGFlow 是对分块的 Embedding 向量进行降维,方便做聚类,而这里是对 Node2Vec 图嵌入进行降维,用于图的可视化。

上面这些计算结果,最终都会融合到原始的实体表中:

final_entities = (
  entities.merge(layout, left_on="title", right_on="label", how="left")
    .merge(degrees, on="title", how="left")
    .drop_duplicates(subset="title")
)

通过 pandas 的左连接策略,确保所有原始实体都被保留,然后过滤 title 为空的记录,对 degree 为空的用 0 填充,以及 UUID 和 ID 的生成,最后使用预定义的 ENTITIES_FINAL_COLUMNS 生成规范化的实体表,包含如下字段:

  • id:实体唯一标识符
  • human_readable_id:人类可读的递增ID
  • title:实体名称
  • type:实体类型
  • description:实体描述
  • text_unit_ids:来源文本单元ID
  • frequency:出现频次
  • degree:节点度数
  • xy:2D 坐标

关系规范化

关系规范化的逻辑相对简洁,没有那么多的弯弯绕绕,位于 finalize_relationships() 函数:

def finalize_relationships(
  relationships: pd.DataFrame,
) -> pd.DataFrame:
  # 创建网络图
  graph = create_graph(relationships, edge_attr=["weight"])
  # 计算节点度数
  degrees = compute_degree(graph)
  # 基于 source 和 target 去重
  # 注意这里没有区分关系方向,也没有区分关系类型
  final_relationships = relationships.drop_duplicates(subset=["source", "target"])
  # 计算关系的组合度数
  final_relationships["combined_degree"] = compute_edge_combined_degree(final_relationships, degrees)
  # 字段规整
  final_relationships.reset_index(inplace=True)
  final_relationships["human_readable_id"] = final_relationships.index
  final_relationships["id"] = final_relationships["human_readable_id"].apply(
    lambda _x: str(uuid4())
  )
  # 只保留预定义的规范化的字段
  return final_relationships.loc[
    :,
    RELATIONSHIPS_FINAL_COLUMNS,
  ]

首先从关系数据构建图结构,计算节点度数,然后对关系去重处理,计算关系的组合度数,最后字段规整,并通过 RELATIONSHIPS_FINAL_COLUMNS 只保留预定义的规范化字段:

  • id:关系唯一标识符
  • human_readable_id:人类可读的递增ID
  • source:关系的源实体
  • target:关系的目标实体
  • description:关系描述
  • weight:关系权重
  • combined_degree:组合度数
  • text_unit_ids:关联的文本单元

事实声明提取

除了实体和关系,GraphRAG 还可以从文本单元中提取事实性的 声明(Claims),为每个实体提供更丰富的上下文信息。

在 GraphRAG 中,声明(Claims)提取 有的地方也被称为 协变量(Covariates)提取。协变量是一个统计学中的概念,指的是与 因变量(被解释变量) 相关,但并非研究中主要关注的 自变量(解释变量) 的变量,协变量可能干扰自变量与因变量之间的关系。例如,在研究 “教育水平对收入的影响” 时,“工作经验” 就是一个协变量,它与收入相关,若不控制,可能会混淆教育水平与收入之间的真实关系。协变量提取就是通过一系列方法,从原始数据中识别、筛选出对研究目标有潜在影响的协变量,并将其作为分析变量纳入模型或研究设计中的过程。而声明是关于实体之间的事实性陈述,通常包含主体和客体,声明提取是协变量识别的一种方式。

该工作流默认关闭,需要在 settings.yaml 中开启:

extract_claims:
  enabled: true
  model_id: default_chat_model
  prompt: "prompts/extract_claims.txt"
  description: "Any claims or facts that could be relevant to information discovery."
  max_gleanings: 1

从配置可以看出,声明提取和图谱提取非常类似,都是使用预定义的提示词模板指导大模型提取结构化信息,也可以通过 max_gleanings 配置支持多轮提取。提取的声明包含主体、客体、关系类型、状态、时间范围等结构化信息:

  • subject_id: 声明的主体
  • object_id: 声明的客体
  • type: 声明类型
  • status: 声明状态(已确认/虚假/未经验证)
  • start_date: 开始日期
  • end_date: 结束日期
  • description: 声明描述
  • source_text: 来源文本

声明提取的默认提示词如下:

-目标活动-
你是一个智能助手,帮助人类分析师分析文本文档中针对某些实体的声明。

-目标-
给定一个可能与该活动相关的文本文档、一个实体规范和一个声明描述,提取所有符合实体规范的实体以及针对这些实体的所有声明。

-步骤-
1. 提取所有符合预定义实体规范的命名实体。实体规范可以是实体名称列表或实体类型列表。
2. 对于步骤1中识别出的每个实体,提取与该实体相关的所有声明。声明需要与指定的声明描述相匹配,并且该实体应是声明的主体。
对于每个声明,提取以下信息:
- 主体:作为声明主体的实体名称,需大写。主体实体是实施声明中所描述行为的实体。主体必须是步骤1中识别出的命名实体之一。
- 客体:作为声明客体的实体名称,需大写。客体实体是要么报告/处理该行为,要么受该行为影响的实体。如果客体实体未知,使用**NONE**。
- 声明类型:声明的总体类别,需大写。命名方式应能在多个文本输入中重复使用,以便相似的声明具有相同的声明类型。
- 声明状态:**TRUE**、**FALSE**或**SUSPECTED**。TRUE表示声明已确认,FALSE表示声明被证实为虚假,SUSPECTED表示声明未经验证。
- 声明描述:详细说明声明背后的推理,以及所有相关证据和参考资料。
- 声明日期:提出声明的时间段(开始日期,结束日期)。开始日期和结束日期都应采用ISO-8601格式。如果声明是在单个日期而非日期范围内提出的,则开始日期和结束日期设置为相同的日期。如果日期未知,返回**NONE**。
- 声明来源文本:所有与该声明相关的原始文本引语列表。

每个声明的格式为:(<主体实体><|><客体实体><|><声明类型><|><声明状态><|><声明开始日期><|><声明结束日期><|><声明描述><|><声明来源>)

3. 以英文返回所有在步骤1和步骤2中识别出的声明,形成一个单一列表。使用**##**作为列表分隔符。

4. 完成后,输出 <|COMPLETE|>

官方还提供了两个示例,可以直观的看出 GraphRAG 是如何通过实体规范(对应提取的实体或关系)和声明描述(对应 extract_claims.description 配置)从文本中提取出详细的声明信息的:

示例1:
实体规范:组织
声明描述:与实体相关的危险信号
文本:根据2022年1月10日的一篇文章,A公司在参与B政府机构发布的多个公共招标时,因串标被罚款。该公司为C个人所有,C个人在2015年被怀疑参与腐败活动。
输出:

(A公司<|>B政府机构<|>反竞争行为<|>TRUE<|>2022-01-10T00:00:00<|>2022-01-10T00:00:00<|>根据2022年1月10日发表的一篇文章,A公司因在B政府机构发布的多个公共招标中串标被罚款,因此被发现存在反竞争行为<|>根据2022年1月10日的一篇文章,A公司在参与B政府机构发布的多个公共招标时,因串标被罚款。)
<|COMPLETE|>

示例2:
实体规范:A公司、C个人
声明描述:与实体相关的危险信号
文本:根据2022年1月10日的一篇文章,A公司在参与B政府机构发布的多个公共招标时,因串标被罚款。该公司为C个人所有,C个人在2015年被怀疑参与腐败活动。
输出:

(A公司<|>B政府机构<|>反竞争行为<|>TRUE<|>2022-01-10T00:00:00<|>2022-01-10T00:00:00<|>根据2022年1月10日发表的一篇文章,A公司因在B政府机构发布的多个公共招标中串标被罚款,因此被发现存在反竞争行为<|>根据2022年1月10日的一篇文章,A公司在参与B政府机构发布的多个公共招标时,因串标被罚款。)
##
(C个人<|>NONE<|>贪污腐败<|>SUSPECTED<|>2015-01-01T00:00:00<|>2015-12-30T00:00:00<|>C个人在2015年被怀疑参与腐败活动<|>该公司为C个人所有,C个人在2015年被怀疑参与腐败活动。)
<|COMPLETE|>

声明提取工作流为 GraphRAG 知识图谱增加了时间维度和事实信息,使得其能够追踪实体间关系的时间演变,支持基于时间的查询和推理,提供更丰富的上下文信息用于回答复杂问题。这个工作流与实体关系提取互补,共同构建了一个多层次、多维度的知识表示体系。

小结

今天的学习内容主要包括以下两点:

  • 图谱规范化处理:了解实体和关系规范化的复杂流程,包括图嵌入、UMAP 降维、度数计算等多种图分析技术的综合应用;
  • 可选的声明提取:学习如何提取时间相关的事实性声明,为知识图谱增加时间维度和事实验证能力;

通过这个阶段的处理,原本杂乱无序的文本信息被转换为结构化的实体表和关系表,为后续的社区检测和向量化奠定基础。在下一篇文章中,我们将继续学习关于知识提取的内容,还记得我们之前在概述篇提到的,GraphRAG 支持 StandardFast 两种索引方法,Standard 方法基于大模型实现,也就是这两天学习的内容,Fast 方法基于传统的 NLP 算法实现,我们接下来就来看看这一部分内容。


GraphRAG 索引构建之知识提取

在前面的文章中,我们详细学习了 GraphRAG 索引构建的文档处理阶段,了解了如何将各种格式的原始文档转换为标准化的文本单元。今天我们将深入探索整个索引构建流程中最核心的部分 —— 知识提取,看看 GraphRAG 是如何利用大语言模型的力量,从非结构化的文本中挖掘出结构化的实体、关系和声明,并最终构建出完整的知识图谱。

知识提取阶段总览

让我们回顾一下索引构建的整体流程,知识提取正是连接文本处理和图谱分析的关键桥梁:

index-workflow-3-steps.png

知识提取阶段包含三个核心工作流:

  • 提取图谱(extract_graph - 从文本中提取实体和关系,对同一个实体和关系的描述进行总结合并;
  • 图谱规范化(finalize_graph - 对提取结果进行规范化处理,输出最终的实体表和关系表;
  • 提取事实声明(extract_covariates - 提取事实性声明,这一步是可选的,声明通常用于识别诸如欺诈等恶意行为,因此并不适用于所有数据集;

提取图谱

图谱提取的实现位于 index/workflows/extract_graph.py 文件中,整个工作流包含以下关键步骤:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 1. 加载文本单元
  text_units = await load_table_from_storage("text_units", context.output_storage)
  
  # 2. 配置提取策略和摘要策略
  extraction_strategy = config.extract_graph.resolved_strategy()
  summarization_strategy = config.summarize_descriptions.resolved_strategy()
  
  # 3. 执行图谱提取
  entities, relationships, raw_entities, raw_relationships = await extract_graph(
    text_units=text_units,
    extraction_strategy=extraction_strategy,
    summarization_strategy=summarization_strategy
  )
  
  # 4. 保存结果
  await write_table_to_storage(entities, "entities", context.output_storage)
  await write_table_to_storage(relationships, "relationships", context.output_storage)

这里值得注意是提取策略和摘要策略,它们对应 settings.yaml 文件中的 extract_graphsummarize_descriptions 两个配置:

extract_graph:
  model_id: default_chat_model
  prompt: "prompts/extract_graph.txt"
  entity_types: [organization,person,geo,event]
  max_gleanings: 1

summarize_descriptions:
  model_id: default_chat_model
  prompt: "prompts/summarize_descriptions.txt"
  max_length: 500

从这里可以看出,整个图谱提取分为两步:首先遍历上一阶段生成的所有文本单元,通过大模型从每个文本片段中识别出实体和关系;然后将相同的实体和关系进行合并,再通过大模型总结合并之后的实体和关系描述;

extract-graph.png

核心提示词

第一步所使用的提示词位于 prompts/extract_graph.txt,翻译如下:

-目标-
给定一个可能与本活动相关的文本文档和一个实体类型列表,从文本中识别出所有属于这些类型的实体以及已识别实体之间的所有关系。

-步骤-
1. 识别所有实体。对于每个已识别的实体,提取以下信息:
- entity_name: 实体名称,首字母大写
- entity_type: 以下类型之一:[{entity_types}]
- entity_description: 对实体属性和活动的全面描述
每个实体的格式为("entity"<|><entity_name><|><entity_type><|><entity_description>)

2. 从步骤1中识别出的实体中,识别所有(源实体,目标实体)对,这些实体对之间存在*明确的关系*。
对于每对相关实体,提取以下信息:
- source_entity: 源实体的名称,如步骤1中所识别的
- target_entity: 目标实体的名称,如步骤1中所识别的
- relationship_description: 解释为什么认为源实体和目标实体相关
- relationship_strength: 一个数值分数,表示源实体和目标实体之间关系的强度
每个关系的格式为("relationship"<|><source_entity><|><target_entity><|><relationship_description><|><relationship_strength>)

3. 以英文返回输出,作为步骤1和步骤2中识别的所有实体和关系的单个列表。使用 **##** 作为列表分隔符。

4. 完成后,输出 <|COMPLETE|>

输出的结果每一行包含一条实体信息或关系信息,不同字段之间使用 <|> 分割,记录之间使用 ## 分割,并以 <|COMPLETE|> 结尾,下面是一个示例:

("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at X Corp)
##
("entity"<|>X CORP<|>ORGANIZATION<|>X Corp is a technology company)
##
("relationship"<|>ALICE JOHNSON<|>X CORP<|>Alice Johnson works as a software engineer at X Corp<|>8)
<|COMPLETE|>

第二步所使用的提示词位于 prompts/summarize_descriptions.txt,这个就比较简单了:

你是一名乐于助人的助手,负责对下方提供的数据生成一份全面的总结。
给定一个或多个实体,以及一份描述列表,这些都与同一个实体或一组实体相关。
请将所有这些内容合并成一个单一的、全面的描述。确保包含从所有描述中收集到的信息。
如果所提供的描述存在矛盾,请解决这些矛盾并提供一个单一的、连贯的总结。
确保用第三人称撰写,并包含实体名称,以便我们了解完整的背景。
将最终描述的长度限制在 {max_length} 个单词以内。

#######
-数据-
实体:{entity_name}
描述列表:{description_list}
#######
输出:

如果提取实体和关系时,同一个实体或关系出现了多次,就有可能导致信息不完整或矛盾的情况,比如下面两条记录:

("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at X Corp)
("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at Y Corp)

通过对描述进行总结,得到一条关于 ALICE JOHNSON 的完整记录:

("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at X Corp and Y Corp)

多轮迭代提取

此外,在提取图谱时,GraphRAG 还实现了一个多轮迭代提取的机制,通过多次调用 LLM 来确保提取的完整性,可以通过 max_gleanings 参数进行配置。其核心逻辑如下:

# 如果指定了 `max_gleanings` 参数,就进入循环以提取更多实体
# 有两个退出条件:(a)达到配置的最大值,(b)模型表示没有更多实体了
if self._max_gleanings > 0:
  for i in range(self._max_gleanings):
    response = await self._model.achat(
      CONTINUE_PROMPT,
      name=f"extract-continuation-{i}",
      history=response.history,
    )
    results += response.output.content or ""

    # 达到配置的最大值
    if i >= self._max_gleanings - 1:
      break

    response = await self._model.achat(
      LOOP_PROMPT,
      name=f"extract-loopcheck-{i}",
      history=response.history,
    )

    # 模型表示没有更多实体了
    if response.output.content != "Y":
      break

每次迭代会调用两次大模型,第一次告诉大模型提取有遗漏,让它继续提取,提示词 CONTINUE_PROMPT 如下:

在上一次提取中遗漏了许多实体和关系。
请记住,只输出与之前提取的任何类型相匹配的实体。
使用相同的格式在下方添加它们:

第二次让大模型进一步确认是否还有遗漏,提示词 LOOP_PROMPT 如下:

似乎仍有一些实体和关系可能被遗漏了。
如果还有需要添加的实体或关系,请回答Y;如果没有,请回答N。
请只用一个字母Y或N回答。

未完待续

今天我们开始学习 GraphRAG 索引构建流程中最核心的知识提取阶段,它包含 提取图谱图谱规范化提取事实声明 三个工作流。今天学习的主要是提取图谱的几个核心要点:

  • 两阶段图谱提取:掌握 GraphRAG 如何通过大模型首先从文本单元中识别实体和关系,然后对相同的实体和关系进行合并和总结,形成完整的知识图谱结构;
  • 精心设计的提示词工程:介绍了 extract_graph.txtsummarize_descriptions.txt 两个核心提示词,了解了如何通过结构化的指令引导大模型提取高质量的实体关系信息;
  • 多轮迭代提取机制:学习了 GraphRAG 通过 max_gleanings 配置实现的多轮迭代提取,确保信息提取的完整性和准确性;

关于知识提取阶段还有另外两个工作流,我们明天继续。


GraphRAG 索引构建之文档处理

昨天我们对 GraphRAG 的索引构建做了一个基本的概述,了解了其索引构建的整体流程和工作流引擎的设计。今天我们将深入第一个具体阶段 —— 文档处理,这是整个知识图谱构建的起点,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取奠定基础。

回顾索引构建流程

根据前一篇文章的分析,我们可以将 GraphRAG 的索引构建流程分为三个主要阶段:

index-workflow-3-steps.png

今天我们重点关注文档处理阶段,它的核心任务是将非结构化的原始数据转换为后续处理流程所需的标准化文本单元。这个阶段包含三个关键工作流:

  • 加载原始文档(load_input_documents - 从各种数据源加载原始文档
  • 文本分块处理(create_base_text_units - 将长文档分割成可处理的文本块
  • 文档规范化(create_final_documents - 对文档进行最终的标准化整理

加载原始文档

在 GraphRAG 中,所有工作流的入口都是 run_workflow() 方法,加载原始文档也是如此。它的核心逻辑位于 index/workflows/load_input_documents.py 文件:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 加载输入文档,转换为标准的 pd.DataFrame 格式
  output = await load_input_documents(config.input, context.input_storage)

  # 将原始文档写入 documents 表中
  await write_table_to_storage(output, "documents", context.output_storage)
  return WorkflowFunctionOutput(result=output)

这个工作流首先加载输入文档,将其转换为标准的 pd.DataFrame 格式,然后写入 documents 表中。GraphRAG 通过工厂模式实现了对纯文本、CSV、JSON 三种文件格式的统一处理:

loaders: dict[str, Callable[..., Awaitable[pd.DataFrame]]] = {
  InputFileType.text: load_text,
  InputFileType.csv: load_csv,
  InputFileType.json: load_json,
}

如果你有不同的格式,官方的建议是编写脚本将其转换为其中一种。

同时,它也支持 Azure Blob、CosmosDB、本地文件、内存这四种不同的存储类型:

StorageFactory.register(StorageType.blob.value, create_blob_storage)
StorageFactory.register(StorageType.cosmosdb.value, create_cosmosdb_storage)
StorageFactory.register(StorageType.file.value, create_file_storage)
StorageFactory.register(StorageType.memory.value, lambda **_: MemoryPipelineStorage())

默认情况下是从本地的 input 目录下读取文件,并使用 text 加载器处理。我们可以在 settings.yaml 文件中的 input 部分进行配置:

input:
  storage:
    type: file # [blob, cosmosdb, file, memory]
    base_dir: "input"
  file_type: text # [csv, text, json]

文本文件处理

对于纯文本文件,处理逻辑相对简单,在 index/input/text.py 中:

async def load_file(path: str, group: dict | None = None) -> pd.DataFrame:
  text = await storage.get(path, encoding=config.encoding)
  new_item = {**group, "text": text}
  new_item["id"] = gen_sha512_hash(new_item, new_item.keys())
  new_item["title"] = str(Path(path).name)
  new_item["creation_date"] = await storage.get_creation_date(path)
  return pd.DataFrame([new_item])

每个文本文件被读取为一个完整的字符串,并自动生成标准字段:

  • text: 文件的完整内容
  • id: 基于内容的 SHA512 哈希值,确保唯一性
  • title: 文件名(不含路径)
  • creation_date: 文件创建日期

JSON 文件处理

JSON 文件的处理位于 index/input/json.py 文件,直接调用 json.loads() 加载数据。支持两种格式,一种是单个对象格式:

{
  "text": "文档内容",
  "title": "文档标题"
}

另一种是数组格式:

[
  {"text": "文档1内容", "title": "文档1标题"},
  {"text": "文档2内容", "title": "文档2标题"}
]

注意,目前暂不支持 JSONL 格式(每行一个完整的 JSON 对象),需转换为数组格式。

JSON 解析器会自动为对象增加 idcreation_date 两个字段。另外,如果 JSON 中没有 texttitle 字段,还可以在配置文件中设置:

input:
  file_type: json
  text_column: content  # 指定 text 列
  title_column: name    # 指定 title 列

CSV 文件处理

CSV 文件的处理位于 index/input/csv.py 文件,直接调用 pd.read_csv() 加载数据,它的格式如下:

text,title
"文档1内容","文档1标题"
"文档2内容","文档2标题"

同样的,CSV 解析器也会自动为对象增加 idcreation_date 两个字段,也支持在配置文件中设置 texttitle 字段映射。

文本分块处理

加载原始文档后,下一步是将长文档切分成更小、更易于处理的文本单元,这一步骤对后续的 LLM 处理至关重要。分块策略通过 settings.yaml 中的 chunks 部分配置:

chunks:
  size: 1200                   # 每个分块的目标 token 数量
  overlap: 100                 # 相邻分块之间重叠的 token 数量
  group_by_columns: [id]       # 按指定列分组,确保分块在文档内部进行
  strategy: tokens             # 分块策略:tokens 或 sentence
  encoding_model: cl100k_base  # 用于计算 token 的编码模型

其中 sizeoverlap 比较好理解,分别表示每个分块的大小和相邻分块之间的重叠;group_by_columns 通常设置为 ["doc_id"] 或 ["id"] 表示在分块前先按文档 ID 进行分组,确保文本分块不会跨越不同的文档;strategy 表示分块策略,支持 tokenssentence 两种。

def load_strategy(strategy: ChunkStrategyType) -> ChunkStrategy:
  """分块策略定义"""
  match strategy:
    case ChunkStrategyType.tokens:
      # tokens 分块策略
      from graphrag.index.operations.chunk_text.strategies import run_tokens
      return run_tokens
    case ChunkStrategyType.sentence:
      # sentence 分块策略
      from graphrag.index.operations.chunk_text.bootstrap import bootstrap
      from graphrag.index.operations.chunk_text.strategies import run_sentences
      bootstrap()
      return run_sentences

基于 tokens 的精确分块

tokens 分块策略 是 GraphRAG 的默认分块策略,它基于 tiktoken 库进行精确的 token 计算和分块,核心实现如下:

def run_tokens(
  input: list[str], config: ChunkingConfig, tick: ProgressTicker,
) -> Iterable[TextChunk]:

  # 获取编码和解码函数
  encode, decode = get_encoding_fn(encoding_name)
  
  # 基于编码 token 将文本分块
  return split_multiple_texts_on_tokens(
    input,
    Tokenizer(
      chunk_overlap=chunk_overlap,
      tokens_per_chunk=tokens_per_chunk,
      encode=encode,
      decode=decode,
    ),
    tick,
  )

其中 get_encoding_fn() 获取编码和解码函数,它支持通过 tiktoken 库加载不同的编码模型,比如 GPT-3.5 和 GPT-4 的 cl100k_base 或者 GPT-4.1 的 o200k_base 等,可以在 tiktoken 源码中 找到完整的模型列表:

tiktoken-models.png

tokens 分块的核心算法位于 split_multiple_texts_on_tokens() 函数:

def split_multiple_texts_on_tokens(
  texts: list[str], tokenizer: Tokenizer, tick: ProgressTicker
) -> list[TextChunk]:
  """将多个文本进行分块并返回带元数据的块"""
  result = []
  mapped_ids = []

  # 第一步:编码所有文本并记录源文档索引
  for source_doc_idx, text in enumerate(texts):
    encoded = tokenizer.encode(text)
    mapped_ids.append((source_doc_idx, encoded))

  # 第二步:创建全局 token 序列,每个 token 都记录其来源文档
  input_ids = [
    (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids
  ]

  # 第三步:滑动窗口分块
  start_idx = 0
  cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
  chunk_ids = input_ids[start_idx:cur_idx]

  while start_idx < len(input_ids):
    # 解码当前块的文本内容
    chunk_text = tokenizer.decode([id for _, id in chunk_ids])
    # 记录当前块涉及的源文档索引
    doc_indices = list({doc_idx for doc_idx, _ in chunk_ids})
    result.append(TextChunk(chunk_text, doc_indices, len(chunk_ids)))
    # 已完成
    if cur_idx == len(input_ids):
      break
    # 滑动窗口:下一个块的起始位置考虑重叠
    start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
    cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
    chunk_ids = input_ids[start_idx:cur_idx]

  return result

该算法首先使用 tiktoken 库将所有文档编码成 token 序列(即 mapped_ids 数组);然后将 token 序列展开,变成一个全局的整数序列,每个 token 都保留其来源信息(即 input_ids 数组);然后按 tokens_per_chunk 数量对 token 序列进行第一个分块并解码得到文本内容;接着使用滑动窗口往后依次分块,同时考虑相邻块之间有指定数量的重叠 token,保持上下文连续性。

有趣的是,这里的分块算法参考了 LangChain 的实现,有兴趣的朋友可以看下 LangChain 的 TokenTextSplitter

基于句子的自然分块

sentence 分块策略 使用 NLTK 库进行基于句子边界的自然语言分块,它首先调用 bootstrap() 初始化必要的 NLTK 资源:

def bootstrap():
  """初始化 NLTK 资源"""
  global initialized_nltk
  if not initialized_nltk:
    import nltk
    from nltk.corpus import wordnet as wn
    # 句子分词
    nltk.download("punkt")
    nltk.download("punkt_tab")
    # 词性标注
    nltk.download("averaged_perceptron_tagger")
    nltk.download("averaged_perceptron_tagger_eng")
    # 命名实体识别相关
    nltk.download("maxent_ne_chunker")
    nltk.download("maxent_ne_chunker_tab")
    # 词汇资源
    nltk.download("words")
    nltk.download("wordnet")
    # 确保 wordnet 数据已加载
    wn.ensure_loaded()
    # 完成初始化
    initialized_nltk = True

这里我稍微介绍下这几个 NLTK 资源:

  • 句子分词相关punkt 用于句子边界检测和分词,将文本分割成句子,识别句子的开始和结束;punkt_tabpunkt 的表格化版本,提供更高效的句子分词性能;
  • 词性标注相关averaged_perceptron_tagger 用于词性标注,基于 平均感知机算法,为每个单词标注词性(名词、动词、形容词等);averaged_perceptron_tagger_eng 是其英语特化版本,专门针对英语文本的词性标注;
  • 命名实体识别相关maxent_ne_chunker 用于命名实体识别和分块,基于 最大熵模型,识别人名、地名、组织名等命名实体;maxent_ne_chunker_tabmaxent_ne_chunker 的表格化版本,提供更快的命名实体识别性能;
  • 词汇资源words 是英语单词词典,包含大量英语单词,用于拼写检查和词汇验证;wordnet 是英语词汇的语义网络,提供词汇间的语义关系(同义词、反义词、上下位关系等),支持词义消歧、语义相似度计算等;

然后使用 NLTK 的句子分割器,也就是 sent_tokenize() 方法,进行按句子分块:

def run_sentences(
  input: list[str], _config: ChunkingConfig, tick: ProgressTicker
) -> Iterable[TextChunk]:
  """按句子将文本分成多个部分"""
  for doc_idx, text in enumerate(input):
    # 使用 NLTK 进行句子分割
    sentences = nltk.sent_tokenize(text)
    for sentence in sentences:
      yield TextChunk(
        text_chunk=sentence,
        source_doc_indices=[doc_idx],
      )
    tick(1)

可以看到,这种分块策略的实现相对简单,每个句子就是一个独立的文本块,句子之间没有重叠,避免信息重复。

两种策略的对比

下表对两种策略做了个对比:

特性tokens 策略sentence 策略
分块依据Token 数量句子边界
块大小控制精确控制依赖句子长度
上下文保持可配置重叠自然语言完整性
跨文档支持支持不支持
计算复杂度较高较低
适用场景严格 token 限制的 LLM 处理需要保持句子完整性的分析

可以根据需要选择适当的分块策略。

无论是哪种分块策略,都返回统一的 TextChunk 数据结构:

@dataclass
class TextChunk:
  """文本块类定义"""
  text_chunk: str              # 文本块内容
  source_doc_indices: list[int] # 源文档索引列表
  n_tokens: int | None = None  # token 数量(可选)

此外,我们还可以为文档添加一下元数据,分块时也有一些处理元数据的策略,相关配置如下:

input:
  metadata: [title,tag] # 增加元数据列
chunks:
  prepend_metadata: true  # 将元数据复制到每个文本块的开头
  chunk_size_includes_metadata: false # 计算分块大小时是否包含元数据

关于元数据的使用,可以参考 GraphRAG 官方教程:

文档规范化

在完成文本分块后,create_final_documents 步骤会对处理结果进行最终的整理和规范化,将分块后的文本单元重新与原始文档关联。让我们详细分析实际的源码实现:

def create_final_documents(documents: pd.DataFrame, text_units: pd.DataFrame) -> pd.DataFrame:
  
  # 展开文本单元的文档ID关联并选择需要的列
  exploded = (
    text_units.explode("document_ids")
    .loc[:, ["id", "document_ids", "text"]]
    .rename(
      columns={
        "document_ids": "chunk_doc_id",
        "id": "chunk_id", 
        "text": "chunk_text",
      }
    )
  )
  
  # 与原始文档信息合并
  joined = exploded.merge(
    documents,
    left_on="chunk_doc_id",
    right_on="id",
    how="inner",
    copy=False,
  )
  
  # 按文档聚合所有相关的文本单元ID
  docs_with_text_units = joined.groupby("id", sort=False).agg(
    text_unit_ids=("chunk_id", list)
  )
  
  # 重新与文档信息合并,形成最终结构
  rejoined = docs_with_text_units.merge(
    documents,
    on="id",
    how="right",
    copy=False,
  ).reset_index(drop=True)
  
  # 数据格式化和添加序号
  rejoined["id"] = rejoined["id"].astype(str)
  rejoined["human_readable_id"] = rejoined.index + 1
  
  # 返回标准化的列结构
  return rejoined.loc[:, DOCUMENTS_FINAL_COLUMNS]

这一步的代码看起来很长,其实它的逻辑并不复杂,核心在于数据的操作和转换,看懂这一步的代码需要熟练掌握 pandas 各种数据处理技巧,比如:

  • .explode("document_ids"):将数组列展开为多行,每个数组元素对应一行
  • .loc[:, ["id", "document_ids", "text"]]:列切片操作,只保留需要的列,减少内存使用
  • .rename(columns={}):重命名列以更清晰地表示其含义
  • .merge(documents, ...):对两个表执行合并操作,使用 left_onright_on 明确指定连接键
  • .groupby("id", sort=False):按文档 ID 分组,保持原有顺序,提高性能
  • agg(text_unit_ids=("chunk_id", list)):聚合操作,将每个文档对应的所有文本单元 ID 收集成列表

最后使用预定义的 DOCUMENTS_FINAL_COLUMNS 确保输出格式的一致性,包括以下列:

  • id: 文档唯一标识符
  • human_readable_id: 人类可读的递增ID
  • title: 文档标题
  • text: 原始文档内容
  • text_unit_ids: 关联的文本单元ID列表
  • metadata: 元数据信息
  • creation_date: 创建日期

小结

今天我们深入探讨了 GraphRAG 索引构建流程中的文档处理阶段,这个阶段是整个知识图谱构建的基础,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取和图谱构建做好准备。通过本次学习,我们掌握了以下核心要点:

  • 统一的文档加载:学习了 GraphRAG 如何通过工厂模式,支持从多种数据源(如本地文件、Azure Blob)加载不同格式(纯文本、CSV、JSON)的原始文档;
  • 灵活的文本分块:详细分析了两种核心的文本分块策略。tokens 策略基于 tiktoken 实现精确的 token 级切片和重叠,适合对 LLM 输入有严格限制的场景;而 sentence 策略则利用 NLTK 库进行基于句子边界的自然语言分块,更好地保持了语义的完整性;
  • 标准化的数据输出:文档处理的最后一步,将分块后的文本单元与原始文档进行重新关联,最终输出包含标准化字段的文档数据,为后续的索引流程提供了结构清晰、内容一致的输入。

在下一篇文章中,我们将进入知识提取阶段,学习 GraphRAG 如何调用大模型从这些精心处理的文本单元中提取实体和关系,最终构建出结构化的知识图谱。


昨天我们对 GraphRAG 的索引构建做了一个基本的概述,了解了其索引构建的整体流程和工作流引擎的设计。今天我们将深入第一个具体阶段 —— 文档处理,这是整个知识图谱构建的起点,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取奠定基础。

回顾索引构建流程

根据前一篇文章的分析,我们可以将 GraphRAG 的索引构建流程分为三个主要阶段:

今天我们重点关注文档处理阶段,它的核心任务是将非结构化的原始数据转换为后续处理流程所需的标准化文本单元。这个阶段包含三个关键工作流:

  • 加载原始文档(load_input_documents - 从各种数据源加载原始文档
  • 文本分块处理(create_base_text_units - 将长文档分割成可处理的文本块
  • 文档规范化(create_final_documents - 对文档进行最终的标准化整理

加载原始文档

在 GraphRAG 中,所有工作流的入口都是 run_workflow() 方法,加载原始文档也是如此。它的核心逻辑位于 index/workflows/load_input_documents.py 文件:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 加载输入文档,转换为标准的 pd.DataFrame 格式
  output = await load_input_documents(config.input, context.input_storage)

  # 将原始文档写入 documents 表中
  await write_table_to_storage(output, "documents", context.output_storage)
  return WorkflowFunctionOutput(result=output)

这个工作流首先加载输入文档,将其转换为标准的 pd.DataFrame 格式,然后写入 documents 表中。GraphRAG 通过工厂模式实现了对纯文本、CSV、JSON 三种文件格式的统一处理:

loaders: dict[str, Callable[..., Awaitable[pd.DataFrame]]] = {
  InputFileType.text: load_text,
  InputFileType.csv: load_csv,
  InputFileType.json: load_json,
}

如果你有不同的格式,官方的建议是编写脚本将其转换为其中一种。

同时,它也支持 Azure Blob、CosmosDB、本地文件、内存这四种不同的存储类型:

StorageFactory.register(StorageType.blob.value, create_blob_storage)
StorageFactory.register(StorageType.cosmosdb.value, create_cosmosdb_storage)
StorageFactory.register(StorageType.file.value, create_file_storage)
StorageFactory.register(StorageType.memory.value, lambda **_: MemoryPipelineStorage())

默认情况下是从本地的 input 目录下读取文件,并使用 text 加载器处理。我们可以在 settings.yaml 文件中的 input 部分进行配置:

input:
  storage:
    type: file # [blob, cosmosdb, file, memory]
    base_dir: "input"
  file_type: text # [csv, text, json]

文本文件处理

对于纯文本文件,处理逻辑相对简单,在 index/input/text.py 中:

async def load_file(path: str, group: dict | None = None) -> pd.DataFrame:
  text = await storage.get(path, encoding=config.encoding)
  new_item = {**group, "text": text}
  new_item["id"] = gen_sha512_hash(new_item, new_item.keys())
  new_item["title"] = str(Path(path).name)
  new_item["creation_date"] = await storage.get_creation_date(path)
  return pd.DataFrame([new_item])

每个文本文件被读取为一个完整的字符串,并自动生成标准字段:

  • text: 文件的完整内容
  • id: 基于内容的 SHA512 哈希值,确保唯一性
  • title: 文件名(不含路径)
  • creation_date: 文件创建日期

JSON 文件处理

JSON 文件的处理位于 index/input/json.py 文件,直接调用 json.loads() 加载数据。支持两种格式,一种是单个对象格式:

{
  "text": "文档内容",
  "title": "文档标题"
}

另一种是数组格式:

[
  {"text": "文档1内容", "title": "文档1标题"},
  {"text": "文档2内容", "title": "文档2标题"}
]

注意,目前暂不支持 JSONL 格式(每行一个完整的 JSON 对象),需转换为数组格式。

JSON 解析器会自动为对象增加 idcreation_date 两个字段。另外,如果 JSON 中没有 texttitle 字段,还可以在配置文件中设置:

input:
  file_type: json
  text_column: content  # 指定 text 列
  title_column: name    # 指定 title 列

CSV 文件处理

CSV 文件的处理位于 index/input/csv.py 文件,直接调用 pd.read_csv() 加载数据,它的格式如下:

text,title
"文档1内容","文档1标题"
"文档2内容","文档2标题"

同样的,CSV 解析器也会自动为对象增加 idcreation_date 两个字段,也支持在配置文件中设置 texttitle 字段映射。

文本分块处理

加载原始文档后,下一步是将长文档切分成更小、更易于处理的文本单元,这一步骤对后续的 LLM 处理至关重要。分块策略通过 settings.yaml 中的 chunks 部分配置:

chunks:
  size: 1200                   # 每个分块的目标 token 数量
  overlap: 100                 # 相邻分块之间重叠的 token 数量
  group_by_columns: [id]       # 按指定列分组,确保分块在文档内部进行
  strategy: tokens             # 分块策略:tokens 或 sentence
  encoding_model: cl100k_base  # 用于计算 token 的编码模型

其中 sizeoverlap 比较好理解,分别表示每个分块的大小和相邻分块之间的重叠;group_by_columns 通常设置为 ["doc_id"] 或 ["id"] 表示在分块前先按文档 ID 进行分组,确保文本分块不会跨越不同的文档;strategy 表示分块策略,支持 tokenssentence 两种。

def load_strategy(strategy: ChunkStrategyType) -> ChunkStrategy:
  """分块策略定义"""
  match strategy:
    case ChunkStrategyType.tokens:
      # tokens 分块策略
      from graphrag.index.operations.chunk_text.strategies import run_tokens
      return run_tokens
    case ChunkStrategyType.sentence:
      # sentence 分块策略
      from graphrag.index.operations.chunk_text.bootstrap import bootstrap
      from graphrag.index.operations.chunk_text.strategies import run_sentences
      bootstrap()
      return run_sentences

基于 tokens 的精确分块

tokens 分块策略 是 GraphRAG 的默认分块策略,它基于 tiktoken 库进行精确的 token 计算和分块,核心实现如下:

def run_tokens(
  input: list[str], config: ChunkingConfig, tick: ProgressTicker,
) -> Iterable[TextChunk]:

  # 获取编码和解码函数
  encode, decode = get_encoding_fn(encoding_name)
  
  # 基于编码 token 将文本分块
  return split_multiple_texts_on_tokens(
    input,
    Tokenizer(
      chunk_overlap=chunk_overlap,
      tokens_per_chunk=tokens_per_chunk,
      encode=encode,
      decode=decode,
    ),
    tick,
  )

其中 get_encoding_fn() 获取编码和解码函数,它支持通过 tiktoken 库加载不同的编码模型,比如 GPT-3.5 和 GPT-4 的 cl100k_base 或者 GPT-4.1 的 o200k_base 等,可以在 tiktoken 源码中 找到完整的模型列表:

tokens 分块的核心算法位于 split_multiple_texts_on_tokens() 函数:

def split_multiple_texts_on_tokens(
  texts: list[str], tokenizer: Tokenizer, tick: ProgressTicker
) -> list[TextChunk]:
  """将多个文本进行分块并返回带元数据的块"""
  result = []
  mapped_ids = []

  # 第一步:编码所有文本并记录源文档索引
  for source_doc_idx, text in enumerate(texts):
    encoded = tokenizer.encode(text)
    mapped_ids.append((source_doc_idx, encoded))

  # 第二步:创建全局 token 序列,每个 token 都记录其来源文档
  input_ids = [
    (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids
  ]

  # 第三步:滑动窗口分块
  start_idx = 0
  cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
  chunk_ids = input_ids[start_idx:cur_idx]

  while start_idx < len(input_ids):
    # 解码当前块的文本内容
    chunk_text = tokenizer.decode([id for _, id in chunk_ids])
    # 记录当前块涉及的源文档索引
    doc_indices = list({doc_idx for doc_idx, _ in chunk_ids})
    result.append(TextChunk(chunk_text, doc_indices, len(chunk_ids)))
    # 已完成
    if cur_idx == len(input_ids):
      break
    # 滑动窗口:下一个块的起始位置考虑重叠
    start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
    cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
    chunk_ids = input_ids[start_idx:cur_idx]

  return result

该算法首先使用 tiktoken 库将所有文档编码成 token 序列(即 mapped_ids 数组);然后将 token 序列展开,变成一个全局的整数序列,每个 token 都保留其来源信息(即 input_ids 数组);然后按 tokens_per_chunk 数量对 token 序列进行第一个分块并解码得到文本内容;接着使用滑动窗口往后依次分块,同时考虑相邻块之间有指定数量的重叠 token,保持上下文连续性。

有趣的是,这里的分块算法参考了 LangChain 的实现,有兴趣的朋友可以看下 LangChain 的 TokenTextSplitter

基于句子的自然分块

sentence 分块策略 使用 NLTK 库进行基于句子边界的自然语言分块,它首先调用 bootstrap() 初始化必要的 NLTK 资源:

def bootstrap():
  """初始化 NLTK 资源"""
  global initialized_nltk
  if not initialized_nltk:
    import nltk
    from nltk.corpus import wordnet as wn
    # 句子分词
    nltk.download("punkt")
    nltk.download("punkt_tab")
    # 词性标注
    nltk.download("averaged_perceptron_tagger")
    nltk.download("averaged_perceptron_tagger_eng")
    # 命名实体识别相关
    nltk.download("maxent_ne_chunker")
    nltk.download("maxent_ne_chunker_tab")
    # 词汇资源
    nltk.download("words")
    nltk.download("wordnet")
    # 确保 wordnet 数据已加载
    wn.ensure_loaded()
    # 完成初始化
    initialized_nltk = True

这里我稍微介绍下这几个 NLTK 资源:

  • 句子分词相关punkt 用于句子边界检测和分词,将文本分割成句子,识别句子的开始和结束;punkt_tabpunkt 的表格化版本,提供更高效的句子分词性能;
  • 词性标注相关averaged_perceptron_tagger 用于词性标注,基于 平均感知机算法,为每个单词标注词性(名词、动词、形容词等);averaged_perceptron_tagger_eng 是其英语特化版本,专门针对英语文本的词性标注;
  • 命名实体识别相关maxent_ne_chunker 用于命名实体识别和分块,基于 最大熵模型,识别人名、地名、组织名等命名实体;maxent_ne_chunker_tabmaxent_ne_chunker 的表格化版本,提供更快的命名实体识别性能;
  • 词汇资源words 是英语单词词典,包含大量英语单词,用于拼写检查和词汇验证;wordnet 是英语词汇的语义网络,提供词汇间的语义关系(同义词、反义词、上下位关系等),支持词义消歧、语义相似度计算等;

然后使用 NLTK 的句子分割器,也就是 sent_tokenize() 方法,进行按句子分块:

def run_sentences(
  input: list[str], _config: ChunkingConfig, tick: ProgressTicker
) -> Iterable[TextChunk]:
  """按句子将文本分成多个部分"""
  for doc_idx, text in enumerate(input):
    # 使用 NLTK 进行句子分割
    sentences = nltk.sent_tokenize(text)
    for sentence in sentences:
      yield TextChunk(
        text_chunk=sentence,
        source_doc_indices=[doc_idx],
      )
    tick(1)

可以看到,这种分块策略的实现相对简单,每个句子就是一个独立的文本块,句子之间没有重叠,避免信息重复。

两种策略的对比

下表对两种策略做了个对比:

特性tokens 策略sentence 策略
分块依据Token 数量句子边界
块大小控制精确控制依赖句子长度
上下文保持可配置重叠自然语言完整性
跨文档支持支持不支持
计算复杂度较高较低
适用场景严格 token 限制的 LLM 处理需要保持句子完整性的分析

可以根据需要选择适当的分块策略。

无论是哪种分块策略,都返回统一的 TextChunk 数据结构:

@dataclass
class TextChunk:
  """文本块类定义"""
  text_chunk: str              # 文本块内容
  source_doc_indices: list[int] # 源文档索引列表
  n_tokens: int | None = None  # token 数量(可选)

此外,我们还可以为文档添加一下元数据,分块时也有一些处理元数据的策略,相关配置如下:

input:
  metadata: [title,tag] # 增加元数据列
chunks:
  prepend_metadata: true  # 将元数据复制到每个文本块的开头
  chunk_size_includes_metadata: false # 计算分块大小时是否包含元数据

关于元数据的使用,可以参考 GraphRAG 官方教程:

文档规范化

在完成文本分块后,create_final_documents 步骤会对处理结果进行最终的整理和规范化,将分块后的文本单元重新与原始文档关联。让我们详细分析实际的源码实现:

def create_final_documents(documents: pd.DataFrame, text_units: pd.DataFrame) -> pd.DataFrame:
  
  # 展开文本单元的文档ID关联并选择需要的列
  exploded = (
    text_units.explode("document_ids")
    .loc[:, ["id", "document_ids", "text"]]
    .rename(
      columns={
        "document_ids": "chunk_doc_id",
        "id": "chunk_id", 
        "text": "chunk_text",
      }
    )
  )
  
  # 与原始文档信息合并
  joined = exploded.merge(
    documents,
    left_on="chunk_doc_id",
    right_on="id",
    how="inner",
    copy=False,
  )
  
  # 按文档聚合所有相关的文本单元ID
  docs_with_text_units = joined.groupby("id", sort=False).agg(
    text_unit_ids=("chunk_id", list)
  )
  
  # 重新与文档信息合并,形成最终结构
  rejoined = docs_with_text_units.merge(
    documents,
    on="id",
    how="right",
    copy=False,
  ).reset_index(drop=True)
  
  # 数据格式化和添加序号
  rejoined["id"] = rejoined["id"].astype(str)
  rejoined["human_readable_id"] = rejoined.index + 1
  
  # 返回标准化的列结构
  return rejoined.loc[:, DOCUMENTS_FINAL_COLUMNS]

这一步的代码看起来很长,其实它的逻辑并不复杂,核心在于数据的操作和转换,看懂这一步的代码需要熟练掌握 pandas 各种数据处理技巧,比如:

  • .explode("document_ids"):将数组列展开为多行,每个数组元素对应一行
  • .loc[:, ["id", "document_ids", "text"]]:列切片操作,只保留需要的列,减少内存使用
  • .rename(columns={}):重命名列以更清晰地表示其含义
  • .merge(documents, ...):对两个表执行合并操作,使用 left_onright_on 明确指定连接键
  • .groupby("id", sort=False):按文档 ID 分组,保持原有顺序,提高性能
  • agg(text_unit_ids=("chunk_id", list)):聚合操作,将每个文档对应的所有文本单元 ID 收集成列表

最后使用预定义的 DOCUMENTS_FINAL_COLUMNS 确保输出格式的一致性,包括以下列:

  • id: 文档唯一标识符
  • human_readable_id: 人类可读的递增ID
  • title: 文档标题
  • text: 原始文档内容
  • text_unit_ids: 关联的文本单元ID列表
  • metadata: 元数据信息
  • creation_date: 创建日期

小结

今天我们深入探讨了 GraphRAG 索引构建流程中的文档处理阶段,这个阶段是整个知识图谱构建的基础,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取和图谱构建做好准备。通过本次学习,我们掌握了以下核心要点:

  • 统一的文档加载:学习了 GraphRAG 如何通过工厂模式,支持从多种数据源(如本地文件、Azure Blob)加载不同格式(纯文本、CSV、JSON)的原始文档;
  • 灵活的文本分块:详细分析了两种核心的文本分块策略。tokens 策略基于 tiktoken 实现精确的 token 级切片和重叠,适合对 LLM 输入有严格限制的场景;而 sentence 策略则利用 NLTK 库进行基于句子边界的自然语言分块,更好地保持了语义的完整性;
  • 标准化的数据输出:文档处理的最后一步,将分块后的文本单元与原始文档进行重新关联,最终输出包含标准化字段的文档数据,为后续的索引流程提供了结构清晰、内容一致的输入。

在下一篇文章中,我们将进入知识提取阶段,学习 GraphRAG 如何调用大模型从这些精心处理的文本单元中提取实体和关系,最终构建出结构化的知识图谱。


GraphRAG 索引构建概述

经过前面对 GraphRAG 项目结构的深入了解,我们已经掌握了它的整体架构和技术栈。今天我们将顺着 graphrag index 命令的调用链,深入探索 GraphRAG 索引构建的核心流程,包括命令行入口、配置加载机制、工作流引擎,以及不同索引方法的实现细节。

命令行入口

当我们执行 graphrag index 命令时,整个调用过程如下:

  1. 入口点__main__.pybin/graphraggraphrag.cli.main:app
  2. CLI 解析cli/main.py@app.command("index") 装饰器
  3. 配置加载cli/index.pyindex_cli() 函数
  4. API 调用api.build_index() 函数

其中,前两步我们昨天已经学习过了:__main__.py 的作用是让 GraphRAG 能以包的方式运行,pyproject.toml 文件中的 [project.scripts] 配置让 GraphRAG 能以可执行文件的方式运行,运行的入口都是 graphrag.cli.main 包的 app 方法;该方法基于 Typer 这个现代化的 Python CLI 库实现,通过 @app.command 装饰器,定义了 GraphRAG 的 5 个子命令,index 就是其中之一。

跟随调用链,我们接着看一下 cli/index.py 文件中 index_cli() 函数的实现:

def index_cli(root_dir: Path, method: IndexingMethod, ...):
  # 配置加载
  config = load_config(root_dir, config_filepath, cli_overrides)
  # 索引构建
  _run_index(config=config, method=method, ...)

这里我们可以看到两个关键步骤:

  1. 配置加载:调用 load_config() 函数加载并合并配置,支持通过命令行参数覆盖配置文件中的设置
  2. 索引执行:调用 _run_index() 执行实际的索引构建

配置加载

配置加载是 GraphRAG 的核心功能之一,位于 config/load_config.py 文件中,其实现如下:

def load_config(
  root_dir: Path,
  config_filepath: Path | None = None,
  cli_overrides: dict[str, Any] | None = None,
) -> GraphRagConfig:
  # 路径规范化,确保使用绝对路径
  root = root_dir.resolve()
  # 在根目录中搜索配置文件
  config_path = _get_config_path(root, config_filepath)
  # 加载 .env 文件中的环境变量
  _load_dotenv(config_path)
  # 读取配置文件文本内容
  config_text = config_path.read_text(encoding="utf-8")
  # 解析环境变量引用
  config_text = _parse_env_variables(config_text)
  # 根据文件类型解析为字典(支持 YAML 和 JSON)
  config_extension = config_path.suffix
  config_data = _parse(config_extension, config_text)
  # 应用命令行参数覆盖
  if cli_overrides:
    _apply_overrides(config_data, cli_overrides)
  # 创建并验证最终配置对象
  return create_graphrag_config(config_data, root_dir=str(root))

这里的 load_config() 函数实现了一个完整的类型安全的配置管理流程,整体流程非常清晰:

  1. 路径规范化:通过 root_dir.resolve() 确保使用绝对路径,其中 root_dir 是用户通过 --root-r 参数指定
  2. 配置发现:如果用户指定了 --config-c 参数,则使用用户自定义配置;否则在根目录中搜索默认配置文件,比如 settings.yamlsettings.ymlsettings.json
  3. 环境变量加载:加载 .env 文件中的环境变量
  4. 配置读取:读取配置文件文本内容
  5. 环境变量替换:解析配置文件中环境变量引用,比如 ${GRAPHRAG_API_KEY}
  6. 格式解析:根据文件类型将配置解析为字典,支持 YAML 和 JSON 两种方式
  7. 覆盖应用:应用命令行参数覆盖,比如 --output-o 参数会覆盖 output.base_dir 配置
  8. 对象创建:创建并验证最终的 GraphRagConfig 配置对象,使用 Pydantic 保证类型安全

这里有几个点比较有意思,可以展开介绍一下。

环境变量支持

GraphRAG 支持在配置文件中注入环境变量,这是使用 Python 的 Template 类实现的:

def _parse_env_variables(text: str) -> str:
  return Template(text).substitute(os.environ)

用户可以在配置文件中使用 ${VAR_NAME} 格式引用环境变量:

models:
  default_chat_model:
    type: openai_chat
    api_base: ${GRAPHRAG_API_BASE}
    auth_type: api_key
    api_key: ${GRAPHRAG_API_KEY}
    model: gpt-4o-mini

点号分隔覆盖机制

配置覆盖支持点号分隔的嵌套路径,比如当用户设置 --output-o 参数时,会覆盖下面三个配置:

cli_overrides = {}
if output_dir:
  cli_overrides["output.base_dir"] = str(output_dir)
  cli_overrides["reporting.base_dir"] = str(output_dir)
  cli_overrides["update_index_output.base_dir"] = str(output_dir)

这种点号分隔的路径语法允许精确覆盖嵌套配置项,为用户提供了灵活的配置管理能力,这块的实现比较通用,有类似需求的话,可以参考下 _apply_overrides() 的函数实现。

使用 Pydantic 创建配置

在配置加载的最后,通过 create_graphrag_config() 函数创建最终的 GraphRagConfig 配置对象:

def create_graphrag_config(
  values: dict[str, Any] | None = None,
  root_dir: str | None = None,
) -> GraphRagConfig:

  values = values or {}
  if root_dir:
    root_path = Path(root_dir).resolve()
    values["root_dir"] = str(root_path)
  return GraphRagConfig(**values)

这里使用了 Pydantic 库,其中 GraphRagConfig 类被定义为一个 Pydantic 模型,我们直接将 values 字典展开(两个星号 ** 用于解包字典)变成一个类型安全的配置对象。Pydantic 不仅能保证类型安全,还支持自定义参数验证,我们下面重点介绍一下它。

简单介绍 Pydantic 库

GraphRAG 使用 Pydantic 进行配置管理,这是一个基于类型提示的数据验证库。

pydantic.png

它的核心优势包括:

  • 类型安全:基于 Python 类型提示自动验证数据类型
  • 数据转换:自动进行数据类型转换和标准化
  • 验证规则:支持复杂的验证逻辑和自定义验证器
  • 错误报告:提供详细的验证错误信息
  • IDE 支持:完美的代码补全和类型检查支持

下面是一个简化的例子,展示如何使用 Pydantic 和 YAML 实现类似 GraphRAG 的配置管理。首先通过 Pydantic 定义配置类:

from enum import Enum
from pydantic import BaseModel, Field, field_validator

class StorageType(str, Enum):
  """存储类型枚举"""
  FILE = "file"
  AZURE_BLOB = "azure_blob"
  S3 = "s3"

class DatabaseConfig(BaseModel):
  """数据库配置模型"""
  host: str = Field(default="localhost", description="数据库主机")
  port: int = Field(default=5432, description="数据库端口")
  username: str = Field(description="用户名")
  password: str = Field(description="密码")
  
  @field_validator("port")
  @classmethod
  def validate_port(cls, v):
    if not 1 <= v <= 65535:
      raise ValueError("端口必须在 1-65535 范围内")
    return v

class AppConfig(BaseModel):
  """主配置模型"""
  app_name: str = Field(default="MyApp", description="应用名称")
  debug: bool = Field(default=False, description="调试模式")
  storage_type: StorageType = Field(default=StorageType.FILE, description="存储类型")
  database: DatabaseConfig = Field(description="数据库配置")
  
  # 自定义验证器
  @field_validator("app_name")
  @classmethod
  def validate_app_name(cls, v):
    if not v.strip():
      raise ValueError("应用名称不能为空")
    return v.strip()

然后创建一个配置文件 config.yaml 内容如下:

app_name: "Demo"
debug: true
storage_type: "file"
database:
  host: "localhost"
  port: 5432
  username: "admin"
  password: "secret"

接着我们就可以从 YAML 文件加载配置,并将其转换为强类型的配置对象:

import yaml
from pathlib import Path

def load_config_from_yaml(yaml_path: Path) -> AppConfig:
  """从 YAML 文件加载配置"""
  with open(yaml_path, 'r', encoding='utf-8') as f:
    config_data = yaml.safe_load(f)
  # Pydantic 自动验证和转换
  return AppConfig(**config_data)

# 使用示例
config_file = Path("config.yaml")
config = load_config_from_yaml(config_file)
print(f"应用名称: {config.app_name}")
print(f"数据库配置: {config.database.host}:{config.database.port}")

运行结果如下:

应用名称: Demo
数据库配置: localhost:5432

这个例子展示了 Pydantic 的核心特性,包括类型声明、默认值、验证器和自动数据转换等。

工作流引擎

配置加载之后,GraphRAG 调用 _run_index() 执行实际的索引构建,而它则是调用 API 层的 build_index() 函数:

async def build_index(
  config: GraphRagConfig,
  method: IndexingMethod | str = IndexingMethod.Standard,
  is_update_run: bool = False,
  ...
) -> list[PipelineRunResult]:
  
  outputs: list[PipelineRunResult] = []
  # 根据 method 创建对应的工作流管道
  method = _get_method(method, is_update_run)
  pipeline = PipelineFactory.create_pipeline(config, method)
  # 依次运行管道中的每个工作流
  async for output in run_pipeline(pipeline, config, is_update_run=is_update_run, additional_context=additional_context):
    outputs.append(output)
    logger.info("Workflow %s completed successfully", output.workflow)
  return outputs

GraphRAG 的索引构建采用了灵活的管道架构,其中 PipelineFactory 采用工厂设计模式,负责管理和创建处理工作流和管道:

class PipelineFactory:
  """工作流管道工厂类"""
  
  workflows: ClassVar[dict[str, WorkflowFunction]] = {}
  pipelines: ClassVar[dict[str, list[str]]] = {}
  
  @classmethod
  def register(cls, name: str, workflow: WorkflowFunction):
    """注册自定义工作流函数"""
    cls.workflows[name] = workflow

  @classmethod
  def register_all(cls, workflows: dict[str, WorkflowFunction]):
    """批量注册自定义工作流函数"""
    for name, workflow in workflows.items():
      cls.register(name, workflow)
  
  @classmethod
  def register_pipeline(cls, name: str, workflows: list[str]):
    """注册自定义管道,一个管道包含多个工作流函数"""
    cls.pipelines[name] = workflows

  @classmethod
  def create_pipeline(cls, config: GraphRagConfig, method: IndexingMethod) -> Pipeline:
    """根据 method 创建管道"""
    workflows = config.workflows or cls.pipelines.get(method, [])
    return Pipeline([(name, cls.workflows[name]) for name in workflows])

这里涉及 GraphRAG 中的两个核心概念:工作流(Workflow)管道(Pipeline)。工作流是一个个独立的模块,比如加载文档、文本分片、提取图谱等等,程序启动时会自动注册所有内置的工作流函数:

PipelineFactory.register_all({
  "load_input_documents": run_load_input_documents,
  "create_base_text_units": run_create_base_text_units,
  "extract_graph": run_extract_graph,
  "create_communities": run_create_communities,
  # ...
})

而管道则是由多个工作流串起来的一个集合,系统预定义了四个管道:

PipelineFactory.register_pipeline(
  IndexingMethod.Standard, ["load_input_documents", *_standard_workflows]
)
PipelineFactory.register_pipeline(
  IndexingMethod.Fast, ["load_input_documents", *_fast_workflows]
)
PipelineFactory.register_pipeline(
  IndexingMethod.StandardUpdate, ["load_update_documents", *_standard_workflows, *_update_workflows],
)
PipelineFactory.register_pipeline(
  IndexingMethod.FastUpdate, ["load_update_documents", *_fast_workflows, *_update_workflows],
)

分别对应四种不同的构建索引的方法,当我们执行 graphrag index 命令时,可以通过 --method-m 参数指定:

# 标准方法
$ graphrag index --root ./ragtest --method standard

# 快速方法
$ graphrag index --root ./ragtest --method fast

# 标准方法(用于更新)
$ graphrag index --root ./ragtest --method standard-update

# 快速方法(用于更新)
$ graphrag index --root ./ragtest --method fast-update

GraphRAG 通过 create_pipeline() 方法,根据 method 找到对应的管道,然后依次运行管道中的每个工作流函数。

索引构建方法

上面提到 GraphRAG 内置了四种索引构建方法,每种都有其特定的适用场景:

索引方法速度质量适用场景主要特点
Standard首次索引,追求高质量全 LLM 驱动的图构建
Fast快速原型,大数据集NLP + LLM 混合方式
StandardUpdate增量更新,保持高质量标准方法 + 增量更新
FastUpdate频繁更新,快速处理快速方法 + 增量更新

其中后两个 Update 方法是在前两个方法的基础上增加了增量更新的能力,能够在不重新构建整个索引的情况下,处理新增或修改的文档,大大提高了增量处理的效率。我们这里主要关注 StandardFast 这两个方法,它们的主要差异点在于:

  • Standard 方法的 LLM 驱动流程

    • extract_graph:使用大语言模型从文本中提取实体和关系
    • extract_covariates:使用 LLM 提取声明和协变量信息
    • create_community_reports:基于图上下文生成高质量社区报告
  • Fast 方法的混合流程

    • extract_graph_nlp:使用传统 NLP 技术(如 spaCy)进行实体识别
    • prune_graph:对提取的实体进行过滤和清理
    • create_community_reports_text:基于文本单元上下文生成报告(更快但质量稍低)

下图展示了两种方法的工作流执行顺序:

index-workflow.png

蓝色为 Standard 方法使用的工作流,红色为 Fast 方法使用的工作流,绿色为二者共用的工作流。

小结

我们今天学习了当执行 graphrag index 命令时的整个调用过程,主要内容包括:

  1. 命令行入口:详细分析了从 graphrag index 命令到核心逻辑的完整调用链;
  2. 配置加载:深入解读了 load_config() 函数的实现,包括环境变量支持和点号分隔覆盖机制;
  3. Pydantic 介绍:通过一个简单的示例演示如何使用 Pydantic 和 YAML 实现类似 GraphRAG 的配置管理;
  4. 工作流引擎:学习 PipelineFactory 的设计模式和工作流注册机制;
  5. 索引构建方法:对比了四种不同的索引构建方法及其适用场景,通过 Mermaid 图表展示了索引构建的完整处理流程;

在下一篇文章中,我们将深入索引构建的具体流程,先从文档处理阶段开始,详细分析 GraphRAG 如何从各种格式的原始文档中提取和预处理文本数据,为后续的知识提取做好准备。


剖析 GraphRAG 的项目结构

经过这几天的动手实践和可视化体验,我们已经对 GraphRAG 的核心功能有了一个基本的了解。今天,我们正式开始深入研究它的源码,首先从熟悉项目结构入手,为我们后续学习其核心工作流(比如索引和查询)打下基础。

项目概览

使用 tree 命令查看 GraphRAG 项目的项目结构如下:

$ tree -L 1 graphrag
├── __init__.py
├── __main__.py     # 模块入口
├── api             # API 层
├── cache
├── callbacks
├── cli             # CLI 入口
├── config          # 配置管理
├── data_model      # 数据模型
├── index           # 索引系统
├── language_model  # 语言模型
├── logger
├── prompt_tune     # 提示词调优
├── prompts         # 提示词系统
├── query           # 查询系统
├── storage         # 存储抽象层
├── utils
└── vector_stores   # 向量数据库

其中基础模块包括:

  • API 层(api:这里提供了高层次的编程接口,是外部调用 GraphRAG 功能的主要入口,包括索引构建 API(index.py)、查询 API(query.py)和提示词调优 API(prompt_tune.py)三大块;
  • CLI 入口(cli:通过 Typer 装饰器定义命令,解析命令行参数并调用 API 层;
  • 配置管理(config:使用 Pydantic 定义类型安全的配置模型,包括主配置、LLM 配置、向量配置、存储配置等;
  • 数据模型(data_model:定义核心数据结构,包括实体、关系、社区、社区报告、文本单元、文档等;

从入口处可以看出 GraphRAG 有三大功能:索引构建、查询、提示词调优,分别位于下面这些模块中:

  • 索引系统(index:这是 GraphRAG 的核心模块,负责从原始文档构建知识图谱,这里的内容也是最丰富的;比如 input 目录用于输入处理,支持 CSV、JSON、纯文本多种格式的文档;workflows 是 GraphRAG 的工作流引擎,定义了标准的数据处理工作流,使用工厂模式动态创建流水线,支持增量更新和全量重建;operations 目录实现了大量的工作流原子操作,包括文本处理、图谱构建、社区识别、总结分析等;下面是一些典型的工作流操作:

    • chunk_text - 文本分块,将长文档切分为可处理的片段;
    • embed_text - 文本向量化,支持多种嵌入模型;
    • extract_graph - 从文本提取实体和关系;
    • build_noun_graph - 构建名词图,识别重要概念;
    • create_graph.py - 创建完整的知识图谱;
    • cluster_graph.py - 图聚类,发现社区结构;
    • embed_graph - 图嵌入,支持 Node2Vec 等算法;
    • layout_graph - 图布局,支持 UMAP 降维可视化;
    • summarize_communities - 社区摘要生成;
    • summarize_descriptions - 实体描述摘要;
    • extract_covariates - 协变量提取(如声明检测);
  • 查询系统(query:为 GraphRAG 提供了多种检索策略,包括:

    • 全局搜索:基于社区报告的高层次查询,适合回答概括性或主题性问题;使用 Map-Reduce 模式处理大规模数据;
    • 本地搜索:基于实体和关系的细粒度查询,适合回答具体的事实性问题;混合向量检索和图遍历;
    • 漂移搜索:基于动态选择社区查询,在查询过程中调整搜索范围,平衡查询深度和广度;
  • 提示词系统(prompts:包括索引和查询所使用的所有提示词;
  • 提示词调优(prompt_tune:根据具体数据领域自动生成最适合的提示词,提高知识图谱构建质量;

此外,为实现索引构建和查询,大模型和存储服务也是必不可少的:

  • 语言模型抽象(language_model:提供统一的 LLM 和 Embedding 接口,主要通过 fnllm 库实现;
  • 存储抽象层(storage:提供统一的存储接口,支持多种后端,包括本地文件系统、Azure Blob、CosmosDB、内存存储等;
  • 向量数据库(vector_stores:支持多种向量数据库,包括:高性能向量数据库 LanceDB、Azure 认知搜索服务 Azure AI Search 以及 Azure 的 NoSQL 数据库 CosmosDB

技术栈分析

通过分析 pyproject.toml 文件,我们可以对 GraphRAG 的核心依赖和开发工作链有一个初步了解。

  • 大语言模型

    • fnllm[azure,openai] - 统一的大语言模型接口库
    • openai - OpenAI 官方提供的 API 客户端
    • tiktoken - 一款高效的 BPE 分词库,能实现文本分词,并计算出文本对应的 token 数量
  • 数据科学

    • pandas - 数据处理和分析
    • numpy - 数值计算
    • networkx - 图算法库,用于创建、操作和研究各种图结构,支持图的遍历、最短路径查找等多种图算法实现
    • graspologic - 图统计和机器学习,提供了图数据的统计分析方法和基于图的机器学习模型,助力从图结构数据中挖掘有价值的信息
    • umap-learn - 降维算法库,基于 UMAP 算法,能将高维数据映射到低维空间,同时保留数据的局部和全局结构,便于数据可视化和后续分析
  • 向量存储

    • lancedb - 高性能的向量数据库,具备高效的向量存储和检索能力
    • azure-search-documents - Azure 搜索服务的客户端库
  • 配置和 CLI

    • pydantic - 数据验证和配置管理,基于类型提示进行数据验证,能确保数据的完整性和正确性,同时便于配置信息的管理和解析
    • typer - 现代化的 CLI 框架,基于 Python 的类型提示构建,让开发者能快速、简洁地创建功能丰富的命令行工具
    • pyyaml - 用于处理 YAML 格式配置文件
  • 代码质量

    • ruff - 现代化的 Python 代码检查和格式化工具
    • pyright - 由微软开发的 Python 类型检查器
  • 测试框架

    • pytest - 测试框架
    • pytest-asyncio - 提供异步测试支持
    • coverage - 用于代码覆盖率分析
  • 文档和发布

    • mkdocs-material - 现代化的文档站点生成工具,基于 MkDocs 构建
    • semversioner - 用于语义化版本管理的工具,帮助开发者规范地管理项目的版本号,记录版本变更信息,简化版本发布流程

命令行入口

GraphRAG 是一个命令行程序,正如我们在入门篇里所体验的,使用 uv run poe init 命令初始化工作空间:

$ uv run poe init --root ./ragtest

其中 poePoe The Poet 的简称,它是一个任务管理工具,允许在 pyproject.toml 中定义和运行各种任务:

[project]
name = "graphrag"

[tool.poe.tasks]
...
index = "python -m graphrag index"
update = "python -m graphrag update"
init = "python -m graphrag init"
query = "python -m graphrag query"
prompt_tune = "python -m graphrag prompt-tune"

可以看出 poe 命令实际上是通过 python -m 执行的:

$ python -m graphrag init

包括 init 在内,GraphRAG 一共支持 5 个不同的子命令:

  • graphrag init - 初始化项目配置
  • graphrag index - 构建知识图谱索引
  • graphrag query - 执行查询操作
  • graphrag update - 增量更新索引
  • graphrag prompt-tune - 提示词调优

GraphRAG 采用了标准的 Python 包结构,__main__.py 文件是模块的入口:

from graphrag.cli.main import app

app(prog_name="graphrag")

这个文件允许用户通过 python -m graphrag 直接运行包,为开发和测试提供便利。

另外,如果你是通过 pip install 安装的,还可以直接运行 graphrag 命令:

$ graphrag init --root ./ragtest

要实现这一点,其秘密在于 pyproject.toml 文件中的下面这行配置:

[project.scripts]
graphrag = "graphrag.cli.main:app"

这里的 [project.scripts] 用于定义脚本入口,当执行 pip install 后,会在 Python 环境的 bin 目录下生成一个可执行文件,我们不妨看下这个文件内容:

# -*- coding: utf-8 -*-
import sys
from graphrag.cli.main import app
if __name__ == "__main__":
  if sys.argv[0].endswith("-script.pyw"):
    sys.argv[0] = sys.argv[0][:-11]
  elif sys.argv[0].endswith(".exe"):
    sys.argv[0] = sys.argv[0][:-4]
  sys.exit(app())

这个文件根据 [project.scripts] 定义,导入 graphrag.cli.main 包下的 app 方法并执行,因此我们可以直接运行 graphrag 命令。

命令行实现

GraphRAG 命令行是基于 Typer 库开发的,这是一个现代化的 Python CLI 库,基于类型提示构建命令行应用程序。

typer.png

它的核心特点如下:

  • 类型驱动:基于 Python 类型提示自动生成 CLI 接口
  • 易用性:简洁的 API,最少的样板代码
  • 自动化:自动生成帮助文档、参数验证和错误处理
  • 现代化:支持 Python 3.6+ 的现代特性
  • 高级功能:子命令组织、回调函数、进度条、颜色输出、确认提示等

通过寥寥几行代码,我们就可以使用 Typer 开发一个命令行程序:

import typer

app = typer.Typer()

@app.command()
def hello(name: str):
  print(f"Hello {name}")

if __name__ == "__main__":
  app()

再回到 GraphRAG 的代码,当我们执行 python -mgraphrag 命令时,实际上调用的是 __main__.pybin/graphrag 文件,它们都是指向 graphrag.cli.main 包下的 app 方法:

graphrag-typer-app.png

从这里可以看到,GraphRAG 基于 Typer 实现了多个选项和子命令的功能:

graphrag-cli.png

我们以 graphrag index 为例看下命令的调用链:

  1. 入口点__main__.pybin/graphraggraphrag.cli.main:app
  2. CLI 解析cli/main.py@app.command("index") 装饰器
  3. 配置加载cli/index.pyindex_cli() 函数
  4. API 调用api.build_index() 函数

小结

至此,我们对 GraphRAG 的项目结构有了大致的了解,并对 graphrag 命令行的运行原理有了一定的认识。接下来,我们将顺着 graphrag index 的调用链,展开聊聊 GraphRAG 索引构建的具体逻辑。


可视化探索 GraphRAG 的知识图谱

昨天,我们通过一个端到端的示例,快速体验了 GraphRAG 从数据索引到查询的完整流程。我们知道,GraphRAG 的核心是将非结构化文本转化为结构化的知识图谱。然而,单纯查看 Parquet 格式的输出文件很难直观地理解图谱的结构。今天,我们将学习如何将这个知识图谱可视化,从而更清晰地洞察数据中的实体、关系和社区结构。

准备工作

在我们昨天的 ragtest 示例中,uv run poe index 命令已经为我们生成了一系列 Parquet 文件,还有一个图描述文件 graph.graphml,包含了图谱的节点和边信息。为了生成这个文件,在构建索引之前,其实我对 settings.yaml 配置文件做了一点小小的修改,启用了 graphml 快照选项:

snapshots:
  graphml: true

为了支持其他可视化工具,还可以启用下面这些额外参数:

embed_graph:
  enabled: true  # 生成节点的 node2vec 嵌入
umap:
  enabled: true  # 生成 UMAP 嵌入,为实体提供 x/y 位置坐标

运行索引命令后,确保在 output 目录下能找到关键的 graph.graphml 文件,这是许多可视化工具支持的标准文件格式。

GraphML 文件格式

GraphML (Graph Markup Language) 是一种基于 XML 的通用图形描述格式,旨在表示结构化的图形数据,包括节点、边及其属性关系。它支持多种图类型,如有向图、无向图和混合图,并允许用户自定义属性,比如节点的权重和边的类型。GraphML 提供了一种灵活、可扩展的格式,在图论、网络分析和数据可视化等领域有着广泛的应用。

GraphML 文件以 XML 声明开头,核心标签嵌套关系如下:

<?xml version="1.0" encoding="UTF-8"?>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns">
  <!-- 1. 属性定义(key):声明图形元素的属性类型 -->
  <key id="weight" for="edge" attr.name="weight" attr.type="double"/>
  <key id="label" for="node" attr.name="label" attr.type="string"/>
  
  <!-- 2. 图形定义(graph):包含节点和边 -->
  <graph id="example" edgedefault="directed">
    <!-- 节点(node) -->
    <node id="n1">
      <data key="label">Node 1</data>
    </node>
    <node id="n2">
      <data key="label">Node 2</data>
    </node>
    
    <!-- 边(edge) -->
    <edge id="e1" source="n1" target="n2">
      <data key="weight">3.5</data>
    </edge>
  </graph>
</graphml>

整个文件结构包含:

  • 根标签 <graphml>,必须包含 xmlns 命名空间声明,定义 GraphML 的语法规则;
  • 属性定义 <key>,用于预先声明图形元素(节点、边、图形本身)的属性,包括属性名称和数据类型,类似于数据字典;id 为属性的唯一标识,后续可通过 key 引用;
  • 图形标签 <graph>,表示一个具体的图形,可包含多个节点和边,edgedefault 为默认边类型,支持 directed(有向边)和 undirected(无向边);
  • 节点 <node>,表示图形中的节点,id 为唯一标识;
  • <edge>,表示节点间的关系,必须包含 source(起点)和 target(终点)属性;

在 Python 中,常常使用 NetworkX 库读写 GraphML 文件,GraphRAG 用的也是这个库:

import networkx as nx

# 创建无向图
G = nx.Graph()

# 添加节点
nodes = [
  ("n1", {"label": "Node 1"}),
  ("n2", {"label": "Node 2"}),
]

for name, attrs in nodes:
  G.add_node(name, **attrs)

# 添加边
relationships = [
  ("n1", "n2", {"weight": 3.5}),
]

for source, target, attrs in relationships:
  G.add_edge(source, target, **attrs)

# 将图保存为 GraphML 格式
nx.write_graphml(G, "demo.graphml", encoding='utf-8')

使用 Gephi 进行可视化

Gephi 是一款非常流行的开源图可视化和探索工具,非常适合用来分析 GraphRAG 生成的 GraphML 文件。

gephi.jpg

首先,从 Gephi 官网下载并安装最新版本,然后点击 “Open Graph File...” 导入 graph.graphml 文件,你将看到一个包含所有节点和边的基础图谱视图:

gephi-import.png

接着我们安装 Leiden Algorithm 插件,点击 工具插件,搜索 Leiden 关键词,找到并安装该插件,这是进行社区检测的关键工具:

gephi-plugin-leiden.png

然后在统计面板中找到 “平均度数” 和 “Leiden 算法”,点击右侧的 “Run” 运行分析,为后续的可视化提供数据基础。运行 “Leiden 算法” 时调整配置如下:

gephi-plugin-leiden-setting.png

有两点修改:

  • 使用 模块度(Modularity) 作为 质量函数(Quality function),在社区发现中,模块度是衡量社区划分质量的重要指标,其取值范围通常在 [-1, 1] 之间;
  • 分辨率(Resolution) 设置为 1,这个值用于控制社区检测的粒度,如果 > 1 倾向于发现更小、更细粒度的社区,如果 < 1 倾向于发现更大、更粗粒度的社区,如果 = 1 则使用标准模块度优化,产生自然大小的社区;

分析结果如下:

gephi-statistics-run.png

从图中可以看出平均度数为 2.771,模块度质量为 0.602。

度数表示每个节点连接的边数,而平均度数则表示所有节点度数的平均值,2.771 意味着平均每个实体连接到约 2.8 个其他实体,这是一个相对较低的平均度数,表明知识图谱相对稀疏,实体间连接不密集,低连接度有助于形成清晰的社区边界。

另外,0.602 是一个很好的模块度值,说明算法发现了清晰、良好分离的社区。模块度并非越高越好,通常 0.3 ~ 0.7 被广泛认为是较好的社区划分范围,表明网络中的节点确实呈现出明显的 社区内连接紧密、社区间连接稀疏 的特征,划分结果具有较强的合理性;过高的模块度(> 0.7)可能是算法过度优化的结果,甚至可能将本应属于同一社区的节点强行拆分,导致划分结果与实际结构偏离;过低的模块度(< 0.3)通常认为社区结构不明显,划分结果质量较差。

高级可视化技巧

有了上面两个统计值之后,我们就可以对图进行进一步的布局和美化。

首先,使用 Leiden 算法的结果对节点进行分区着色:

  • 通过外观面板选择 Nodes 然后 Partition,并点击右上角的调色板图标,从下拉菜单中选择 Cluster;
  • 点击 Palette 超链接,然后 Generate,取消选中 Limit number of colors ,点击 Generate ,然后 Ok,生成不限颜色数量的调色板,让每个社区都有独特的视觉标识;
  • 点击 Apply 按钮,这将根据 Leiden 发现的分区为图表着色;

gephi-cluster-coloring.png

然后,根据度数调整节点大小,这样可以直观地识别图谱中的关键实体和枢纽节点:

  • 通过外观面板选择 Nodes 然后 Ranking,并点击右上角的 Sizing 图标,从下拉菜单中选择 Degree;
  • 将最小值设置为 10,最大值设置为 50,点击 Apply 按钮;

gephi-degree-resizing.png

接着,使用布局算法对节点进行智能布局优化:

  • 在左下角的 Layout 标签中,选择 OpenORD 布局算法;
  • 将 Liquid 和 Expansion 设置为 50,其他全部设置为 0;
  • 点击 Run 进行初始排列;

gephi-openord-layout.png

  • 继续使用 Force Atlas 2 进行精细调整;
  • 缩放设置为 15,选中 Dissuade Hubs 和 Prevent Overlap;
  • 点击 Run 运行,当图节点看起来已经稳定且位置不再发生显著变化时,按下 Stop 停止;

gephi-force-atlas-2-layout.png

OpenORD 和 Force Atlas 2 都是基于 力引导(force-directed) 的图布局算法,通过模拟节点之间的作用力,基于力的作用对节点位置进行调整,如节点之间的斥力大于引力时,使节点远离,反之则使节点靠近。通过不断调整节点位置,最终使力达到平衡,从而得到图的布局。

最后,在底部的工具栏中,你可以点击黑色的 “T” 图标来显示节点标签,还可以通过旁边的滑块调整标签的大小,防止重叠。

经过这些步骤,你将获得一个结构清晰、层次分明的可视化知识图谱:使用颜色区分社区,不同颜色的节点群代表不同的主题或概念领域;大小表示重要性,较大的节点通常是连接多个概念的关键实体;位置反映关系,相近的节点在语义或功能上具有更强的关联性。

你可以通过缩放、平移来探索图的细节,识别出关键的实体和社区,并理解它们之间的复杂关系。

使用 Unified Search 探索知识图谱

GraphRAG 内置了一个可视化的 Web 应用程序 Unified Search App,它为我们提供了一个全新的交互式探索体验,与静态的 Gephi 可视化不同,Unified Search 允许用户通过直观的界面进行实时查询和探索。

其主要功能包括:

  • 搜索对比: 统一界面比较不同 GraphRAG 搜索方法的结果;
  • 多数据集支持: 通过 listing.json 管理多个 GraphRAG 索引;
  • 图探索: 可视化社区报告、实体图和选定报告;
  • 问题建议: 自动分析数据集生成重要问题;

注意,使用 Unified Search 需要预先运行 GraphRAG 索引,且必须启用图嵌入和 UMAP 配置参数。

首先,进入 unified-search-app 目录,安装所需依赖:

$ cd unified-search-app
$ uv sync --extra dev

在启动 Unified Search 之前,我们还需要创建一个 listing.json 配置文件:

[{
  "key": "ragtest-demo",
  "path": "ragtest",
  "name": "A Christmas Carol",
  "description": "Getting Started index of the novel A Christmas Carol",
  "community_level": 2
}]

Unified Search 支持多数据集,该配置文件包含每个数据集的名称、描述、位置等基本信息。注意 listing.json 文件和数据集目录处于同一级,如下:

- listing.json
- dataset_1
  - settings.yaml
  - .env
  - output
  - prompts
- dataset_2
  - settings.yaml
  - .env
  - output
  - prompts
- ...

然后通过下面的命令启动 Unified Search 应用(注意使用 DATA_ROOT 指定数据集根目录):

$ export DATA_ROOT=/the/path/to/graphrag/
$ uv run poe start

这是一个使用 Streamlit 开发的 Web 程序,启动成功后会自动打开 http://localhost:8501 页面:

unified-search-app.png

整个页面分为两个主面板。左侧为配置面板,提供了多项应用配置选项,该面板可被关闭:

  1. 数据集:以下拉菜单形式按顺序展示你在 listing.json 文件中定义的所有数据集;
  2. 建议问题数量:该选项允许用户设置生成建议问题的数量;
  3. 搜索选项:用于选择应用中启用的搜索方式,至少需要启用一种;

右侧为搜索面板,顶部显示所选数据集的常规信息,例如名称和描述。下方有一个 Suggest some questions 按钮和一个 Ask a question to compare the results 输入框,我们可以在输入框中填写待提交的问题,或者点击按钮通过全局搜索分析数据集并生成最关键的问题:

unified-search-app-suggest-questions.png

勾选问题左侧的复选框,将自动执行搜索和问答。下面有两个标签页,分别为搜索和图探索。搜索页同时展示不同搜索算法的结果,包括本地搜索、全局搜索等:

unified-search-app-search.png

图探索页分为三个模块,社区报告列表、实体图谱和已选报告详情:

unified-search-app-graph-explorer.png

结合 Unified Search 的这些功能,我们可以对图结构进行可视化分析,通过并行对比,用户可以理解不同搜索方法的优势和适用场景。

小结

今天,我们学习了如何将 GraphRAG 生成的知识图谱进行可视化。使用 Gephi 软件,我们可以进行深入的拓扑分析,通过布局、着色、调整大小等一系列操作,发现知识图谱的整体结构和关键模式;而 Unified Search 则提供了动态、交互式的探索体验,让用户能够实时查询和验证假设。

通过可视化,我们不仅能更好地理解数据的内在结构,还能发现隐藏在文本背后的模式和联系,帮助你从复杂的信息网络中提取真正有价值的洞察。


GraphRAG 快速入门

在我们之前的学习过程中,曾简单介绍过 RAGFlow 的一个高级特性 —— 提取知识图谱。通过运用 GraphRAG 和 LightRAG 技术,RAGFlow 可以从实体、关系的提取到子图的构建和合并,再到实体消歧和社区报告的生成,形成完整的知识图谱。因此,这种方法在处理涉及复杂关系与多个实体的文档时,尤其是在多跳问答场景中,表现得尤为卓越。

最早关于 GraphRAG 的概念实际上可以追溯到去年微软研究院发表的论文《From Local to Global: A Graph RAG Approach to Query-Focused Summarization》中:

随后,微软开源了 GraphRAG 项目,在 RAG 领域又掀起了一波新的浪潮:

传统的 RAG 技术通常依赖于基于向量相似度来检索原始文本块,然而,当面对需要整合离散信息以解答的复杂问题时,这种方法往往表现得力不从心。GraphRAG 则采取了一种创新的路径,首先利用大型语言模型从非结构化文本数据中提取出实体和关系,并据此构建一个知识图谱。接着,依托图谱的拓扑结构进行社区发现,对每个社区进行逐层总结,最终形成一个分层且结构化的知识网络。在进行查询时,GraphRAG 利用这一知识网络来强化信息检索,从而为模型提供更具启发性的上下文,以回答那些需要深度推理和全局视角的问题。

核心概念

GraphRAG 的流程主要分为 索引(Indexing)查询(Querying) 两个阶段:

  • 索引(Indexing):这是 GraphRAG 的数据处理阶段,它将非结构化文本转化为结构化知识。主要步骤包括:

    • 文本分块:将长文档切分成小的文本单元(TextUnit);
    • 图谱提取:利用大模型从文本单元中提取出实体(Entities)和关系(Relationships),构建知识图谱;
    • 社区发现:使用 Leiden 等算法对图谱进行层次化社区聚类;
    • 社区总结:利用大模型为每个层级的社区生成摘要报告;
  • 查询(Querying):这是 GraphRAG 的数据检索与问答阶段,利用构建好的知识图谱和摘要来回答问题。比较常见的查询方式有:

    • 全局搜索(Global Search):利用社区摘要,通过 Map-Reduce 的方式对整个数据集进行归纳总结,回答宏观问题;
    • 本地搜索(Local Search):当问题涉及特定实体时,从该实体出发,在知识图谱中向外扩展,聚合其邻近的实体、关系以及相关的原始文本块,为大模型提供精准的局部上下文;

以下是 GraphRAG 的整个流水线示意图:

graphrag-pipeline.png

今天,我们就来快速上手体验一下 GraphRAG 的基本使用。

安装

GraphRAG 是一个 Python 库,可以通过 pip 轻松安装:

$ pip install graphrag

也可以通过源码进行安装,这里我采用这种方法,这样可以一边体验 GraphRAG 一边研究它的源码。首先克隆代码仓库:

$ git clone https://github.com/microsoft/graphrag.git

进入项目目录:

$ cd graphrag

使用 uv 创建 Python 虚拟环境:

$ uv venv --python 3.10
$ source .venv/bin/activate

安装 Python 依赖:

$ uv sync --extra dev

uv 是一个极速的 Python 包和项目管理工具,uv sync 是它的一个命令,用于更新项目的环境,确保项目环境中的依赖与 uv.lock 文件一致。

快速上手

下面,我们就按照官方文档的步骤,通过一个完整的端到端示例,来体验 GraphRAG 的基本使用。首先,我们创建一个测试目录:

$ mkdir -p ./ragtest/input

该目录用于存放原始文档,GraphRAG 支持 txt、json 和 csv 三种格式的文档。

官网给的文档示例是查尔斯・狄更斯(Charles Dickens)创作的一部著名小说《圣诞颂歌》,讲述吝啬鬼斯克鲁奇(Scrooge)在圣诞节前夜经历的奇妙故事,通过三个幽灵的拜访,他的自私和冷酷逐渐崩塌,人性中的同情、仁慈、爱心及喜悦被唤醒,从此成为了一个乐善好施的人。我们可以从 古腾堡计划 下载这本书:

$ curl https://www.gutenberg.org/cache/epub/24022/pg24022.txt -o ./ragtest/input/book.txt

古腾堡计划是一个致力于创建和分发免费电子书的志愿者项目,它提供了大量版权已过期的经典文学作品,可以在上面找到很多免费的图书。

接着,使用 init 命令初始化 GraphRAG 工作空间:

$ uv run poe init --root ./ragtest

其中 poePoe The Poet 的简称,它是一个任务运行器工具,允许在 pyproject.toml 中定义和运行各种任务(类似 npm scriptsMakefile)。通过 poe 可以简化常见操作,如运行测试、启动服务、构建项目等。

如果你是通过 pip 安装的,可以直接使用 graphrag 命令:

$ graphrag init --root ./ragtest

该命令会在 ./ragtest 目录下创建两个核心配置文件以及一些提示词文件:

  • .env:用于存放环境变量,主要是 API Key;
  • settings.yaml:GraphRAG 的主配置文件,包含了数据输入、模型配置、工作流等所有设置;
  • prompts:运行过程中使用的一些提示词,用户可以对其进行微调;

GraphRAG 的核心流程严重依赖大模型,因此我们需要配置模型服务。打开刚刚生成的 .env 文件,将其中的 GRAPHRAG_API_KEY 替换成你自己的 OpenAI API Key:

GRAPHRAG_API_KEY="sk-..."

如果你使用的是 OpenAI 兼容接口,除了 API Key,还需要在 settings.yaml 文件中配置 api_base 参数;如果你使用的是 Azure OpenAI,可能还需要配置 api_versiondeployment_name 等参数:

models:
  default_chat_model:
    type: openai_chat # or azure_openai_chat
    api_base: ${GRAPHRAG_API_BASE}  # set this in the generated .env file
    # api_version: 2024-05-01-preview
    auth_type: api_key # or azure_managed_identity
    api_key: ${GRAPHRAG_API_KEY}    # set this in the generated .env file
    model: gpt-4o-mini
    # deployment_name: <azure_model_deployment_name>
  default_embedding_model:
    type: openai_embedding # or azure_openai_embedding
    api_base: ${GRAPHRAG_API_BASE}
    auth_type: api_key # or azure_managed_identity
    api_key: ${GRAPHRAG_API_KEY}
    model: text-embedding-3-small
    # deployment_name: <azure_model_deployment_name>

配置完成后,我们就可以开始构建索引了:

$ uv run poe index --root ./ragtest

这个过程会花费一些时间,具体取决于你的数据大小和模型性能。GraphRAG 会在后台执行一系列复杂的流程,包括:文本分块、实体与关系提取、图谱构建、社区发现、社区报告生成等。执行成功后,你会在 ./ragtest/output 目录下看到一系列 Parquet 格式的结果文件,这些就是我们构建好的知识图谱数据:

$ tree ./ragtest/output
├── communities.parquet        # 社区表
├── community_reports.parquet  # 社区报告
├── context.json
├── documents.parquet          # 文档表
├── embeddings.community.full_content.parquet
├── embeddings.entity.description.parquet
├── embeddings.text_unit.text.parquet
├── entities.parquet           # 实体表
├── graph.graphml              # 知识图谱
├── lancedb                    # 向量数据库
│   ├── default-community-full_content.lance
│   ├── default-entity-description.lance
│   └── default-text_unit-text.lance
├── relationships.parquet      # 关系表
├── stats.json
└── text_units.parquet         # 文本单元

GraphRAG 使用 Parquet 存储数据,这是一种列式存储的二进制文件格式,专为高效存储和处理大规模结构化数据而设计,广泛用于大数据处理和分析场景。另外,LanceDB 是一个为机器学习优化的向量数据库,使用 Apache Arrow 格式存储。GraphRAG 使用它来存储文本嵌入向量,用于相似性搜索。

索引构建完成后,我们就可以通过 query 命令来查询,GraphRAG 支持多种查询模式,我们来体验下最常用的两种。

  • 全局搜索(Global Search):适用于需要对整个数据集进行宏观理解和总结的问题:
$ uv run poe query \
    --root ./ragtest \
    --method global \
    --query "What are the top themes in this story? 用中文回答"

graphrag-query-1.png

  • 本地搜索(Local Search):适用于查询关于特定实体的具体信息:
$ uv run poe query \
    --root ./ragtest \
    --method local \
    --query "Who is Scrooge and what are his main relationships? 用中文回答"

graphrag-query-2.png

小结

今天,我们初步探索了 GraphRAG,通过动手实践,体验了其从安装、配置到索引构建和查询的完整流程。GraphRAG 通过将非结构化文本转化成结构化的知识图谱,提供了一种创新的 RAG 范式。这种方法不仅提高了复杂问题的解决能力,还为多跳问答和深度推理提供了强有力的支持。

当然,GraphRAG 的功能远不止于此,它还支持更高级的功能,如提示词自动调优、自定义工作流、图可视化等。

我们后面将结合其源码,继续探索 GraphRAG 的强大能力和实现原理。


学习 Coze Studio 的知识库检索逻辑

经过前面几天的学习,我们已经深入探讨了 Coze Studio 从知识库的创建到文档入库的完整流程。今天,我们将继续这一探索之旅,深入研究知识库的最后一个核心环节 —— 检索(Retrieval),看看在 Coze Studio 中,智能体是如何从知识库中精准高效地找到与用户问题最相关的信息的。Coze Studio 在这方面设计了一套颇为完善的检索流水线,它融合了查询重写、多路并行检索(向量、全文、NL2SQL)以及重排序等多种技术。下面就让我们一起看看它背后的实现细节。

知识库检索配置

在 Coze Studio 中,我们可以为智能体提供特定领域的背景知识,包括文本、表格和图片,让其可以回答领域内的问题:

coze-agent-kb-settings.png

我们还可以对知识库进行一些配置,以便智能体更好地对知识库进行检索:

coze-agent-kb-settings-2.png

这些配置参数包括:

  • 调用方式:分为 自动调用按需调用 两种,自动调用指的是每次对话时都会发起一次检索,将检索结果放置在内置的系统提示词中,而按需调用则是将知识库检索封装成一个工具供智能体调用,只有当用户问题确实需要检索知识库时才触发调用。实际上,在开源版本中暂时只支持自动调用,我们之前在学习智能体执行逻辑时已经看到了,通过 Graph API 编排出来的智能体图,知识库检索是必然执行的节点之一,而且在源码中搜索 recallKnowledge 工具,也找不到其他地方用到;
  • 搜索策略:分为 语义全文混合 三种,语义表示基于向量的文本相关性查询,推荐在需要理解语义关联度和跨语言查询的场景使用,全文表示依赖于关键词的全文搜索,推荐在搜索具有特定名称、缩写词、短语或 ID 的场景使用,混合表示结合全文检索与语义检索的优势,并对结果进行综合排序。
  • 最大召回数量:从知识库中返回给大模型的最大段落数,注意默认是 1,可以适当调大一点;
  • 最小匹配度:根据设置的匹配度选取段落返回给大模型,低于设定匹配度的内容不会被召回;
  • 表格 SQL 查询:同时将查询的自然语言转为 SQL 语句进行查询,SQL 执行结果与 RAG 召回段落一同输入给模型;该选项仅对表格知识库有效;
  • 查询改写:结合对话上下文,对用户的问题进行改写,使得改写后的查询更适合检索;
  • 结果重排:通过分析用户查询的意图,对召回结果重新排序,使得最相关的内容排在前面;

知识库检索流程

回顾之前学习的智能体执行逻辑,其中有一路负责知识库的检索:

// 新建知识库检索器
kr, err := newKnowledgeRetriever(ctx, &retrieverConfig{
  knowledgeConfig: conf.Agent.Knowledge,
})

// 第一个节点检索
_ = g.AddLambdaNode(keyOfKnowledgeRetriever,
  compose.InvokableLambda[*AgentRequest, []*schema.Document](kr.Retrieve),
  compose.WithNodeName(keyOfKnowledgeRetriever))

// 第二个节点组装成字符串
_ = g.AddLambdaNode(keyOfKnowledgeRetrieverPack,
  compose.InvokableLambda[[]*schema.Document, string](kr.PackRetrieveResultInfo),
  compose.WithOutputKey(placeholderOfKnowledge),
)

首先新建一个知识库检索器,将其 Retrieve() 方法作为第一个节点,知识库检索后,该方法返回 []*schema.Document 文档列表,因此还需要第二个节点,调用 PackRetrieveResultInfo 将其组装成字符串,替换系统提示词模版中的 {{ knowledge }} 占位符。

可以看出只要智能体关联了知识库,用户每次对话时总是会触发一次知识库检索,检索的逻辑位于 knowledge 领域层,如下:

func (k *knowledgeSVC) Retrieve(ctx context.Context, request *RetrieveRequest) (response *RetrieveResponse, err error) {

  // 检索上下文
  retrieveContext, err := k.newRetrieveContext(ctx, request)

  // 查询重写
  rewriteNode := compose.InvokableLambda(k.queryRewriteNode)
  // 向量化召回
  vectorRetrieveNode := compose.InvokableLambda(k.vectorRetrieveNode)
  // ES召回
  EsRetrieveNode := compose.InvokableLambda(k.esRetrieveNode)
  // Nl2Sql召回
  Nl2SqlRetrieveNode := compose.InvokableLambda(k.nl2SqlRetrieveNode)
  // 用户查询透传
  passRequestContextNode := compose.InvokableLambda(k.passRequestContext)
  // 重排序
  reRankNode := compose.InvokableLambda(k.reRankNode)
  // 打包查询结果
  packResult := compose.InvokableLambda(k.packResults)

  // 多路召回
  parallelNode := compose.NewParallel().
    AddLambda("vectorRetrieveNode", vectorRetrieveNode).
    AddLambda("esRetrieveNode", EsRetrieveNode).
    AddLambda("nl2SqlRetrieveNode", Nl2SqlRetrieveNode).
    AddLambda("passRequestContext", passRequestContextNode)

  // 编排检索链
  chain := compose.NewChain[*RetrieveContext, []*knowledgeModel.RetrieveSlice]()
  r, err := chain.
    AppendLambda(rewriteNode).
    AppendParallel(parallelNode).
    AppendLambda(reRankNode).
    AppendLambda(packResult).
    Compile(ctx)
  // 调用链
  output, err := r.Invoke(ctx, retrieveContext)
  return &RetrieveResponse{
    RetrieveSlices: output,
  }, nil
}

可以看到,这里使用了 Eino 的编排(Compose)技术,通过 Chain API 实现了多路检索 + 重排序的知识库检索流程:

coze-studio-kb-retriever-flow.png

下面将依次学习下检索流程中各个节点的实现逻辑。

查询重写

该节点只有在启用查询重写且有聊天历史时才会执行,其作用为对用户原始查询进行改写优化,结合聊天上下文,将用户的当前查询改写成更适合检索的查询语句。查询重写的目的主要有两个:

  1. 使模糊问题更明确:比如用户的问题是 听说最近武汉的樱花挺不错,打算下周二带家人去看看,不知道那天天气怎么样?,需要把问题改成更明确的 武汉下周二天气怎么样?
  2. 多轮对话中的指代消解:比如用户先问 合肥明天的天气怎么样?,系统回答之后,用户接着又问 那后天呢?,这时要把问题改写成 合肥后天的天气怎么样?

其核心就一段 Prompt,位于 backend/conf/prompt/messages_to_query_template_jinja2.json 配置文件:

# 角色:
你是一名专业的查询重构工程师,擅长根据用户提供的上下文信息重写查询语句,使其更清晰、更完整,并与用户的意图相符。你应使用与用户输入相同的语言进行回复。

## 输出格式:
输出内容应为重构后的查询语句,以纯文本格式呈现。

## 示例:
示例 1:
输入:
[
  {
    "role": "user",
    "content": "世界上最大的沙漠在哪里?"
  },
  {
    "role": "assistant",
    "content": "世界上最大的沙漠是撒哈拉沙漠。"
  },
  {
    "role": "user",
    "content": "怎么去那里?"
  }
]
输出:怎么去撒哈拉沙漠?

示例 2:
输入:
[
  {
    "role": "user",
    "content": "分析当前网红欺骗公众以获取流量对当今社会的影响。"
  }
]
输出:当前网红欺骗公众以获取流量,分析这一现象对当今社会的影响。

值得注意的是,开启查询重写功能需要配置 AI 生成模型,可以打开 .env 文件,配置如下:

export BUILTIN_CM_TYPE="openai"

或者单独配置 M2Q 模型:

export M2Q_BUILTIN_CM_TYPE="openai"

由于修改的是环境变量,通过 --force-recreate 重启服务:

$ docker compose --profile '*' up -d --force-recreate --no-deps coze-server

多路检索策略

检索流程中的第二部分包含四个并行执行的子节点:

  • 向量检索节点:进行语义相似度检索,使用向量数据库,基于语义相似度查找相关文档;当搜索策略设置为纯全文搜索时跳过;
  • ES 检索节点:进行全文文本检索,使用 Elasticsearch,基于关键词匹配查找相关文档;当搜索策略设置为纯语义搜索时跳过;
  • NL2SQL 检索节点:专门处理表格数据的检索,将自然语言查询转换为 SQL 语句,查询结构化表格数据;只有在启用 NL2SQL 且存在表格文档时执行;
  • 上下文传递节点:传递原始请求上下文,确保后续节点能够访问完整的检索上下文信息;由于这里使用的是 Chain API,它是基于 Graph API 实现的,不支持跨节点传递参数;

这里的向量检索和 ES 检索都没什么特别的,主要看下 NL2SQL 检索的实现:

func (k *knowledgeSVC) nl2SqlExec(ctx context.Context, doc *model.KnowledgeDocument, retrieveCtx *RetrieveContext, opts []nl2sql.Option) (retrieveResult []*schema.Document, err error) {
    
  // 调用大模型,将自然语言查询转换为 SQL 语句
  sql, err := k.nl2Sql.NL2SQL(ctx, retrieveCtx.ChatHistory, []*document.TableSchema{packNL2SqlRequest(doc)}, opts...)

  // 对 SQL 中表名和列名进行替换
  parsedSQL, err := sqlparser.NewSQLParser().ParseAndModifySQL(sql, replaceMap)

  // 执行 SQL
  resp, err := k.rdb.ExecuteSQL(ctx, &rdb.ExecuteSQLRequest{
    SQL: parsedSQL,
  })

  return retrieveResult, nil
}

第一步仍然是调用 AI 生成大模型,因此也需要配置 AI 生成模型:

export BUILTIN_CM_TYPE="openai"

或单独配置 NL2SQL 模型:

export NL2SQL_BUILTIN_CM_TYPE="openai"

其核心也是一段 Prompt,位于 backend/conf/prompt/nl2sql_template_jinja2.json 配置文件:

角色:NL2SQL顾问

目标:
将自然语言陈述转换为MySQL标准的SQL查询。遵循约束条件,且仅返回JSON格式。

格式:
- 仅JSON格式。JSON包含字段“sql”(用于生成的SQL)、字段“err_code”(用于原因类型)、字段“err_msg”(用于详细原因,最好超过10个字)
- 不要使用“```json”标记格式

技能:
- 擅长将自然语言陈述转换为MySQL标准的SQL查询。

定义:
“err_code”原因类型定义:
- 0表示已生成SQL
- 3002表示因超时无法生成SQL
- 3003表示因缺少表结构无法生成SQL
- 3005表示因某些术语不明确无法生成SQL

示例:
问:帮我实现NL2SQL。
表结构描述:CREATE TABLE `sales_records` (
  `sales_id` bigint(20) unsigned NOT NULL COMMENT '销售员ID',
  `product_id` bigint(64) COMMENT '产品ID',
  `sale_date` datetime(3) COMMENT '销售日期和时间',
  `quantity_sold` int(11) COMMENT '销售量',
  PRIMARY KEY (`sales_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='销售记录表';
SQL需求的自然语言描述:查询上月的销量总额第一名的销售员和他的销售总额
答:{
  "sql":"SELECT sales_id, SUM(quantity_sold) AS total_sales FROM sales_records WHERE MONTH(sale_date) = MONTH(CURRENT_DATE - INTERVAL 1 MONTH) AND YEAR(sale_date) = YEAR(CURRENT_DATE - INTERVAL 1 MONTH) GROUP BY sales_id ORDER BY total_sales DESC LIMIT 1",
  "err_code":0,
  "err_msg":"SQL查询生成成功"
}

值得一提的是,给大模型的表名和列名是原始的名称:

SELECT 成绩 FROM 学生信息 WHERE 姓名 = '胡阳';

而我们之前学习表格知识库入库逻辑时提到过,Coze Studio 会动态生成一个表名和列名,都是 ID 格式,如下:

create-table-db.png

因此还需要将大模型生成的 SQL 语句中的表名和列名做一个替换,这里使用的是 TiDB 的 SQL Parser 对 SQL 语句进行解析和替换,如果有类似需求的话,可以参考下这块的逻辑。

重排序

多路召回后,为了使得和用户查询最相关的内容排在前面,我们还需要对召回结果重新排序。Coze Studio 提供了两种重排序实现:

  • RRF 实现:基于倒序排名融合算法,通过计算每个文档在各个检索结果中的排名来综合评分;
  • VikingDB 实现:调用火山引擎的重排序 API 接口;

不过看代码当前只使用了 RRF 重排序,RRF 全称为 Reciprocal Rank Fusion(倒数排名融合),是滑铁卢大学和谷歌合作开发的一种算法,它可以将具有不同相关性指标的多个结果集组合成单个结果集,感兴趣的同学可以看下它的论文:

其中最关键的部分就是下面这个公式:

rrf-score.png

其中,D 表示文档集,R 是从 1 到 |D| 的排列,k 是一个常量,默认值为 60,r(d) 表示文档在某个检索结果中的排名。从 RRF 分数的计算中,我们可以看出,RRF 不依赖于每次检索分配的绝对分数,而是依赖于相对排名,这使得它非常适合组合来自可能具有不同分数尺度或分布的查询结果。

小结

今天,我们学习了 Coze Studio 知识库检索的完整流程,通过 Eino 框架编排出一套包含查询重写、多路并行检索以及结果重排的检索流水线。其核心技术点可总结如下:

  • 查询重写:利用大模型结合对话上下文,对用户的原始问题进行优化和消歧,生成更适合机器检索的查询语句;
  • 多路并行检索:同时发起向量、全文和 NL2SQL 三种检索方式,最大化地召回相关信息,并通过并行处理提升检索效率;
  • 结果重排:采用先进的 RRF 算法,对来自不同检索通路的结果进行智能融合与排序,确保最相关的内容能够优先呈现给大模型;

至此,我们对 Coze Studio 的学习之旅也要告一段落了。从快速上手、基本功能实战,到深入后端源码,我们系统地剖析了其智能体、插件、工作流、知识库等核心模块的设计与实现。希望这一系列的文章能帮助大家对 Coze Studio 有一个全面而深入的理解,并为大家利用其构建自己的 AI 应用提供有价值的参考。


学习 Coze Studio 的知识库入库逻辑(续)

书接上文,当用户在知识库上传文档、表格或图片,然后对知识库进行设置和确认后,Coze Studio 通过发送 IndexDocuments 事件开始了文档的异步处理。这个事件被发送到消息队列后,由 knowledge 领域的事件处理器消费,我们今天就来看下这块的逻辑。

事件处理器

事件处理器的代码位于 backend/domain/knowledge/service/event_handle.go 文件,这是一个 switch 结构,负责分发知识库相关的各类事件,比如批量文档处理(IndexDocuments)、单个文档处理(IndexDocument)、修改文档分片(IndexSlice)、删除知识库数据(DeleteKnowledgeData)、文档预览(DocumentReview)等:

func (k *knowledgeSVC) HandleMessage(ctx context.Context, msg *eventbus.Message) (err error) {

  event := &entity.Event{}
  err = sonic.Unmarshal(msg.Body, event)

  // 事件处理器
  switch event.Type {
  case entity.EventTypeIndexDocuments:
    // 批量文档处理
    k.indexDocuments(ctx, event)
  case entity.EventTypeIndexDocument:
    // 单个文档处理
    k.indexDocument(ctx, event)
  case entity.EventTypeIndexSlice:
    // 修改文档分片
    k.indexSlice(ctx, event)
  case entity.EventTypeDeleteKnowledgeData:
    // 删除知识库数据
    k.deleteKnowledgeDataEventHandler(ctx, event)
  case entity.EventTypeDocumentReview:
    // 文档预览
    k.documentReviewEventHandler(ctx, event)
  }
  return nil
}

其中批量文档处理(IndexDocuments)事件的逻辑很简单,就是遍历所有文档,为每个文档单独发送一个单文档处理(IndexDocument)事件。

文档处理流程

单文档处理(IndexDocument)事件的代码位于 indexDocument() 函数,这个处理函数是整个知识库入库流程的核心,其执行流程可分为以下几个关键步骤:

  1. 状态检查与清理

    • 首先检查知识库和文档是否处于可写入状态,防止并发操作导致数据不一致;
    • 如果不是追加模式,会先清除该文档在数据库(knowledge_document_slice 表)和向量数据库中的旧数据;
  2. 解析与分片

    • 将文档状态设置为分片中(Chunking);
    • 从对象存储(TOS)下载文档的原始文件;
    • 根据文档类型(文本、图片、表格)获取对应的解析器(Parser),对文件内容进行解析和分片,生成一系列文档分片(schema.Document);
  3. 数据持久化

    • 为每个文档分片生成唯一的 ID;
    • 将文档分片批量存入 knowledge_document_slice 数据表中;
    • 对于表格类型,还会将解析出的结构化数据行插入到之前动态创建的物理数据表中;
  4. 向量化与索引

    • 将持久化后的文档分片转换为向量数据库要求的格式;
    • 调用 searchstoreStore 方法,将文档分片进行向量化,并存入 Milvus 等向量数据库中。这个过程会根据文档 ID 进行分区(Partition),便于后续的高效检索和管理;
  5. 状态更新

    • 所有文档分片成功存入向量数据库后,将其在 knowledge_document_slice 表中的状态更新为 Done
    • 最后,将 knowledge_document 表中对应文档的状态更新为 Enable,表示该文档已处理完成并可供检索;

接下来,我们详细看看每一步的实现逻辑。

状态检查与清理

// 检查知识库和文档状态
if valid, err := k.isWritableKnowledgeAndDocument(ctx, doc.KnowledgeID, doc.ID); err != nil {
  return err
} else if !valid {
  return errorx.New(errno.ErrKnowledgeNonRetryableCode, ...)
}

// 清除旧数据
collectionName := getCollectionName(doc.KnowledgeID)
if !doc.IsAppend {
  ids, err := k.sliceRepo.GetDocumentSliceIDs(ctx, []int64{doc.ID})
  if len(ids) > 0 {
    // 删除分片记录
    err = k.sliceRepo.DeleteByDocument(ctx, doc.ID)
    for _, manager := range k.searchStoreManagers {
      // 删除 search store 中的数据
      s, err := manager.GetSearchStore(ctx, collectionName)
      s.Delete(ctx, slices.Transform(event.SliceIDs, func(id int64) string {
        return strconv.FormatInt(id, 10)
      }))
    }
  }
}

首先通过 isWritableKnowledgeAndDocument() 函数分别检查知识库和文档的状态是否可写(不是被禁用或被删除),只有两者都可写时才继续后面的流程。然后如果是非追加模式,查询当前文档在 knowledge_document_slice 表中是否已存在分片记录,如果存在,则删除该文档的所有分片,同时删除该文档在搜索存储中的数据。

这里的 搜索存储(Search Store) 是 Coze Studio 对搜索引擎的统一抽象层,为不同的数据库(Milvus、Elasticsearch、VikingDB)提供统一的操作接口,它继承 indexer.Indexerretriever.Retriever,支持向量或文本的存储和索引构建,并提供基于向量或文本的相似性搜索能力。要注意的是,搜索存储要通过 Manager 的 GetSearchStore() 来获取,Manager 是搜索存储的管理器,负责管理搜索存储的生命周期,比如创建、删除、获取等。目前支持全文搜索和向量搜索两种 Manager 如下:

var sManagers []searchstore.Manager

// es full text search
sManagers = append(sManagers, sses.NewManager(&sses.ManagerConfig{Client: c.ES}))

// vector search
mgr, err := getVectorStore(ctx)
sManagers = append(sManagers, mgr)

其中向量搜索根据 VECTOR_STORE_TYPE 环境变量可选 Milvus 或 VikingDB,默认是 Milvus 数据库。

解析与分片

// 将文档状态设置为分片中
k.documentRepo.SetStatus(ctx, doc.ID, int32(entity.DocumentStatusChunking), "")

// 从对象存储下载文档的原始文件
bodyBytes, err := k.storage.GetObject(ctx, doc.URI)

// 根据文档类型(文本、图片、表格)获取对应的解析器
docParser, err := k.parseManager.GetParser(convert.DocumentToParseConfig(doc))

// 对文件内容进行解析和分片,生成一系列文档分片
parseResult, err := docParser.Parse(ctx, bytes.NewReader(bodyBytes), parser.WithExtraMeta(map[string]any{
  document.MetaDataKeyCreatorID: doc.CreatorID,
  document.MetaDataKeyExternalStorage: map[string]any{
    "document_id": doc.ID,
  },
}))

接下来,从对象存储下载文档的原始文件,然后对文件内容进行解析和分片。根据不同的文档类型,Coze Studio 提供了不同的解析器:

func (m *manager) GetParser(config *parser.Config) (parser.Parser, error) {
  var pFn parseFn
  switch config.FileExtension {
  case parser.FileExtensionPDF:
    pFn = parseByPython(config, m.storage, m.ocr, goutil.GetPython3Path(), goutil.GetPythonFilePath("parse_pdf.py"))
  case parser.FileExtensionTXT:
    pFn = parseText(config)
  case parser.FileExtensionMarkdown:
    pFn = parseMarkdown(config, m.storage, m.ocr)
  case parser.FileExtensionDocx:
    pFn = parseByPython(config, m.storage, m.ocr, goutil.GetPython3Path(), goutil.GetPythonFilePath("parse_docx.py"))
  case parser.FileExtensionCSV:
    pFn = parseCSV(config)
  case parser.FileExtensionXLSX:
    pFn = parseXLSX(config)
  case parser.FileExtensionJSON:
    pFn = parseJSON(config)
  case parser.FileExtensionJsonMaps:
    pFn = parseJSONMaps(config)
  case parser.FileExtensionJPG, parser.FileExtensionJPEG, parser.FileExtensionPNG:
    pFn = parseImage(config, m.model)
  default:
    return nil, fmt.Errorf("[Parse] document type not support, type=%s", config.FileExtension)
  }
  return &p{parseFn: pFn}, nil
}

解析器做的比较通用,将文本、表格、图片知识库的所有配置参数混在一起,不用区分知识库是什么类型。这里对这些解析器的实现流程做个大概的总结:

  • PDF 解析器:运行 Python 脚本 parse_pdf.py 从 PDF 文件中提取出文本、图片、表格等内容,主要使用的是 pdfplumber 库;文本内容调用 chunkCustom 进行分块;图片转为 base64 存储到对象存储,如果开启了 OCR 功能,还调用 OCR 接口将图片转换为文本;表格数据则转为 CSV 格式处理;
  • TXT 解析器:直接读取文件内容,调用 chunkCustom 进行文本分块;
  • Markdown 解析器:使用 yuin/goldmark 将 Markdown 文件解析成 AST,遍历文本节点,按分隔符和块大小分块,支持自动下载图片并保存到对象存储,如果开启了 OCR 功能,也会调用 OCR 接口做文字识别;
  • Docx 解析器:通过运行 Python 脚本 parse_docx.py 从 DOCX 文件中提取文本、图片和表格内容,主要使用的是 python-docx 库;之后的逻辑和 PDF 解析器一样;
  • CSV 解析器:通过 encoding/csv 逐行读取 CSV 数据,支持自动处理 BOM 头,转换为行迭代器统一处理;
  • XLSX 解析器:通过 qax-os/excelize 库打开 Excel 文件,逐行读取,并自动补齐列数,转换为行迭代器统一处理;
  • JSON 解析器:解析 JSON 数组为 map 切片,支持动态提取表头,即首个对象的 key,支持自定义列配置,转换为行迭代器统一处理;
  • 图片解析器:读取图片,转为 base64,并调用多模态大模型生成图片描述,支持手动标注模式;

可以看出,Coze Studio 对常见的文件类型都提供了很好的支持:对于 PDF 和 Word 等复杂文档,使用 Python 来处理,通过 Go 与 Python 脚本的管道通信机制;对于表格数据,都使用了行迭代器统一处理;对于图像数据,使用 OCR 进行文字识别,或调用多模态大模型生成图片描述。

我们之前在介绍 Eino 组件时学习过,Eino Ext 内置了一些文档处理类的组件,支持解析 TXT、HTML、DOCX、XLSX、PDF 等格式的文件。但是很显然,Go 在这方面的生态相比于 Python 来说还不够成熟,因此 Coze Studio 采用了 Go + Python 这种折中的方式。

数据持久化

// 批量生成所有文档分片的 ID
allIDs := make([]int64, 0, len(parseResult))
for l := 0; l < len(parseResult); l += 100 {
    ids, err := k.idgen.GenMultiIDs(ctx, batchSize)
    allIDs = append(allIDs, ids...)
}

// 将 parseResult 转换为 sliceModels
sliceModels := make([]*model.KnowledgeDocumentSlice, 0, len(parseResult))
for i, src := range parseResult {
    sliceModel := &model.KnowledgeDocumentSlice{
        ID:          allIDs[i],
        KnowledgeID: doc.KnowledgeID,
        DocumentID:  doc.ID,
        Content:     parseResult[i].Content,
        // 将分片状态设置为处理中(Processing)
        Status:      int32(model.SliceStatusProcessing),
    }
    // 表格类型的分片 特殊处理
    if doc.Type == knowledge.DocumentTypeTable {
        sliceEntity, err := convertFn(src, doc.KnowledgeID, doc.ID, doc.CreatorID)
        sliceModel.Content = sliceEntity.GetSliceContent()
    }
    sliceModels = append(sliceModels, sliceModel)
}

// 批量保存文档分片
err = k.sliceRepo.BatchCreate(ctx, sliceModels)

// 保存表格类型的数据
if doc.Type == knowledge.DocumentTypeTable {
    err = k.upsertDataToTable(ctx, &doc.TableInfo, sliceEntities)
}

经过上一步后,我们得到了一系列的文档分片 parseResult,接下来的逻辑就是将其批量存入 knowledge_document_slice 数据表中。首先通过 GenMultiIDs 为每个分片生成唯一的 ID,这里使用批量生成的方式,一次生成 100 个;然后将 parseResult 转换为 KnowledgeDocumentSlice 类型,并将分片状态设置为处理中(SliceStatusProcessing);最后调用 sliceRepo.BatchCreate 批量保存。

这里值得注意的一点是对表格数据的处理,表格按行分片,内容是一行数据的 JSON 格式,Key 为列名,Value 为单元格的值,如下:

table-slice.png

此外,我们回顾下昨天学习文档处理器的内容:对于表格知识库,首次插入文档时,除了会新建一条 knowledge_document 记录,还会根据 Excel 的列结构动态地创建一个物理数据表;这里在解析完表格之后,表格的分片数据还会插入到这个物理数据表中:

create-table-db.png

向量化与索引

// 字段列表和索引字段
fields, err := k.mapSearchFields(doc)
indexingFields := getIndexingFields(fields)

for _, manager := range k.searchStoreManagers {
  // 为每个知识库创建一个独立的集合或索引
  manager.Create(ctx, &searchstore.CreateRequest{
    CollectionName: collectionName,
    Fields:         fields,
    CollectionMeta: nil,
  })
  // 将文档分片保存到搜索存储中(Milvus、Elasticsearch、VikingDB)
  ss, err := manager.GetSearchStore(ctx, collectionName)
  _, err = ss.Store(ctx, ssDocs,
    searchstore.WithIndexerPartitionKey(fieldNameDocumentID),
    searchstore.WithPartition(strconv.FormatInt(doc.ID, 10)),
    searchstore.WithIndexingFields(indexingFields),
  )
}

接下来,我们还需要将文档分片保存到搜索存储(Milvus、Elasticsearch、VikingDB)中。首先,使用 Manager 的 Create 方法为每个知识库创建一个独立的集合,集合的名字为 opencoze_<kb_id>,每种搜索存储的概念不太一样,创建集合的方式也是不一样的。比如在 Milvus 中,分为创建集合、创建索引和加载集合三步,确保 Milvus 集合加载到内存中,使其可用于搜索操作:

func (m *milvusManager) Create(ctx context.Context, req *searchstore.CreateRequest) error {
  // 创建集合
  if err := m.createCollection(ctx, req); err != nil {
    return fmt.Errorf("[Create] create collection failed, %w", err)
  }
  // 创建索引
  if err := m.createIndexes(ctx, req); err != nil {
    return fmt.Errorf("[Create] create indexes failed, %w", err)
  }
  // 加载集合
  if exists, err := m.loadCollection(ctx, req.CollectionName); err != nil {
    return fmt.Errorf("[Create] load collection failed, %w", err)
  } else if !exists {
    return fmt.Errorf("[Create] load collection failed, collection=%v does not exist", req.CollectionName)
  }
  return nil
}

在 Milvus 向量数据库中,load_collection 是一个非常重要的操作,其主要作用是将指定的集合(Collection)从磁盘加载到内存中,以便进行高效的向量检索和查询操作,刚创建或刚插入数据的集合需要先执行 load_collection 后才能进行查询操作。

而在 Elasticsearch 中,直接创建索引即可:

func (e *esManager) Create(ctx context.Context, req *searchstore.CreateRequest) error {
  cli := e.config.Client
  index := req.CollectionName
  // 创建索引
  err = cli.CreateIndex(ctx, index, properties)
  return err
}

在 Milvus 中,每个知识库对应一个集合:

milvus-indexes.png

根据文档类型创建对应的字段和索引:

milvus-indexes-schema.png

而在 Elasticsearch 中,每个知识库对应一个索引:

es-indexes.png

然后,将文档分片依次保存到搜索存储中,对于 Milvus 来说,会有一个向量列,通过 Embedding 计算:

milvus-indexes-data.png

而 Elasticsearch 则直接存储文档分片:

es-indexes-data.png

整个过程中规中矩,并没有多少需要特别关注的地方。其中 Embedding 我们之前介绍过,支持三种接入方式:

  • OpenAI - 兼容 OpenAI 协议的 Embedding 接口;
  • ARK - 火山引擎提供的 Embedding 服务,支持多种模型,如 doubao-embeddingdoubao-embedding-largedoubao-embedding-visionbge-large-zhbge-m3bge-visualized-m3 等;其中 bge-m3 比较特殊,它是唯一一个 同时支持稠密向量和稀疏向量 的模型,如果使用这个模型,在 Milvus 集合中不仅会创建 dense_text_content 稠密向量列,还会创建一个 sparse_text_content 稀疏向量列;
  • HTTP - 调用本地部署的模型服务,需要满足 Coze 自己的一套 接口协议,暂不支持 Ollama 或 Xinference 协议;

状态更新

// 将分片状态设置为已完成(Done)
err = k.sliceRepo.BatchSetStatus(ctx, allIDs, int32(model.SliceStatusDone), "")

// 将文档状态设置为启用(Enable)
err = k.documentRepo.SetStatus(ctx, doc.ID, int32(entity.DocumentStatusEnable), "")

// 更新文档的分片信息
// 1. 统计文档的分片数量
// 2. 计算文档所有分片内容的总大小
err = k.documentRepo.UpdateDocumentSliceInfo(ctx, event.Document.ID)

最后一步,将分片状态设置为已完成(Done),将文档状态设置为启用(Enable),同时更新文档的分片信息,包括分片数量和分片大小,对应 knowledge_document 表中的 slice_countsize 字段。

小结

今天我们接续昨天的内容,深入探究了 Coze Studio 知识库文档入库的异步处理全流程。整个过程由消息队列驱动,其核心逻辑总结如下:

  • 事件驱动架构:文档上传后,系统通过发送 IndexDocuments 事件触发异步处理流程,该事件再分发为针对单个文档的 IndexDocument 事件,由专门的事件处理器进行消费和处理;
  • 核心处理流水线indexDocument 函数是整个流程的核心,它编排了从文档解析到最终索引完成的五个关键步骤:状态检查与清理、解析与分片、数据持久化、向量化与索引,以及最终的状态更新;
  • 多格式文档解析:系统内置了针对不同文件类型(PDF, DOCX, Markdown, 表格, 图片等)的精细化解析器。特别地,对于 PDF、DOCX 等复杂格式,结合了 Python 脚本进行处理,弥补了 Go 生态在文档解析方面的不足;
  • 双重存储策略:解析后的文档分片不仅被持久化到 MySQL 数据库(knowledge_document_slice 表)中,同时也被向量化并存入由 Search Store 抽象层管理的 Milvus 或 Elasticsearch 中,实现了元数据与向量索引的分离存储;

至此,一份用户上传的文档就完成了从解析、切片、存储到向量化的完整入库流程,可以被智能体检索和使用了。


学习 Coze Studio 的知识库入库逻辑

经过前面几天的学习,我们已经深入探讨了 Coze Studio 的智能体、插件、工作流等核心功能的实现原理。今天,我们将继续探索 Coze Studio 的知识库功能,学习当我们在前端页面上点击 “创建知识库” 并上传文档时,后端服务是如何处理这些入库请求的。

知识库创建接口

我们知道,创建知识库是构建 RAG 应用的第一步,Coze Studio 支持 文本表格图片 三种不同类型的知识库:

coze-kb-create.png

这三种知识库的创建流程基本类似,都分为两步:

  1. 输入名称、描述、图标和导入类型,点击 “完成创建” 或 “创建并导入”,这一步调用 /api/knowledge/create 接口,创建的知识库实例对应 knowledge 数据表;
  2. 上传文档、表格或图片,对知识库进行设置,预览,然后开始对文档进行处理,这一步调用 /api/knowledge/document/create 接口,上传的文档对应 knowledge_document 数据表;

下面我们着重看下第二个接口的实现逻辑,接口层代码位于 backend/api/handler/coze/knowledge_service.go 文件:

// @router /api/knowledge/document/create [POST]
func CreateDocument(ctx context.Context, c *app.RequestContext) {

  // 绑定入参
  var req dataset.CreateDocumentRequest
  err = c.BindAndValidate(&req)

  // 调用 knowledge 应用层
  resp := new(dataset.CreateDocumentResponse)
  resp, err = application.KnowledgeSVC.CreateDocument(ctx, &req)
  c.JSON(consts.StatusOK, resp)
}

然后,调用 knowledge 应用层:

func (k *KnowledgeApplicationService) CreateDocument(ctx context.Context, req *dataset.CreateDocumentRequest) (*dataset.CreateDocumentResponse, error) {

  // 调用 knowledge 领域层
  resp := dataset.NewCreateDocumentResponse()
  createResp, err := k.DomainSVC.CreateDocument(ctx, &service.CreateDocumentRequest{
    Documents: documents,
  })
  return resp, nil
}

紧接着,做一些参数的转换,再调用 knowledge 领域层,这里是文档处理的核心:

func (k *knowledgeSVC) CreateDocument(ctx context.Context, request *CreateDocumentRequest) (response *CreateDocumentResponse, err error) {

  // 根据不同的知识库类型,创建对应的文档处理器
  docProcessor := impl.NewDocProcessor(ctx, &impl.DocProcessorConfig{
    ...
  })

  // 1. 前置的动作,上传 tos 等
  err = docProcessor.BeforeCreate()
  // 2. 构建 落库
  err = docProcessor.BuildDBModel()
  // 3. 插入数据库
  err = docProcessor.InsertDBModel()
  // 4. 发起索引任务
  err = docProcessor.Indexing()
  // 5. 返回处理后的文档信息
  docs := docProcessor.GetResp()
  return &CreateDocumentResponse{
    Documents: docs,
  }, nil
}

文档处理器

Coze Studio 使用工厂模式根据文档来源和类型创建不同的文档处理器:

  • customDocProcessor: 自定义内容 + 非表格类型
  • customTableProcessor: 自定义内容 + 表格类型
  • baseDocProcessor: 本地文件 + 非表格类型 或 默认
  • localTableProcessor: 本地文件 + 表格类型

其中,本地文件指的是用户上传文件,自定义内容指的是手动录入文档内容。所有处理器都实现相同的接口:BeforeCreateBuildDBModelInsertDBModelIndexingGetResp,流程图如下:

create-document-flow.png

前置动作

前置动作 BeforeCreate 是文档创建流程中的第一个步骤,主要是对自定义内容和表格进行处理,比如在 customDocProcessor 中,将用户输入的文本内容上传到对象存储:

func (c *customDocProcessor) BeforeCreate() error {
  for i := range c.Documents {
    // 文件扩展名
    c.Documents[i].FileExtension = getFormatType(c.Documents[i].Type)
    // 根据当前时间为文档生成唯一的存储路径
    uri := getTosUri(c.UserID, string(c.Documents[i].FileExtension))
    // 把用户直接输入的文本内容持久化到存储系统
    _ := c.storage.PutObject(c.ctx, uri, []byte(c.Documents[i].RawContent))
    c.Documents[i].URI = uri
  }
  return nil
}

其中文件扩展名只支持 .txt.json 两种,分别对应文本知识库和表格知识库:

func getFormatType(tp knowledge.DocumentType) parser.FileExtension {
  docType := parser.FileExtensionTXT
  if tp == knowledge.DocumentTypeTable {
    docType = parser.FileExtensionJSON
  }
  return docType
}

生成的文件位于 FileBizType.Knowledge 目录下,文件名由用户 ID 和当前时间组成:

func getTosUri(userID int64, fileType string) string {
  fileName := fmt.Sprintf("FileBizType.Knowledge/%d_%d.%s", userID, time.Now().UnixNano(), fileType)
  return fileName
}

可以在 Minio 的 opencoze 桶下找到对应的文件:

minio-file-biz-type.png

构建数据模型

第二步 BuildDBModel 比较简单,构建文档的数据模型,为后面插入 knowledge_document 数据表做准备。这里有两点提一下:

  1. 文档 ID 使用 idgen.GenMultiIDs() 批量生成,通过 Redis 实现,参考 backend/infra/impl/idgen/idgen.go 文件;
  2. 对于表格型知识库,上传的表格文件必须具备相同的数据结构,如果往已存在的表格知识库中添加文档,也就是追加模式,这时会复用之前表格的数据结构,直接跳过插入数据表这一步;这也意味着,每个表格知识库里只会有一条文档记录,这条记录的 table_info 字段至关重要,参见下一节;

插入数据

第三步 InsertDBModel 将上一步构建好的文档记录批量插入到 knowledge_document 数据表中:

func (p *baseDocProcessor) InsertDBModel() (err error) {
  
  // 如果是表格知识库,且不是追加模式
  // 说明是首次创建,使用 CREATE TABLE 创建一个数据表
  if !isTableAppend(p.Documents) {
    err = p.createTable()
  }

  // 开启事务
  tx, err := p.knowledgeRepo.InitTx()
  
  defer func() {
    if err != nil {
      // 出错时回滚
      tx.Rollback()
    } else {
      // 正常时提交
      tx.Commit()
    }
  }()

  // 批量插入文档记录
  err = p.documentRepo.CreateWithTx(ctx, tx, p.docModels)
  
  // 更新知识库时间
  err = p.knowledgeRepo.UpdateWithTx(ctx, tx, p.Documents[0].KnowledgeID, map[string]interface{}{
    "updated_at": time.Now().UnixMilli(),
  })
  return nil
}

这里的逻辑也有两点值得提一下:

  1. 支持批量插入多条文档记录,使用数据库事务,确保一批数据全部插入成功或失败;
  2. 对于表格知识库,首次插入文档时,除了会新建一条 knowledge_document 记录,还会根据 Excel 的列结构动态地创建一个数据表;

比如下面这个 Excel 文件,包含五列:

create-table-excel.png

查看新建的 knowledge_document 记录,有一个 table_info 字段,内容如下:

{
  "columns": [
    {
      "ID": 7533389719145545728,
      "Name": "姓名",
      "Type": 1,
      "Indexing": true,
      "Sequence": 0,
      "Description": ""
    },
    {
      "ID": 7533389719145562112,
      "Name": "学号",
      "Type": 2,
      "Indexing": false,
      "Sequence": 1,
      "Description": ""
    },
    {
      "ID": 7533389719145578496,
      "Name": "年龄",
      "Type": 2,
      "Indexing": false,
      "Sequence": 2,
      "Description": ""
    },
    {
      "ID": 7533389719145594880,
      "Name": "性别",
      "Type": 1,
      "Indexing": false,
      "Sequence": 3,
      "Description": ""
    },
    {
      "ID": 7533389719145611264,
      "Name": "成绩",
      "Type": 2,
      "Indexing": false,
      "Sequence": 4,
      "Description": ""
    },
    {
      "ID": 7533389719145627648,
      "Name": "_knowledge_document_slice_id",
      "Type": 2,
      "Indexing": false,
      "Sequence": -1,
      "Description": "主键ID"
    }
  ],
  "table_desc": "",
  "virtual_table_name": "学生信息",
  "physical_table_name": "table_7533389719149740032"
}

这里的 physical_table_name 就是对应创建的数据表,注意表里的列名都变成了 ID 格式:

create-table-db.png

后面在查询表格知识库时,实际上就是在查询这个表。

计算索引

经过前面的几步,我们完成了自定义内容的处理、知识库和文档的插入、数据表的创建等等,不过这些都还只是开胃小菜,接下来开始才是重头戏。在索引计算 Indexing 中,我们通过 MQ 发送 IndexDocuments 事件,开启异步处理文件:

func (p *baseDocProcessor) Indexing() error {

  // 创建 IndexDocuments 事件
  event := events.NewIndexDocumentsEvent(p.Documents[0].KnowledgeID, p.Documents)
  body, err := sonic.Marshal(event)
  
  // 通过 MQ 发送事件
  err = p.producer.Send(p.ctx, body)
  return nil
}

Coze Studio 支持多种 MQ 实现:

事件 IndexDocuments 发送之后,再通过 GetResp 返回创建文档的响应,整个文档上传的接口就结束了。

未完待续

今天,我们简单过了一遍 Coze Studio 知识库创建和文档上传的后端逻辑。整个过程从用户上传文档触发 CreateDocument 接口开始,在完成文档元信息入库后,通过发送 IndexDocuments 事件,将耗时的文档处理转为异步执行任务。尽管文档上传的接口结束了,但是这只是 Coze Studio 文档处理流程的冰山一角,我们明天将继续研读 Coze Studio 关于文档处理流程的代码。