Fork me on GitHub

分类 GraphRAG 下的文章

学习 GraphRAG 四大搜索策略

在前面的系列文章中,我们深入学习了 GraphRAG 索引构建的完整流程,从文档加载、文本分片,到实体关系提取、社区检测,最终生成了包括实体表、关系表、社区报告等在内的结构化输出文件。这些文件构成了 GraphRAG 的知识库基础,为查询阶段提供了丰富的数据源。现在,让我们转入查询阶段的学习,探索 GraphRAG 如何基于这些数据实现智能问答。

在之前的入门篇中,我们已经体验了通过 query 命令来查询,并通过 --method 参数指定搜索策略:

$ uv run poe query \
    --root ./ragtest \
    --method global \
    --query "What are the top themes in this story?"

GraphRAG 提供了四种不同的搜索策略,按照复杂程度递增的顺序,它们分别是:基础搜索(Basic Search)本地搜索(Local Search)全局搜索(Global Search)漂移搜索(DRIFT Search)。本文将由浅入深地剖析这四种搜索策略的工作原理和使用场景。

基础搜索(Basic Search)

基础搜索是 GraphRAG 中最简单的查询方式,本质上是传统向量 RAG 的实现。它主要用于与其他搜索策略进行对比,帮助用户了解 GraphRAG 相对于传统方法的优势。

basic-search.png

基础搜索的核心思想非常直接:对用户查询进行向量化,然后在文本单元的向量数据库中进行相似性搜索,找到最相关的前 K 个文本片段,最后使用这些片段作为上下文,连同原始问题一起提交给大语言模型,生成最终答案。

基础搜索适用于简单的事实性问题,特别是答案可以直接从单个或少数几个文档片段中获得的情况,比如:

  • 什么是机器学习?
  • Python 中如何定义函数?
  • 苹果公司的总部在哪里?

由于基础搜索仅依赖向量相似性,它无法处理需要多步推理或全局理解的复杂问题。这正是 GraphRAG 引入图结构和社区检测的动机。

本地搜索(Local Search)

本地搜索,或者叫 局部搜索,是 GraphRAG 的核心创新之一,它利用知识图谱的实体关系结构来增强传统 RAG 的检索能力。相比基础搜索,本地搜索能够理解查询中的实体,并利用这些实体在知识图谱中的连接关系来查找更丰富、更相关的上下文信息。

本地搜索的流程分为三个主要阶段:实体识别、上下文扩展和答案生成。

local-search.png

这个过程的详细步骤如下:

  1. 实体识别阶段:对用户查询进行向量化,在实体描述向量数据库中进行相似性搜索,获取语义相关的候选实体集合;
  2. 上下文扩展阶段

    • 文本单元扩展:根据实体和文本单元映射关系,找到包含这些实体的原始文本片段;
    • 社区报告扩展:根据实体和社区映射关系,获取相关的社区摘要报告;
    • 实体关系扩展:使用实体名称和描述等信息构建实体上下文,然后再依次查找与实体相连的关系构建关系上下文;
    • 协变量扩展:如果配置了协变量提取,还会包含相关的声明信息;
  3. 排序过滤阶段

    • 排序:对每类候选数据进行重要性排序,比如对于文本单元来说,关系数量更多优先级更高,对于社区来说,包含的选中实体数量更多的优先级更高;
    • 过滤:根据 token 预算进行过滤,确保上下文适合大语言模型的输入窗口;
  4. 答案生成阶段:将过滤后的多源信息整合成结构化上下文,调用大模型生成最终答案;

本地搜索的过程就像是一次从图谱中的特定节点出发,向外探索和收集信息的侦察任务,它的关键优势在于 多源信息融合 能力:通过文本单元保留原始文档的详细信息,通过知识图谱揭示实体间的复杂关系,通过社区报告提供结构化的主题总结。它特别适合需要理解特定实体及其关系的问题,比如:

  • 洋甘菊有什么治疗功效?(需要理解洋甘菊这个实体的属性)
  • 苹果公司和微软公司有什么合作关系?(需要理解两个实体间的关系)
  • 机器学习领域的主要算法有哪些?(需要找到与机器学习相关的算法实体)

全局搜索(Global Search)

全局搜索解决了传统 RAG 的一个根本性问题:无法处理需要理解整个数据集的查询。当用户问 数据中的主要主题是什么?最重要的趋势有哪些? 这种需要全局视角和高度概括性的问题时,传统的向量检索往往无法给出满意的答案,因为这类问题需要对整个数据集进行宏观理解和总结。

全局搜索采用了 Map-Reduce 架构,分为两个核心阶段:

global-search.png

首先是 Map 阶段,收集指定层级的社区报告,并行处理:

  1. 报告收集:从指定层级的社区层次结构中收集所有的社区报告;如果开启了动态社区选择,则使用大模型根据用户问题对社区报告进行打分,选择得分大于阈值的社区报告;
  2. 分批处理:将社区报告按照 token 预算随机分成多个批次;
  3. 并行分析:对于每个批次,将其中的社区报告作为上下文,连同用户查询一起发送给大模型,使用 Map 系统提示词生成一个中间响应;这个响应包含一系列观点,每个观点都有一个重要性评分(1-100 分);
{
  "points": [
    { "description": "观点1的描述 [数据:报告(报告ID)]", "score": "评分值" },
    { "description": "观点2的描述 [数据:报告(报告ID)]", "score": "评分值" }
  ]
}

第一阶段并不回答用户的原始问题,只是生成中间响应,然后再通过 Reduce 阶段,对其聚合生成最终答案:

  1. 重要性排序:收集所有中间响应的观点并按重要性评分排序;
  2. 智能过滤:按评分选出最重要的观点,并确保在 token 预算内;
  3. 最终聚合:使用 Reduce 系统提示将选中的观点整合起来,作为最终的上下文,再次提交给大模型,生成一个全面连贯的最终答案;

可以看出,全局搜索通过直接利用最高度浓缩的知识(社区报告),避免了在海量细碎文本块中进行检索,特别适合需要宏观理解和数据集级别分析的问题:

  • 这个数据集中讨论的主要主题有哪些?
  • 文档中提到的最重要的趋势是什么?
  • 数据中的关键人物和组织有哪些?
  • 这些文档反映了什么样的总体情况?

它的效果很大程度上取决于选择的社区层级:

  • 底层社区:包含更详细的信息,答案更全面,但需要更多的计算资源和时间;
  • 顶层社区:处理速度更快,成本更低,但答案可能较为宏观;

用户可以根据问题的复杂程度和计算预算选择合适的社区层级。

漂移搜索(DRIFT Search)

我们看到,全局搜索通过检索社区报告来回答宏观问题,而本地搜索则通过图遍历和多源信息融合来回答关于具体实体的微观问题,用户在使用时,必须先判断出问题的类型,然后再明确指定对应的策略,使用起来非常费脑子,如果能将两种搜索策略融合起来那就完美了。

因此,GraphRAG 推出了一种更高级的混合搜索策略 —— 漂移搜索(DRIFT Search),这里的 DRIFT 其实是首字母缩写,全称为 Dynamic Reasoning and Inference with Flexible Traversal,它巧妙地结合了全局搜索和本地搜索的优势,通过动态推理和灵活遍历实现了更全面、更深入的查询能力。

漂移搜索的设计理念是渐进式查询精化,首先进行全局层面的主题探索,建立宏观认知,然后基于初步发现生成深入的后续问题,最后通过本地搜索验证和细化这些问题的答案。这种方式避免了全局搜索过于宏观和本地搜索过于狭隘的问题,实现了广度和深度的平衡。它的运行流程包含三个核心阶段:

drift-search.png

阶段一被称为 Primer,用于建立初步的、较宽泛的全局认知,分为两步:

  1. 报告选择:将用户查询与所有社区报告进行语义相似性比较,选择最相关的 Top-K 个社区报告作为初始上下文;这里使用了 HyDE(Hypothetical Document Embeddings) 策略,首先根据用户查询生成和社区报告结构类似的假设性文档,然后使用这个假设性文档的嵌入来选择社区报告,这通常比原始的用户查询嵌入效果更好;
  2. 初步回答:将这些报告拆成 N 个批次,针对每个批次,生成一个宏观层面的初步答案,分析初步答案和原始查询的相关性,并识别需要进一步探索的方向,生成具体的后续问题;
{
  "intermediate_answer": "<初步答案>",
  "score": "<和原始查询的相关性>",
  "follow_up_queries": [
    "<后续问题1>",
    "<后续问题2>"
  ]
}

阶段二叫做 Follow-Up,对生成的后续问题进一步的深入探索:

  1. 问题选择:从后续问题列表中选择评分最高的 Top-K 个问题,如果没有评分则随机选择;
  2. 局部问答:使用本地搜索(Local Search)对选定的问题进行详细回答,这个过程会产生更精确的中间答案,并可能生成新一轮的、更具针对性的后续问题;
{
  "response": "<对后续问题进行回答,注意不要回答原始查询>",
  "score": "<和原始查询的相关性>",
  "follow_up_queries": [
    "<后续问题1>",
    "<后续问题2>"
  ]
}

阶段二是一个 while 循环,这个循环过程会持续进行,直到满足停止条件,当达到预设的深度限制(默认 3 层)或所有后续问题均已回答时。然后进入阶段三 Reduce,系统将所有的中间答案汇聚起来,为用户提供一个既有全局概览又有局部细节的综合性答案。

漂移搜索通过动态地在全局和局部信息之间“漂移”,模拟了人类分析师的探索过程,这和最近流行的 Deep Research 技术非常类似,能够更智能地应对复杂的、探索性的查询需求,这类需求通常会从一个宽泛的起点开始逐步深入到具体细节,比如:

  • 这个数据集中的关键问题是什么,它们是如何相互关联的?
  • 分析一下某个领域的发展趋势及其影响因素
  • 深入探讨某个复杂事件的原因、过程和影响
  • 比较分析多个概念或实体的异同及其关系

综合对比

为了更好地理解这四种搜索策略的特点和适用场景,我们来做一个综合对比:

search-diff.png

只有根据问题场景选择最适合的搜索策略,才能达到最好的效果。

技术实现细节

最后,我们来看下 GraphRAG 搜索策略的技术实现,源码位于 query/structured_search 目录下:

├── base.py
├── basic_search
│   ├── basic_context.py
│   └── search.py
├── drift_search
│   ├── action.py
│   ├── drift_context.py
│   ├── primer.py
│   ├── search.py
│   └── state.py
├── global_search
│   ├── community_context.py
│   └── search.py
└── local_search
    ├── mixed_context.py
    └── search.py

GraphRAG 采用了一致的架构设计,所有的搜索策略都统一继承自 BaseSearch 基类,提供 search()stream_search() 两个方法:

class BasicSearch(BaseSearch[BasicContextBuilder]):
  '''基础搜索'''
class LocalSearch(BaseSearch[LocalContextBuilder]):
  '''本地搜索'''
class GlobalSearch(BaseSearch[GlobalContextBuilder]):
  '''全局搜索'''
class DRIFTSearch(BaseSearch[DRIFTSearchContextBuilder]):
  '''漂移搜索'''

每个搜索策略都对应一个 上下文构建器(Context Builder),负责从索引数据中构建查询上下文。不同的搜索策略使用专门优化的系统提示:

  • 基础搜索:

    • BASIC_SEARCH_SYSTEM_PROMPT:基于文本片段回答用户问题
  • 本地搜索:

    • LOCAL_SEARCH_SYSTEM_PROMPT:基于实体关系、社区报告、文本片段等回答用户问题
  • 全局搜索:

    • RATE_QUERY:根据用户问题对社区报告进行打分,用于选择最合适的社区报告;
    • MAP_SYSTEM_PROMPT:根据社区报告,让大模型生成一个中间响应,包含一系列关于用户查询的观点及重要性评分;
    • REDUCE_SYSTEM_PROMPT:聚合中间观点,生成最终答案;
  • 漂移搜索:

    • HyDE:根据用户问题生成假设性文档;
    • DRIFT_PRIMER_PROMPT:基于社区报告对用户问题作出初步回答,生成后续问题;
    • DRIFT_LOCAL_SYSTEM_PROMPT:针对后续问题调用本地搜索后,生成中间答案,并生成新一轮的、更具针对性的后续问题;
    • DRIFT_REDUCE_PROMPT:基于所有的中间答案,回答用户的原始问题;

具体的源码解读就不再展开了,有兴趣的朋友可以对照着上面各节的讲解去看下对应的代码。

小结

今天,我们详细剖析了 GraphRAG 的四大搜索策略,从简单到复杂,从局部到全局,从静态到动态,每种策略都有其独特的设计理念,通过合理选择和组合这些策略,GraphRAG 能够处理从简单事实查询到复杂推理问题的各种场景。

至此,我们对 GraphRAG 的探索之旅也告一段落。从最初的快速上手,到深入索引构建的三个核心阶段(文档处理、知识提取、图谱增强),再到今天的搜索策略,我们系统地学习了 GraphRAG 如何将扁平的非结构化文本,一步步转化为一个结构丰富、可深度推理的知识库,并最终利用它来赋能更智能的问答系统。

希望这个系列的文章能帮助你全面理解 GraphRAG 的核心思想与实现原理,并为你构建自己的高级 RAG 应用带来启发。


详解 GraphRAG 索引构建的输出文件

在前面的系列文章中,我们深入学习了 GraphRAG 索引构建的完整流程,从文档加载、文本分片,到实体关系提取、社区检测和向量化。经过这些复杂的处理步骤,GraphRAG 最终生成了一系列结构化的输出文件,这些文件就是整个知识图谱的数字化资产。通过这些文件,GraphRAG 实现了不同的检索策略,可以回答用户的各种问题。但是在正式进入检索阶段之前,让我们先来详细剖析下这些输出文件,了解每个文件的格式、字段信息和存储机制。

输出文件概览

当 GraphRAG 索引构建完成后,所有结果都保存在 output 目录中。让我们先来看看完整的文件结构:

output/
├── documents.parquet              # 文档表
├── text_units.parquet             # 文本单元表
├── entities.parquet               # 实体表
├── relationships.parquet          # 关系表
├── communities.parquet            # 社区表
├── community_reports.parquet      # 社区报告表
├── embeddings.entity.description.parquet       # 实体描述向量
├── embeddings.community.full_content.parquet   # 社区内容向量
├── embeddings.text_unit.text.parquet          # 文本单元向量
├── graph.graphml                  # 图结构文件
├── lancedb/                       # LanceDB 向量数据库
├── stats.json                     # 统计信息
└── context.json                   # 上下文信息

这些文件可以分为三类:

  1. 核心数据表:存储结构化数据,包括文档、文本单元、实体、关系、社区、社区报告等,使用 Parquet 格式;
  2. 向量数据:存储嵌入向量,包括文本单元、实体描述、社区内容等,使用 Parquet 和 LanceDB 双重存储;
  3. 元数据文件:包含图结构、统计信息等辅助数据;其中 graph.graphml 文件在之前学习可视化的时候已经详细介绍过,此处不再赘述;

核心数据表详解

这一节对所有的核心数据表做个总结。

1. documents.parquet - 文档表

文档表存储了原始文档的基础信息和预处理后的文本内容:

字段描述
id文档唯一标识符(SHA-256 哈希)
human_readable_id人类可读的递增 ID
title文档标题(通常是文件名,或 CSV 中的 title 列)
text文档的完整文本内容
text_unit_ids关联的文本单元 ID 列表
creation_date创建时间戳
metadata额外的元数据信息

文档表是整个知识图谱的源头,每个文档都会被分解成多个文本单元进行后续处理。

2. text_units.parquet - 文本单元表

文本单元是 GraphRAG 处理的基本单位,通过 tokens 策略将长文档分解成固定长度的片段,或者通过 sentence 策略将其分解成自然语言的句子:

字段描述
id文本单元唯一标识符
human_readable_id人类可读的递增 ID
text文本单元的具体内容
n_tokens文本的 token 长度,应该和配置文件中的 chunk_size 相等,除了最后一个分块
document_ids所属文档 ID 列表,通常情况下只有一个,但是也可以修改分组策略让一个分块跨多个文档
entity_ids包含的实体 ID 列表
relationship_ids包含的关系 ID 列表
covariate_ids关联的协变量 ID 列表(如果存在)

文本单元表是连接文档和知识图谱的桥梁,每个文本单元都详细记录了从中提取的结构化信息。

3. entities.parquet - 实体表

实体表存储了从文本中识别出的所有命名实体及其详细信息:

字段描述
id实体唯一标识符
human_readable_id人类可读的递增 ID
title实体名称
type实体类型,默认支持 "organization", "person", "geo" 和 "event" 四种
description实体详细描述,一个实体可能出现在多个文本单元中,使用大模型生成总结摘要
text_unit_ids出现的文本单元 ID 列表
frequency出现频次
degree图中的度数(连接的边数量)
x图布局中的 x 坐标,必须开启图嵌入和 UMAP 配置
y图布局中的 y 坐标,必须开启图嵌入和 UMAP 配置

实体表是知识图谱的节点集合,每个实体都包含了丰富的语义信息和图结构属性。

4. relationships.parquet - 关系表

关系表记录了实体之间的连接关系:

字段描述
id关系唯一标识符
human_readable_id人类可读的递增 ID
source源实体名称
target目标实体名称
description关系的详细描述,一个关系也可能出现在多个文本单元中,使用大模型生成总结摘要
weight关系的权重,如果是基于大模型的提取,让大模型评估关系的强度,如果是基于 NLP 的提取,则通过共现频次计算
combined_degree源和目标实体的度数总和
text_unit_ids支持该关系的文本单元 ID 列表

关系表定义了知识图谱的边集合,捕捉了实体间的语义关联。

5. communities.parquet - 社区表

社区表存储了通过 Leiden 算法识别出的图社区结构:

字段描述
id社区唯一标识符
human_readable_id人类可读的递增 ID
level社区层级(支持层次化聚类)
community社区 ID,通过 Leiden 算法生成的唯一标识
parent父社区 ID
children子社区 ID 列表
title社区标题
entity_ids包含的实体 ID 列表
relationship_ids包含的关系 ID 列表
text_unit_ids关联的文本单元 ID 列表
period创建时间,用于增量更新合并
size社区大小(实体数量),用于增量更新合并

社区表将知识图谱组织成层次化的主题集群,为生成高质量摘要提供了基础。

6. community_reports.parquet - 社区报告表

社区报告是通过大模型对每个社区生成的结构化摘要:

字段描述
id报告唯一标识符
human_readable_id人类可读的递增 ID
community关联的社区 ID
parent父社区 ID
children子社区 ID 列表
level社区层级
title报告标题
summary执行摘要
full_content完整报告内容
rank重要性排名
rank_explanation排名解释
findings关于社区的前 5-10 条关键发现(JSON 格式)
full_content_json大模型返回的完整内容,大多数内容已经被提取到对应列中
period创建时间,用于增量更新合并
size社区大小(实体数量),用于增量更新合并

社区报告表是知识图谱的智能摘要,将复杂的图结构转化为人类可理解的洞察。

Parquet:高效的列式存储

上述这些核心数据表最终都使用 Parquet 格式进行存储。Apache Parquet 是一种开源的列式数据文件格式,专为大数据场景设计,旨在优化数据存储效率和查询性能。它由 Twitter 和 Cloudera 联合开发,现为 Apache 顶级开源项目,广泛应用于 Hadoop 生态系统及各类大数据处理框架。

apache-parquet.png

传统数据库(如 MySQL)采用行式存储(一行数据连续存储),适合 整行读取 场景(如事务处理);而 Parquet 的列式存储(一列数据连续存储),更适合大数据 多列筛选、聚合 场景(如数据分析)。它的核心特性包括:

  • 列式存储:按列而非按行存储数据,查询时仅读取目标列(而非整行),减少 I/O 开销;
  • 高效压缩:基于列的相似数据特征(如同一列数据类型一致、重复值多),支持多种高效的编码和压缩算法,存储成本降低 3-5 倍;
  • 谓词下推(Predicate Pushdown):配合 Spark 等查询引擎,将过滤条件下推到存储层,提前过滤无效数据,减少数据传输量;
  • Schema 演进:支持 Schema 的动态修改,比如新增列或修改列类型,无需重写历史数据,兼容新旧数据查询;
  • 平台无关性:不依赖特定计算框架或存储系统,可在 Spark、Flink、Hive、Presto 等工具中通用;
  • 支持嵌套数据:原生支持数组、字典、结构体等复杂数据类型,无需将嵌套数据扁平化,适配 JSON、Avro 等半结构化数据;

在 GraphRAG 中,使用了 Pandas 库对 Parquet 文件进行读取和写入,具体的逻辑位于 utils/storage.py 中:

async def load_table_from_storage(name: str, storage: PipelineStorage) -> pd.DataFrame:
  """从存储中读取表格数据"""
  filename = f"{name}.parquet"
  return pd.read_parquet(BytesIO(await storage.get(filename, as_bytes=True)))

async def write_table_to_storage(
  table: pd.DataFrame, name: str, storage: PipelineStorage
) -> None:
  """向存储中写入表格数据"""
  await storage.set(f"{name}.parquet", table.to_parquet())

其中 pd.read_parquet() 用于读取 Parquet 文件,table.to_parquet() 用于将 DataFrame 转换为 Parquet 格式。下面是一个完整示例,演示了如何使用 Pandas 写入和读取 Parquet 文件:

import pandas as pd
from pathlib import Path

def write_parquet_example(filepath):
  """写入示例数据"""
  
  # 创建示例数据
  data = {
    'id': [1, 2, 3, 4, 5],
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [25, 30, 35, 28, 32],
    'city': ['New York', 'London', 'Paris', 'Tokyo', 'Berlin'],
    'salary': [50000.0, 60000.0, 70000.0, 55000.0, 65000.0]
  }
  
  # 创建 DataFrame
  df = pd.DataFrame(data)
  
  # 写入 Parquet 文件
  output_path = Path(filepath)
  df.to_parquet(output_path, engine='pyarrow')
  
  print(f"数据已写入到: {output_path}")
  print(f"数据形状: {df.shape}")
  print(f"数据预览:\n{df.head()}")
  
  return output_path

def read_parquet_example(file_path: str):
  """读取示例文件"""
  
  # 读取完整数据
  df = pd.read_parquet(file_path, engine='pyarrow')
  print(f"\n读取完整数据:\n{df}")
  
  # 读取特定列
  df = pd.read_parquet(file_path, columns=['name', 'age', 'city'])
  print(f"\n读取特定列:\n{df}")
  
  return df

if __name__ == "__main__":
  
  # 1. 写入 Parquet 文件
  parquet_file = write_parquet_example('sample_data.parquet')
  
  # 2. 读取 Parquet 文件
  df = read_parquet_example('sample_data.parquet')

有兴趣的同学可以将 filepath 改成 GraphRAG 的 output 目录,研究下每个 Parquet 文件的内容。

向量数据存储

我们昨天学过,GraphRAG 支持对多种文本字段进行向量化,默认包括文本单元、实体描述和社区完整内容。根据配置,这些向量既保存在 Parquet 文件中(快照存储),也存储在 LanceDB 向量数据库中(用于高效检索):

├── lancedb
│   ├── default-community-full_content.lance
│   ├── default-entity-description.lance
│   └── default-text_unit-text.lance
├── embeddings.community.full_content.parquet
├── embeddings.entity.description.parquet
└── embeddings.text_unit.text.parquet

GraphRAG 的输出文件形成了完整的数据血缘链路,每一级的输出都基于上一级的结果:

data-flow.png

在 LanceDB 中,每条记录都包含以下字段:

columns = [
  'id',         # 源数据的唯一标识符
  'text',       # 原始文本内容
  'vector',     # 嵌入向量(浮点数数组)
  'attributes'  # 额外属性,比如 title 文档标题
]

LanceDB:面向 AI 的向量数据库

LanceDB 是一款基于 Lance 存储格式(一种高性能列式存储格式)构建的开源向量数据库,主打 低延迟查询高吞吐量易用性 三大核心优势,专为大规模向量数据的存储、检索与管理设计。它由 Lance 生态团队开发,旨在解决传统向量数据库在性能、成本、易用性之间的平衡问题,尤其适配机器学习和人工智能场景的需求,比如语义搜索、推荐系统、多模态检索、特征存储等。

lancedb.png

LanceDB 的核心定位是 为 AI 原生应用打造的向量数据库,其设计理念围绕以下几点展开:

  • 存储与计算融合:基于 Lance 格式的底层优化,将向量存储与向量索引、过滤计算深度整合,避免传统数据库 存算分离 带来的性能损耗;
  • 极简易用性:提供 Python、Rust 等简洁 API,支持本地文件级存储,同时兼容 S3、GCS 等云存储,降低开发者上手门槛;
  • 多模态数据支持:原生支持多模态数据管理,不仅存储向量和元数据,还能存储原始数据(文本、图像、视频等);
  • 混合检索能力:支持 向量检索多向量检索全文检索混合检索元数据过滤SQL 检索 等多种检索策略;

在 GraphRAG 中,操作 LanceDB 的代码位于 vector_stores/lancedb.py 文件中的 LanceDBVectorStore 类:

import lancedb

class LanceDBVectorStore(BaseVectorStore):

  def connect(self, **kwargs: Any) -> Any:
    """连接 LanceDB 数据库"""
    self.db_connection = lancedb.connect(kwargs["db_uri"])

  def load_documents(
    self, documents: list[VectorStoreDocument], overwrite: bool = True
  ) -> None:
    """将文档存入 LanceDB 数据库"""
    if overwrite:
      self.document_collection = self.db_connection.create_table(self.collection_name, data=data, mode="overwrite")
    else:
      self.document_collection = self.db_connection.open_table(self.collection_name)
      self.document_collection.add(data)

  def similarity_search_by_vector(
    self, query_embedding: list[float], k: int = 10, **kwargs: Any
  ) -> list[VectorStoreSearchResult]:
    """使用向量,执行向量相似性检索"""
    docs = (
      self.document_collection.search(
        query=query_embedding, vector_column_name="vector"
      )
      .limit(k)
      .to_list()
    )

  def similarity_search_by_text(
    self, text: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any
  ) -> list[VectorStoreSearchResult]:
    """使用文本,执行向量相似性检索"""
    query_embedding = text_embedder(text)
    if query_embedding:
      return self.similarity_search_by_vector(query_embedding, k)
    return []

  def search_by_id(self, id: str) -> VectorStoreDocument:
    """根据 ID 搜索文档"""
    doc = (
      self.document_collection.search()
        .where(f"id == '{id}'", prefilter=True)
        .to_list()
    )

我们可以针对 LanceDBVectorStore 类写一个简单的测试脚本:

import openai

# 使用 OpenAI Embedding
def embedder(text: str) -> list[float]:
  return openai.embeddings.create(
    input=text, model="text-embedding-3-small"
  ).data[0].embedding

# 连接 LanceDB 数据库
from graphrag.vector_stores.lancedb import LanceDBVectorStore
vector_store = LanceDBVectorStore(collection_name="default-entity-description")
vector_store.connect(db_uri="./ragtest/output/lancedb")

# 向量相似性检索
results = vector_store.similarity_search_by_text(
  "Who is Scrooge?", embedder, k=2
)
print(results)

感兴趣的同学可以用它来查询 GraphRAG 生成的这几个 LanceDB 数据库文件,研究其数据结构和内容。

小结

今天我们深入学习了 GraphRAG 索引构建的输出文件,这些结构化的输出文件是 GraphRAG 查询阶段的基础数据源。通过逐一分析 6 个核心数据表和 3 个向量数据库的字段结构和存储内容,了解 Parquet 列式存储格式以及专为 AI 优化的 LanceDB 向量数据库,我们对 GraphRAG 如何组织输出数据有了更清晰的认知。

在下一篇文章中,我们将转入查询阶段的学习,探索 GraphRAG 如何基于这些数据文件实现智能问答,包括全局查询、局部查询和混合查询等不同的检索策略。


GraphRAG 索引构建之图谱增强

在前面的文章中,我们已经深入学习了 GraphRAG 索引构建的前两个阶段,从原始文档到文本单元,再从文本单元提取出结构化的知识图谱。现在,我们已经拥有了包含实体和关系的网络图,接下来,我们将进入索引构建的第三阶段 —— 图谱增强,探索 GraphRAG 如何从这张零散的网络图中挖掘出更高层次的结构和洞见。

图谱增强阶段包含四个核心工作流:

  • 创建社区(create_communities - 使用 Leiden 算法对知识图谱进行层次化社区检测,返回多层次的社区结构;
  • 创建最终文本单元(create_final_text_units - 负责将基础文本单元与实体、关系和协变量信息进行关联,生成最终的文本单元数据;
  • 生成社区报告(create_community_reports - 为每个社区生成高质量的摘要报告;
  • 生成文本嵌入(generate_text_embeddings - 将各层次文本信息向量化,并存储到外部的向量数据库;

其中,创建最终文本单元按理应该放在第二阶段的末尾,当图谱提取结束后就可以创建了,它的作用是将文本单元与提取出来的实体、关系和协变量进行关联并重新保存。这里按照源码中顺序,将其放在第三阶段,这一步也比较简单,本篇不做过多赘述,主要关注另三个工作流的实现。

创建社区

创建社区是图谱增强阶段的核心步骤,它通过图聚类算法将零散的实体组织成有意义的主题集群。该工作流的实现位于 index/workflows/create_communities.py 文件中:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:

  # 1. 加载实体和关系数据
  entities = await load_table_from_storage("entities", context.output_storage)
  relationships = await load_table_from_storage("relationships", context.output_storage)

  # 2. 获取聚类配置
  max_cluster_size = config.cluster_graph.max_cluster_size
  use_lcc = config.cluster_graph.use_lcc
  seed = config.cluster_graph.seed

  # 3. 执行社区创建
  output = create_communities(
    entities,
    relationships,
    max_cluster_size=max_cluster_size,
    use_lcc=use_lcc,
    seed=seed,
  )

  # 4. 保存结果
  await write_table_to_storage(output, "communities", context.output_storage)

创建社区使用的配置如下:

cluster_graph:
  max_cluster_size: 10 # 最大社区数量
  use_lcc: true        # 是否使用最大连通分量
  seed: 0xDEADBEEF     # 随机种子

其中 max_cluster_size 表示最多生成 10 个社区;use_lcc 表示是否使用最大连通分量,我们在之前计算图嵌入时也见过这个参数,意味着只有图中最大的连通子图会被用于计算,孤立的节点或较小的连通分量会被过滤掉;seed 表示随机种子,一般保持默认即可,主要用于在多次运行聚类算法时获得一致的结果。

核心处理逻辑位于 create_communities() 函数中:

def create_communities(
  entities: pd.DataFrame,
  relationships: pd.DataFrame,
  max_cluster_size: int,
  use_lcc: bool,
  seed: int | None = None,
) -> pd.DataFrame:

  # 1. 构建图结构
  graph = create_graph(relationships, edge_attr=["weight"])

  # 2. 执行层次化聚类
  clusters = cluster_graph(
    graph,
    max_cluster_size,
    use_lcc,
    seed=seed,
  )

  # 3. 格式化聚类结果
  communities = pd.DataFrame(
    clusters, columns=pd.Index(["level", "community", "parent", "title"])
  ).explode("title")

它首先将关系数据转换为图结构,这里使用的是 NetworkX 库的 nx.from_pandas_edgelist() 方法,根据边创建加权无向图,然后调用 Leiden 聚类算法 进行社区检测,返回多层次的社区结构。

使用 Leiden 进行社区检测

在网络科学或图论中,社区(Community) 是指网络中的一组节点,其核心特征是:社区内部的节点之间连接紧密,而与社区外部节点的连接相对稀疏,这种 “内密外疏” 的结构是社区的核心标志,反映了网络中节点的聚类性和关联性。Leiden 算法是一种在图数据中识别社区结构的高效算法,由 Traag 等人在莱顿大学于 2018 年提出。它在经典的 Louvain 算法 基础上进行了改进,解决了 Louvain 算法中可能出现的 “分辨率限制” 和社区划分不精确的问题,因此在复杂网络分析中被广泛应用。

和之前学习的 RAGFlow 一样,GraphRAG 也是使用 graspologic 库的 hierarchical_leiden() 方法进行社区检测,为了搞懂 cluster_graph() 函数的具体逻辑,我们不妨先快速上手 graspologic 库,通过一个简单的示例 ,看看如何使用它对图中的节点执行层次化聚类:

import json
import networkx as nx
from graspologic.partition import hierarchical_leiden

def create_random_graph(num_nodes, edge_probability, seed):
  """生成一个随机图"""
  
  G = nx.erdos_renyi_graph(num_nodes, edge_probability, seed=seed)
  
  # 给节点加上 label 属性
  for node in G.nodes():
    G.nodes[node]['label'] = f"Node_{node}"
  
  return G

def apply_hierarchical_leiden_clustering(graph, max_cluster_size=10, seed=42):
  """调用 Leiden 层次化聚类算法"""
  
  # 层次化聚类
  community_mapping = hierarchical_leiden(
    graph, 
    max_cluster_size=max_cluster_size, 
    random_seed=seed
  )
  
  # 组织聚类结果
  results = {}
  hierarchy = {}
  
  for partition in community_mapping:
    level = partition.level
    if level not in results:
      results[level] = {}
    results[level][partition.node] = partition.cluster
    hierarchy[partition.cluster] = partition.parent_cluster if partition.parent_cluster is not None else -1
  
  return results, hierarchy

if __name__ == "__main__":
  
  # 生成一个随机图
  G = create_random_graph(num_nodes=25, edge_probability=0.2, seed=42)
  
  # 调用 Leiden 层次化聚类算法
  clustering_results, hierarchy = apply_hierarchical_leiden_clustering(
    G, max_cluster_size=8, seed=42
  )

这里先用 nx.erdos_renyi_graph() 创建一个符合 Erdős-Rényi 模型(ER 模型) 的随机图,图中有 25 个节点,然后使用 graspologic 库的 hierarchical_leiden() 方法对其进行聚类,聚类返回的结果是一个列表:

cluster-results.png

列表中的对象结构如下:

class HierarchicalCluster(NamedTuple):
  # 节点 ID
  node: Any
  # 聚类 ID
  cluster: int
  # 父聚类 ID
  parent_cluster: Optional[int]
  # 聚类层次
  level: int
  # 是否为最终聚类
  is_final_cluster: bool

尽管返回的是列表结构,实际上,它是一个层次化的树状结构。每当某个聚类的数量超过某个限制时,Leiden 算法就会为该聚类划分子聚类,每划分一次,聚类层次 level 就会加 1,并将 parent_cluster 设置为父聚类的 ID;如果聚类不会进一步划分,就将 is_final_cluster 设置为 true;下面是聚类返回结果的示例:

[
  {
    "node": 0,
    "cluster": 0,
    "parent_cluster": null,
    "level": 0,
    "is_final_cluster": true
  },
  // 省略 5 个 cluster == 0 的节点
  {
    "node": 24,
    "cluster": 1,
    "parent_cluster": null,
    "level": 0,
    "is_final_cluster": false
  },
  // 省略 7 个 cluster == 1 的节点
  {
    "node": 13,
    "cluster": 2,
    "parent_cluster": null,
    "level": 0,
    "is_final_cluster": true
  },
  // 省略 5 个 cluster == 2 的节点
  {
    "node": 21,
    "cluster": 3,
    "parent_cluster": null,
    "level": 0,
    "is_final_cluster": true
  },
  // 省略 4 个 cluster == 3 的节点
  {
    "node": 24,
    "cluster": 4,
    "parent_cluster": 1,
    "level": 1,
    "is_final_cluster": true
  },
  // 省略 1 个 cluster == 4 的节点
  {
    "node": 22,
    "cluster": 5,
    "parent_cluster": 1,
    "level": 1,
    "is_final_cluster": true
  },
  // 省略 3 个 cluster == 5 的节点
  {
    "node": 3,
    "cluster": 6,
    "parent_cluster": 1,
    "level": 1,
    "is_final_cluster": true
  },
  // 省略 1 个 cluster == 6 的节点
]

这个图共有 25 个节点,通过聚类算法检测出了 6 个聚类(即社区):

clusters-1.png

可以看到 1 号聚类共 8 个节点,被进一步划分成了 4、5、6 三个聚类,它们的 parent_cluster 是 1,表示由 1 号聚类派生而来,level 是 1,表示聚类的层级:

clusters-2.png

接着 GraphRAG 将聚类结果组织成层次关系,results 是每个层级下的节点和节点所属的聚类:

{
  "0": {
    "0": 0,
    "1": 1,
    "2": 0,
    "3": 1,
    "4": 2,
    // ...
    "21": 3,
    "22": 1,
    "23": 1,
    "24": 1
  },
  "1": {
    "1": 4,
    "3": 6,
    "12": 5,
    "16": 5,
    "19": 5,
    "22": 5,
    "23": 6,
    "24": 4
  }
}

hierarchy 是每个聚类对应的父聚类:

{
  "0": -1,
  "1": -1,
  "2": -1,
  "3": -1,
  "4": 1,
  "5": 1,
  "6": 1
}

然后进一步将其格式化成最终的社区数据,包含以下字段:

  • id: 社区唯一标识符
  • human_readable_id: 人类可读的递增ID
  • level: 社区层级
  • community: 社区ID
  • parent: 父社区ID
  • children: 子社区ID列表
  • title: 社区标题,字符串 Community N
  • entity_ids: 包含的实体ID列表
  • relationship_ids: 包含的关系ID列表
  • text_unit_ids: 关联的文本单元ID列表
  • period: 创建时间
  • size: 社区大小(实体数量)

生成社区报告

识别出社区结构后,下一步通过 LLM 对社区内的信息进行深度分析和总结,为每个社区生成摘要报告,代码如下:

async def create_community_reports(
  edges_input: pd.DataFrame,
  entities: pd.DataFrame,
  communities: pd.DataFrame,
  claims_input: pd.DataFrame | None
) -> pd.DataFrame:
  
  # 1. 分解社区数据,建立社区和实体映射
  nodes = explode_communities(communities, entities)

  # 2. 准备节点、边和声明数据
  nodes = _prep_nodes(nodes)
  edges = _prep_edges(edges_input)
  claims = _prep_claims(claims_input)

  # 3. 构建本地上下文
  local_contexts = build_local_context(
    nodes,
    edges,
    claims,
  )

  # 4. 生成社区摘要
  community_reports = await summarize_communities(
    nodes,
    communities,
    local_contexts,
    build_level_context,
  )

  # 5. 最终化报告
  return finalize_community_reports(community_reports, communities)

其中 local_contexts 被称为本地上下文,包含所有社区的详细信息,build_local_context() 函数将社区内的所有节点、边和声明汇总成一个字符串 CONTEXT_STRING,同时计算上下文长度是否超过 max_context_tokens 配置,如果超过则打上 CONTEXT_EXCEED_FLAG 标记。

生成社区摘要的核心逻辑位于 summarize_communities() 函数,它首先获取所有的社区层级,并从高到低排序,这里的倒序是关键,先生成最底层(层级最大)的社区报告,因为越底层包含的节点越少,不容易超出 token 限制;然后通过 build_level_context() 构建层级上下文,只包含特定层级的社区信息,针对每一个社区,有两种情况:

  • 如果上下文未超出 token 限制,则直接使用该社区的上下文;
  • 如果上下文超出 token 限制,则判断是否存在子社区报告;

    • 不存在子社区报告,一般出现在最底层的社区,则对该社区的上下文进行裁剪;
    • 存在子社区报告,则使用子社区报告作为该社区的上下文,如果超出 token 限制,也需要裁剪;

就这样从最底层的社区开始,一层层往上遍历,生成每个社区的摘要,使用的提示词如下:

你是一个帮助人类分析师进行一般信息挖掘的人工智能助手。信息挖掘是在一个网络中识别和评估与特定实体(如组织和个人)相关的信息的过程。

# 目标
根据属于某个社区的实体列表及其关系和可选的相关声明,撰写一份关于该社区的综合报告。这份报告将用于向决策者告知与该社区相关的信息及其潜在影响。报告内容包括社区主要实体的概述、它们的合规性、技术能力、声誉以及值得关注的声明。

# 报告结构

报告应包含以下部分:
- 标题:代表社区主要实体的名称——标题应简短但具体。可能的话,在标题中包含有代表性的命名实体。
- 摘要:对社区整体结构、其内部实体之间的关系以及与这些实体相关的重要信息的执行摘要。
- 影响严重程度评级:一个介于0-10之间的浮点分数,代表社区内实体所带来的影响的严重程度。影响是对一个社区重要性的评分。
- 评级解释:用一句话解释影响严重程度评级。
- 详细发现:关于该社区的 5-10 个关键见解。每个见解都应有一个简短摘要,随后是根据以下依据规则展开的多个段落的解释性文本。内容需全面详实。

这里和 RAGFlow 是一模一样的,因为 RAGFlow 就是参考了 GraphRAG 的实现。

最终的报告输出为 JSON 格式:

{
  "title": "<报告标题>",
  "summary": "<执行摘要>",
  "rating": "<影响严重程度评级>",
  "rating_explanation": "<评级解释>",
  "findings": [
    {
      "summary":"<见解1摘要>",
      "explanation": "<见解1解释>"
    },
    {
      "summary":"<见解2摘要>",
      "explanation": "<见解2解释>"
    }
  ]
}

生成文本嵌入

社区报告生成后,索引构建就基本上结束了。最后一步是将各个工作流中生成的文本信息向量化,为了在查询阶段能够快速地进行语义检索。该步骤位于 generate_text_embeddings 工作流:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 加载所有含文本信息的表
  documents = await load_table_from_storage("documents", context.output_storage)
  relationships = await load_table_from_storage("relationships", context.output_storage)
  text_units = await load_table_from_storage("text_units", context.output_storage)
  entities = await load_table_from_storage("entities", context.output_storage)
  community_reports = await load_table_from_storage("community_reports", context.output_storage)
  
  # 获取配置
  embedded_fields = config.embed_text.names
  text_embed = get_embedding_settings(config)

  # 根据配置,只计算特定文本字段的向量
  output = await generate_text_embeddings(
    documents=documents,
    relationships=relationships,
    text_units=text_units,
    entities=entities,
    community_reports=community_reports,
    text_embed_config=text_embed,
    embedded_fields=embedded_fields,
  )

  # 持久化存储向量数据
  if config.snapshots.embeddings:
    for name, table in output.items():
      await write_table_to_storage(table, f"embeddings.{name}", context.output_storage)

  return WorkflowFunctionOutput(result=output)

值得注意的一点是,GraphRAG 支持 8 个字段的向量计算:

  • entity.title - 实体标题
  • entity.description - 实体描述
  • relationship.description - 关系描述
  • document.text - 文档文本
  • community.title - 社区标题
  • community.summary - 社区摘要
  • community.full_content - 社区完整内容
  • text_unit.text - 文本单元

但是默认只计算实体描述、社区内容和文本单元的向量,可以在 embed_text 配置进行调整:

embed_text:
  model: text-embedding-3-small
  batch_size: 16
  batch_max_tokens: 8191
  model_id: default_embedding_model
  vector_store_id: default_vector_store
  names:
    - entity.description
    - community.full_content
    - text_unit.text

GraphRAG 使用这里配置的嵌入模型(如 text-embedding-3-small)来执行向量化。这里的处理比较有意思,GraphRAG 首先使用 TokenTextSplitter 将长文本按 batch_max_tokens 分割为符合 token 限制的片段;如果只有一个片段,则返回该片段的向量,如果文本被分成了多个片段,那么计算平均向量并归一化:

if size == 0:
  embeddings.append(None)
elif size == 1:
  embedding = raw_embeddings[cursor]
  embeddings.append(embedding)
else:
  chunk = raw_embeddings[cursor : cursor + size]
  average = np.average(chunk, axis=0)
  normalized = average / np.linalg.norm(average)
  embeddings.append(normalized.tolist())

生成的向量会与对应的文本和 ID 一同存储在向量数据库中,支持批量处理,默认使用 LanceDB,这是一个现代化的、为 AI 优化的高性能向量数据库,它将索引文件直接存储在本地文件系统中。除此之外,GraphRAG 还支持 Azure 认知搜索服务 Azure AI Search 以及 Azure 的 NoSQL 数据库 CosmosDB 等向量存储。可以通过 vector_store 配置进行修改:

vector_store:
  default_vector_store:
    type: lancedb
    db_uri: output/lancedb
    container_name: default
    overwrite: True

另外,如果我们配置了 snapshots.embeddings,向量数据还会被存到独立的 Parquet 文件中:

  • embeddings.entity.description.parquet
  • embeddings.community.full_content.parquet
  • embeddings.text_unit.text.parquet

小结

今天我们深入学习了 GraphRAG 索引构建流程中的最后一个阶段,将零散的知识组织成层次化结构的社区。主要内容总结如下:

  • Leiden 聚类:详细了解了 GraphRAG 如何使用 Leiden 算法对知识图谱进行社区检测,构建层次化的社区结构;
  • 层次化报告生成:探讨了如何从底层社区开始一层一层的往上为每个社区生成摘要报告,以及高层社区上下文超出 token 限制的解决方法;
  • 文本列向量化:了解了如何将文本单元、实体描述和社区报告等不同层次的文本信息转化为向量嵌入,为语义检索提供支持。

至此,GraphRAG 的索引构建阶段就完成了,索引构建的产物位于 output 目录中,包括一系列 Parquet 格式的文件和 LanceDB 数据:

  • 文档表(documents.parquet
  • 文本单元表(text_units.parquet
  • 实体表(entities.parquet
  • 关系表(relationships.parquet
  • 社区表(communities.parquet
  • 社区报告(community_reports.parquet
  • 向量数据库(lancedb

接下来,我们就可以通过 query 命令来查询这些数据,正式进入检索和问答环节。


GraphRAG 索引构建之知识提取(四)

我们昨天大概了解了基于 NLP 的知识提取流程,为了更好地学习相关的提取技术,我们简单介绍了 SpaCy 和 TextBlob 这两个 NLP 库的基本使用。有了 SpaCy 和 TextBlob 的基础,我们再回过头来看看 extract_graph_nlp 工作流中的三大名词短语提取器。

句法分析提取器(Syntactic)

首先是句法分析提取器,其核心实现位于 SyntacticNounPhraseExtractor 类:

def __init__(
  self,
  model_name: str,
  max_word_length: int,
  include_named_entities: bool,
  exclude_entity_tags: list[str],
  exclude_pos_tags: list[str],
  exclude_nouns: list[str],
  word_delimiter: str,
):
  # 根据是否需要命名实体识别来决定加载的组件
  if not include_named_entities:
    # 排除命名实体识别器,提升处理速度
    self.nlp = self.load_spacy_model(model_name, exclude=["lemmatizer", "ner"])
  else:
    # 保留命名实体识别器,只排除词形还原器
    self.nlp = self.load_spacy_model(model_name, exclude=["lemmatizer"])

从初始化函数可以看出该提取器有不少配置参数:

extract_graph_nlp:
  text_analyzer:
    extractor_type: syntactic_parser
    model_name: en_core_web_md   # SpaCy 模式
    max_word_length: 15          # 最大单词长度
    word_delimiter: " "          # 单词分隔符
    include_named_entities: true # 是否开启命名实体识别
    exclude_nouns:               # 排除停用词,大多是日常交流中使用频繁但本身不携带太多关键信息的词汇
      - stuff
      - thing
      - things
      - ...
    exclude_entity_tags:         # 排除指定类型的实体
      - DATE   # 日期
    exclude_pos_tags:            # 排除特定的词性标签
      - DET    # 限定词,如 a an the 等
      - PRON   # 代词,如 he she it 等
      - INTJ   # 感叹词,如 oh wow hey 等
      - X      # 其他

句法分析提取器根据 include_named_entities 参数加载 SpaCy 模型,默认模型是 en_core_web_md,为提高性能,加载模型时排除了 lemmatizer 模块(词形还原器),如果用户关闭了命名实体识别,还会排除 ner 模块(命名实体识别器)。

该提取器的核心实现如下:

def extract(self, text: str) -> list[str]:
  
  # SpaCy 文本分析
  doc = self.nlp(text)

  if self.include_named_entities:
    # 命名实体 + 名词块
    entities = [
      ent for ent in doc.ents
      if ent.label_ not in self.exclude_entity_tags
    ]
    spans = entities + list(doc.noun_chunks)

    # 根据规则过滤
    tagged_noun_phrases = [
      self._tag_noun_phrases(span, entities) for span in spans
    ]
  else:
    # 直接获取名词块,根据规则过滤
    tagged_noun_phrases = [
      self._tag_noun_phrases(chunk, []) for chunk in doc.noun_chunks
    ]

如果开启了命名实体识别,则将 SpaCy 提取的命名实体(ents)和名词块(noun_chunks)合并,然后通过一些规则进行过滤得到结果;如果未开启,则直接获取名词块,根据规则过滤。过滤规则比较细碎,比如根据 exclude_entity_tags 排除特定类型的实体,根据 exclude_pos_tags 排除特定词性,根据 exclude_nouns 排除停用词,排除空格和标点,根据 max_word_length 判断长度是否有效,等等。

上下文无关文法提取器(CFG)

第二个是上下文无关文法(CFG,Context-Free Grammar)提取器,其核心实现位于 CFGNounPhraseExtractor 类:

def __init__(
  self,
  model_name: str,
  max_word_length: int,
  include_named_entities: bool,
  exclude_entity_tags: list[str],
  exclude_pos_tags: list[str],
  exclude_nouns: list[str],
  word_delimiter: str,
  noun_phrase_grammars: dict[tuple, str],
  noun_phrase_tags: list[str],
):
  # 根据是否需要命名实体识别来决定加载的组件
  if not include_named_entities:
    # 排除命名实体识别器,提升处理速度
    self.nlp = self.load_spacy_model(model_name, exclude=["lemmatizer", "parser", "ner"])
  else:
    # 保留命名实体识别器,只排除词形还原器和句法分析器
    self.nlp = self.load_spacy_model(model_name, exclude=["lemmatizer", "parser"])

可以看出这个提取器和句法分析提取器非常像,同样是基于 SpaCy 实现的,而且配置参数也基本上一样。只不过 CFG 提取器在加载模型时排除了句法分析器(parser),因为它不依赖复杂的依存句法分析,只需要基本的词性标注功能即可。另外,它在初始化函数里多了两个参数:

extract_graph_nlp:
  text_analyzer:
    extractor_type: cfg
    # ... 同上
    noun_phrase_tags:            # 识别名词短语的词性标签
      - PROPN  # 专有名词,指特定的人、地方、组织等的名称,如 "London"(伦敦)、"Apple"(苹果公司)、"Alice"(爱丽丝)等
      - NOUNS  # 普通名词,如 "book"(书)、"city"(城市)、"idea"(想法)等
    noun_phrase_grammars:        # CFG 语法规则
      "PROPN,PROPN": "PROPN"
      "NOUN,NOUN": "NOUNS"
      "NOUNS,NOUN": "NOUNS"
      "ADJ,ADJ": "ADJ"
      "ADJ,NOUN": "NOUNS"

这两个参数就是 CFG 提取器的核心,CFG 其实是一种形式语法,由四元组 G = (V, Σ, R, S) 定义:

  • V: 非终结符集合 (如 NP, VP)
  • Σ: 终结符集合 (如具体单词)
  • R: 产生式规则集合 (如 NP → DT NN)
  • S: 起始符号

光看这个定义可能比较抽象,实际上就是先定义一个 CFG 规则集合,集合中包含多个键值对,键为词性标签对 (POS1, POS2),值为合并后的词性,比如上面的配置对应下面的语法规则:

"PROPN,PROPN": "PROPN", # 专有名词 + 专有名词 → 专有名词
"NOUN,NOUN": "NOUNS",   # 名词 + 名词 → 复合名词  
"NOUNS,NOUN": "NOUNS",  # 复合名词 + 名词 → 复合名词
"ADJ,ADJ": "ADJ",       # 形容词 + 形容词 → 形容词
"ADJ,NOUN": "NOUNS",    # 形容词 + 名词 → 复合名词

然后依次获取相邻两个词的词性标签对,在规则集合中查找匹配的规则并合并,通过反复应用语法规则,将简单词汇组合成更复杂的名词短语。我们看一个具体的例子,假设有文本 "big red car",处理过程如下:

  1. 初始标注: [("big", "ADJ"), ("red", "ADJ"), ("car", "NOUN")]
  2. 第一轮合并:

    • 检查 ("ADJ", "ADJ") → 找到规则 "ADJ,ADJ": "ADJ"
    • 合并: [("big red", "ADJ"), ("car", "NOUN")]
  3. 第二轮合并:

    • 检查 ("ADJ", "NOUN") → 找到规则 "ADJ,NOUN": "NOUNS"
    • 合并: [("big red car", "NOUNS")]
  4. 结果筛选:

    • 如果 "NOUNS" 在 noun_phrase_tags 中,则提取为名词短语

CFG 提取器的核心就是一个迭代式的合并算法:

def extract_cfg_matches(self, doc: Doc) -> list[tuple[str, str]]:
  # 1. 预处理:提取词汇和词性标注对,过滤无效词汇
  tagged_tokens = [
    (token.text, token.pos_) for token in doc
    if token.pos_ not in self.exclude_pos_tags  # 过滤指定词性
    and token.is_space is False                 # 过滤空格
    and token.text != "-"                       # 过滤连字符
  ]

  # 2. 迭代式合并:持续寻找匹配的语法模式
  merge = True
  while merge:
    merge = False
    for index in range(len(tagged_tokens) - 1):
      first, second = tagged_tokens[index], tagged_tokens[index + 1]
      key = first[1], second[1]  # 构建POS标签对作为查找键
      value = self.noun_phrase_grammars.get(key, None)  # 查找语法规则
      if value:
        # 找到匹配模式:移除原来的两个词汇,插入合并结果
        merge = True
        tagged_tokens.pop(index)     # 移除第一个词汇
        tagged_tokens.pop(index)     # 移除第二个词汇(索引已向前移动)
        match = f"{first[0]}{self.word_delimiter}{second[0]}"  # 合并文本
        pos = value  # 使用语法规则定义的新词性
        tagged_tokens.insert(index, (match, pos))  # 插入合并结果
        break
    
  # 3. 筛选结果:只返回符合目标词性的词汇
  return [t for t in tagged_tokens if t[1] in self.noun_phrase_tags]

通过自底向上的迭代合并,CFG 提取器从词汇级别逐步构建更复杂的语法结构,能有效识别复合名词短语,如 "Microsoft Corporation"、"machine learning algorithm" 等。由于少了依存句法分析,CFG 提取器一般比句法分析提取器更快,但需要为不同语言定制语法规则。另外有趣的是,CFG 的提取逻辑实际上参考了 TextBlob 的 FastNPExtractor 实现,感兴趣的可以看下 TextBlob 的源码。

正则表达式提取器(RegexEnglish)

这是基于 TextBlob 的最快速提取器,专门针对英语文本优化,它充分利用了前面介绍的 TextBlob 库的名词短语提取能力。核心实现位于 RegexENNounPhraseExtractor 类:

def __init__(
    self,
    exclude_nouns: list[str],
    max_word_length: int,
    word_delimiter: str,
):
    # 自动下载必需的语料库
    download_if_not_exists("brown")    # 布朗语料库
    download_if_not_exists("treebank") # 宾州树库
    download_if_not_exists("averaged_perceptron_tagger_eng") # 词性标注器

    # 下载分词器
    download_if_not_exists("punkt")     # 句子分割器
    download_if_not_exists("punkt_tab") # 句子分割器的表格版本

    # 预加载语料库,避免多线程竞争条件
    nltk.corpus.brown.ensure_loaded()
    nltk.corpus.treebank.ensure_loaded()

提取器在初始化时会自动下载和预加载所需的 NLTK 资源,另外能看到它的配置参数要简单的多:

extract_graph_nlp:
  text_analyzer:
    extractor_type: regex_english
    max_word_length: 15          # 最大单词长度
    word_delimiter: " "          # 单词分隔符
    exclude_nouns:               # 排除停用词,大多是日常交流中使用频繁但本身不携带太多关键信息的词汇
      - stuff
      - thing
      - things
      - ...

提取器的核心实现基于 TextBlob 的词性标注和名词短语提取能力:

def extract(self, text: str) -> list[str]:
  # 1. 创建TextBlob对象,自动执行分词和词性标注
  blob = TextBlob(text)
  
  # 2. 提取专有名词:识别NNP标签的词汇
  proper_nouns = [token[0].upper() for token in blob.tags if token[1] == "NNP"]
  
  # 3. 使用TextBlob内置的noun_phrases属性提取名词短语  
  tagged_noun_phrases = [
    self._tag_noun_phrases(chunk, proper_nouns)
    for chunk in blob.noun_phrases  # 
  ]
  
  # 4. 应用过滤规则,保留有效的名词短语
  filtered_noun_phrases = set()
  for tagged_np in tagged_noun_phrases:
    if (
      tagged_np["has_proper_nouns"]           # 包含专有名词
      or len(tagged_np["cleaned_tokens"]) > 1  # 多词短语
      or tagged_np["has_compound_words"]      # 复合词
    ) and tagged_np["has_valid_tokens"]:       # 词汇长度有效
      filtered_noun_phrases.add(tagged_np["cleaned_text"])
      
  return list(filtered_noun_phrases)

值得注意的是,这里的 noun_phrases 是通过 TextBlob 的 FastNPExtractor 提取的,正如上面所说,CFG 提取器就是参考它实现的,因此这个提取器虽然名字叫正则表达式提取器,实际上仍然是基于 CFG 实现的。

名词图谱构建

选定名词短语提取器后,下一步是构建名词图谱,这个过程由 build_noun_graph() 函数实现:

async def build_noun_graph(
  text_unit_df: pd.DataFrame,
  text_analyzer: BaseNounPhraseExtractor,
  normalize_edge_weights: bool,
  num_threads: int = 4,
  cache: PipelineCache | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:

  # 提取节点
  nodes_df = await _extract_nodes(
    text_units, text_analyzer, num_threads=num_threads, cache=cache
  )
  # 提取边
  edges_df = _extract_edges(
    nodes_df, normalize_edge_weights=normalize_edge_weights
  )

  return (nodes_df, edges_df)

整个构建过程分为两个关键步骤:节点提取和边提取。

节点提取

节点提取的核心逻辑位于 _extract_nodes() 函数:

async def _extract_nodes(
  text_unit_df: pd.DataFrame,
  text_analyzer: BaseNounPhraseExtractor,
  num_threads: int = 4,
  cache: PipelineCache | None = None,
) -> pd.DataFrame:

  # 定义提取函数
  async def extract(row):
    text = row["text"]
    attrs = {"text": text, "analyzer": str(text_analyzer)}
    key = gen_sha512_hash(attrs, attrs.keys())
    
    # 缓存机制:先检查缓存
    result = await cache.get(key)
    if not result:
      # 缓存未命中,使用提取器提取
      result = text_analyzer.extract(text)
      await cache.set(key, result)
    return result

  # 并行处理所有文本单元
  text_unit_df["noun_phrases"] = await derive_from_rows(
    text_unit_df,
    extract,
    num_threads=num_threads,
    async_type=AsyncType.Threaded,
  )

  # 展开名词短语到单独行
  noun_node_df = text_unit_df.explode("noun_phrases")
  noun_node_df = noun_node_df.rename(
    columns={"noun_phrases": "title", "id": "text_unit_id"}
  )

  # 按名词短语标题分组,统计频率
  grouped_node_df = (
    noun_node_df.groupby("title").agg({"text_unit_id": list}).reset_index()
  )
  grouped_node_df["frequency"] = grouped_node_df["text_unit_id"].apply(len)

  return grouped_node_df[["title", "frequency", "text_unit_id"]]

整个过程比较简单,在代码中已有详细注释,核心点有几个:

  • 提取器调用:根据配置调用对应的提取器,从文本单元中提取出名词短语
  • 异步并发处理:使用 derive_from_rows 进行多线程并行处理
  • 缓存机制:通过 SHA512 哈希键实现缓存,避免重复计算
  • 频率统计:统计每个名词短语在多少个文本单元中出现

边提取

边提取的核心逻辑位于 _extract_edges() 函数:

def _extract_edges(
  nodes_df: pd.DataFrame,
  normalize_edge_weights: bool = True,
) -> pd.DataFrame:

  # 1. 展开节点到文本单元级别
  text_units_df = nodes_df.explode("text_unit_ids")
  text_units_df = text_units_df.rename(columns={"text_unit_ids": "text_unit_id"})

  # 2. 按文本单元分组,只保留包含2个或更多名词短语的文本单元
  text_units_df = (
    text_units_df.groupby("text_unit_id")
    .agg({"title": lambda x: list(x) if len(x) > 1 else np.nan})
    .reset_index()
  )
  text_units_df = text_units_df.dropna()

  # 3. 使用组合算法生成所有可能的边对
  from itertools import combinations
  titles = text_units_df["title"].tolist()
  all_edges = [list(combinations(t, 2)) for t in titles]

  # 4. 标准化边方向(确保 source ≤ target)
  edge_df[["source", "target"]] = edge_df.loc[:, "edges"].to_list()
  edge_df["min_source"] = edge_df[["source", "target"]].min(axis=1)
  edge_df["max_target"] = edge_df[["source", "target"]].max(axis=1)
  edge_df = edge_df.rename(
    columns={"min_source": "source", "max_target": "target"}
  )

  # 5. 按源节点和目标节点分组,计算权重
  grouped_edge_df = (
    edge_df.groupby(["source", "target"]).agg({"text_unit_id": list}).reset_index()
  )
  grouped_edge_df["weight"] = grouped_edge_df["text_unit_id"].apply(len)

  # 6. 可选的 PMI 权重归一化
  if normalize_edge_weights:
    grouped_edge_df = calculate_pmi_edge_weights(nodes_df, grouped_edge_df)

  return grouped_edge_df

边提取采用了基于 共现(Co-occurrence) 的关系提取策略,核心思想是:在同一个文本单元中出现的名词短语之间存在潜在关系。假设文本单元包含名词短语 ["Apple", "iPhone", "Technology"],算法会首先使用 combinations(t, 2) 生成所有可能的两两组合:

[
  ("Apple","iPhone"), 
  ("Apple","Technology"), 
  ("iPhone","Technology")
]

然后标准化边的方向,确保无向图中的边始终按统一方向存储,避免重复边(如 A-B 和 B-A 被视为同一条边);接着根据每对边出现的次数计算权重,原始权重=1;再根据各实体的全局频率调整权重,这里的使用的是 PMI 归一化。

PMI(Pointwise Mutual Information,点互信息) 是信息论中用于衡量两个事件(或特征)之间关联程度的指标,它描述了两个事件同时发生的概率与它们独立发生时概率乘积之间的偏离程度。使用 PMI 可以减少对低频实体的偏重,它的计算公式如下:

pmi.png

最终生成三条带权重的无向边。

图谱修剪

至此,我们终于完成了基于 NLP 的图谱构建,通过传统的自然语言技术,从文本单元中提取出节点和边。不过使用 NLP 提取的原始图谱通常包含大量的噪声节点和边,我们还需要对其进行进一步的修剪,这也就是 prune_graph 工作流要做的事:

def prune_graph(
  entities: pd.DataFrame,
  relationships: pd.DataFrame,
  pruning_config: PruneGraphConfig,
) -> tuple[pd.DataFrame, pd.DataFrame]:
  
  # 创建一个临时图
  graph = create_graph(relationships, edge_attr=["weight"], nodes=entities)

  # 根据配置对图进行修剪
  pruned = prune_graph_operation(
    graph,
    min_node_freq=pruning_config.min_node_freq,
    max_node_freq_std=pruning_config.max_node_freq_std,
    min_node_degree=pruning_config.min_node_degree,
    max_node_degree_std=pruning_config.max_node_degree_std,
    min_edge_weight_pct=pruning_config.min_edge_weight_pct,
    remove_ego_nodes=pruning_config.remove_ego_nodes,
    lcc_only=pruning_config.lcc_only,
  )

  # 将修剪后的图还原成 DataFrame 并返回
  pruned_nodes, pruned_edges = graph_to_dataframes(
    pruned, node_columns=["title"], edge_columns=["source", "target"]
  )

首先将 DataFrame 转换成 NetworkX 图,然后对图进行修剪,最后还原成 DataFrame 并返回。修剪的策略主要是通过 settings.yaml 文件进行配置:

prune_graph:
  min_node_freq: 2           # 最小节点频率阈值
  max_node_freq_std: 2.0     # 节点频率标准差上限  
  min_node_degree: 1         # 最小节点度数阈值
  max_node_degree_std: 2.0   # 节点度数标准差上限
  min_edge_weight_pct: 0.1   # 最小边权重百分位数
  remove_ego_nodes: false    # 是否移除 ego 节点
  lcc_only: true             # 是否只保留最大连通分量

这里涉及一些统计学和图形学中的基础概念,比如节点频率、节点度数、标准差、边的权重、百分位数、ego 节点、最大连通分量等,下面是我对这些参数做一个简单的总结:

  1. 频率修剪

    • 移除频率低于最小阈值的节点,如果节点频率(出现次数)低于 min_node_freq 则移除;
    • 移除频率高于标准差上限的节点,通过 np.std 计算标准差,通过 np.mean 计算平均值,如果节点频率高于 mean + max_node_freq_std * std 则移除;
  2. 度数修剪

    • 移除度数低于最小阈值的节点,如果节点度数(连接的边个数)低于 min_node_degree 则移除;
    • 移除度数高于标准差上限的节点,通过 np.std 计算标准差,通过 np.mean 计算平均值,如果节点度数高于 mean + max_node_degree_std * std 则移除;
  3. 权重修剪:移除权重低于百分位数阈值的边,通过 np.percentile 计算所有边权重相对于 min_edge_weight_pct 的百分位数,如果边权重低于该值则移除;
  4. 中心节点修剪:可选移除度数最高的节点,ego 节点通常是图中连接最密集的中心节点,可能会过度影响图的结构和分析结果,移除中心节点后,更容易发现图中真实的社区结构和聚类模式;
  5. 连通性修剪:可选只保留图中最大的连通子图,孤立的节点或较小的连通分量会被移除;

小结

通过今天的学习,我们掌握了以下要点:

  • 三种名词短语提取器:详细了解了 Syntactic、CFG 和 RegexEnglish 三种提取方法的原理、特点和适用场景,学会了如何根据需求选择合适的提取器;
  • 共现关系构建边:深入理解了基于文本单元内名词短语共现的关系提取算法,包括组合生成、权重计算和 PMI 归一化等关键技术;
  • 图谱修剪策略:掌握了多维度统计过滤的修剪方法,了解如何通过频率、度数、权重等指标清理噪声节点和边;

通过基于 NLP 的知识提取,我们已经从文本单元中构建出了包含实体和关系的初步知识图谱。相比于基于大模型的知识提取,传统 NLP 方法不仅更便宜,而且处理速度也有着显著的提升,不过这种方法的局限性也很明显:

  1. 语义理解有限:很多功能都是基于统计方法实现,缺乏深层语义理解;
  2. 关系类型单一:只能提取隐式共现关系,无法识别具体的关系类型;
  3. 上下文依赖:依赖文本单元的分块质量,为减少这方面的影响,分块大小不要设置的过大,一般在 50 到 100 即可;
  4. 噪声问题:容易引入无意义的共现关系;

因此在技术选型时也需要根据实际情况综合对比,如果你主要就是想做全局搜索,生成点摘要,那 NLP 方法就挺合适,另外一种比较好的做法是将 NLP 方法作为基准。

好了,关于 GraphRAG 知识提取的内容就学到这了。在下一篇文章中,我们将学习 GraphRAG 如何基于这些提取的实体和关系进行更深层的图谱分析,包括社区检测、层次化聚类和向量化索引,最终构建出完整的多层次知识图谱索引体系。


GraphRAG 索引构建之知识提取(三)

今天,我们将继续学习 GraphRAG 中关于知识提取的内容。上周的 extract_graph 工作流,完全基于大语言模型进行实体关系提取,质量高但速度较慢;为此 GraphRAG 还提供了另一种实现,基于传统的自然语言处理(NLP)技术,实现更快速、更低成本的实体关系提取。我们可以通过 Fast 索引方法 开启该功能:

$ graphrag index --root ./ragtest --method fast

我们重点学习 Fast 方法中的两个核心工作流:

  • 基于 NLP 的图谱提取(extract_graph_nlp - 使用 NLP 技术从文本中提取名词短语作为实体
  • 图谱修剪(prune_graph - 对提取结果进行统计过滤和清理

基于 NLP 的图谱提取

extract_graph_nlp 工作流的核心实现位于 index/workflows/extract_graph_nlp.py 文件,整个流程相当简洁:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  # 1. 加载文本单元
  text_units = await load_table_from_storage("text_units", context.output_storage)

  # 2. 执行 NLP 图谱提取
  entities, relationships = await extract_graph_nlp(
    text_units,
    extraction_config=config.extract_graph_nlp,
  )

  # 3. 保存结果
  await write_table_to_storage(entities, "entities", context.output_storage)
  await write_table_to_storage(relationships, "relationships", context.output_storage)

与基于大模型的 extract_graph 相比,这个工作流的实现要简单得多,核心逻辑都封装在 extract_graph_nlp() 函数中:

async def extract_graph_nlp(
  text_units: pd.DataFrame,
  extraction_config: ExtractGraphNLPConfig,
) -> tuple[pd.DataFrame, pd.DataFrame]:
  # 创建名词短语提取器
  text_analyzer = create_noun_phrase_extractor(text_analyzer_config)

  # 构建名词图谱
  extracted_nodes, extracted_edges = await build_noun_graph(
    text_units,
    text_analyzer=text_analyzer,
    normalize_edge_weights=extraction_config.normalize_edge_weights,
    num_threads=extraction_config.concurrent_requests,
  )

  # 添加下游工作流所需的字段
  extracted_nodes["type"] = "NOUN PHRASE"
  extracted_nodes["description"] = ""
  extracted_edges["description"] = ""

  return (extracted_nodes, extracted_edges)

整个工作流的执行流程如下:

extract-graph-nlp-flow.png

大致可分为三个步骤:首先创建名词短语提取器,然后构建名词图谱,最后为提取的节点和边添加必要的元数据字段。

名词短语提取器

GraphRAG 提供了三种不同的名词短语提取方法,每种都有其独特的优势和适用场景,通过工厂模式进行统一管理:

class NounPhraseExtractorFactory:
  @classmethod
  def get_np_extractor(cls, config: TextAnalyzerConfig) -> BaseNounPhraseExtractor:
    # 根据配置创建名词短语提取器
    match np_extractor_type:
      # 句法分析提取器
      case NounPhraseExtractorType.Syntactic:
        return SyntacticNounPhraseExtractor(...)
      # 上下文无关文法提取器
      case NounPhraseExtractorType.CFG:
        return CFGNounPhraseExtractor(...)
      # 正则表达式提取器
      case NounPhraseExtractorType.RegexEnglish:
        return RegexENNounPhraseExtractor(...)

我们可以在 settings.yaml 中配置使用哪种提取器:

extract_graph_nlp:
  text_analyzer:
    extractor_type: regex_english # [regex_english, syntactic_parser, cfg]

三种提取器的特点如下:

  • 句法分析提取器(Syntactic):基于 依存句法分析(Dependency Parsing)命名实体识别(NER) 的名词短语提取器,使用 SpaCy 实现。该提取器相比基于正则表达式的提取器往往能产生更准确的结果,但速度较慢。此外,通过使用 SpaCy 相应的模型,它可以用于不同的语言。
  • 上下文无关文法提取器(CFG):基于 上下文无关文法(CFG)命名实体识别(NER) 的名词短语提取器,同样使用 SpaCy 实现。该提取器往往比基于句法分析的提取器更快,但对于不同语言可能需要修改语法规则。
  • 正则表达式提取器(RegexEnglish):基于正则表达式的名词短语提取器,使用 TextBlob 实现。它是默认提取器,也是 LazyGraphRAG 首次基准测试中使用的提取器,但它只适用于英文。它比基于句法分析的提取器更快,但准确性可能更低。

下表是三种提取器的一个简单对比:

特性SyntacticCFGRegexEnglish
准确性最高中等较低
速度最慢中等最快
多语言支持需配置仅英语
资源占用中等
依赖复杂度SpaCy 模型SpaCy 基础TextBlob+NLTK
可定制性配置参数语法规则有限

建议根据实际的需求进行选择,比如:高质量需求选择 Syntactic 提取器,快速原型选择 RegexEnglish 提取器,平衡性能选择 CFG 提取器。下面我们将详细介绍这三种提取器的实现,但在深入具体源码之前,让我们先了解一下这些提取器所依赖的核心 NLP 库。

SpaCy 简单介绍

SpaCy 是一个专为生产环境设计的现代自然语言处理库,在 GraphRAG 的名词短语提取中扮演着重要角色。它提供了高效的文本处理管道,支持多种语言和复杂的 NLP 任务。

spacy.png

SpaCy 基于 Cython 实现,处理速度极快,支持 75+ 不同的语言,并内置了多种语言的预训练模型,下面是一些常用的 SpaCy 模型:

  • en_core_web_sm: 英语小型模型(12MB)- 基础功能
  • en_core_web_md: 英语中型模型(31MB)- 包含词向量
  • en_core_web_lg: 英语大型模型(382MB)- 高精度
  • zh_core_web_sm: 中文小型模型(46MB)- 支持中文处理

每一种模型都采用管道(Pipeline)架构,文本按顺序经过多个处理组件,每个组件可以选择性的启用或禁用:

spacy-pipeline.png

这些组件包括:

  • tok2vec: 将文本转换为向量表示
  • tagger: 词性标注器,为每个词(token)分配词性标签(Part-of-Speech Tag)
  • parser: 句法分析器,分析句子的句法结构,包括词语之间的依赖关系(Dependency Parsing)
  • attribute_ruler: 属性规则器,通过自定义规则修正或补充文本的属性(如词性、形态等)
  • lemmatizer: 词形还原器,将词语还原为其基本形式(词根)
  • ner: 命名实体识别器,识别文本中的命名实体(Named Entity)并分类

更多关于 SpaCy 模型、管道和组件的内容,可参考下面的文档:

让我们通过一个简单的示例来了解 SpaCy 的基本功能。首先,下载我们所需的模型:

$ python -m spacy download en_core_web_sm

通过 spacy.load 加载模型:

import spacy

nlp = spacy.load("en_core_web_sm")

然后准备一段简单的测试文本:

text = "Apple Inc, founded by Steve Jobs, develops innovative iPhone technology in California."
doc = nlp(text)

接下来就可以使用 SpaCy 的各个功能了,比如分词和词性标注:

for token in doc:
  print(f"{token.text:12} {token.pos_:8} {token.tag_:6} {token.lemma_}")

这里的几个属性解释如下:

  • text 为单词的原始文本;
  • pos_ 为单词的词性,比如名词(noun)、动词(verb)、形容词(adjective)、副词(adverb)等;
  • tag 为词性标签,用于表示具体词性的符号或缩写,不同的标注体系会定义不同的 TAG 符号,例如在英文的 Penn Treebank 标注体系中,"NN" 表示名词(noun, singular),"VB" 表示动词(verb, base form),"JJ" 表示形容词(adjective);而在中文的北大分词标注体系中,"n" 表示名词,"v" 表示动词,"a" 表示形容词等;
  • lemma 为词根或词元,指的是将词语的各种变形形式(如过去式、复数、比较级等)还原为其最基本的形式,比如将 "running" 还原为 "run","better" 还原为 "good" 等;

上面的代码输出如下:

Apple        PROPN    NNP    Apple
Inc          PROPN    NNP    Inc
,            PUNCT    ,      ,
founded      VERB     VBN    found
by           ADP      IN     by
Steve        PROPN    NNP    Steve
Jobs         PROPN    NNP    Jobs
,            PUNCT    ,      ,
develops     VERB     VBZ    develop
innovative   ADJ      JJ     innovative
iPhone       PROPN    NNP    iPhone
technology   NOUN     NN     technology
in           ADP      IN     in
California   PROPN    NNP    California
.            PUNCT    .      .

命名实体识别和名词块提取功能:

for ent in doc.ents:
  print(f"{ent.text:20} {ent.label_:10} {spacy.explain(ent.label_)}")

for chunk in doc.noun_chunks:
  print(f"{chunk.text:25} {chunk.root.text:10} {chunk.root.dep_}")

输出如下:

# 命名实体
Apple Inc            ORG        Companies, agencies, institutions, etc.
Steve Jobs           PERSON     People, including fictional
California           GPE        Countries, cities, states

# 名词块
Apple Inc                 Inc        nsubj
Steve Jobs                Jobs       pobj
innovative iPhone technology technology dobj
California                California pobj

命名实体(entities)和名词块(noun_chunks)是 SpaCy 中两个不同的概念,但很容易混淆。命名实体基于 NER 模型识别,而名词块基于句法分析识别,命名实体相对于名词块更具语义性,比如 "The big red car was expensive" 这句话,通过识别语法上的名词短语结构,可以抽取出 "The big red car" 名词块,但是可能抽不出命名实体。

TextBlob 简单介绍

TextBlob 是一个简单而强大的 Python 文本处理库,专为快速原型开发和教学设计。在 GraphRAG 的 RegexEnglish 提取器中,TextBlob 提供了高效的英语文本处理能力。

textblob.png

TextBlob 的核心特性如下:

  • 简洁的 API:提供直观的文本处理接口
  • 快速处理:基于 NLTK 和正则表达式的高效实现
  • 内置功能:集成了常用的 NLP 任务
  • 轻量级:依赖少,安装简单

TextBlob 提供了丰富的文本处理功能:

from textblob import TextBlob

# 创建 TextBlob 对象
text = "Apple Inc, founded by Steve Jobs, develops innovative iPhone technology in California."
blob = TextBlob(text)

# 1. 分词
print("分词结果:")
print(blob.words)
# 输出:['Apple', 'Inc', 'founded', 'by', 'Steve', 'Jobs', 'develops', 'innovative', 'iPhone', 'technology', 'in', 'California']

# 2. 句子分割
print("\n句子分割:")
for sentence in blob.sentences:
    print(f"- {sentence}")
# 输出:- Apple Inc, founded by Steve Jobs, develops innovative iPhone technology in California.

# 3. 词性标注
print("\n词性标注:")
for word, pos in blob.tags:
    print(f"{word:12} {pos}")
# 输出:
# Apple        NNP
# Inc          NNP  
# founded      VBN
# by           IN
# Steve        NNP
# ...

# 4. 名词短语
print("\n名词短语:")
for noun in blob.noun_phrases:
    print(f"- {noun}")
# 输出:
# - apple inc
# - steve jobs
# - innovative iphone technology
# - california

由于 TextBlob 基于 NLTK 实现,它依赖 NLTK 的一些资源,比如 punkt_tab 用于分割句子,averaged_perceptron_tagger_eng 用于词性标注,brown 用于名词短语提取(布朗语料库),可以通过 textblob.download_corpora 全量下载:

$ python -m textblob.download_corpora

可以看出,TextBlob 的功能和 SpaCy 是有部分重合的,TextBlob 的优势是轻量级、简单易用、快速上手,比较适合原型开发和教学,但是它准确性有限、多语言支持不足、扩展性和配置选项相对较少,在生产环境建议还是使用 SpaCy。因此在 GraphRAG 的代码里也可以看到,作者计划将 TextBlob 移除,使用 SpaCy 来重新实现 RegexEnglish 提取器。

未完待续

我们今天主要学习 GraphRAG 中基于 NLP 的知识提取流程,这是 Fast 索引方法的核心组件。我们了解到 Fast 索引方法中关于知识提取的有两个工作流:基于 NLP 的图谱提取图谱修剪。为了更好地学习相关的提取技术,我们先介绍了 SpaCy 和 TextBlob 这两个 NLP 库的使用,通过简单示例加深基本概念的理解。

明天我们将继续学习这部分内容,包括三种名词短语提取器的实现原理,通过共现关系构建边,以及多种不同的图谱修剪策略。


GraphRAG 索引构建之知识提取(二)

昨天我们学习了知识提取阶段的提取图谱工作流,今天我们继续学习另外两个:图谱规范化提取事实声明

图谱规范化

提取出的原始实体和关系需要进一步处理才能构成完整的知识图谱,finalize_graph 工作流负责这一规范化过程:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:

  # 1. 加载实体和关系
  entities = await load_table_from_storage("entities", context.output_storage)
  relationships = await load_table_from_storage("relationships", context.output_storage)

  # 2. 执行图谱规范化
  final_entities, final_relationships = finalize_graph(
    entities,
    relationships,
    embed_config=config.embed_graph,
    layout_enabled=config.umap.enabled,
  )

  # 3. 保存结果
  await write_table_to_storage(final_entities, "entities", context.output_storage)
  await write_table_to_storage(final_relationships, "relationships", context.output_storage)
  if config.snapshots.graphml:
    # 生成图描述文件
    graph = create_graph(final_relationships, edge_attr=["weight"])
    await snapshot_graphml(
      graph,
      name="graph",
      storage=context.output_storage,
    )

处理流程很清晰,首先加载上一步提取的实体和关系,然后调用 finalize_graph() 函数对其进行规范化处理,最后将处理后的数据重新保存,并根据配置生成 GraphML 图描述文件,便于外部工具进行图分析和可视化,这也就是我们之前学习可视化时所导入的文件。

其中 finalize_graph() 函数的实现如下:

def finalize_graph(
  entities: pd.DataFrame,
  relationships: pd.DataFrame,
  embed_config: EmbedGraphConfig | None = None,
  layout_enabled: bool = False,
) -> tuple[pd.DataFrame, pd.DataFrame]:
  # 实体规范化
  final_entities = finalize_entities(
    entities, relationships, embed_config, layout_enabled
  )
  # 关系规范化
  final_relationships = finalize_relationships(relationships)
  return (final_entities, final_relationships)

可以看出,它主要包括实体规范化和关系规范化:

  • 实体规范化:使用 NetworkX 创建网络图,计算每个节点的度数,使用 Node2Vec 生成图嵌入向量,使用 UMAP 降维算法进行图布局计算,得到 2D 可视化坐标,为每个实体分配唯一的 UUID 和人类可读的 ID 并输出最终规范化的实体对象;
  • 关系规范化:基于 source 和 target 去除重复的关系,计算每个关系的组合度数,为每个关系分配唯一的 UUID 和人类可读的 ID 并输出最终规范化的关系对象;

下面我们仔细看下这两个部分。

实体规范化

实体规范化的逻辑位于 finalize_entities() 函数:

def finalize_entities(
  entities: pd.DataFrame,
  relationships: pd.DataFrame,
  embed_config: EmbedGraphConfig | None = None,
  layout_enabled: bool = False,
) -> pd.DataFrame:
  # 创建网络图
  graph = create_graph(relationships, edge_attr=["weight"])
  # 生成图嵌入向量
  graph_embeddings = embed_graph(graph, embed_config)
  # 计算图布局
  layout = layout_graph(graph, layout_enabled, embeddings=graph_embeddings)
  # 计算节点度数
  degrees = compute_degree(graph)
  # 生成最终实体表
  final_entities = (
    entities.merge(layout, left_on="title", right_on="label", how="left")
    .merge(degrees, on="title", how="left")
    .drop_duplicates(subset="title")
  )
  # 字段规整
  final_entities = final_entities.loc[entities["title"].notna()].reset_index()
  final_entities["degree"] = final_entities["degree"].fillna(0).astype(int)
  final_entities.reset_index(inplace=True)
  final_entities["human_readable_id"] = final_entities.index
  final_entities["id"] = final_entities["human_readable_id"].apply(
    lambda _x: str(uuid4())
  )
  # 只保留预定义的规范化的字段
  return final_entities.loc[
    :,
    ENTITIES_FINAL_COLUMNS,
  ]

这个函数还比较有意思,涉及了多个图分析和机器学习技术。比如图的构建:

graph = nx.from_pandas_edgelist(edges, edge_attr=["weight"])

这里使用 NetworkX 库从边列表构建无向图,同时保留边的权重属性,支持加权图分析。基于这个图,我们可以计算每个节点的度数:

def compute_degree(graph: nx.Graph) -> pd.DataFrame:
  return pd.DataFrame([
    {"title": node, "degree": int(degree)}
    for node, degree in graph.degree
  ])

节点度数反映了实体的重要性和连接性,为后续的图分析提供数据基础。

再比如使用 Node2Vec 对图进行嵌入:

embeddings = embed_node2vec(
  graph=graph,
  dimensions=config.dimensions,   # 默认1536维
  num_walks=config.num_walks,     # 默认10次随机游走
  walk_length=config.walk_length, # 默认40步长
  window_size=config.window_size, # 默认2窗口大小
  iterations=config.iterations,   # 默认3次迭代
  random_seed=config.random_seed, # 默认86种子
)

该功能默认禁用,可通过下面的配置开启:

embed_graph:
  enabled: true

这里其实使用的是 graspologic 库的 embed.node2vec_embed() 方法,感兴趣的可以看下 graspologic 的文档:

通过将节点映射到 1536 维向量空间,捕获图的拓扑结构和语义信息,主要用于图的可视化分析。

此外,我们还可以通过下面的配置,在图嵌入之前将图限制为 最大连通分量(Largest Connected Component, LCC)

embed_graph:
  use_lcc: true

这意味着只有图中最大的连通子图会被用于生成嵌入向量,孤立的节点或较小的连通分量会被过滤掉。这样做的好处是:

  • 提高嵌入质量:Node2Vec 算法需要在图上进行随机游走,连通的图能产生更有意义的节点嵌入;
  • 确保稳定性:使用相同的最大连通分量可以确保结果的一致性;
  • 高效率:只处理最重要的连通部分,减少计算开销;

LCC 的计算同样使用的 graspologic 库,参考 utils.largest_connected_component() 函数。

此外,finalize_entities() 函数还使用了降维技术,通过 UMAP 算法对上一步计算的图嵌入执行降维操作,为实体提供 x/y 位置坐标,便于在 2D 平面下进行可视化分析。这个功能同样需要在配置中开启:

umap:
  enabled: true # 必须同时开启 embed_graph 功能

UMAP 的全称为 均匀流形逼近与投影(Uniform Manifold Approximation and Projection),是一种用于高维数据降维的机器学习算法。它在保留数据的局部和全局结构方面表现出色,广泛应用于数据可视化、特征学习等领域,与 t-SNE 等算法类似,但在计算效率和保留全局结构上往往更具优势。

umap.png

直接使用 umap-learn 库计算即可:

embedding_positions = umap.UMAP(
  min_dist=min_dist,         # 默认0.75最小距离,控制点的紧密程度
  n_neighbors=n_neighbors,   # 默认5个邻居,控制局部/全局平衡
  spread=spread,             # 默认1传播参数
  n_components=2,            # 2D可视化
  metric="euclidean",        # 欧几里得距离
  random_state=86,           # 随机种子
).fit_transform(embedding_vectors)

对这些参数感兴趣的同学可以参考 umap-learn 的官方文档:

关于 UMAP 降维,我们之前在学习 RAGFlow 的 RAPTOR 分块策略时也曾简单介绍过,只不过 RAGFlow 是对分块的 Embedding 向量进行降维,方便做聚类,而这里是对 Node2Vec 图嵌入进行降维,用于图的可视化。

上面这些计算结果,最终都会融合到原始的实体表中:

final_entities = (
  entities.merge(layout, left_on="title", right_on="label", how="left")
    .merge(degrees, on="title", how="left")
    .drop_duplicates(subset="title")
)

通过 pandas 的左连接策略,确保所有原始实体都被保留,然后过滤 title 为空的记录,对 degree 为空的用 0 填充,以及 UUID 和 ID 的生成,最后使用预定义的 ENTITIES_FINAL_COLUMNS 生成规范化的实体表,包含如下字段:

  • id:实体唯一标识符
  • human_readable_id:人类可读的递增ID
  • title:实体名称
  • type:实体类型
  • description:实体描述
  • text_unit_ids:来源文本单元ID
  • frequency:出现频次
  • degree:节点度数
  • xy:2D 坐标

关系规范化

关系规范化的逻辑相对简洁,没有那么多的弯弯绕绕,位于 finalize_relationships() 函数:

def finalize_relationships(
  relationships: pd.DataFrame,
) -> pd.DataFrame:
  # 创建网络图
  graph = create_graph(relationships, edge_attr=["weight"])
  # 计算节点度数
  degrees = compute_degree(graph)
  # 基于 source 和 target 去重
  # 注意这里没有区分关系方向,也没有区分关系类型
  final_relationships = relationships.drop_duplicates(subset=["source", "target"])
  # 计算关系的组合度数
  final_relationships["combined_degree"] = compute_edge_combined_degree(final_relationships, degrees)
  # 字段规整
  final_relationships.reset_index(inplace=True)
  final_relationships["human_readable_id"] = final_relationships.index
  final_relationships["id"] = final_relationships["human_readable_id"].apply(
    lambda _x: str(uuid4())
  )
  # 只保留预定义的规范化的字段
  return final_relationships.loc[
    :,
    RELATIONSHIPS_FINAL_COLUMNS,
  ]

首先从关系数据构建图结构,计算节点度数,然后对关系去重处理,计算关系的组合度数,最后字段规整,并通过 RELATIONSHIPS_FINAL_COLUMNS 只保留预定义的规范化字段:

  • id:关系唯一标识符
  • human_readable_id:人类可读的递增ID
  • source:关系的源实体
  • target:关系的目标实体
  • description:关系描述
  • weight:关系权重
  • combined_degree:组合度数
  • text_unit_ids:关联的文本单元

事实声明提取

除了实体和关系,GraphRAG 还可以从文本单元中提取事实性的 声明(Claims),为每个实体提供更丰富的上下文信息。

在 GraphRAG 中,声明(Claims)提取 有的地方也被称为 协变量(Covariates)提取。协变量是一个统计学中的概念,指的是与 因变量(被解释变量) 相关,但并非研究中主要关注的 自变量(解释变量) 的变量,协变量可能干扰自变量与因变量之间的关系。例如,在研究 “教育水平对收入的影响” 时,“工作经验” 就是一个协变量,它与收入相关,若不控制,可能会混淆教育水平与收入之间的真实关系。协变量提取就是通过一系列方法,从原始数据中识别、筛选出对研究目标有潜在影响的协变量,并将其作为分析变量纳入模型或研究设计中的过程。而声明是关于实体之间的事实性陈述,通常包含主体和客体,声明提取是协变量识别的一种方式。

该工作流默认关闭,需要在 settings.yaml 中开启:

extract_claims:
  enabled: true
  model_id: default_chat_model
  prompt: "prompts/extract_claims.txt"
  description: "Any claims or facts that could be relevant to information discovery."
  max_gleanings: 1

从配置可以看出,声明提取和图谱提取非常类似,都是使用预定义的提示词模板指导大模型提取结构化信息,也可以通过 max_gleanings 配置支持多轮提取。提取的声明包含主体、客体、关系类型、状态、时间范围等结构化信息:

  • subject_id: 声明的主体
  • object_id: 声明的客体
  • type: 声明类型
  • status: 声明状态(已确认/虚假/未经验证)
  • start_date: 开始日期
  • end_date: 结束日期
  • description: 声明描述
  • source_text: 来源文本

声明提取的默认提示词如下:

-目标活动-
你是一个智能助手,帮助人类分析师分析文本文档中针对某些实体的声明。

-目标-
给定一个可能与该活动相关的文本文档、一个实体规范和一个声明描述,提取所有符合实体规范的实体以及针对这些实体的所有声明。

-步骤-
1. 提取所有符合预定义实体规范的命名实体。实体规范可以是实体名称列表或实体类型列表。
2. 对于步骤1中识别出的每个实体,提取与该实体相关的所有声明。声明需要与指定的声明描述相匹配,并且该实体应是声明的主体。
对于每个声明,提取以下信息:
- 主体:作为声明主体的实体名称,需大写。主体实体是实施声明中所描述行为的实体。主体必须是步骤1中识别出的命名实体之一。
- 客体:作为声明客体的实体名称,需大写。客体实体是要么报告/处理该行为,要么受该行为影响的实体。如果客体实体未知,使用**NONE**。
- 声明类型:声明的总体类别,需大写。命名方式应能在多个文本输入中重复使用,以便相似的声明具有相同的声明类型。
- 声明状态:**TRUE**、**FALSE**或**SUSPECTED**。TRUE表示声明已确认,FALSE表示声明被证实为虚假,SUSPECTED表示声明未经验证。
- 声明描述:详细说明声明背后的推理,以及所有相关证据和参考资料。
- 声明日期:提出声明的时间段(开始日期,结束日期)。开始日期和结束日期都应采用ISO-8601格式。如果声明是在单个日期而非日期范围内提出的,则开始日期和结束日期设置为相同的日期。如果日期未知,返回**NONE**。
- 声明来源文本:所有与该声明相关的原始文本引语列表。

每个声明的格式为:(<主体实体><|><客体实体><|><声明类型><|><声明状态><|><声明开始日期><|><声明结束日期><|><声明描述><|><声明来源>)

3. 以英文返回所有在步骤1和步骤2中识别出的声明,形成一个单一列表。使用**##**作为列表分隔符。

4. 完成后,输出 <|COMPLETE|>

官方还提供了两个示例,可以直观的看出 GraphRAG 是如何通过实体规范(对应提取的实体或关系)和声明描述(对应 extract_claims.description 配置)从文本中提取出详细的声明信息的:

示例1:
实体规范:组织
声明描述:与实体相关的危险信号
文本:根据2022年1月10日的一篇文章,A公司在参与B政府机构发布的多个公共招标时,因串标被罚款。该公司为C个人所有,C个人在2015年被怀疑参与腐败活动。
输出:

(A公司<|>B政府机构<|>反竞争行为<|>TRUE<|>2022-01-10T00:00:00<|>2022-01-10T00:00:00<|>根据2022年1月10日发表的一篇文章,A公司因在B政府机构发布的多个公共招标中串标被罚款,因此被发现存在反竞争行为<|>根据2022年1月10日的一篇文章,A公司在参与B政府机构发布的多个公共招标时,因串标被罚款。)
<|COMPLETE|>

示例2:
实体规范:A公司、C个人
声明描述:与实体相关的危险信号
文本:根据2022年1月10日的一篇文章,A公司在参与B政府机构发布的多个公共招标时,因串标被罚款。该公司为C个人所有,C个人在2015年被怀疑参与腐败活动。
输出:

(A公司<|>B政府机构<|>反竞争行为<|>TRUE<|>2022-01-10T00:00:00<|>2022-01-10T00:00:00<|>根据2022年1月10日发表的一篇文章,A公司因在B政府机构发布的多个公共招标中串标被罚款,因此被发现存在反竞争行为<|>根据2022年1月10日的一篇文章,A公司在参与B政府机构发布的多个公共招标时,因串标被罚款。)
##
(C个人<|>NONE<|>贪污腐败<|>SUSPECTED<|>2015-01-01T00:00:00<|>2015-12-30T00:00:00<|>C个人在2015年被怀疑参与腐败活动<|>该公司为C个人所有,C个人在2015年被怀疑参与腐败活动。)
<|COMPLETE|>

声明提取工作流为 GraphRAG 知识图谱增加了时间维度和事实信息,使得其能够追踪实体间关系的时间演变,支持基于时间的查询和推理,提供更丰富的上下文信息用于回答复杂问题。这个工作流与实体关系提取互补,共同构建了一个多层次、多维度的知识表示体系。

小结

今天的学习内容主要包括以下两点:

  • 图谱规范化处理:了解实体和关系规范化的复杂流程,包括图嵌入、UMAP 降维、度数计算等多种图分析技术的综合应用;
  • 可选的声明提取:学习如何提取时间相关的事实性声明,为知识图谱增加时间维度和事实验证能力;

通过这个阶段的处理,原本杂乱无序的文本信息被转换为结构化的实体表和关系表,为后续的社区检测和向量化奠定基础。在下一篇文章中,我们将继续学习关于知识提取的内容,还记得我们之前在概述篇提到的,GraphRAG 支持 StandardFast 两种索引方法,Standard 方法基于大模型实现,也就是这两天学习的内容,Fast 方法基于传统的 NLP 算法实现,我们接下来就来看看这一部分内容。


GraphRAG 索引构建之知识提取

在前面的文章中,我们详细学习了 GraphRAG 索引构建的文档处理阶段,了解了如何将各种格式的原始文档转换为标准化的文本单元。今天我们将深入探索整个索引构建流程中最核心的部分 —— 知识提取,看看 GraphRAG 是如何利用大语言模型的力量,从非结构化的文本中挖掘出结构化的实体、关系和声明,并最终构建出完整的知识图谱。

知识提取阶段总览

让我们回顾一下索引构建的整体流程,知识提取正是连接文本处理和图谱分析的关键桥梁:

index-workflow-3-steps.png

知识提取阶段包含三个核心工作流:

  • 提取图谱(extract_graph - 从文本中提取实体和关系,对同一个实体和关系的描述进行总结合并;
  • 图谱规范化(finalize_graph - 对提取结果进行规范化处理,输出最终的实体表和关系表;
  • 提取事实声明(extract_covariates - 提取事实性声明,这一步是可选的,声明通常用于识别诸如欺诈等恶意行为,因此并不适用于所有数据集;

提取图谱

图谱提取的实现位于 index/workflows/extract_graph.py 文件中,整个工作流包含以下关键步骤:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 1. 加载文本单元
  text_units = await load_table_from_storage("text_units", context.output_storage)
  
  # 2. 配置提取策略和摘要策略
  extraction_strategy = config.extract_graph.resolved_strategy()
  summarization_strategy = config.summarize_descriptions.resolved_strategy()
  
  # 3. 执行图谱提取
  entities, relationships, raw_entities, raw_relationships = await extract_graph(
    text_units=text_units,
    extraction_strategy=extraction_strategy,
    summarization_strategy=summarization_strategy
  )
  
  # 4. 保存结果
  await write_table_to_storage(entities, "entities", context.output_storage)
  await write_table_to_storage(relationships, "relationships", context.output_storage)

这里值得注意是提取策略和摘要策略,它们对应 settings.yaml 文件中的 extract_graphsummarize_descriptions 两个配置:

extract_graph:
  model_id: default_chat_model
  prompt: "prompts/extract_graph.txt"
  entity_types: [organization,person,geo,event]
  max_gleanings: 1

summarize_descriptions:
  model_id: default_chat_model
  prompt: "prompts/summarize_descriptions.txt"
  max_length: 500

从这里可以看出,整个图谱提取分为两步:首先遍历上一阶段生成的所有文本单元,通过大模型从每个文本片段中识别出实体和关系;然后将相同的实体和关系进行合并,再通过大模型总结合并之后的实体和关系描述;

extract-graph.png

核心提示词

第一步所使用的提示词位于 prompts/extract_graph.txt,翻译如下:

-目标-
给定一个可能与本活动相关的文本文档和一个实体类型列表,从文本中识别出所有属于这些类型的实体以及已识别实体之间的所有关系。

-步骤-
1. 识别所有实体。对于每个已识别的实体,提取以下信息:
- entity_name: 实体名称,首字母大写
- entity_type: 以下类型之一:[{entity_types}]
- entity_description: 对实体属性和活动的全面描述
每个实体的格式为("entity"<|><entity_name><|><entity_type><|><entity_description>)

2. 从步骤1中识别出的实体中,识别所有(源实体,目标实体)对,这些实体对之间存在*明确的关系*。
对于每对相关实体,提取以下信息:
- source_entity: 源实体的名称,如步骤1中所识别的
- target_entity: 目标实体的名称,如步骤1中所识别的
- relationship_description: 解释为什么认为源实体和目标实体相关
- relationship_strength: 一个数值分数,表示源实体和目标实体之间关系的强度
每个关系的格式为("relationship"<|><source_entity><|><target_entity><|><relationship_description><|><relationship_strength>)

3. 以英文返回输出,作为步骤1和步骤2中识别的所有实体和关系的单个列表。使用 **##** 作为列表分隔符。

4. 完成后,输出 <|COMPLETE|>

输出的结果每一行包含一条实体信息或关系信息,不同字段之间使用 <|> 分割,记录之间使用 ## 分割,并以 <|COMPLETE|> 结尾,下面是一个示例:

("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at X Corp)
##
("entity"<|>X CORP<|>ORGANIZATION<|>X Corp is a technology company)
##
("relationship"<|>ALICE JOHNSON<|>X CORP<|>Alice Johnson works as a software engineer at X Corp<|>8)
<|COMPLETE|>

第二步所使用的提示词位于 prompts/summarize_descriptions.txt,这个就比较简单了:

你是一名乐于助人的助手,负责对下方提供的数据生成一份全面的总结。
给定一个或多个实体,以及一份描述列表,这些都与同一个实体或一组实体相关。
请将所有这些内容合并成一个单一的、全面的描述。确保包含从所有描述中收集到的信息。
如果所提供的描述存在矛盾,请解决这些矛盾并提供一个单一的、连贯的总结。
确保用第三人称撰写,并包含实体名称,以便我们了解完整的背景。
将最终描述的长度限制在 {max_length} 个单词以内。

#######
-数据-
实体:{entity_name}
描述列表:{description_list}
#######
输出:

如果提取实体和关系时,同一个实体或关系出现了多次,就有可能导致信息不完整或矛盾的情况,比如下面两条记录:

("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at X Corp)
("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at Y Corp)

通过对描述进行总结,得到一条关于 ALICE JOHNSON 的完整记录:

("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at X Corp and Y Corp)

多轮迭代提取

此外,在提取图谱时,GraphRAG 还实现了一个多轮迭代提取的机制,通过多次调用 LLM 来确保提取的完整性,可以通过 max_gleanings 参数进行配置。其核心逻辑如下:

# 如果指定了 `max_gleanings` 参数,就进入循环以提取更多实体
# 有两个退出条件:(a)达到配置的最大值,(b)模型表示没有更多实体了
if self._max_gleanings > 0:
  for i in range(self._max_gleanings):
    response = await self._model.achat(
      CONTINUE_PROMPT,
      name=f"extract-continuation-{i}",
      history=response.history,
    )
    results += response.output.content or ""

    # 达到配置的最大值
    if i >= self._max_gleanings - 1:
      break

    response = await self._model.achat(
      LOOP_PROMPT,
      name=f"extract-loopcheck-{i}",
      history=response.history,
    )

    # 模型表示没有更多实体了
    if response.output.content != "Y":
      break

每次迭代会调用两次大模型,第一次告诉大模型提取有遗漏,让它继续提取,提示词 CONTINUE_PROMPT 如下:

在上一次提取中遗漏了许多实体和关系。
请记住,只输出与之前提取的任何类型相匹配的实体。
使用相同的格式在下方添加它们:

第二次让大模型进一步确认是否还有遗漏,提示词 LOOP_PROMPT 如下:

似乎仍有一些实体和关系可能被遗漏了。
如果还有需要添加的实体或关系,请回答Y;如果没有,请回答N。
请只用一个字母Y或N回答。

未完待续

今天我们开始学习 GraphRAG 索引构建流程中最核心的知识提取阶段,它包含 提取图谱图谱规范化提取事实声明 三个工作流。今天学习的主要是提取图谱的几个核心要点:

  • 两阶段图谱提取:掌握 GraphRAG 如何通过大模型首先从文本单元中识别实体和关系,然后对相同的实体和关系进行合并和总结,形成完整的知识图谱结构;
  • 精心设计的提示词工程:介绍了 extract_graph.txtsummarize_descriptions.txt 两个核心提示词,了解了如何通过结构化的指令引导大模型提取高质量的实体关系信息;
  • 多轮迭代提取机制:学习了 GraphRAG 通过 max_gleanings 配置实现的多轮迭代提取,确保信息提取的完整性和准确性;

关于知识提取阶段还有另外两个工作流,我们明天继续。


GraphRAG 索引构建之文档处理

昨天我们对 GraphRAG 的索引构建做了一个基本的概述,了解了其索引构建的整体流程和工作流引擎的设计。今天我们将深入第一个具体阶段 —— 文档处理,这是整个知识图谱构建的起点,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取奠定基础。

回顾索引构建流程

根据前一篇文章的分析,我们可以将 GraphRAG 的索引构建流程分为三个主要阶段:

index-workflow-3-steps.png

今天我们重点关注文档处理阶段,它的核心任务是将非结构化的原始数据转换为后续处理流程所需的标准化文本单元。这个阶段包含三个关键工作流:

  • 加载原始文档(load_input_documents - 从各种数据源加载原始文档
  • 文本分块处理(create_base_text_units - 将长文档分割成可处理的文本块
  • 文档规范化(create_final_documents - 对文档进行最终的标准化整理

加载原始文档

在 GraphRAG 中,所有工作流的入口都是 run_workflow() 方法,加载原始文档也是如此。它的核心逻辑位于 index/workflows/load_input_documents.py 文件:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 加载输入文档,转换为标准的 pd.DataFrame 格式
  output = await load_input_documents(config.input, context.input_storage)

  # 将原始文档写入 documents 表中
  await write_table_to_storage(output, "documents", context.output_storage)
  return WorkflowFunctionOutput(result=output)

这个工作流首先加载输入文档,将其转换为标准的 pd.DataFrame 格式,然后写入 documents 表中。GraphRAG 通过工厂模式实现了对纯文本、CSV、JSON 三种文件格式的统一处理:

loaders: dict[str, Callable[..., Awaitable[pd.DataFrame]]] = {
  InputFileType.text: load_text,
  InputFileType.csv: load_csv,
  InputFileType.json: load_json,
}

如果你有不同的格式,官方的建议是编写脚本将其转换为其中一种。

同时,它也支持 Azure Blob、CosmosDB、本地文件、内存这四种不同的存储类型:

StorageFactory.register(StorageType.blob.value, create_blob_storage)
StorageFactory.register(StorageType.cosmosdb.value, create_cosmosdb_storage)
StorageFactory.register(StorageType.file.value, create_file_storage)
StorageFactory.register(StorageType.memory.value, lambda **_: MemoryPipelineStorage())

默认情况下是从本地的 input 目录下读取文件,并使用 text 加载器处理。我们可以在 settings.yaml 文件中的 input 部分进行配置:

input:
  storage:
    type: file # [blob, cosmosdb, file, memory]
    base_dir: "input"
  file_type: text # [csv, text, json]

文本文件处理

对于纯文本文件,处理逻辑相对简单,在 index/input/text.py 中:

async def load_file(path: str, group: dict | None = None) -> pd.DataFrame:
  text = await storage.get(path, encoding=config.encoding)
  new_item = {**group, "text": text}
  new_item["id"] = gen_sha512_hash(new_item, new_item.keys())
  new_item["title"] = str(Path(path).name)
  new_item["creation_date"] = await storage.get_creation_date(path)
  return pd.DataFrame([new_item])

每个文本文件被读取为一个完整的字符串,并自动生成标准字段:

  • text: 文件的完整内容
  • id: 基于内容的 SHA512 哈希值,确保唯一性
  • title: 文件名(不含路径)
  • creation_date: 文件创建日期

JSON 文件处理

JSON 文件的处理位于 index/input/json.py 文件,直接调用 json.loads() 加载数据。支持两种格式,一种是单个对象格式:

{
  "text": "文档内容",
  "title": "文档标题"
}

另一种是数组格式:

[
  {"text": "文档1内容", "title": "文档1标题"},
  {"text": "文档2内容", "title": "文档2标题"}
]

注意,目前暂不支持 JSONL 格式(每行一个完整的 JSON 对象),需转换为数组格式。

JSON 解析器会自动为对象增加 idcreation_date 两个字段。另外,如果 JSON 中没有 texttitle 字段,还可以在配置文件中设置:

input:
  file_type: json
  text_column: content  # 指定 text 列
  title_column: name    # 指定 title 列

CSV 文件处理

CSV 文件的处理位于 index/input/csv.py 文件,直接调用 pd.read_csv() 加载数据,它的格式如下:

text,title
"文档1内容","文档1标题"
"文档2内容","文档2标题"

同样的,CSV 解析器也会自动为对象增加 idcreation_date 两个字段,也支持在配置文件中设置 texttitle 字段映射。

文本分块处理

加载原始文档后,下一步是将长文档切分成更小、更易于处理的文本单元,这一步骤对后续的 LLM 处理至关重要。分块策略通过 settings.yaml 中的 chunks 部分配置:

chunks:
  size: 1200                   # 每个分块的目标 token 数量
  overlap: 100                 # 相邻分块之间重叠的 token 数量
  group_by_columns: [id]       # 按指定列分组,确保分块在文档内部进行
  strategy: tokens             # 分块策略:tokens 或 sentence
  encoding_model: cl100k_base  # 用于计算 token 的编码模型

其中 sizeoverlap 比较好理解,分别表示每个分块的大小和相邻分块之间的重叠;group_by_columns 通常设置为 ["doc_id"] 或 ["id"] 表示在分块前先按文档 ID 进行分组,确保文本分块不会跨越不同的文档;strategy 表示分块策略,支持 tokenssentence 两种。

def load_strategy(strategy: ChunkStrategyType) -> ChunkStrategy:
  """分块策略定义"""
  match strategy:
    case ChunkStrategyType.tokens:
      # tokens 分块策略
      from graphrag.index.operations.chunk_text.strategies import run_tokens
      return run_tokens
    case ChunkStrategyType.sentence:
      # sentence 分块策略
      from graphrag.index.operations.chunk_text.bootstrap import bootstrap
      from graphrag.index.operations.chunk_text.strategies import run_sentences
      bootstrap()
      return run_sentences

基于 tokens 的精确分块

tokens 分块策略 是 GraphRAG 的默认分块策略,它基于 tiktoken 库进行精确的 token 计算和分块,核心实现如下:

def run_tokens(
  input: list[str], config: ChunkingConfig, tick: ProgressTicker,
) -> Iterable[TextChunk]:

  # 获取编码和解码函数
  encode, decode = get_encoding_fn(encoding_name)
  
  # 基于编码 token 将文本分块
  return split_multiple_texts_on_tokens(
    input,
    Tokenizer(
      chunk_overlap=chunk_overlap,
      tokens_per_chunk=tokens_per_chunk,
      encode=encode,
      decode=decode,
    ),
    tick,
  )

其中 get_encoding_fn() 获取编码和解码函数,它支持通过 tiktoken 库加载不同的编码模型,比如 GPT-3.5 和 GPT-4 的 cl100k_base 或者 GPT-4.1 的 o200k_base 等,可以在 tiktoken 源码中 找到完整的模型列表:

tiktoken-models.png

tokens 分块的核心算法位于 split_multiple_texts_on_tokens() 函数:

def split_multiple_texts_on_tokens(
  texts: list[str], tokenizer: Tokenizer, tick: ProgressTicker
) -> list[TextChunk]:
  """将多个文本进行分块并返回带元数据的块"""
  result = []
  mapped_ids = []

  # 第一步:编码所有文本并记录源文档索引
  for source_doc_idx, text in enumerate(texts):
    encoded = tokenizer.encode(text)
    mapped_ids.append((source_doc_idx, encoded))

  # 第二步:创建全局 token 序列,每个 token 都记录其来源文档
  input_ids = [
    (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids
  ]

  # 第三步:滑动窗口分块
  start_idx = 0
  cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
  chunk_ids = input_ids[start_idx:cur_idx]

  while start_idx < len(input_ids):
    # 解码当前块的文本内容
    chunk_text = tokenizer.decode([id for _, id in chunk_ids])
    # 记录当前块涉及的源文档索引
    doc_indices = list({doc_idx for doc_idx, _ in chunk_ids})
    result.append(TextChunk(chunk_text, doc_indices, len(chunk_ids)))
    # 已完成
    if cur_idx == len(input_ids):
      break
    # 滑动窗口:下一个块的起始位置考虑重叠
    start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
    cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
    chunk_ids = input_ids[start_idx:cur_idx]

  return result

该算法首先使用 tiktoken 库将所有文档编码成 token 序列(即 mapped_ids 数组);然后将 token 序列展开,变成一个全局的整数序列,每个 token 都保留其来源信息(即 input_ids 数组);然后按 tokens_per_chunk 数量对 token 序列进行第一个分块并解码得到文本内容;接着使用滑动窗口往后依次分块,同时考虑相邻块之间有指定数量的重叠 token,保持上下文连续性。

有趣的是,这里的分块算法参考了 LangChain 的实现,有兴趣的朋友可以看下 LangChain 的 TokenTextSplitter

基于句子的自然分块

sentence 分块策略 使用 NLTK 库进行基于句子边界的自然语言分块,它首先调用 bootstrap() 初始化必要的 NLTK 资源:

def bootstrap():
  """初始化 NLTK 资源"""
  global initialized_nltk
  if not initialized_nltk:
    import nltk
    from nltk.corpus import wordnet as wn
    # 句子分词
    nltk.download("punkt")
    nltk.download("punkt_tab")
    # 词性标注
    nltk.download("averaged_perceptron_tagger")
    nltk.download("averaged_perceptron_tagger_eng")
    # 命名实体识别相关
    nltk.download("maxent_ne_chunker")
    nltk.download("maxent_ne_chunker_tab")
    # 词汇资源
    nltk.download("words")
    nltk.download("wordnet")
    # 确保 wordnet 数据已加载
    wn.ensure_loaded()
    # 完成初始化
    initialized_nltk = True

这里我稍微介绍下这几个 NLTK 资源:

  • 句子分词相关punkt 用于句子边界检测和分词,将文本分割成句子,识别句子的开始和结束;punkt_tabpunkt 的表格化版本,提供更高效的句子分词性能;
  • 词性标注相关averaged_perceptron_tagger 用于词性标注,基于 平均感知机算法,为每个单词标注词性(名词、动词、形容词等);averaged_perceptron_tagger_eng 是其英语特化版本,专门针对英语文本的词性标注;
  • 命名实体识别相关maxent_ne_chunker 用于命名实体识别和分块,基于 最大熵模型,识别人名、地名、组织名等命名实体;maxent_ne_chunker_tabmaxent_ne_chunker 的表格化版本,提供更快的命名实体识别性能;
  • 词汇资源words 是英语单词词典,包含大量英语单词,用于拼写检查和词汇验证;wordnet 是英语词汇的语义网络,提供词汇间的语义关系(同义词、反义词、上下位关系等),支持词义消歧、语义相似度计算等;

然后使用 NLTK 的句子分割器,也就是 sent_tokenize() 方法,进行按句子分块:

def run_sentences(
  input: list[str], _config: ChunkingConfig, tick: ProgressTicker
) -> Iterable[TextChunk]:
  """按句子将文本分成多个部分"""
  for doc_idx, text in enumerate(input):
    # 使用 NLTK 进行句子分割
    sentences = nltk.sent_tokenize(text)
    for sentence in sentences:
      yield TextChunk(
        text_chunk=sentence,
        source_doc_indices=[doc_idx],
      )
    tick(1)

可以看到,这种分块策略的实现相对简单,每个句子就是一个独立的文本块,句子之间没有重叠,避免信息重复。

两种策略的对比

下表对两种策略做了个对比:

特性tokens 策略sentence 策略
分块依据Token 数量句子边界
块大小控制精确控制依赖句子长度
上下文保持可配置重叠自然语言完整性
跨文档支持支持不支持
计算复杂度较高较低
适用场景严格 token 限制的 LLM 处理需要保持句子完整性的分析

可以根据需要选择适当的分块策略。

无论是哪种分块策略,都返回统一的 TextChunk 数据结构:

@dataclass
class TextChunk:
  """文本块类定义"""
  text_chunk: str              # 文本块内容
  source_doc_indices: list[int] # 源文档索引列表
  n_tokens: int | None = None  # token 数量(可选)

此外,我们还可以为文档添加一下元数据,分块时也有一些处理元数据的策略,相关配置如下:

input:
  metadata: [title,tag] # 增加元数据列
chunks:
  prepend_metadata: true  # 将元数据复制到每个文本块的开头
  chunk_size_includes_metadata: false # 计算分块大小时是否包含元数据

关于元数据的使用,可以参考 GraphRAG 官方教程:

文档规范化

在完成文本分块后,create_final_documents 步骤会对处理结果进行最终的整理和规范化,将分块后的文本单元重新与原始文档关联。让我们详细分析实际的源码实现:

def create_final_documents(documents: pd.DataFrame, text_units: pd.DataFrame) -> pd.DataFrame:
  
  # 展开文本单元的文档ID关联并选择需要的列
  exploded = (
    text_units.explode("document_ids")
    .loc[:, ["id", "document_ids", "text"]]
    .rename(
      columns={
        "document_ids": "chunk_doc_id",
        "id": "chunk_id", 
        "text": "chunk_text",
      }
    )
  )
  
  # 与原始文档信息合并
  joined = exploded.merge(
    documents,
    left_on="chunk_doc_id",
    right_on="id",
    how="inner",
    copy=False,
  )
  
  # 按文档聚合所有相关的文本单元ID
  docs_with_text_units = joined.groupby("id", sort=False).agg(
    text_unit_ids=("chunk_id", list)
  )
  
  # 重新与文档信息合并,形成最终结构
  rejoined = docs_with_text_units.merge(
    documents,
    on="id",
    how="right",
    copy=False,
  ).reset_index(drop=True)
  
  # 数据格式化和添加序号
  rejoined["id"] = rejoined["id"].astype(str)
  rejoined["human_readable_id"] = rejoined.index + 1
  
  # 返回标准化的列结构
  return rejoined.loc[:, DOCUMENTS_FINAL_COLUMNS]

这一步的代码看起来很长,其实它的逻辑并不复杂,核心在于数据的操作和转换,看懂这一步的代码需要熟练掌握 pandas 各种数据处理技巧,比如:

  • .explode("document_ids"):将数组列展开为多行,每个数组元素对应一行
  • .loc[:, ["id", "document_ids", "text"]]:列切片操作,只保留需要的列,减少内存使用
  • .rename(columns={}):重命名列以更清晰地表示其含义
  • .merge(documents, ...):对两个表执行合并操作,使用 left_onright_on 明确指定连接键
  • .groupby("id", sort=False):按文档 ID 分组,保持原有顺序,提高性能
  • agg(text_unit_ids=("chunk_id", list)):聚合操作,将每个文档对应的所有文本单元 ID 收集成列表

最后使用预定义的 DOCUMENTS_FINAL_COLUMNS 确保输出格式的一致性,包括以下列:

  • id: 文档唯一标识符
  • human_readable_id: 人类可读的递增ID
  • title: 文档标题
  • text: 原始文档内容
  • text_unit_ids: 关联的文本单元ID列表
  • metadata: 元数据信息
  • creation_date: 创建日期

小结

今天我们深入探讨了 GraphRAG 索引构建流程中的文档处理阶段,这个阶段是整个知识图谱构建的基础,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取和图谱构建做好准备。通过本次学习,我们掌握了以下核心要点:

  • 统一的文档加载:学习了 GraphRAG 如何通过工厂模式,支持从多种数据源(如本地文件、Azure Blob)加载不同格式(纯文本、CSV、JSON)的原始文档;
  • 灵活的文本分块:详细分析了两种核心的文本分块策略。tokens 策略基于 tiktoken 实现精确的 token 级切片和重叠,适合对 LLM 输入有严格限制的场景;而 sentence 策略则利用 NLTK 库进行基于句子边界的自然语言分块,更好地保持了语义的完整性;
  • 标准化的数据输出:文档处理的最后一步,将分块后的文本单元与原始文档进行重新关联,最终输出包含标准化字段的文档数据,为后续的索引流程提供了结构清晰、内容一致的输入。

在下一篇文章中,我们将进入知识提取阶段,学习 GraphRAG 如何调用大模型从这些精心处理的文本单元中提取实体和关系,最终构建出结构化的知识图谱。


昨天我们对 GraphRAG 的索引构建做了一个基本的概述,了解了其索引构建的整体流程和工作流引擎的设计。今天我们将深入第一个具体阶段 —— 文档处理,这是整个知识图谱构建的起点,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取奠定基础。

回顾索引构建流程

根据前一篇文章的分析,我们可以将 GraphRAG 的索引构建流程分为三个主要阶段:

今天我们重点关注文档处理阶段,它的核心任务是将非结构化的原始数据转换为后续处理流程所需的标准化文本单元。这个阶段包含三个关键工作流:

  • 加载原始文档(load_input_documents - 从各种数据源加载原始文档
  • 文本分块处理(create_base_text_units - 将长文档分割成可处理的文本块
  • 文档规范化(create_final_documents - 对文档进行最终的标准化整理

加载原始文档

在 GraphRAG 中,所有工作流的入口都是 run_workflow() 方法,加载原始文档也是如此。它的核心逻辑位于 index/workflows/load_input_documents.py 文件:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 加载输入文档,转换为标准的 pd.DataFrame 格式
  output = await load_input_documents(config.input, context.input_storage)

  # 将原始文档写入 documents 表中
  await write_table_to_storage(output, "documents", context.output_storage)
  return WorkflowFunctionOutput(result=output)

这个工作流首先加载输入文档,将其转换为标准的 pd.DataFrame 格式,然后写入 documents 表中。GraphRAG 通过工厂模式实现了对纯文本、CSV、JSON 三种文件格式的统一处理:

loaders: dict[str, Callable[..., Awaitable[pd.DataFrame]]] = {
  InputFileType.text: load_text,
  InputFileType.csv: load_csv,
  InputFileType.json: load_json,
}

如果你有不同的格式,官方的建议是编写脚本将其转换为其中一种。

同时,它也支持 Azure Blob、CosmosDB、本地文件、内存这四种不同的存储类型:

StorageFactory.register(StorageType.blob.value, create_blob_storage)
StorageFactory.register(StorageType.cosmosdb.value, create_cosmosdb_storage)
StorageFactory.register(StorageType.file.value, create_file_storage)
StorageFactory.register(StorageType.memory.value, lambda **_: MemoryPipelineStorage())

默认情况下是从本地的 input 目录下读取文件,并使用 text 加载器处理。我们可以在 settings.yaml 文件中的 input 部分进行配置:

input:
  storage:
    type: file # [blob, cosmosdb, file, memory]
    base_dir: "input"
  file_type: text # [csv, text, json]

文本文件处理

对于纯文本文件,处理逻辑相对简单,在 index/input/text.py 中:

async def load_file(path: str, group: dict | None = None) -> pd.DataFrame:
  text = await storage.get(path, encoding=config.encoding)
  new_item = {**group, "text": text}
  new_item["id"] = gen_sha512_hash(new_item, new_item.keys())
  new_item["title"] = str(Path(path).name)
  new_item["creation_date"] = await storage.get_creation_date(path)
  return pd.DataFrame([new_item])

每个文本文件被读取为一个完整的字符串,并自动生成标准字段:

  • text: 文件的完整内容
  • id: 基于内容的 SHA512 哈希值,确保唯一性
  • title: 文件名(不含路径)
  • creation_date: 文件创建日期

JSON 文件处理

JSON 文件的处理位于 index/input/json.py 文件,直接调用 json.loads() 加载数据。支持两种格式,一种是单个对象格式:

{
  "text": "文档内容",
  "title": "文档标题"
}

另一种是数组格式:

[
  {"text": "文档1内容", "title": "文档1标题"},
  {"text": "文档2内容", "title": "文档2标题"}
]

注意,目前暂不支持 JSONL 格式(每行一个完整的 JSON 对象),需转换为数组格式。

JSON 解析器会自动为对象增加 idcreation_date 两个字段。另外,如果 JSON 中没有 texttitle 字段,还可以在配置文件中设置:

input:
  file_type: json
  text_column: content  # 指定 text 列
  title_column: name    # 指定 title 列

CSV 文件处理

CSV 文件的处理位于 index/input/csv.py 文件,直接调用 pd.read_csv() 加载数据,它的格式如下:

text,title
"文档1内容","文档1标题"
"文档2内容","文档2标题"

同样的,CSV 解析器也会自动为对象增加 idcreation_date 两个字段,也支持在配置文件中设置 texttitle 字段映射。

文本分块处理

加载原始文档后,下一步是将长文档切分成更小、更易于处理的文本单元,这一步骤对后续的 LLM 处理至关重要。分块策略通过 settings.yaml 中的 chunks 部分配置:

chunks:
  size: 1200                   # 每个分块的目标 token 数量
  overlap: 100                 # 相邻分块之间重叠的 token 数量
  group_by_columns: [id]       # 按指定列分组,确保分块在文档内部进行
  strategy: tokens             # 分块策略:tokens 或 sentence
  encoding_model: cl100k_base  # 用于计算 token 的编码模型

其中 sizeoverlap 比较好理解,分别表示每个分块的大小和相邻分块之间的重叠;group_by_columns 通常设置为 ["doc_id"] 或 ["id"] 表示在分块前先按文档 ID 进行分组,确保文本分块不会跨越不同的文档;strategy 表示分块策略,支持 tokenssentence 两种。

def load_strategy(strategy: ChunkStrategyType) -> ChunkStrategy:
  """分块策略定义"""
  match strategy:
    case ChunkStrategyType.tokens:
      # tokens 分块策略
      from graphrag.index.operations.chunk_text.strategies import run_tokens
      return run_tokens
    case ChunkStrategyType.sentence:
      # sentence 分块策略
      from graphrag.index.operations.chunk_text.bootstrap import bootstrap
      from graphrag.index.operations.chunk_text.strategies import run_sentences
      bootstrap()
      return run_sentences

基于 tokens 的精确分块

tokens 分块策略 是 GraphRAG 的默认分块策略,它基于 tiktoken 库进行精确的 token 计算和分块,核心实现如下:

def run_tokens(
  input: list[str], config: ChunkingConfig, tick: ProgressTicker,
) -> Iterable[TextChunk]:

  # 获取编码和解码函数
  encode, decode = get_encoding_fn(encoding_name)
  
  # 基于编码 token 将文本分块
  return split_multiple_texts_on_tokens(
    input,
    Tokenizer(
      chunk_overlap=chunk_overlap,
      tokens_per_chunk=tokens_per_chunk,
      encode=encode,
      decode=decode,
    ),
    tick,
  )

其中 get_encoding_fn() 获取编码和解码函数,它支持通过 tiktoken 库加载不同的编码模型,比如 GPT-3.5 和 GPT-4 的 cl100k_base 或者 GPT-4.1 的 o200k_base 等,可以在 tiktoken 源码中 找到完整的模型列表:

tokens 分块的核心算法位于 split_multiple_texts_on_tokens() 函数:

def split_multiple_texts_on_tokens(
  texts: list[str], tokenizer: Tokenizer, tick: ProgressTicker
) -> list[TextChunk]:
  """将多个文本进行分块并返回带元数据的块"""
  result = []
  mapped_ids = []

  # 第一步:编码所有文本并记录源文档索引
  for source_doc_idx, text in enumerate(texts):
    encoded = tokenizer.encode(text)
    mapped_ids.append((source_doc_idx, encoded))

  # 第二步:创建全局 token 序列,每个 token 都记录其来源文档
  input_ids = [
    (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids
  ]

  # 第三步:滑动窗口分块
  start_idx = 0
  cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
  chunk_ids = input_ids[start_idx:cur_idx]

  while start_idx < len(input_ids):
    # 解码当前块的文本内容
    chunk_text = tokenizer.decode([id for _, id in chunk_ids])
    # 记录当前块涉及的源文档索引
    doc_indices = list({doc_idx for doc_idx, _ in chunk_ids})
    result.append(TextChunk(chunk_text, doc_indices, len(chunk_ids)))
    # 已完成
    if cur_idx == len(input_ids):
      break
    # 滑动窗口:下一个块的起始位置考虑重叠
    start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
    cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
    chunk_ids = input_ids[start_idx:cur_idx]

  return result

该算法首先使用 tiktoken 库将所有文档编码成 token 序列(即 mapped_ids 数组);然后将 token 序列展开,变成一个全局的整数序列,每个 token 都保留其来源信息(即 input_ids 数组);然后按 tokens_per_chunk 数量对 token 序列进行第一个分块并解码得到文本内容;接着使用滑动窗口往后依次分块,同时考虑相邻块之间有指定数量的重叠 token,保持上下文连续性。

有趣的是,这里的分块算法参考了 LangChain 的实现,有兴趣的朋友可以看下 LangChain 的 TokenTextSplitter

基于句子的自然分块

sentence 分块策略 使用 NLTK 库进行基于句子边界的自然语言分块,它首先调用 bootstrap() 初始化必要的 NLTK 资源:

def bootstrap():
  """初始化 NLTK 资源"""
  global initialized_nltk
  if not initialized_nltk:
    import nltk
    from nltk.corpus import wordnet as wn
    # 句子分词
    nltk.download("punkt")
    nltk.download("punkt_tab")
    # 词性标注
    nltk.download("averaged_perceptron_tagger")
    nltk.download("averaged_perceptron_tagger_eng")
    # 命名实体识别相关
    nltk.download("maxent_ne_chunker")
    nltk.download("maxent_ne_chunker_tab")
    # 词汇资源
    nltk.download("words")
    nltk.download("wordnet")
    # 确保 wordnet 数据已加载
    wn.ensure_loaded()
    # 完成初始化
    initialized_nltk = True

这里我稍微介绍下这几个 NLTK 资源:

  • 句子分词相关punkt 用于句子边界检测和分词,将文本分割成句子,识别句子的开始和结束;punkt_tabpunkt 的表格化版本,提供更高效的句子分词性能;
  • 词性标注相关averaged_perceptron_tagger 用于词性标注,基于 平均感知机算法,为每个单词标注词性(名词、动词、形容词等);averaged_perceptron_tagger_eng 是其英语特化版本,专门针对英语文本的词性标注;
  • 命名实体识别相关maxent_ne_chunker 用于命名实体识别和分块,基于 最大熵模型,识别人名、地名、组织名等命名实体;maxent_ne_chunker_tabmaxent_ne_chunker 的表格化版本,提供更快的命名实体识别性能;
  • 词汇资源words 是英语单词词典,包含大量英语单词,用于拼写检查和词汇验证;wordnet 是英语词汇的语义网络,提供词汇间的语义关系(同义词、反义词、上下位关系等),支持词义消歧、语义相似度计算等;

然后使用 NLTK 的句子分割器,也就是 sent_tokenize() 方法,进行按句子分块:

def run_sentences(
  input: list[str], _config: ChunkingConfig, tick: ProgressTicker
) -> Iterable[TextChunk]:
  """按句子将文本分成多个部分"""
  for doc_idx, text in enumerate(input):
    # 使用 NLTK 进行句子分割
    sentences = nltk.sent_tokenize(text)
    for sentence in sentences:
      yield TextChunk(
        text_chunk=sentence,
        source_doc_indices=[doc_idx],
      )
    tick(1)

可以看到,这种分块策略的实现相对简单,每个句子就是一个独立的文本块,句子之间没有重叠,避免信息重复。

两种策略的对比

下表对两种策略做了个对比:

特性tokens 策略sentence 策略
分块依据Token 数量句子边界
块大小控制精确控制依赖句子长度
上下文保持可配置重叠自然语言完整性
跨文档支持支持不支持
计算复杂度较高较低
适用场景严格 token 限制的 LLM 处理需要保持句子完整性的分析

可以根据需要选择适当的分块策略。

无论是哪种分块策略,都返回统一的 TextChunk 数据结构:

@dataclass
class TextChunk:
  """文本块类定义"""
  text_chunk: str              # 文本块内容
  source_doc_indices: list[int] # 源文档索引列表
  n_tokens: int | None = None  # token 数量(可选)

此外,我们还可以为文档添加一下元数据,分块时也有一些处理元数据的策略,相关配置如下:

input:
  metadata: [title,tag] # 增加元数据列
chunks:
  prepend_metadata: true  # 将元数据复制到每个文本块的开头
  chunk_size_includes_metadata: false # 计算分块大小时是否包含元数据

关于元数据的使用,可以参考 GraphRAG 官方教程:

文档规范化

在完成文本分块后,create_final_documents 步骤会对处理结果进行最终的整理和规范化,将分块后的文本单元重新与原始文档关联。让我们详细分析实际的源码实现:

def create_final_documents(documents: pd.DataFrame, text_units: pd.DataFrame) -> pd.DataFrame:
  
  # 展开文本单元的文档ID关联并选择需要的列
  exploded = (
    text_units.explode("document_ids")
    .loc[:, ["id", "document_ids", "text"]]
    .rename(
      columns={
        "document_ids": "chunk_doc_id",
        "id": "chunk_id", 
        "text": "chunk_text",
      }
    )
  )
  
  # 与原始文档信息合并
  joined = exploded.merge(
    documents,
    left_on="chunk_doc_id",
    right_on="id",
    how="inner",
    copy=False,
  )
  
  # 按文档聚合所有相关的文本单元ID
  docs_with_text_units = joined.groupby("id", sort=False).agg(
    text_unit_ids=("chunk_id", list)
  )
  
  # 重新与文档信息合并,形成最终结构
  rejoined = docs_with_text_units.merge(
    documents,
    on="id",
    how="right",
    copy=False,
  ).reset_index(drop=True)
  
  # 数据格式化和添加序号
  rejoined["id"] = rejoined["id"].astype(str)
  rejoined["human_readable_id"] = rejoined.index + 1
  
  # 返回标准化的列结构
  return rejoined.loc[:, DOCUMENTS_FINAL_COLUMNS]

这一步的代码看起来很长,其实它的逻辑并不复杂,核心在于数据的操作和转换,看懂这一步的代码需要熟练掌握 pandas 各种数据处理技巧,比如:

  • .explode("document_ids"):将数组列展开为多行,每个数组元素对应一行
  • .loc[:, ["id", "document_ids", "text"]]:列切片操作,只保留需要的列,减少内存使用
  • .rename(columns={}):重命名列以更清晰地表示其含义
  • .merge(documents, ...):对两个表执行合并操作,使用 left_onright_on 明确指定连接键
  • .groupby("id", sort=False):按文档 ID 分组,保持原有顺序,提高性能
  • agg(text_unit_ids=("chunk_id", list)):聚合操作,将每个文档对应的所有文本单元 ID 收集成列表

最后使用预定义的 DOCUMENTS_FINAL_COLUMNS 确保输出格式的一致性,包括以下列:

  • id: 文档唯一标识符
  • human_readable_id: 人类可读的递增ID
  • title: 文档标题
  • text: 原始文档内容
  • text_unit_ids: 关联的文本单元ID列表
  • metadata: 元数据信息
  • creation_date: 创建日期

小结

今天我们深入探讨了 GraphRAG 索引构建流程中的文档处理阶段,这个阶段是整个知识图谱构建的基础,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取和图谱构建做好准备。通过本次学习,我们掌握了以下核心要点:

  • 统一的文档加载:学习了 GraphRAG 如何通过工厂模式,支持从多种数据源(如本地文件、Azure Blob)加载不同格式(纯文本、CSV、JSON)的原始文档;
  • 灵活的文本分块:详细分析了两种核心的文本分块策略。tokens 策略基于 tiktoken 实现精确的 token 级切片和重叠,适合对 LLM 输入有严格限制的场景;而 sentence 策略则利用 NLTK 库进行基于句子边界的自然语言分块,更好地保持了语义的完整性;
  • 标准化的数据输出:文档处理的最后一步,将分块后的文本单元与原始文档进行重新关联,最终输出包含标准化字段的文档数据,为后续的索引流程提供了结构清晰、内容一致的输入。

在下一篇文章中,我们将进入知识提取阶段,学习 GraphRAG 如何调用大模型从这些精心处理的文本单元中提取实体和关系,最终构建出结构化的知识图谱。


GraphRAG 索引构建概述

经过前面对 GraphRAG 项目结构的深入了解,我们已经掌握了它的整体架构和技术栈。今天我们将顺着 graphrag index 命令的调用链,深入探索 GraphRAG 索引构建的核心流程,包括命令行入口、配置加载机制、工作流引擎,以及不同索引方法的实现细节。

命令行入口

当我们执行 graphrag index 命令时,整个调用过程如下:

  1. 入口点__main__.pybin/graphraggraphrag.cli.main:app
  2. CLI 解析cli/main.py@app.command("index") 装饰器
  3. 配置加载cli/index.pyindex_cli() 函数
  4. API 调用api.build_index() 函数

其中,前两步我们昨天已经学习过了:__main__.py 的作用是让 GraphRAG 能以包的方式运行,pyproject.toml 文件中的 [project.scripts] 配置让 GraphRAG 能以可执行文件的方式运行,运行的入口都是 graphrag.cli.main 包的 app 方法;该方法基于 Typer 这个现代化的 Python CLI 库实现,通过 @app.command 装饰器,定义了 GraphRAG 的 5 个子命令,index 就是其中之一。

跟随调用链,我们接着看一下 cli/index.py 文件中 index_cli() 函数的实现:

def index_cli(root_dir: Path, method: IndexingMethod, ...):
  # 配置加载
  config = load_config(root_dir, config_filepath, cli_overrides)
  # 索引构建
  _run_index(config=config, method=method, ...)

这里我们可以看到两个关键步骤:

  1. 配置加载:调用 load_config() 函数加载并合并配置,支持通过命令行参数覆盖配置文件中的设置
  2. 索引执行:调用 _run_index() 执行实际的索引构建

配置加载

配置加载是 GraphRAG 的核心功能之一,位于 config/load_config.py 文件中,其实现如下:

def load_config(
  root_dir: Path,
  config_filepath: Path | None = None,
  cli_overrides: dict[str, Any] | None = None,
) -> GraphRagConfig:
  # 路径规范化,确保使用绝对路径
  root = root_dir.resolve()
  # 在根目录中搜索配置文件
  config_path = _get_config_path(root, config_filepath)
  # 加载 .env 文件中的环境变量
  _load_dotenv(config_path)
  # 读取配置文件文本内容
  config_text = config_path.read_text(encoding="utf-8")
  # 解析环境变量引用
  config_text = _parse_env_variables(config_text)
  # 根据文件类型解析为字典(支持 YAML 和 JSON)
  config_extension = config_path.suffix
  config_data = _parse(config_extension, config_text)
  # 应用命令行参数覆盖
  if cli_overrides:
    _apply_overrides(config_data, cli_overrides)
  # 创建并验证最终配置对象
  return create_graphrag_config(config_data, root_dir=str(root))

这里的 load_config() 函数实现了一个完整的类型安全的配置管理流程,整体流程非常清晰:

  1. 路径规范化:通过 root_dir.resolve() 确保使用绝对路径,其中 root_dir 是用户通过 --root-r 参数指定
  2. 配置发现:如果用户指定了 --config-c 参数,则使用用户自定义配置;否则在根目录中搜索默认配置文件,比如 settings.yamlsettings.ymlsettings.json
  3. 环境变量加载:加载 .env 文件中的环境变量
  4. 配置读取:读取配置文件文本内容
  5. 环境变量替换:解析配置文件中环境变量引用,比如 ${GRAPHRAG_API_KEY}
  6. 格式解析:根据文件类型将配置解析为字典,支持 YAML 和 JSON 两种方式
  7. 覆盖应用:应用命令行参数覆盖,比如 --output-o 参数会覆盖 output.base_dir 配置
  8. 对象创建:创建并验证最终的 GraphRagConfig 配置对象,使用 Pydantic 保证类型安全

这里有几个点比较有意思,可以展开介绍一下。

环境变量支持

GraphRAG 支持在配置文件中注入环境变量,这是使用 Python 的 Template 类实现的:

def _parse_env_variables(text: str) -> str:
  return Template(text).substitute(os.environ)

用户可以在配置文件中使用 ${VAR_NAME} 格式引用环境变量:

models:
  default_chat_model:
    type: openai_chat
    api_base: ${GRAPHRAG_API_BASE}
    auth_type: api_key
    api_key: ${GRAPHRAG_API_KEY}
    model: gpt-4o-mini

点号分隔覆盖机制

配置覆盖支持点号分隔的嵌套路径,比如当用户设置 --output-o 参数时,会覆盖下面三个配置:

cli_overrides = {}
if output_dir:
  cli_overrides["output.base_dir"] = str(output_dir)
  cli_overrides["reporting.base_dir"] = str(output_dir)
  cli_overrides["update_index_output.base_dir"] = str(output_dir)

这种点号分隔的路径语法允许精确覆盖嵌套配置项,为用户提供了灵活的配置管理能力,这块的实现比较通用,有类似需求的话,可以参考下 _apply_overrides() 的函数实现。

使用 Pydantic 创建配置

在配置加载的最后,通过 create_graphrag_config() 函数创建最终的 GraphRagConfig 配置对象:

def create_graphrag_config(
  values: dict[str, Any] | None = None,
  root_dir: str | None = None,
) -> GraphRagConfig:

  values = values or {}
  if root_dir:
    root_path = Path(root_dir).resolve()
    values["root_dir"] = str(root_path)
  return GraphRagConfig(**values)

这里使用了 Pydantic 库,其中 GraphRagConfig 类被定义为一个 Pydantic 模型,我们直接将 values 字典展开(两个星号 ** 用于解包字典)变成一个类型安全的配置对象。Pydantic 不仅能保证类型安全,还支持自定义参数验证,我们下面重点介绍一下它。

简单介绍 Pydantic 库

GraphRAG 使用 Pydantic 进行配置管理,这是一个基于类型提示的数据验证库。

pydantic.png

它的核心优势包括:

  • 类型安全:基于 Python 类型提示自动验证数据类型
  • 数据转换:自动进行数据类型转换和标准化
  • 验证规则:支持复杂的验证逻辑和自定义验证器
  • 错误报告:提供详细的验证错误信息
  • IDE 支持:完美的代码补全和类型检查支持

下面是一个简化的例子,展示如何使用 Pydantic 和 YAML 实现类似 GraphRAG 的配置管理。首先通过 Pydantic 定义配置类:

from enum import Enum
from pydantic import BaseModel, Field, field_validator

class StorageType(str, Enum):
  """存储类型枚举"""
  FILE = "file"
  AZURE_BLOB = "azure_blob"
  S3 = "s3"

class DatabaseConfig(BaseModel):
  """数据库配置模型"""
  host: str = Field(default="localhost", description="数据库主机")
  port: int = Field(default=5432, description="数据库端口")
  username: str = Field(description="用户名")
  password: str = Field(description="密码")
  
  @field_validator("port")
  @classmethod
  def validate_port(cls, v):
    if not 1 <= v <= 65535:
      raise ValueError("端口必须在 1-65535 范围内")
    return v

class AppConfig(BaseModel):
  """主配置模型"""
  app_name: str = Field(default="MyApp", description="应用名称")
  debug: bool = Field(default=False, description="调试模式")
  storage_type: StorageType = Field(default=StorageType.FILE, description="存储类型")
  database: DatabaseConfig = Field(description="数据库配置")
  
  # 自定义验证器
  @field_validator("app_name")
  @classmethod
  def validate_app_name(cls, v):
    if not v.strip():
      raise ValueError("应用名称不能为空")
    return v.strip()

然后创建一个配置文件 config.yaml 内容如下:

app_name: "Demo"
debug: true
storage_type: "file"
database:
  host: "localhost"
  port: 5432
  username: "admin"
  password: "secret"

接着我们就可以从 YAML 文件加载配置,并将其转换为强类型的配置对象:

import yaml
from pathlib import Path

def load_config_from_yaml(yaml_path: Path) -> AppConfig:
  """从 YAML 文件加载配置"""
  with open(yaml_path, 'r', encoding='utf-8') as f:
    config_data = yaml.safe_load(f)
  # Pydantic 自动验证和转换
  return AppConfig(**config_data)

# 使用示例
config_file = Path("config.yaml")
config = load_config_from_yaml(config_file)
print(f"应用名称: {config.app_name}")
print(f"数据库配置: {config.database.host}:{config.database.port}")

运行结果如下:

应用名称: Demo
数据库配置: localhost:5432

这个例子展示了 Pydantic 的核心特性,包括类型声明、默认值、验证器和自动数据转换等。

工作流引擎

配置加载之后,GraphRAG 调用 _run_index() 执行实际的索引构建,而它则是调用 API 层的 build_index() 函数:

async def build_index(
  config: GraphRagConfig,
  method: IndexingMethod | str = IndexingMethod.Standard,
  is_update_run: bool = False,
  ...
) -> list[PipelineRunResult]:
  
  outputs: list[PipelineRunResult] = []
  # 根据 method 创建对应的工作流管道
  method = _get_method(method, is_update_run)
  pipeline = PipelineFactory.create_pipeline(config, method)
  # 依次运行管道中的每个工作流
  async for output in run_pipeline(pipeline, config, is_update_run=is_update_run, additional_context=additional_context):
    outputs.append(output)
    logger.info("Workflow %s completed successfully", output.workflow)
  return outputs

GraphRAG 的索引构建采用了灵活的管道架构,其中 PipelineFactory 采用工厂设计模式,负责管理和创建处理工作流和管道:

class PipelineFactory:
  """工作流管道工厂类"""
  
  workflows: ClassVar[dict[str, WorkflowFunction]] = {}
  pipelines: ClassVar[dict[str, list[str]]] = {}
  
  @classmethod
  def register(cls, name: str, workflow: WorkflowFunction):
    """注册自定义工作流函数"""
    cls.workflows[name] = workflow

  @classmethod
  def register_all(cls, workflows: dict[str, WorkflowFunction]):
    """批量注册自定义工作流函数"""
    for name, workflow in workflows.items():
      cls.register(name, workflow)
  
  @classmethod
  def register_pipeline(cls, name: str, workflows: list[str]):
    """注册自定义管道,一个管道包含多个工作流函数"""
    cls.pipelines[name] = workflows

  @classmethod
  def create_pipeline(cls, config: GraphRagConfig, method: IndexingMethod) -> Pipeline:
    """根据 method 创建管道"""
    workflows = config.workflows or cls.pipelines.get(method, [])
    return Pipeline([(name, cls.workflows[name]) for name in workflows])

这里涉及 GraphRAG 中的两个核心概念:工作流(Workflow)管道(Pipeline)。工作流是一个个独立的模块,比如加载文档、文本分片、提取图谱等等,程序启动时会自动注册所有内置的工作流函数:

PipelineFactory.register_all({
  "load_input_documents": run_load_input_documents,
  "create_base_text_units": run_create_base_text_units,
  "extract_graph": run_extract_graph,
  "create_communities": run_create_communities,
  # ...
})

而管道则是由多个工作流串起来的一个集合,系统预定义了四个管道:

PipelineFactory.register_pipeline(
  IndexingMethod.Standard, ["load_input_documents", *_standard_workflows]
)
PipelineFactory.register_pipeline(
  IndexingMethod.Fast, ["load_input_documents", *_fast_workflows]
)
PipelineFactory.register_pipeline(
  IndexingMethod.StandardUpdate, ["load_update_documents", *_standard_workflows, *_update_workflows],
)
PipelineFactory.register_pipeline(
  IndexingMethod.FastUpdate, ["load_update_documents", *_fast_workflows, *_update_workflows],
)

分别对应四种不同的构建索引的方法,当我们执行 graphrag index 命令时,可以通过 --method-m 参数指定:

# 标准方法
$ graphrag index --root ./ragtest --method standard

# 快速方法
$ graphrag index --root ./ragtest --method fast

# 标准方法(用于更新)
$ graphrag index --root ./ragtest --method standard-update

# 快速方法(用于更新)
$ graphrag index --root ./ragtest --method fast-update

GraphRAG 通过 create_pipeline() 方法,根据 method 找到对应的管道,然后依次运行管道中的每个工作流函数。

索引构建方法

上面提到 GraphRAG 内置了四种索引构建方法,每种都有其特定的适用场景:

索引方法速度质量适用场景主要特点
Standard首次索引,追求高质量全 LLM 驱动的图构建
Fast快速原型,大数据集NLP + LLM 混合方式
StandardUpdate增量更新,保持高质量标准方法 + 增量更新
FastUpdate频繁更新,快速处理快速方法 + 增量更新

其中后两个 Update 方法是在前两个方法的基础上增加了增量更新的能力,能够在不重新构建整个索引的情况下,处理新增或修改的文档,大大提高了增量处理的效率。我们这里主要关注 StandardFast 这两个方法,它们的主要差异点在于:

  • Standard 方法的 LLM 驱动流程

    • extract_graph:使用大语言模型从文本中提取实体和关系
    • extract_covariates:使用 LLM 提取声明和协变量信息
    • create_community_reports:基于图上下文生成高质量社区报告
  • Fast 方法的混合流程

    • extract_graph_nlp:使用传统 NLP 技术(如 spaCy)进行实体识别
    • prune_graph:对提取的实体进行过滤和清理
    • create_community_reports_text:基于文本单元上下文生成报告(更快但质量稍低)

下图展示了两种方法的工作流执行顺序:

index-workflow.png

蓝色为 Standard 方法使用的工作流,红色为 Fast 方法使用的工作流,绿色为二者共用的工作流。

小结

我们今天学习了当执行 graphrag index 命令时的整个调用过程,主要内容包括:

  1. 命令行入口:详细分析了从 graphrag index 命令到核心逻辑的完整调用链;
  2. 配置加载:深入解读了 load_config() 函数的实现,包括环境变量支持和点号分隔覆盖机制;
  3. Pydantic 介绍:通过一个简单的示例演示如何使用 Pydantic 和 YAML 实现类似 GraphRAG 的配置管理;
  4. 工作流引擎:学习 PipelineFactory 的设计模式和工作流注册机制;
  5. 索引构建方法:对比了四种不同的索引构建方法及其适用场景,通过 Mermaid 图表展示了索引构建的完整处理流程;

在下一篇文章中,我们将深入索引构建的具体流程,先从文档处理阶段开始,详细分析 GraphRAG 如何从各种格式的原始文档中提取和预处理文本数据,为后续的知识提取做好准备。


剖析 GraphRAG 的项目结构

经过这几天的动手实践和可视化体验,我们已经对 GraphRAG 的核心功能有了一个基本的了解。今天,我们正式开始深入研究它的源码,首先从熟悉项目结构入手,为我们后续学习其核心工作流(比如索引和查询)打下基础。

项目概览

使用 tree 命令查看 GraphRAG 项目的项目结构如下:

$ tree -L 1 graphrag
├── __init__.py
├── __main__.py     # 模块入口
├── api             # API 层
├── cache
├── callbacks
├── cli             # CLI 入口
├── config          # 配置管理
├── data_model      # 数据模型
├── index           # 索引系统
├── language_model  # 语言模型
├── logger
├── prompt_tune     # 提示词调优
├── prompts         # 提示词系统
├── query           # 查询系统
├── storage         # 存储抽象层
├── utils
└── vector_stores   # 向量数据库

其中基础模块包括:

  • API 层(api:这里提供了高层次的编程接口,是外部调用 GraphRAG 功能的主要入口,包括索引构建 API(index.py)、查询 API(query.py)和提示词调优 API(prompt_tune.py)三大块;
  • CLI 入口(cli:通过 Typer 装饰器定义命令,解析命令行参数并调用 API 层;
  • 配置管理(config:使用 Pydantic 定义类型安全的配置模型,包括主配置、LLM 配置、向量配置、存储配置等;
  • 数据模型(data_model:定义核心数据结构,包括实体、关系、社区、社区报告、文本单元、文档等;

从入口处可以看出 GraphRAG 有三大功能:索引构建、查询、提示词调优,分别位于下面这些模块中:

  • 索引系统(index:这是 GraphRAG 的核心模块,负责从原始文档构建知识图谱,这里的内容也是最丰富的;比如 input 目录用于输入处理,支持 CSV、JSON、纯文本多种格式的文档;workflows 是 GraphRAG 的工作流引擎,定义了标准的数据处理工作流,使用工厂模式动态创建流水线,支持增量更新和全量重建;operations 目录实现了大量的工作流原子操作,包括文本处理、图谱构建、社区识别、总结分析等;下面是一些典型的工作流操作:

    • chunk_text - 文本分块,将长文档切分为可处理的片段;
    • embed_text - 文本向量化,支持多种嵌入模型;
    • extract_graph - 从文本提取实体和关系;
    • build_noun_graph - 构建名词图,识别重要概念;
    • create_graph.py - 创建完整的知识图谱;
    • cluster_graph.py - 图聚类,发现社区结构;
    • embed_graph - 图嵌入,支持 Node2Vec 等算法;
    • layout_graph - 图布局,支持 UMAP 降维可视化;
    • summarize_communities - 社区摘要生成;
    • summarize_descriptions - 实体描述摘要;
    • extract_covariates - 协变量提取(如声明检测);
  • 查询系统(query:为 GraphRAG 提供了多种检索策略,包括:

    • 全局搜索:基于社区报告的高层次查询,适合回答概括性或主题性问题;使用 Map-Reduce 模式处理大规模数据;
    • 本地搜索:基于实体和关系的细粒度查询,适合回答具体的事实性问题;混合向量检索和图遍历;
    • 漂移搜索:基于动态选择社区查询,在查询过程中调整搜索范围,平衡查询深度和广度;
  • 提示词系统(prompts:包括索引和查询所使用的所有提示词;
  • 提示词调优(prompt_tune:根据具体数据领域自动生成最适合的提示词,提高知识图谱构建质量;

此外,为实现索引构建和查询,大模型和存储服务也是必不可少的:

  • 语言模型抽象(language_model:提供统一的 LLM 和 Embedding 接口,主要通过 fnllm 库实现;
  • 存储抽象层(storage:提供统一的存储接口,支持多种后端,包括本地文件系统、Azure Blob、CosmosDB、内存存储等;
  • 向量数据库(vector_stores:支持多种向量数据库,包括:高性能向量数据库 LanceDB、Azure 认知搜索服务 Azure AI Search 以及 Azure 的 NoSQL 数据库 CosmosDB

技术栈分析

通过分析 pyproject.toml 文件,我们可以对 GraphRAG 的核心依赖和开发工作链有一个初步了解。

  • 大语言模型

    • fnllm[azure,openai] - 统一的大语言模型接口库
    • openai - OpenAI 官方提供的 API 客户端
    • tiktoken - 一款高效的 BPE 分词库,能实现文本分词,并计算出文本对应的 token 数量
  • 数据科学

    • pandas - 数据处理和分析
    • numpy - 数值计算
    • networkx - 图算法库,用于创建、操作和研究各种图结构,支持图的遍历、最短路径查找等多种图算法实现
    • graspologic - 图统计和机器学习,提供了图数据的统计分析方法和基于图的机器学习模型,助力从图结构数据中挖掘有价值的信息
    • umap-learn - 降维算法库,基于 UMAP 算法,能将高维数据映射到低维空间,同时保留数据的局部和全局结构,便于数据可视化和后续分析
  • 向量存储

    • lancedb - 高性能的向量数据库,具备高效的向量存储和检索能力
    • azure-search-documents - Azure 搜索服务的客户端库
  • 配置和 CLI

    • pydantic - 数据验证和配置管理,基于类型提示进行数据验证,能确保数据的完整性和正确性,同时便于配置信息的管理和解析
    • typer - 现代化的 CLI 框架,基于 Python 的类型提示构建,让开发者能快速、简洁地创建功能丰富的命令行工具
    • pyyaml - 用于处理 YAML 格式配置文件
  • 代码质量

    • ruff - 现代化的 Python 代码检查和格式化工具
    • pyright - 由微软开发的 Python 类型检查器
  • 测试框架

    • pytest - 测试框架
    • pytest-asyncio - 提供异步测试支持
    • coverage - 用于代码覆盖率分析
  • 文档和发布

    • mkdocs-material - 现代化的文档站点生成工具,基于 MkDocs 构建
    • semversioner - 用于语义化版本管理的工具,帮助开发者规范地管理项目的版本号,记录版本变更信息,简化版本发布流程

命令行入口

GraphRAG 是一个命令行程序,正如我们在入门篇里所体验的,使用 uv run poe init 命令初始化工作空间:

$ uv run poe init --root ./ragtest

其中 poePoe The Poet 的简称,它是一个任务管理工具,允许在 pyproject.toml 中定义和运行各种任务:

[project]
name = "graphrag"

[tool.poe.tasks]
...
index = "python -m graphrag index"
update = "python -m graphrag update"
init = "python -m graphrag init"
query = "python -m graphrag query"
prompt_tune = "python -m graphrag prompt-tune"

可以看出 poe 命令实际上是通过 python -m 执行的:

$ python -m graphrag init

包括 init 在内,GraphRAG 一共支持 5 个不同的子命令:

  • graphrag init - 初始化项目配置
  • graphrag index - 构建知识图谱索引
  • graphrag query - 执行查询操作
  • graphrag update - 增量更新索引
  • graphrag prompt-tune - 提示词调优

GraphRAG 采用了标准的 Python 包结构,__main__.py 文件是模块的入口:

from graphrag.cli.main import app

app(prog_name="graphrag")

这个文件允许用户通过 python -m graphrag 直接运行包,为开发和测试提供便利。

另外,如果你是通过 pip install 安装的,还可以直接运行 graphrag 命令:

$ graphrag init --root ./ragtest

要实现这一点,其秘密在于 pyproject.toml 文件中的下面这行配置:

[project.scripts]
graphrag = "graphrag.cli.main:app"

这里的 [project.scripts] 用于定义脚本入口,当执行 pip install 后,会在 Python 环境的 bin 目录下生成一个可执行文件,我们不妨看下这个文件内容:

# -*- coding: utf-8 -*-
import sys
from graphrag.cli.main import app
if __name__ == "__main__":
  if sys.argv[0].endswith("-script.pyw"):
    sys.argv[0] = sys.argv[0][:-11]
  elif sys.argv[0].endswith(".exe"):
    sys.argv[0] = sys.argv[0][:-4]
  sys.exit(app())

这个文件根据 [project.scripts] 定义,导入 graphrag.cli.main 包下的 app 方法并执行,因此我们可以直接运行 graphrag 命令。

命令行实现

GraphRAG 命令行是基于 Typer 库开发的,这是一个现代化的 Python CLI 库,基于类型提示构建命令行应用程序。

typer.png

它的核心特点如下:

  • 类型驱动:基于 Python 类型提示自动生成 CLI 接口
  • 易用性:简洁的 API,最少的样板代码
  • 自动化:自动生成帮助文档、参数验证和错误处理
  • 现代化:支持 Python 3.6+ 的现代特性
  • 高级功能:子命令组织、回调函数、进度条、颜色输出、确认提示等

通过寥寥几行代码,我们就可以使用 Typer 开发一个命令行程序:

import typer

app = typer.Typer()

@app.command()
def hello(name: str):
  print(f"Hello {name}")

if __name__ == "__main__":
  app()

再回到 GraphRAG 的代码,当我们执行 python -mgraphrag 命令时,实际上调用的是 __main__.pybin/graphrag 文件,它们都是指向 graphrag.cli.main 包下的 app 方法:

graphrag-typer-app.png

从这里可以看到,GraphRAG 基于 Typer 实现了多个选项和子命令的功能:

graphrag-cli.png

我们以 graphrag index 为例看下命令的调用链:

  1. 入口点__main__.pybin/graphraggraphrag.cli.main:app
  2. CLI 解析cli/main.py@app.command("index") 装饰器
  3. 配置加载cli/index.pyindex_cli() 函数
  4. API 调用api.build_index() 函数

小结

至此,我们对 GraphRAG 的项目结构有了大致的了解,并对 graphrag 命令行的运行原理有了一定的认识。接下来,我们将顺着 graphrag index 的调用链,展开聊聊 GraphRAG 索引构建的具体逻辑。