Fork me on GitHub

分类 GraphRAG 下的文章

GraphRAG 索引构建之知识提取

在前面的文章中,我们详细学习了 GraphRAG 索引构建的文档处理阶段,了解了如何将各种格式的原始文档转换为标准化的文本单元。今天我们将深入探索整个索引构建流程中最核心的部分 —— 知识提取,看看 GraphRAG 是如何利用大语言模型的力量,从非结构化的文本中挖掘出结构化的实体、关系和声明,并最终构建出完整的知识图谱。

知识提取阶段总览

让我们回顾一下索引构建的整体流程,知识提取正是连接文本处理和图谱分析的关键桥梁:

index-workflow-3-steps.png

知识提取阶段包含三个核心工作流:

  • 提取图谱(extract_graph - 从文本中提取实体和关系,对同一个实体和关系的描述进行总结合并;
  • 图谱规范化(finalize_graph - 对提取结果进行规范化处理,输出最终的实体表和关系表;
  • 提取事实声明(extract_covariates - 提取事实性声明,这一步是可选的,声明通常用于识别诸如欺诈等恶意行为,因此并不适用于所有数据集;

提取图谱

图谱提取的实现位于 index/workflows/extract_graph.py 文件中,整个工作流包含以下关键步骤:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 1. 加载文本单元
  text_units = await load_table_from_storage("text_units", context.output_storage)
  
  # 2. 配置提取策略和摘要策略
  extraction_strategy = config.extract_graph.resolved_strategy()
  summarization_strategy = config.summarize_descriptions.resolved_strategy()
  
  # 3. 执行图谱提取
  entities, relationships, raw_entities, raw_relationships = await extract_graph(
    text_units=text_units,
    extraction_strategy=extraction_strategy,
    summarization_strategy=summarization_strategy
  )
  
  # 4. 保存结果
  await write_table_to_storage(entities, "entities", context.output_storage)
  await write_table_to_storage(relationships, "relationships", context.output_storage)

这里值得注意是提取策略和摘要策略,它们对应 settings.yaml 文件中的 extract_graphsummarize_descriptions 两个配置:

extract_graph:
  model_id: default_chat_model
  prompt: "prompts/extract_graph.txt"
  entity_types: [organization,person,geo,event]
  max_gleanings: 1

summarize_descriptions:
  model_id: default_chat_model
  prompt: "prompts/summarize_descriptions.txt"
  max_length: 500

从这里可以看出,整个图谱提取分为两步:首先遍历上一阶段生成的所有文本单元,通过大模型从每个文本片段中识别出实体和关系;然后将相同的实体和关系进行合并,再通过大模型总结合并之后的实体和关系描述;

extract-graph.png

核心提示词

第一步所使用的提示词位于 prompts/extract_graph.txt,翻译如下:

-目标-
给定一个可能与本活动相关的文本文档和一个实体类型列表,从文本中识别出所有属于这些类型的实体以及已识别实体之间的所有关系。

-步骤-
1. 识别所有实体。对于每个已识别的实体,提取以下信息:
- entity_name: 实体名称,首字母大写
- entity_type: 以下类型之一:[{entity_types}]
- entity_description: 对实体属性和活动的全面描述
每个实体的格式为("entity"<|><entity_name><|><entity_type><|><entity_description>)

2. 从步骤1中识别出的实体中,识别所有(源实体,目标实体)对,这些实体对之间存在*明确的关系*。
对于每对相关实体,提取以下信息:
- source_entity: 源实体的名称,如步骤1中所识别的
- target_entity: 目标实体的名称,如步骤1中所识别的
- relationship_description: 解释为什么认为源实体和目标实体相关
- relationship_strength: 一个数值分数,表示源实体和目标实体之间关系的强度
每个关系的格式为("relationship"<|><source_entity><|><target_entity><|><relationship_description><|><relationship_strength>)

3. 以英文返回输出,作为步骤1和步骤2中识别的所有实体和关系的单个列表。使用 **##** 作为列表分隔符。

4. 完成后,输出 <|COMPLETE|>

输出的结果每一行包含一条实体信息或关系信息,不同字段之间使用 <|> 分割,记录之间使用 ## 分割,并以 <|COMPLETE|> 结尾,下面是一个示例:

("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at X Corp)
##
("entity"<|>X CORP<|>ORGANIZATION<|>X Corp is a technology company)
##
("relationship"<|>ALICE JOHNSON<|>X CORP<|>Alice Johnson works as a software engineer at X Corp<|>8)
<|COMPLETE|>

第二步所使用的提示词位于 prompts/summarize_descriptions.txt,这个就比较简单了:

你是一名乐于助人的助手,负责对下方提供的数据生成一份全面的总结。
给定一个或多个实体,以及一份描述列表,这些都与同一个实体或一组实体相关。
请将所有这些内容合并成一个单一的、全面的描述。确保包含从所有描述中收集到的信息。
如果所提供的描述存在矛盾,请解决这些矛盾并提供一个单一的、连贯的总结。
确保用第三人称撰写,并包含实体名称,以便我们了解完整的背景。
将最终描述的长度限制在 {max_length} 个单词以内。

#######
-数据-
实体:{entity_name}
描述列表:{description_list}
#######
输出:

如果提取实体和关系时,同一个实体或关系出现了多次,就有可能导致信息不完整或矛盾的情况,比如下面两条记录:

("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at X Corp)
("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at Y Corp)

通过对描述进行总结,得到一条关于 ALICE JOHNSON 的完整记录:

("entity"<|>ALICE JOHNSON<|>PERSON<|>Alice Johnson is a software engineer at X Corp and Y Corp)

多轮迭代提取

此外,在提取图谱时,GraphRAG 还实现了一个多轮迭代提取的机制,通过多次调用 LLM 来确保提取的完整性,可以通过 max_gleanings 参数进行配置。其核心逻辑如下:

# 如果指定了 `max_gleanings` 参数,就进入循环以提取更多实体
# 有两个退出条件:(a)达到配置的最大值,(b)模型表示没有更多实体了
if self._max_gleanings > 0:
  for i in range(self._max_gleanings):
    response = await self._model.achat(
      CONTINUE_PROMPT,
      name=f"extract-continuation-{i}",
      history=response.history,
    )
    results += response.output.content or ""

    # 达到配置的最大值
    if i >= self._max_gleanings - 1:
      break

    response = await self._model.achat(
      LOOP_PROMPT,
      name=f"extract-loopcheck-{i}",
      history=response.history,
    )

    # 模型表示没有更多实体了
    if response.output.content != "Y":
      break

每次迭代会调用两次大模型,第一次告诉大模型提取有遗漏,让它继续提取,提示词 CONTINUE_PROMPT 如下:

在上一次提取中遗漏了许多实体和关系。
请记住,只输出与之前提取的任何类型相匹配的实体。
使用相同的格式在下方添加它们:

第二次让大模型进一步确认是否还有遗漏,提示词 LOOP_PROMPT 如下:

似乎仍有一些实体和关系可能被遗漏了。
如果还有需要添加的实体或关系,请回答Y;如果没有,请回答N。
请只用一个字母Y或N回答。

未完待续

今天我们开始学习 GraphRAG 索引构建流程中最核心的知识提取阶段,它包含 提取图谱图谱规范化提取事实声明 三个工作流。今天学习的主要是提取图谱的几个核心要点:

  • 两阶段图谱提取:掌握 GraphRAG 如何通过大模型首先从文本单元中识别实体和关系,然后对相同的实体和关系进行合并和总结,形成完整的知识图谱结构;
  • 精心设计的提示词工程:介绍了 extract_graph.txtsummarize_descriptions.txt 两个核心提示词,了解了如何通过结构化的指令引导大模型提取高质量的实体关系信息;
  • 多轮迭代提取机制:学习了 GraphRAG 通过 max_gleanings 配置实现的多轮迭代提取,确保信息提取的完整性和准确性;

关于知识提取阶段还有另外两个工作流,我们明天继续。


GraphRAG 索引构建之文档处理

昨天我们对 GraphRAG 的索引构建做了一个基本的概述,了解了其索引构建的整体流程和工作流引擎的设计。今天我们将深入第一个具体阶段 —— 文档处理,这是整个知识图谱构建的起点,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取奠定基础。

回顾索引构建流程

根据前一篇文章的分析,我们可以将 GraphRAG 的索引构建流程分为三个主要阶段:

index-workflow-3-steps.png

今天我们重点关注文档处理阶段,它的核心任务是将非结构化的原始数据转换为后续处理流程所需的标准化文本单元。这个阶段包含三个关键工作流:

  • 加载原始文档(load_input_documents - 从各种数据源加载原始文档
  • 文本分块处理(create_base_text_units - 将长文档分割成可处理的文本块
  • 文档规范化(create_final_documents - 对文档进行最终的标准化整理

加载原始文档

在 GraphRAG 中,所有工作流的入口都是 run_workflow() 方法,加载原始文档也是如此。它的核心逻辑位于 index/workflows/load_input_documents.py 文件:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 加载输入文档,转换为标准的 pd.DataFrame 格式
  output = await load_input_documents(config.input, context.input_storage)

  # 将原始文档写入 documents 表中
  await write_table_to_storage(output, "documents", context.output_storage)
  return WorkflowFunctionOutput(result=output)

这个工作流首先加载输入文档,将其转换为标准的 pd.DataFrame 格式,然后写入 documents 表中。GraphRAG 通过工厂模式实现了对纯文本、CSV、JSON 三种文件格式的统一处理:

loaders: dict[str, Callable[..., Awaitable[pd.DataFrame]]] = {
  InputFileType.text: load_text,
  InputFileType.csv: load_csv,
  InputFileType.json: load_json,
}

如果你有不同的格式,官方的建议是编写脚本将其转换为其中一种。

同时,它也支持 Azure Blob、CosmosDB、本地文件、内存这四种不同的存储类型:

StorageFactory.register(StorageType.blob.value, create_blob_storage)
StorageFactory.register(StorageType.cosmosdb.value, create_cosmosdb_storage)
StorageFactory.register(StorageType.file.value, create_file_storage)
StorageFactory.register(StorageType.memory.value, lambda **_: MemoryPipelineStorage())

默认情况下是从本地的 input 目录下读取文件,并使用 text 加载器处理。我们可以在 settings.yaml 文件中的 input 部分进行配置:

input:
  storage:
    type: file # [blob, cosmosdb, file, memory]
    base_dir: "input"
  file_type: text # [csv, text, json]

文本文件处理

对于纯文本文件,处理逻辑相对简单,在 index/input/text.py 中:

async def load_file(path: str, group: dict | None = None) -> pd.DataFrame:
  text = await storage.get(path, encoding=config.encoding)
  new_item = {**group, "text": text}
  new_item["id"] = gen_sha512_hash(new_item, new_item.keys())
  new_item["title"] = str(Path(path).name)
  new_item["creation_date"] = await storage.get_creation_date(path)
  return pd.DataFrame([new_item])

每个文本文件被读取为一个完整的字符串,并自动生成标准字段:

  • text: 文件的完整内容
  • id: 基于内容的 SHA512 哈希值,确保唯一性
  • title: 文件名(不含路径)
  • creation_date: 文件创建日期

JSON 文件处理

JSON 文件的处理位于 index/input/json.py 文件,直接调用 json.loads() 加载数据。支持两种格式,一种是单个对象格式:

{
  "text": "文档内容",
  "title": "文档标题"
}

另一种是数组格式:

[
  {"text": "文档1内容", "title": "文档1标题"},
  {"text": "文档2内容", "title": "文档2标题"}
]

注意,目前暂不支持 JSONL 格式(每行一个完整的 JSON 对象),需转换为数组格式。

JSON 解析器会自动为对象增加 idcreation_date 两个字段。另外,如果 JSON 中没有 texttitle 字段,还可以在配置文件中设置:

input:
  file_type: json
  text_column: content  # 指定 text 列
  title_column: name    # 指定 title 列

CSV 文件处理

CSV 文件的处理位于 index/input/csv.py 文件,直接调用 pd.read_csv() 加载数据,它的格式如下:

text,title
"文档1内容","文档1标题"
"文档2内容","文档2标题"

同样的,CSV 解析器也会自动为对象增加 idcreation_date 两个字段,也支持在配置文件中设置 texttitle 字段映射。

文本分块处理

加载原始文档后,下一步是将长文档切分成更小、更易于处理的文本单元,这一步骤对后续的 LLM 处理至关重要。分块策略通过 settings.yaml 中的 chunks 部分配置:

chunks:
  size: 1200                   # 每个分块的目标 token 数量
  overlap: 100                 # 相邻分块之间重叠的 token 数量
  group_by_columns: [id]       # 按指定列分组,确保分块在文档内部进行
  strategy: tokens             # 分块策略:tokens 或 sentence
  encoding_model: cl100k_base  # 用于计算 token 的编码模型

其中 sizeoverlap 比较好理解,分别表示每个分块的大小和相邻分块之间的重叠;group_by_columns 通常设置为 ["doc_id"] 或 ["id"] 表示在分块前先按文档 ID 进行分组,确保文本分块不会跨越不同的文档;strategy 表示分块策略,支持 tokenssentence 两种。

def load_strategy(strategy: ChunkStrategyType) -> ChunkStrategy:
  """分块策略定义"""
  match strategy:
    case ChunkStrategyType.tokens:
      # tokens 分块策略
      from graphrag.index.operations.chunk_text.strategies import run_tokens
      return run_tokens
    case ChunkStrategyType.sentence:
      # sentence 分块策略
      from graphrag.index.operations.chunk_text.bootstrap import bootstrap
      from graphrag.index.operations.chunk_text.strategies import run_sentences
      bootstrap()
      return run_sentences

基于 tokens 的精确分块

tokens 分块策略 是 GraphRAG 的默认分块策略,它基于 tiktoken 库进行精确的 token 计算和分块,核心实现如下:

def run_tokens(
  input: list[str], config: ChunkingConfig, tick: ProgressTicker,
) -> Iterable[TextChunk]:

  # 获取编码和解码函数
  encode, decode = get_encoding_fn(encoding_name)
  
  # 基于编码 token 将文本分块
  return split_multiple_texts_on_tokens(
    input,
    Tokenizer(
      chunk_overlap=chunk_overlap,
      tokens_per_chunk=tokens_per_chunk,
      encode=encode,
      decode=decode,
    ),
    tick,
  )

其中 get_encoding_fn() 获取编码和解码函数,它支持通过 tiktoken 库加载不同的编码模型,比如 GPT-3.5 和 GPT-4 的 cl100k_base 或者 GPT-4.1 的 o200k_base 等,可以在 tiktoken 源码中 找到完整的模型列表:

tiktoken-models.png

tokens 分块的核心算法位于 split_multiple_texts_on_tokens() 函数:

def split_multiple_texts_on_tokens(
  texts: list[str], tokenizer: Tokenizer, tick: ProgressTicker
) -> list[TextChunk]:
  """将多个文本进行分块并返回带元数据的块"""
  result = []
  mapped_ids = []

  # 第一步:编码所有文本并记录源文档索引
  for source_doc_idx, text in enumerate(texts):
    encoded = tokenizer.encode(text)
    mapped_ids.append((source_doc_idx, encoded))

  # 第二步:创建全局 token 序列,每个 token 都记录其来源文档
  input_ids = [
    (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids
  ]

  # 第三步:滑动窗口分块
  start_idx = 0
  cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
  chunk_ids = input_ids[start_idx:cur_idx]

  while start_idx < len(input_ids):
    # 解码当前块的文本内容
    chunk_text = tokenizer.decode([id for _, id in chunk_ids])
    # 记录当前块涉及的源文档索引
    doc_indices = list({doc_idx for doc_idx, _ in chunk_ids})
    result.append(TextChunk(chunk_text, doc_indices, len(chunk_ids)))
    # 已完成
    if cur_idx == len(input_ids):
      break
    # 滑动窗口:下一个块的起始位置考虑重叠
    start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
    cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
    chunk_ids = input_ids[start_idx:cur_idx]

  return result

该算法首先使用 tiktoken 库将所有文档编码成 token 序列(即 mapped_ids 数组);然后将 token 序列展开,变成一个全局的整数序列,每个 token 都保留其来源信息(即 input_ids 数组);然后按 tokens_per_chunk 数量对 token 序列进行第一个分块并解码得到文本内容;接着使用滑动窗口往后依次分块,同时考虑相邻块之间有指定数量的重叠 token,保持上下文连续性。

有趣的是,这里的分块算法参考了 LangChain 的实现,有兴趣的朋友可以看下 LangChain 的 TokenTextSplitter

基于句子的自然分块

sentence 分块策略 使用 NLTK 库进行基于句子边界的自然语言分块,它首先调用 bootstrap() 初始化必要的 NLTK 资源:

def bootstrap():
  """初始化 NLTK 资源"""
  global initialized_nltk
  if not initialized_nltk:
    import nltk
    from nltk.corpus import wordnet as wn
    # 句子分词
    nltk.download("punkt")
    nltk.download("punkt_tab")
    # 词性标注
    nltk.download("averaged_perceptron_tagger")
    nltk.download("averaged_perceptron_tagger_eng")
    # 命名实体识别相关
    nltk.download("maxent_ne_chunker")
    nltk.download("maxent_ne_chunker_tab")
    # 词汇资源
    nltk.download("words")
    nltk.download("wordnet")
    # 确保 wordnet 数据已加载
    wn.ensure_loaded()
    # 完成初始化
    initialized_nltk = True

这里我稍微介绍下这几个 NLTK 资源:

  • 句子分词相关punkt 用于句子边界检测和分词,将文本分割成句子,识别句子的开始和结束;punkt_tabpunkt 的表格化版本,提供更高效的句子分词性能;
  • 词性标注相关averaged_perceptron_tagger 用于词性标注,基于 平均感知机算法,为每个单词标注词性(名词、动词、形容词等);averaged_perceptron_tagger_eng 是其英语特化版本,专门针对英语文本的词性标注;
  • 命名实体识别相关maxent_ne_chunker 用于命名实体识别和分块,基于 最大熵模型,识别人名、地名、组织名等命名实体;maxent_ne_chunker_tabmaxent_ne_chunker 的表格化版本,提供更快的命名实体识别性能;
  • 词汇资源words 是英语单词词典,包含大量英语单词,用于拼写检查和词汇验证;wordnet 是英语词汇的语义网络,提供词汇间的语义关系(同义词、反义词、上下位关系等),支持词义消歧、语义相似度计算等;

然后使用 NLTK 的句子分割器,也就是 sent_tokenize() 方法,进行按句子分块:

def run_sentences(
  input: list[str], _config: ChunkingConfig, tick: ProgressTicker
) -> Iterable[TextChunk]:
  """按句子将文本分成多个部分"""
  for doc_idx, text in enumerate(input):
    # 使用 NLTK 进行句子分割
    sentences = nltk.sent_tokenize(text)
    for sentence in sentences:
      yield TextChunk(
        text_chunk=sentence,
        source_doc_indices=[doc_idx],
      )
    tick(1)

可以看到,这种分块策略的实现相对简单,每个句子就是一个独立的文本块,句子之间没有重叠,避免信息重复。

两种策略的对比

下表对两种策略做了个对比:

特性tokens 策略sentence 策略
分块依据Token 数量句子边界
块大小控制精确控制依赖句子长度
上下文保持可配置重叠自然语言完整性
跨文档支持支持不支持
计算复杂度较高较低
适用场景严格 token 限制的 LLM 处理需要保持句子完整性的分析

可以根据需要选择适当的分块策略。

无论是哪种分块策略,都返回统一的 TextChunk 数据结构:

@dataclass
class TextChunk:
  """文本块类定义"""
  text_chunk: str              # 文本块内容
  source_doc_indices: list[int] # 源文档索引列表
  n_tokens: int | None = None  # token 数量(可选)

此外,我们还可以为文档添加一下元数据,分块时也有一些处理元数据的策略,相关配置如下:

input:
  metadata: [title,tag] # 增加元数据列
chunks:
  prepend_metadata: true  # 将元数据复制到每个文本块的开头
  chunk_size_includes_metadata: false # 计算分块大小时是否包含元数据

关于元数据的使用,可以参考 GraphRAG 官方教程:

文档规范化

在完成文本分块后,create_final_documents 步骤会对处理结果进行最终的整理和规范化,将分块后的文本单元重新与原始文档关联。让我们详细分析实际的源码实现:

def create_final_documents(documents: pd.DataFrame, text_units: pd.DataFrame) -> pd.DataFrame:
  
  # 展开文本单元的文档ID关联并选择需要的列
  exploded = (
    text_units.explode("document_ids")
    .loc[:, ["id", "document_ids", "text"]]
    .rename(
      columns={
        "document_ids": "chunk_doc_id",
        "id": "chunk_id", 
        "text": "chunk_text",
      }
    )
  )
  
  # 与原始文档信息合并
  joined = exploded.merge(
    documents,
    left_on="chunk_doc_id",
    right_on="id",
    how="inner",
    copy=False,
  )
  
  # 按文档聚合所有相关的文本单元ID
  docs_with_text_units = joined.groupby("id", sort=False).agg(
    text_unit_ids=("chunk_id", list)
  )
  
  # 重新与文档信息合并,形成最终结构
  rejoined = docs_with_text_units.merge(
    documents,
    on="id",
    how="right",
    copy=False,
  ).reset_index(drop=True)
  
  # 数据格式化和添加序号
  rejoined["id"] = rejoined["id"].astype(str)
  rejoined["human_readable_id"] = rejoined.index + 1
  
  # 返回标准化的列结构
  return rejoined.loc[:, DOCUMENTS_FINAL_COLUMNS]

这一步的代码看起来很长,其实它的逻辑并不复杂,核心在于数据的操作和转换,看懂这一步的代码需要熟练掌握 pandas 各种数据处理技巧,比如:

  • .explode("document_ids"):将数组列展开为多行,每个数组元素对应一行
  • .loc[:, ["id", "document_ids", "text"]]:列切片操作,只保留需要的列,减少内存使用
  • .rename(columns={}):重命名列以更清晰地表示其含义
  • .merge(documents, ...):对两个表执行合并操作,使用 left_onright_on 明确指定连接键
  • .groupby("id", sort=False):按文档 ID 分组,保持原有顺序,提高性能
  • agg(text_unit_ids=("chunk_id", list)):聚合操作,将每个文档对应的所有文本单元 ID 收集成列表

最后使用预定义的 DOCUMENTS_FINAL_COLUMNS 确保输出格式的一致性,包括以下列:

  • id: 文档唯一标识符
  • human_readable_id: 人类可读的递增ID
  • title: 文档标题
  • text: 原始文档内容
  • text_unit_ids: 关联的文本单元ID列表
  • metadata: 元数据信息
  • creation_date: 创建日期

小结

今天我们深入探讨了 GraphRAG 索引构建流程中的文档处理阶段,这个阶段是整个知识图谱构建的基础,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取和图谱构建做好准备。通过本次学习,我们掌握了以下核心要点:

  • 统一的文档加载:学习了 GraphRAG 如何通过工厂模式,支持从多种数据源(如本地文件、Azure Blob)加载不同格式(纯文本、CSV、JSON)的原始文档;
  • 灵活的文本分块:详细分析了两种核心的文本分块策略。tokens 策略基于 tiktoken 实现精确的 token 级切片和重叠,适合对 LLM 输入有严格限制的场景;而 sentence 策略则利用 NLTK 库进行基于句子边界的自然语言分块,更好地保持了语义的完整性;
  • 标准化的数据输出:文档处理的最后一步,将分块后的文本单元与原始文档进行重新关联,最终输出包含标准化字段的文档数据,为后续的索引流程提供了结构清晰、内容一致的输入。

在下一篇文章中,我们将进入知识提取阶段,学习 GraphRAG 如何调用大模型从这些精心处理的文本单元中提取实体和关系,最终构建出结构化的知识图谱。


昨天我们对 GraphRAG 的索引构建做了一个基本的概述,了解了其索引构建的整体流程和工作流引擎的设计。今天我们将深入第一个具体阶段 —— 文档处理,这是整个知识图谱构建的起点,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取奠定基础。

回顾索引构建流程

根据前一篇文章的分析,我们可以将 GraphRAG 的索引构建流程分为三个主要阶段:

今天我们重点关注文档处理阶段,它的核心任务是将非结构化的原始数据转换为后续处理流程所需的标准化文本单元。这个阶段包含三个关键工作流:

  • 加载原始文档(load_input_documents - 从各种数据源加载原始文档
  • 文本分块处理(create_base_text_units - 将长文档分割成可处理的文本块
  • 文档规范化(create_final_documents - 对文档进行最终的标准化整理

加载原始文档

在 GraphRAG 中,所有工作流的入口都是 run_workflow() 方法,加载原始文档也是如此。它的核心逻辑位于 index/workflows/load_input_documents.py 文件:

async def run_workflow(
  config: GraphRagConfig,
  context: PipelineRunContext,
) -> WorkflowFunctionOutput:
  
  # 加载输入文档,转换为标准的 pd.DataFrame 格式
  output = await load_input_documents(config.input, context.input_storage)

  # 将原始文档写入 documents 表中
  await write_table_to_storage(output, "documents", context.output_storage)
  return WorkflowFunctionOutput(result=output)

这个工作流首先加载输入文档,将其转换为标准的 pd.DataFrame 格式,然后写入 documents 表中。GraphRAG 通过工厂模式实现了对纯文本、CSV、JSON 三种文件格式的统一处理:

loaders: dict[str, Callable[..., Awaitable[pd.DataFrame]]] = {
  InputFileType.text: load_text,
  InputFileType.csv: load_csv,
  InputFileType.json: load_json,
}

如果你有不同的格式,官方的建议是编写脚本将其转换为其中一种。

同时,它也支持 Azure Blob、CosmosDB、本地文件、内存这四种不同的存储类型:

StorageFactory.register(StorageType.blob.value, create_blob_storage)
StorageFactory.register(StorageType.cosmosdb.value, create_cosmosdb_storage)
StorageFactory.register(StorageType.file.value, create_file_storage)
StorageFactory.register(StorageType.memory.value, lambda **_: MemoryPipelineStorage())

默认情况下是从本地的 input 目录下读取文件,并使用 text 加载器处理。我们可以在 settings.yaml 文件中的 input 部分进行配置:

input:
  storage:
    type: file # [blob, cosmosdb, file, memory]
    base_dir: "input"
  file_type: text # [csv, text, json]

文本文件处理

对于纯文本文件,处理逻辑相对简单,在 index/input/text.py 中:

async def load_file(path: str, group: dict | None = None) -> pd.DataFrame:
  text = await storage.get(path, encoding=config.encoding)
  new_item = {**group, "text": text}
  new_item["id"] = gen_sha512_hash(new_item, new_item.keys())
  new_item["title"] = str(Path(path).name)
  new_item["creation_date"] = await storage.get_creation_date(path)
  return pd.DataFrame([new_item])

每个文本文件被读取为一个完整的字符串,并自动生成标准字段:

  • text: 文件的完整内容
  • id: 基于内容的 SHA512 哈希值,确保唯一性
  • title: 文件名(不含路径)
  • creation_date: 文件创建日期

JSON 文件处理

JSON 文件的处理位于 index/input/json.py 文件,直接调用 json.loads() 加载数据。支持两种格式,一种是单个对象格式:

{
  "text": "文档内容",
  "title": "文档标题"
}

另一种是数组格式:

[
  {"text": "文档1内容", "title": "文档1标题"},
  {"text": "文档2内容", "title": "文档2标题"}
]

注意,目前暂不支持 JSONL 格式(每行一个完整的 JSON 对象),需转换为数组格式。

JSON 解析器会自动为对象增加 idcreation_date 两个字段。另外,如果 JSON 中没有 texttitle 字段,还可以在配置文件中设置:

input:
  file_type: json
  text_column: content  # 指定 text 列
  title_column: name    # 指定 title 列

CSV 文件处理

CSV 文件的处理位于 index/input/csv.py 文件,直接调用 pd.read_csv() 加载数据,它的格式如下:

text,title
"文档1内容","文档1标题"
"文档2内容","文档2标题"

同样的,CSV 解析器也会自动为对象增加 idcreation_date 两个字段,也支持在配置文件中设置 texttitle 字段映射。

文本分块处理

加载原始文档后,下一步是将长文档切分成更小、更易于处理的文本单元,这一步骤对后续的 LLM 处理至关重要。分块策略通过 settings.yaml 中的 chunks 部分配置:

chunks:
  size: 1200                   # 每个分块的目标 token 数量
  overlap: 100                 # 相邻分块之间重叠的 token 数量
  group_by_columns: [id]       # 按指定列分组,确保分块在文档内部进行
  strategy: tokens             # 分块策略:tokens 或 sentence
  encoding_model: cl100k_base  # 用于计算 token 的编码模型

其中 sizeoverlap 比较好理解,分别表示每个分块的大小和相邻分块之间的重叠;group_by_columns 通常设置为 ["doc_id"] 或 ["id"] 表示在分块前先按文档 ID 进行分组,确保文本分块不会跨越不同的文档;strategy 表示分块策略,支持 tokenssentence 两种。

def load_strategy(strategy: ChunkStrategyType) -> ChunkStrategy:
  """分块策略定义"""
  match strategy:
    case ChunkStrategyType.tokens:
      # tokens 分块策略
      from graphrag.index.operations.chunk_text.strategies import run_tokens
      return run_tokens
    case ChunkStrategyType.sentence:
      # sentence 分块策略
      from graphrag.index.operations.chunk_text.bootstrap import bootstrap
      from graphrag.index.operations.chunk_text.strategies import run_sentences
      bootstrap()
      return run_sentences

基于 tokens 的精确分块

tokens 分块策略 是 GraphRAG 的默认分块策略,它基于 tiktoken 库进行精确的 token 计算和分块,核心实现如下:

def run_tokens(
  input: list[str], config: ChunkingConfig, tick: ProgressTicker,
) -> Iterable[TextChunk]:

  # 获取编码和解码函数
  encode, decode = get_encoding_fn(encoding_name)
  
  # 基于编码 token 将文本分块
  return split_multiple_texts_on_tokens(
    input,
    Tokenizer(
      chunk_overlap=chunk_overlap,
      tokens_per_chunk=tokens_per_chunk,
      encode=encode,
      decode=decode,
    ),
    tick,
  )

其中 get_encoding_fn() 获取编码和解码函数,它支持通过 tiktoken 库加载不同的编码模型,比如 GPT-3.5 和 GPT-4 的 cl100k_base 或者 GPT-4.1 的 o200k_base 等,可以在 tiktoken 源码中 找到完整的模型列表:

tokens 分块的核心算法位于 split_multiple_texts_on_tokens() 函数:

def split_multiple_texts_on_tokens(
  texts: list[str], tokenizer: Tokenizer, tick: ProgressTicker
) -> list[TextChunk]:
  """将多个文本进行分块并返回带元数据的块"""
  result = []
  mapped_ids = []

  # 第一步:编码所有文本并记录源文档索引
  for source_doc_idx, text in enumerate(texts):
    encoded = tokenizer.encode(text)
    mapped_ids.append((source_doc_idx, encoded))

  # 第二步:创建全局 token 序列,每个 token 都记录其来源文档
  input_ids = [
    (source_doc_idx, id) for source_doc_idx, ids in mapped_ids for id in ids
  ]

  # 第三步:滑动窗口分块
  start_idx = 0
  cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
  chunk_ids = input_ids[start_idx:cur_idx]

  while start_idx < len(input_ids):
    # 解码当前块的文本内容
    chunk_text = tokenizer.decode([id for _, id in chunk_ids])
    # 记录当前块涉及的源文档索引
    doc_indices = list({doc_idx for doc_idx, _ in chunk_ids})
    result.append(TextChunk(chunk_text, doc_indices, len(chunk_ids)))
    # 已完成
    if cur_idx == len(input_ids):
      break
    # 滑动窗口:下一个块的起始位置考虑重叠
    start_idx += tokenizer.tokens_per_chunk - tokenizer.chunk_overlap
    cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
    chunk_ids = input_ids[start_idx:cur_idx]

  return result

该算法首先使用 tiktoken 库将所有文档编码成 token 序列(即 mapped_ids 数组);然后将 token 序列展开,变成一个全局的整数序列,每个 token 都保留其来源信息(即 input_ids 数组);然后按 tokens_per_chunk 数量对 token 序列进行第一个分块并解码得到文本内容;接着使用滑动窗口往后依次分块,同时考虑相邻块之间有指定数量的重叠 token,保持上下文连续性。

有趣的是,这里的分块算法参考了 LangChain 的实现,有兴趣的朋友可以看下 LangChain 的 TokenTextSplitter

基于句子的自然分块

sentence 分块策略 使用 NLTK 库进行基于句子边界的自然语言分块,它首先调用 bootstrap() 初始化必要的 NLTK 资源:

def bootstrap():
  """初始化 NLTK 资源"""
  global initialized_nltk
  if not initialized_nltk:
    import nltk
    from nltk.corpus import wordnet as wn
    # 句子分词
    nltk.download("punkt")
    nltk.download("punkt_tab")
    # 词性标注
    nltk.download("averaged_perceptron_tagger")
    nltk.download("averaged_perceptron_tagger_eng")
    # 命名实体识别相关
    nltk.download("maxent_ne_chunker")
    nltk.download("maxent_ne_chunker_tab")
    # 词汇资源
    nltk.download("words")
    nltk.download("wordnet")
    # 确保 wordnet 数据已加载
    wn.ensure_loaded()
    # 完成初始化
    initialized_nltk = True

这里我稍微介绍下这几个 NLTK 资源:

  • 句子分词相关punkt 用于句子边界检测和分词,将文本分割成句子,识别句子的开始和结束;punkt_tabpunkt 的表格化版本,提供更高效的句子分词性能;
  • 词性标注相关averaged_perceptron_tagger 用于词性标注,基于 平均感知机算法,为每个单词标注词性(名词、动词、形容词等);averaged_perceptron_tagger_eng 是其英语特化版本,专门针对英语文本的词性标注;
  • 命名实体识别相关maxent_ne_chunker 用于命名实体识别和分块,基于 最大熵模型,识别人名、地名、组织名等命名实体;maxent_ne_chunker_tabmaxent_ne_chunker 的表格化版本,提供更快的命名实体识别性能;
  • 词汇资源words 是英语单词词典,包含大量英语单词,用于拼写检查和词汇验证;wordnet 是英语词汇的语义网络,提供词汇间的语义关系(同义词、反义词、上下位关系等),支持词义消歧、语义相似度计算等;

然后使用 NLTK 的句子分割器,也就是 sent_tokenize() 方法,进行按句子分块:

def run_sentences(
  input: list[str], _config: ChunkingConfig, tick: ProgressTicker
) -> Iterable[TextChunk]:
  """按句子将文本分成多个部分"""
  for doc_idx, text in enumerate(input):
    # 使用 NLTK 进行句子分割
    sentences = nltk.sent_tokenize(text)
    for sentence in sentences:
      yield TextChunk(
        text_chunk=sentence,
        source_doc_indices=[doc_idx],
      )
    tick(1)

可以看到,这种分块策略的实现相对简单,每个句子就是一个独立的文本块,句子之间没有重叠,避免信息重复。

两种策略的对比

下表对两种策略做了个对比:

特性tokens 策略sentence 策略
分块依据Token 数量句子边界
块大小控制精确控制依赖句子长度
上下文保持可配置重叠自然语言完整性
跨文档支持支持不支持
计算复杂度较高较低
适用场景严格 token 限制的 LLM 处理需要保持句子完整性的分析

可以根据需要选择适当的分块策略。

无论是哪种分块策略,都返回统一的 TextChunk 数据结构:

@dataclass
class TextChunk:
  """文本块类定义"""
  text_chunk: str              # 文本块内容
  source_doc_indices: list[int] # 源文档索引列表
  n_tokens: int | None = None  # token 数量(可选)

此外,我们还可以为文档添加一下元数据,分块时也有一些处理元数据的策略,相关配置如下:

input:
  metadata: [title,tag] # 增加元数据列
chunks:
  prepend_metadata: true  # 将元数据复制到每个文本块的开头
  chunk_size_includes_metadata: false # 计算分块大小时是否包含元数据

关于元数据的使用,可以参考 GraphRAG 官方教程:

文档规范化

在完成文本分块后,create_final_documents 步骤会对处理结果进行最终的整理和规范化,将分块后的文本单元重新与原始文档关联。让我们详细分析实际的源码实现:

def create_final_documents(documents: pd.DataFrame, text_units: pd.DataFrame) -> pd.DataFrame:
  
  # 展开文本单元的文档ID关联并选择需要的列
  exploded = (
    text_units.explode("document_ids")
    .loc[:, ["id", "document_ids", "text"]]
    .rename(
      columns={
        "document_ids": "chunk_doc_id",
        "id": "chunk_id", 
        "text": "chunk_text",
      }
    )
  )
  
  # 与原始文档信息合并
  joined = exploded.merge(
    documents,
    left_on="chunk_doc_id",
    right_on="id",
    how="inner",
    copy=False,
  )
  
  # 按文档聚合所有相关的文本单元ID
  docs_with_text_units = joined.groupby("id", sort=False).agg(
    text_unit_ids=("chunk_id", list)
  )
  
  # 重新与文档信息合并,形成最终结构
  rejoined = docs_with_text_units.merge(
    documents,
    on="id",
    how="right",
    copy=False,
  ).reset_index(drop=True)
  
  # 数据格式化和添加序号
  rejoined["id"] = rejoined["id"].astype(str)
  rejoined["human_readable_id"] = rejoined.index + 1
  
  # 返回标准化的列结构
  return rejoined.loc[:, DOCUMENTS_FINAL_COLUMNS]

这一步的代码看起来很长,其实它的逻辑并不复杂,核心在于数据的操作和转换,看懂这一步的代码需要熟练掌握 pandas 各种数据处理技巧,比如:

  • .explode("document_ids"):将数组列展开为多行,每个数组元素对应一行
  • .loc[:, ["id", "document_ids", "text"]]:列切片操作,只保留需要的列,减少内存使用
  • .rename(columns={}):重命名列以更清晰地表示其含义
  • .merge(documents, ...):对两个表执行合并操作,使用 left_onright_on 明确指定连接键
  • .groupby("id", sort=False):按文档 ID 分组,保持原有顺序,提高性能
  • agg(text_unit_ids=("chunk_id", list)):聚合操作,将每个文档对应的所有文本单元 ID 收集成列表

最后使用预定义的 DOCUMENTS_FINAL_COLUMNS 确保输出格式的一致性,包括以下列:

  • id: 文档唯一标识符
  • human_readable_id: 人类可读的递增ID
  • title: 文档标题
  • text: 原始文档内容
  • text_unit_ids: 关联的文本单元ID列表
  • metadata: 元数据信息
  • creation_date: 创建日期

小结

今天我们深入探讨了 GraphRAG 索引构建流程中的文档处理阶段,这个阶段是整个知识图谱构建的基础,负责将各种格式的原始文档转换为标准化的文本单元,为后续的知识提取和图谱构建做好准备。通过本次学习,我们掌握了以下核心要点:

  • 统一的文档加载:学习了 GraphRAG 如何通过工厂模式,支持从多种数据源(如本地文件、Azure Blob)加载不同格式(纯文本、CSV、JSON)的原始文档;
  • 灵活的文本分块:详细分析了两种核心的文本分块策略。tokens 策略基于 tiktoken 实现精确的 token 级切片和重叠,适合对 LLM 输入有严格限制的场景;而 sentence 策略则利用 NLTK 库进行基于句子边界的自然语言分块,更好地保持了语义的完整性;
  • 标准化的数据输出:文档处理的最后一步,将分块后的文本单元与原始文档进行重新关联,最终输出包含标准化字段的文档数据,为后续的索引流程提供了结构清晰、内容一致的输入。

在下一篇文章中,我们将进入知识提取阶段,学习 GraphRAG 如何调用大模型从这些精心处理的文本单元中提取实体和关系,最终构建出结构化的知识图谱。


GraphRAG 索引构建概述

经过前面对 GraphRAG 项目结构的深入了解,我们已经掌握了它的整体架构和技术栈。今天我们将顺着 graphrag index 命令的调用链,深入探索 GraphRAG 索引构建的核心流程,包括命令行入口、配置加载机制、工作流引擎,以及不同索引方法的实现细节。

命令行入口

当我们执行 graphrag index 命令时,整个调用过程如下:

  1. 入口点__main__.pybin/graphraggraphrag.cli.main:app
  2. CLI 解析cli/main.py@app.command("index") 装饰器
  3. 配置加载cli/index.pyindex_cli() 函数
  4. API 调用api.build_index() 函数

其中,前两步我们昨天已经学习过了:__main__.py 的作用是让 GraphRAG 能以包的方式运行,pyproject.toml 文件中的 [project.scripts] 配置让 GraphRAG 能以可执行文件的方式运行,运行的入口都是 graphrag.cli.main 包的 app 方法;该方法基于 Typer 这个现代化的 Python CLI 库实现,通过 @app.command 装饰器,定义了 GraphRAG 的 5 个子命令,index 就是其中之一。

跟随调用链,我们接着看一下 cli/index.py 文件中 index_cli() 函数的实现:

def index_cli(root_dir: Path, method: IndexingMethod, ...):
  # 配置加载
  config = load_config(root_dir, config_filepath, cli_overrides)
  # 索引构建
  _run_index(config=config, method=method, ...)

这里我们可以看到两个关键步骤:

  1. 配置加载:调用 load_config() 函数加载并合并配置,支持通过命令行参数覆盖配置文件中的设置
  2. 索引执行:调用 _run_index() 执行实际的索引构建

配置加载

配置加载是 GraphRAG 的核心功能之一,位于 config/load_config.py 文件中,其实现如下:

def load_config(
  root_dir: Path,
  config_filepath: Path | None = None,
  cli_overrides: dict[str, Any] | None = None,
) -> GraphRagConfig:
  # 路径规范化,确保使用绝对路径
  root = root_dir.resolve()
  # 在根目录中搜索配置文件
  config_path = _get_config_path(root, config_filepath)
  # 加载 .env 文件中的环境变量
  _load_dotenv(config_path)
  # 读取配置文件文本内容
  config_text = config_path.read_text(encoding="utf-8")
  # 解析环境变量引用
  config_text = _parse_env_variables(config_text)
  # 根据文件类型解析为字典(支持 YAML 和 JSON)
  config_extension = config_path.suffix
  config_data = _parse(config_extension, config_text)
  # 应用命令行参数覆盖
  if cli_overrides:
    _apply_overrides(config_data, cli_overrides)
  # 创建并验证最终配置对象
  return create_graphrag_config(config_data, root_dir=str(root))

这里的 load_config() 函数实现了一个完整的类型安全的配置管理流程,整体流程非常清晰:

  1. 路径规范化:通过 root_dir.resolve() 确保使用绝对路径,其中 root_dir 是用户通过 --root-r 参数指定
  2. 配置发现:如果用户指定了 --config-c 参数,则使用用户自定义配置;否则在根目录中搜索默认配置文件,比如 settings.yamlsettings.ymlsettings.json
  3. 环境变量加载:加载 .env 文件中的环境变量
  4. 配置读取:读取配置文件文本内容
  5. 环境变量替换:解析配置文件中环境变量引用,比如 ${GRAPHRAG_API_KEY}
  6. 格式解析:根据文件类型将配置解析为字典,支持 YAML 和 JSON 两种方式
  7. 覆盖应用:应用命令行参数覆盖,比如 --output-o 参数会覆盖 output.base_dir 配置
  8. 对象创建:创建并验证最终的 GraphRagConfig 配置对象,使用 Pydantic 保证类型安全

这里有几个点比较有意思,可以展开介绍一下。

环境变量支持

GraphRAG 支持在配置文件中注入环境变量,这是使用 Python 的 Template 类实现的:

def _parse_env_variables(text: str) -> str:
  return Template(text).substitute(os.environ)

用户可以在配置文件中使用 ${VAR_NAME} 格式引用环境变量:

models:
  default_chat_model:
    type: openai_chat
    api_base: ${GRAPHRAG_API_BASE}
    auth_type: api_key
    api_key: ${GRAPHRAG_API_KEY}
    model: gpt-4o-mini

点号分隔覆盖机制

配置覆盖支持点号分隔的嵌套路径,比如当用户设置 --output-o 参数时,会覆盖下面三个配置:

cli_overrides = {}
if output_dir:
  cli_overrides["output.base_dir"] = str(output_dir)
  cli_overrides["reporting.base_dir"] = str(output_dir)
  cli_overrides["update_index_output.base_dir"] = str(output_dir)

这种点号分隔的路径语法允许精确覆盖嵌套配置项,为用户提供了灵活的配置管理能力,这块的实现比较通用,有类似需求的话,可以参考下 _apply_overrides() 的函数实现。

使用 Pydantic 创建配置

在配置加载的最后,通过 create_graphrag_config() 函数创建最终的 GraphRagConfig 配置对象:

def create_graphrag_config(
  values: dict[str, Any] | None = None,
  root_dir: str | None = None,
) -> GraphRagConfig:

  values = values or {}
  if root_dir:
    root_path = Path(root_dir).resolve()
    values["root_dir"] = str(root_path)
  return GraphRagConfig(**values)

这里使用了 Pydantic 库,其中 GraphRagConfig 类被定义为一个 Pydantic 模型,我们直接将 values 字典展开(两个星号 ** 用于解包字典)变成一个类型安全的配置对象。Pydantic 不仅能保证类型安全,还支持自定义参数验证,我们下面重点介绍一下它。

简单介绍 Pydantic 库

GraphRAG 使用 Pydantic 进行配置管理,这是一个基于类型提示的数据验证库。

pydantic.png

它的核心优势包括:

  • 类型安全:基于 Python 类型提示自动验证数据类型
  • 数据转换:自动进行数据类型转换和标准化
  • 验证规则:支持复杂的验证逻辑和自定义验证器
  • 错误报告:提供详细的验证错误信息
  • IDE 支持:完美的代码补全和类型检查支持

下面是一个简化的例子,展示如何使用 Pydantic 和 YAML 实现类似 GraphRAG 的配置管理。首先通过 Pydantic 定义配置类:

from enum import Enum
from pydantic import BaseModel, Field, field_validator

class StorageType(str, Enum):
  """存储类型枚举"""
  FILE = "file"
  AZURE_BLOB = "azure_blob"
  S3 = "s3"

class DatabaseConfig(BaseModel):
  """数据库配置模型"""
  host: str = Field(default="localhost", description="数据库主机")
  port: int = Field(default=5432, description="数据库端口")
  username: str = Field(description="用户名")
  password: str = Field(description="密码")
  
  @field_validator("port")
  @classmethod
  def validate_port(cls, v):
    if not 1 <= v <= 65535:
      raise ValueError("端口必须在 1-65535 范围内")
    return v

class AppConfig(BaseModel):
  """主配置模型"""
  app_name: str = Field(default="MyApp", description="应用名称")
  debug: bool = Field(default=False, description="调试模式")
  storage_type: StorageType = Field(default=StorageType.FILE, description="存储类型")
  database: DatabaseConfig = Field(description="数据库配置")
  
  # 自定义验证器
  @field_validator("app_name")
  @classmethod
  def validate_app_name(cls, v):
    if not v.strip():
      raise ValueError("应用名称不能为空")
    return v.strip()

然后创建一个配置文件 config.yaml 内容如下:

app_name: "Demo"
debug: true
storage_type: "file"
database:
  host: "localhost"
  port: 5432
  username: "admin"
  password: "secret"

接着我们就可以从 YAML 文件加载配置,并将其转换为强类型的配置对象:

import yaml
from pathlib import Path

def load_config_from_yaml(yaml_path: Path) -> AppConfig:
  """从 YAML 文件加载配置"""
  with open(yaml_path, 'r', encoding='utf-8') as f:
    config_data = yaml.safe_load(f)
  # Pydantic 自动验证和转换
  return AppConfig(**config_data)

# 使用示例
config_file = Path("config.yaml")
config = load_config_from_yaml(config_file)
print(f"应用名称: {config.app_name}")
print(f"数据库配置: {config.database.host}:{config.database.port}")

运行结果如下:

应用名称: Demo
数据库配置: localhost:5432

这个例子展示了 Pydantic 的核心特性,包括类型声明、默认值、验证器和自动数据转换等。

工作流引擎

配置加载之后,GraphRAG 调用 _run_index() 执行实际的索引构建,而它则是调用 API 层的 build_index() 函数:

async def build_index(
  config: GraphRagConfig,
  method: IndexingMethod | str = IndexingMethod.Standard,
  is_update_run: bool = False,
  ...
) -> list[PipelineRunResult]:
  
  outputs: list[PipelineRunResult] = []
  # 根据 method 创建对应的工作流管道
  method = _get_method(method, is_update_run)
  pipeline = PipelineFactory.create_pipeline(config, method)
  # 依次运行管道中的每个工作流
  async for output in run_pipeline(pipeline, config, is_update_run=is_update_run, additional_context=additional_context):
    outputs.append(output)
    logger.info("Workflow %s completed successfully", output.workflow)
  return outputs

GraphRAG 的索引构建采用了灵活的管道架构,其中 PipelineFactory 采用工厂设计模式,负责管理和创建处理工作流和管道:

class PipelineFactory:
  """工作流管道工厂类"""
  
  workflows: ClassVar[dict[str, WorkflowFunction]] = {}
  pipelines: ClassVar[dict[str, list[str]]] = {}
  
  @classmethod
  def register(cls, name: str, workflow: WorkflowFunction):
    """注册自定义工作流函数"""
    cls.workflows[name] = workflow

  @classmethod
  def register_all(cls, workflows: dict[str, WorkflowFunction]):
    """批量注册自定义工作流函数"""
    for name, workflow in workflows.items():
      cls.register(name, workflow)
  
  @classmethod
  def register_pipeline(cls, name: str, workflows: list[str]):
    """注册自定义管道,一个管道包含多个工作流函数"""
    cls.pipelines[name] = workflows

  @classmethod
  def create_pipeline(cls, config: GraphRagConfig, method: IndexingMethod) -> Pipeline:
    """根据 method 创建管道"""
    workflows = config.workflows or cls.pipelines.get(method, [])
    return Pipeline([(name, cls.workflows[name]) for name in workflows])

这里涉及 GraphRAG 中的两个核心概念:工作流(Workflow)管道(Pipeline)。工作流是一个个独立的模块,比如加载文档、文本分片、提取图谱等等,程序启动时会自动注册所有内置的工作流函数:

PipelineFactory.register_all({
  "load_input_documents": run_load_input_documents,
  "create_base_text_units": run_create_base_text_units,
  "extract_graph": run_extract_graph,
  "create_communities": run_create_communities,
  # ...
})

而管道则是由多个工作流串起来的一个集合,系统预定义了四个管道:

PipelineFactory.register_pipeline(
  IndexingMethod.Standard, ["load_input_documents", *_standard_workflows]
)
PipelineFactory.register_pipeline(
  IndexingMethod.Fast, ["load_input_documents", *_fast_workflows]
)
PipelineFactory.register_pipeline(
  IndexingMethod.StandardUpdate, ["load_update_documents", *_standard_workflows, *_update_workflows],
)
PipelineFactory.register_pipeline(
  IndexingMethod.FastUpdate, ["load_update_documents", *_fast_workflows, *_update_workflows],
)

分别对应四种不同的构建索引的方法,当我们执行 graphrag index 命令时,可以通过 --method-m 参数指定:

# 标准方法
$ graphrag index --root ./ragtest --method standard

# 快速方法
$ graphrag index --root ./ragtest --method fast

# 标准方法(用于更新)
$ graphrag index --root ./ragtest --method standard-update

# 快速方法(用于更新)
$ graphrag index --root ./ragtest --method fast-update

GraphRAG 通过 create_pipeline() 方法,根据 method 找到对应的管道,然后依次运行管道中的每个工作流函数。

索引构建方法

上面提到 GraphRAG 内置了四种索引构建方法,每种都有其特定的适用场景:

索引方法速度质量适用场景主要特点
Standard首次索引,追求高质量全 LLM 驱动的图构建
Fast快速原型,大数据集NLP + LLM 混合方式
StandardUpdate增量更新,保持高质量标准方法 + 增量更新
FastUpdate频繁更新,快速处理快速方法 + 增量更新

其中后两个 Update 方法是在前两个方法的基础上增加了增量更新的能力,能够在不重新构建整个索引的情况下,处理新增或修改的文档,大大提高了增量处理的效率。我们这里主要关注 StandardFast 这两个方法,它们的主要差异点在于:

  • Standard 方法的 LLM 驱动流程

    • extract_graph:使用大语言模型从文本中提取实体和关系
    • extract_covariates:使用 LLM 提取声明和协变量信息
    • create_community_reports:基于图上下文生成高质量社区报告
  • Fast 方法的混合流程

    • extract_graph_nlp:使用传统 NLP 技术(如 spaCy)进行实体识别
    • prune_graph:对提取的实体进行过滤和清理
    • create_community_reports_text:基于文本单元上下文生成报告(更快但质量稍低)

下图展示了两种方法的工作流执行顺序:

index-workflow.png

蓝色为 Standard 方法使用的工作流,红色为 Fast 方法使用的工作流,绿色为二者共用的工作流。

小结

我们今天学习了当执行 graphrag index 命令时的整个调用过程,主要内容包括:

  1. 命令行入口:详细分析了从 graphrag index 命令到核心逻辑的完整调用链;
  2. 配置加载:深入解读了 load_config() 函数的实现,包括环境变量支持和点号分隔覆盖机制;
  3. Pydantic 介绍:通过一个简单的示例演示如何使用 Pydantic 和 YAML 实现类似 GraphRAG 的配置管理;
  4. 工作流引擎:学习 PipelineFactory 的设计模式和工作流注册机制;
  5. 索引构建方法:对比了四种不同的索引构建方法及其适用场景,通过 Mermaid 图表展示了索引构建的完整处理流程;

在下一篇文章中,我们将深入索引构建的具体流程,先从文档处理阶段开始,详细分析 GraphRAG 如何从各种格式的原始文档中提取和预处理文本数据,为后续的知识提取做好准备。


剖析 GraphRAG 的项目结构

经过这几天的动手实践和可视化体验,我们已经对 GraphRAG 的核心功能有了一个基本的了解。今天,我们正式开始深入研究它的源码,首先从熟悉项目结构入手,为我们后续学习其核心工作流(比如索引和查询)打下基础。

项目概览

使用 tree 命令查看 GraphRAG 项目的项目结构如下:

$ tree -L 1 graphrag
├── __init__.py
├── __main__.py     # 模块入口
├── api             # API 层
├── cache
├── callbacks
├── cli             # CLI 入口
├── config          # 配置管理
├── data_model      # 数据模型
├── index           # 索引系统
├── language_model  # 语言模型
├── logger
├── prompt_tune     # 提示词调优
├── prompts         # 提示词系统
├── query           # 查询系统
├── storage         # 存储抽象层
├── utils
└── vector_stores   # 向量数据库

其中基础模块包括:

  • API 层(api:这里提供了高层次的编程接口,是外部调用 GraphRAG 功能的主要入口,包括索引构建 API(index.py)、查询 API(query.py)和提示词调优 API(prompt_tune.py)三大块;
  • CLI 入口(cli:通过 Typer 装饰器定义命令,解析命令行参数并调用 API 层;
  • 配置管理(config:使用 Pydantic 定义类型安全的配置模型,包括主配置、LLM 配置、向量配置、存储配置等;
  • 数据模型(data_model:定义核心数据结构,包括实体、关系、社区、社区报告、文本单元、文档等;

从入口处可以看出 GraphRAG 有三大功能:索引构建、查询、提示词调优,分别位于下面这些模块中:

  • 索引系统(index:这是 GraphRAG 的核心模块,负责从原始文档构建知识图谱,这里的内容也是最丰富的;比如 input 目录用于输入处理,支持 CSV、JSON、纯文本多种格式的文档;workflows 是 GraphRAG 的工作流引擎,定义了标准的数据处理工作流,使用工厂模式动态创建流水线,支持增量更新和全量重建;operations 目录实现了大量的工作流原子操作,包括文本处理、图谱构建、社区识别、总结分析等;下面是一些典型的工作流操作:

    • chunk_text - 文本分块,将长文档切分为可处理的片段;
    • embed_text - 文本向量化,支持多种嵌入模型;
    • extract_graph - 从文本提取实体和关系;
    • build_noun_graph - 构建名词图,识别重要概念;
    • create_graph.py - 创建完整的知识图谱;
    • cluster_graph.py - 图聚类,发现社区结构;
    • embed_graph - 图嵌入,支持 Node2Vec 等算法;
    • layout_graph - 图布局,支持 UMAP 降维可视化;
    • summarize_communities - 社区摘要生成;
    • summarize_descriptions - 实体描述摘要;
    • extract_covariates - 协变量提取(如声明检测);
  • 查询系统(query:为 GraphRAG 提供了多种检索策略,包括:

    • 全局搜索:基于社区报告的高层次查询,适合回答概括性或主题性问题;使用 Map-Reduce 模式处理大规模数据;
    • 本地搜索:基于实体和关系的细粒度查询,适合回答具体的事实性问题;混合向量检索和图遍历;
    • 漂移搜索:基于动态选择社区查询,在查询过程中调整搜索范围,平衡查询深度和广度;
  • 提示词系统(prompts:包括索引和查询所使用的所有提示词;
  • 提示词调优(prompt_tune:根据具体数据领域自动生成最适合的提示词,提高知识图谱构建质量;

此外,为实现索引构建和查询,大模型和存储服务也是必不可少的:

  • 语言模型抽象(language_model:提供统一的 LLM 和 Embedding 接口,主要通过 fnllm 库实现;
  • 存储抽象层(storage:提供统一的存储接口,支持多种后端,包括本地文件系统、Azure Blob、CosmosDB、内存存储等;
  • 向量数据库(vector_stores:支持多种向量数据库,包括:高性能向量数据库 LanceDB、Azure 认知搜索服务 Azure AI Search 以及 Azure 的 NoSQL 数据库 CosmosDB

技术栈分析

通过分析 pyproject.toml 文件,我们可以对 GraphRAG 的核心依赖和开发工作链有一个初步了解。

  • 大语言模型

    • fnllm[azure,openai] - 统一的大语言模型接口库
    • openai - OpenAI 官方提供的 API 客户端
    • tiktoken - 一款高效的 BPE 分词库,能实现文本分词,并计算出文本对应的 token 数量
  • 数据科学

    • pandas - 数据处理和分析
    • numpy - 数值计算
    • networkx - 图算法库,用于创建、操作和研究各种图结构,支持图的遍历、最短路径查找等多种图算法实现
    • graspologic - 图统计和机器学习,提供了图数据的统计分析方法和基于图的机器学习模型,助力从图结构数据中挖掘有价值的信息
    • umap-learn - 降维算法库,基于 UMAP 算法,能将高维数据映射到低维空间,同时保留数据的局部和全局结构,便于数据可视化和后续分析
  • 向量存储

    • lancedb - 高性能的向量数据库,具备高效的向量存储和检索能力
    • azure-search-documents - Azure 搜索服务的客户端库
  • 配置和 CLI

    • pydantic - 数据验证和配置管理,基于类型提示进行数据验证,能确保数据的完整性和正确性,同时便于配置信息的管理和解析
    • typer - 现代化的 CLI 框架,基于 Python 的类型提示构建,让开发者能快速、简洁地创建功能丰富的命令行工具
    • pyyaml - 用于处理 YAML 格式配置文件
  • 代码质量

    • ruff - 现代化的 Python 代码检查和格式化工具
    • pyright - 由微软开发的 Python 类型检查器
  • 测试框架

    • pytest - 测试框架
    • pytest-asyncio - 提供异步测试支持
    • coverage - 用于代码覆盖率分析
  • 文档和发布

    • mkdocs-material - 现代化的文档站点生成工具,基于 MkDocs 构建
    • semversioner - 用于语义化版本管理的工具,帮助开发者规范地管理项目的版本号,记录版本变更信息,简化版本发布流程

命令行入口

GraphRAG 是一个命令行程序,正如我们在入门篇里所体验的,使用 uv run poe init 命令初始化工作空间:

$ uv run poe init --root ./ragtest

其中 poePoe The Poet 的简称,它是一个任务管理工具,允许在 pyproject.toml 中定义和运行各种任务:

[project]
name = "graphrag"

[tool.poe.tasks]
...
index = "python -m graphrag index"
update = "python -m graphrag update"
init = "python -m graphrag init"
query = "python -m graphrag query"
prompt_tune = "python -m graphrag prompt-tune"

可以看出 poe 命令实际上是通过 python -m 执行的:

$ python -m graphrag init

包括 init 在内,GraphRAG 一共支持 5 个不同的子命令:

  • graphrag init - 初始化项目配置
  • graphrag index - 构建知识图谱索引
  • graphrag query - 执行查询操作
  • graphrag update - 增量更新索引
  • graphrag prompt-tune - 提示词调优

GraphRAG 采用了标准的 Python 包结构,__main__.py 文件是模块的入口:

from graphrag.cli.main import app

app(prog_name="graphrag")

这个文件允许用户通过 python -m graphrag 直接运行包,为开发和测试提供便利。

另外,如果你是通过 pip install 安装的,还可以直接运行 graphrag 命令:

$ graphrag init --root ./ragtest

要实现这一点,其秘密在于 pyproject.toml 文件中的下面这行配置:

[project.scripts]
graphrag = "graphrag.cli.main:app"

这里的 [project.scripts] 用于定义脚本入口,当执行 pip install 后,会在 Python 环境的 bin 目录下生成一个可执行文件,我们不妨看下这个文件内容:

# -*- coding: utf-8 -*-
import sys
from graphrag.cli.main import app
if __name__ == "__main__":
  if sys.argv[0].endswith("-script.pyw"):
    sys.argv[0] = sys.argv[0][:-11]
  elif sys.argv[0].endswith(".exe"):
    sys.argv[0] = sys.argv[0][:-4]
  sys.exit(app())

这个文件根据 [project.scripts] 定义,导入 graphrag.cli.main 包下的 app 方法并执行,因此我们可以直接运行 graphrag 命令。

命令行实现

GraphRAG 命令行是基于 Typer 库开发的,这是一个现代化的 Python CLI 库,基于类型提示构建命令行应用程序。

typer.png

它的核心特点如下:

  • 类型驱动:基于 Python 类型提示自动生成 CLI 接口
  • 易用性:简洁的 API,最少的样板代码
  • 自动化:自动生成帮助文档、参数验证和错误处理
  • 现代化:支持 Python 3.6+ 的现代特性
  • 高级功能:子命令组织、回调函数、进度条、颜色输出、确认提示等

通过寥寥几行代码,我们就可以使用 Typer 开发一个命令行程序:

import typer

app = typer.Typer()

@app.command()
def hello(name: str):
  print(f"Hello {name}")

if __name__ == "__main__":
  app()

再回到 GraphRAG 的代码,当我们执行 python -mgraphrag 命令时,实际上调用的是 __main__.pybin/graphrag 文件,它们都是指向 graphrag.cli.main 包下的 app 方法:

graphrag-typer-app.png

从这里可以看到,GraphRAG 基于 Typer 实现了多个选项和子命令的功能:

graphrag-cli.png

我们以 graphrag index 为例看下命令的调用链:

  1. 入口点__main__.pybin/graphraggraphrag.cli.main:app
  2. CLI 解析cli/main.py@app.command("index") 装饰器
  3. 配置加载cli/index.pyindex_cli() 函数
  4. API 调用api.build_index() 函数

小结

至此,我们对 GraphRAG 的项目结构有了大致的了解,并对 graphrag 命令行的运行原理有了一定的认识。接下来,我们将顺着 graphrag index 的调用链,展开聊聊 GraphRAG 索引构建的具体逻辑。


可视化探索 GraphRAG 的知识图谱

昨天,我们通过一个端到端的示例,快速体验了 GraphRAG 从数据索引到查询的完整流程。我们知道,GraphRAG 的核心是将非结构化文本转化为结构化的知识图谱。然而,单纯查看 Parquet 格式的输出文件很难直观地理解图谱的结构。今天,我们将学习如何将这个知识图谱可视化,从而更清晰地洞察数据中的实体、关系和社区结构。

准备工作

在我们昨天的 ragtest 示例中,uv run poe index 命令已经为我们生成了一系列 Parquet 文件,还有一个图描述文件 graph.graphml,包含了图谱的节点和边信息。为了生成这个文件,在构建索引之前,其实我对 settings.yaml 配置文件做了一点小小的修改,启用了 graphml 快照选项:

snapshots:
  graphml: true

为了支持其他可视化工具,还可以启用下面这些额外参数:

embed_graph:
  enabled: true  # 生成节点的 node2vec 嵌入
umap:
  enabled: true  # 生成 UMAP 嵌入,为实体提供 x/y 位置坐标

运行索引命令后,确保在 output 目录下能找到关键的 graph.graphml 文件,这是许多可视化工具支持的标准文件格式。

GraphML 文件格式

GraphML (Graph Markup Language) 是一种基于 XML 的通用图形描述格式,旨在表示结构化的图形数据,包括节点、边及其属性关系。它支持多种图类型,如有向图、无向图和混合图,并允许用户自定义属性,比如节点的权重和边的类型。GraphML 提供了一种灵活、可扩展的格式,在图论、网络分析和数据可视化等领域有着广泛的应用。

GraphML 文件以 XML 声明开头,核心标签嵌套关系如下:

<?xml version="1.0" encoding="UTF-8"?>
<graphml xmlns="http://graphml.graphdrawing.org/xmlns">
  <!-- 1. 属性定义(key):声明图形元素的属性类型 -->
  <key id="weight" for="edge" attr.name="weight" attr.type="double"/>
  <key id="label" for="node" attr.name="label" attr.type="string"/>
  
  <!-- 2. 图形定义(graph):包含节点和边 -->
  <graph id="example" edgedefault="directed">
    <!-- 节点(node) -->
    <node id="n1">
      <data key="label">Node 1</data>
    </node>
    <node id="n2">
      <data key="label">Node 2</data>
    </node>
    
    <!-- 边(edge) -->
    <edge id="e1" source="n1" target="n2">
      <data key="weight">3.5</data>
    </edge>
  </graph>
</graphml>

整个文件结构包含:

  • 根标签 <graphml>,必须包含 xmlns 命名空间声明,定义 GraphML 的语法规则;
  • 属性定义 <key>,用于预先声明图形元素(节点、边、图形本身)的属性,包括属性名称和数据类型,类似于数据字典;id 为属性的唯一标识,后续可通过 key 引用;
  • 图形标签 <graph>,表示一个具体的图形,可包含多个节点和边,edgedefault 为默认边类型,支持 directed(有向边)和 undirected(无向边);
  • 节点 <node>,表示图形中的节点,id 为唯一标识;
  • <edge>,表示节点间的关系,必须包含 source(起点)和 target(终点)属性;

在 Python 中,常常使用 NetworkX 库读写 GraphML 文件,GraphRAG 用的也是这个库:

import networkx as nx

# 创建无向图
G = nx.Graph()

# 添加节点
nodes = [
  ("n1", {"label": "Node 1"}),
  ("n2", {"label": "Node 2"}),
]

for name, attrs in nodes:
  G.add_node(name, **attrs)

# 添加边
relationships = [
  ("n1", "n2", {"weight": 3.5}),
]

for source, target, attrs in relationships:
  G.add_edge(source, target, **attrs)

# 将图保存为 GraphML 格式
nx.write_graphml(G, "demo.graphml", encoding='utf-8')

使用 Gephi 进行可视化

Gephi 是一款非常流行的开源图可视化和探索工具,非常适合用来分析 GraphRAG 生成的 GraphML 文件。

gephi.jpg

首先,从 Gephi 官网下载并安装最新版本,然后点击 “Open Graph File...” 导入 graph.graphml 文件,你将看到一个包含所有节点和边的基础图谱视图:

gephi-import.png

接着我们安装 Leiden Algorithm 插件,点击 工具插件,搜索 Leiden 关键词,找到并安装该插件,这是进行社区检测的关键工具:

gephi-plugin-leiden.png

然后在统计面板中找到 “平均度数” 和 “Leiden 算法”,点击右侧的 “Run” 运行分析,为后续的可视化提供数据基础。运行 “Leiden 算法” 时调整配置如下:

gephi-plugin-leiden-setting.png

有两点修改:

  • 使用 模块度(Modularity) 作为 质量函数(Quality function),在社区发现中,模块度是衡量社区划分质量的重要指标,其取值范围通常在 [-1, 1] 之间;
  • 分辨率(Resolution) 设置为 1,这个值用于控制社区检测的粒度,如果 > 1 倾向于发现更小、更细粒度的社区,如果 < 1 倾向于发现更大、更粗粒度的社区,如果 = 1 则使用标准模块度优化,产生自然大小的社区;

分析结果如下:

gephi-statistics-run.png

从图中可以看出平均度数为 2.771,模块度质量为 0.602。

度数表示每个节点连接的边数,而平均度数则表示所有节点度数的平均值,2.771 意味着平均每个实体连接到约 2.8 个其他实体,这是一个相对较低的平均度数,表明知识图谱相对稀疏,实体间连接不密集,低连接度有助于形成清晰的社区边界。

另外,0.602 是一个很好的模块度值,说明算法发现了清晰、良好分离的社区。模块度并非越高越好,通常 0.3 ~ 0.7 被广泛认为是较好的社区划分范围,表明网络中的节点确实呈现出明显的 社区内连接紧密、社区间连接稀疏 的特征,划分结果具有较强的合理性;过高的模块度(> 0.7)可能是算法过度优化的结果,甚至可能将本应属于同一社区的节点强行拆分,导致划分结果与实际结构偏离;过低的模块度(< 0.3)通常认为社区结构不明显,划分结果质量较差。

高级可视化技巧

有了上面两个统计值之后,我们就可以对图进行进一步的布局和美化。

首先,使用 Leiden 算法的结果对节点进行分区着色:

  • 通过外观面板选择 Nodes 然后 Partition,并点击右上角的调色板图标,从下拉菜单中选择 Cluster;
  • 点击 Palette 超链接,然后 Generate,取消选中 Limit number of colors ,点击 Generate ,然后 Ok,生成不限颜色数量的调色板,让每个社区都有独特的视觉标识;
  • 点击 Apply 按钮,这将根据 Leiden 发现的分区为图表着色;

gephi-cluster-coloring.png

然后,根据度数调整节点大小,这样可以直观地识别图谱中的关键实体和枢纽节点:

  • 通过外观面板选择 Nodes 然后 Ranking,并点击右上角的 Sizing 图标,从下拉菜单中选择 Degree;
  • 将最小值设置为 10,最大值设置为 50,点击 Apply 按钮;

gephi-degree-resizing.png

接着,使用布局算法对节点进行智能布局优化:

  • 在左下角的 Layout 标签中,选择 OpenORD 布局算法;
  • 将 Liquid 和 Expansion 设置为 50,其他全部设置为 0;
  • 点击 Run 进行初始排列;

gephi-openord-layout.png

  • 继续使用 Force Atlas 2 进行精细调整;
  • 缩放设置为 15,选中 Dissuade Hubs 和 Prevent Overlap;
  • 点击 Run 运行,当图节点看起来已经稳定且位置不再发生显著变化时,按下 Stop 停止;

gephi-force-atlas-2-layout.png

OpenORD 和 Force Atlas 2 都是基于 力引导(force-directed) 的图布局算法,通过模拟节点之间的作用力,基于力的作用对节点位置进行调整,如节点之间的斥力大于引力时,使节点远离,反之则使节点靠近。通过不断调整节点位置,最终使力达到平衡,从而得到图的布局。

最后,在底部的工具栏中,你可以点击黑色的 “T” 图标来显示节点标签,还可以通过旁边的滑块调整标签的大小,防止重叠。

经过这些步骤,你将获得一个结构清晰、层次分明的可视化知识图谱:使用颜色区分社区,不同颜色的节点群代表不同的主题或概念领域;大小表示重要性,较大的节点通常是连接多个概念的关键实体;位置反映关系,相近的节点在语义或功能上具有更强的关联性。

你可以通过缩放、平移来探索图的细节,识别出关键的实体和社区,并理解它们之间的复杂关系。

使用 Unified Search 探索知识图谱

GraphRAG 内置了一个可视化的 Web 应用程序 Unified Search App,它为我们提供了一个全新的交互式探索体验,与静态的 Gephi 可视化不同,Unified Search 允许用户通过直观的界面进行实时查询和探索。

其主要功能包括:

  • 搜索对比: 统一界面比较不同 GraphRAG 搜索方法的结果;
  • 多数据集支持: 通过 listing.json 管理多个 GraphRAG 索引;
  • 图探索: 可视化社区报告、实体图和选定报告;
  • 问题建议: 自动分析数据集生成重要问题;

注意,使用 Unified Search 需要预先运行 GraphRAG 索引,且必须启用图嵌入和 UMAP 配置参数。

首先,进入 unified-search-app 目录,安装所需依赖:

$ cd unified-search-app
$ uv sync --extra dev

在启动 Unified Search 之前,我们还需要创建一个 listing.json 配置文件:

[{
  "key": "ragtest-demo",
  "path": "ragtest",
  "name": "A Christmas Carol",
  "description": "Getting Started index of the novel A Christmas Carol",
  "community_level": 2
}]

Unified Search 支持多数据集,该配置文件包含每个数据集的名称、描述、位置等基本信息。注意 listing.json 文件和数据集目录处于同一级,如下:

- listing.json
- dataset_1
  - settings.yaml
  - .env
  - output
  - prompts
- dataset_2
  - settings.yaml
  - .env
  - output
  - prompts
- ...

然后通过下面的命令启动 Unified Search 应用(注意使用 DATA_ROOT 指定数据集根目录):

$ export DATA_ROOT=/the/path/to/graphrag/
$ uv run poe start

这是一个使用 Streamlit 开发的 Web 程序,启动成功后会自动打开 http://localhost:8501 页面:

unified-search-app.png

整个页面分为两个主面板。左侧为配置面板,提供了多项应用配置选项,该面板可被关闭:

  1. 数据集:以下拉菜单形式按顺序展示你在 listing.json 文件中定义的所有数据集;
  2. 建议问题数量:该选项允许用户设置生成建议问题的数量;
  3. 搜索选项:用于选择应用中启用的搜索方式,至少需要启用一种;

右侧为搜索面板,顶部显示所选数据集的常规信息,例如名称和描述。下方有一个 Suggest some questions 按钮和一个 Ask a question to compare the results 输入框,我们可以在输入框中填写待提交的问题,或者点击按钮通过全局搜索分析数据集并生成最关键的问题:

unified-search-app-suggest-questions.png

勾选问题左侧的复选框,将自动执行搜索和问答。下面有两个标签页,分别为搜索和图探索。搜索页同时展示不同搜索算法的结果,包括本地搜索、全局搜索等:

unified-search-app-search.png

图探索页分为三个模块,社区报告列表、实体图谱和已选报告详情:

unified-search-app-graph-explorer.png

结合 Unified Search 的这些功能,我们可以对图结构进行可视化分析,通过并行对比,用户可以理解不同搜索方法的优势和适用场景。

小结

今天,我们学习了如何将 GraphRAG 生成的知识图谱进行可视化。使用 Gephi 软件,我们可以进行深入的拓扑分析,通过布局、着色、调整大小等一系列操作,发现知识图谱的整体结构和关键模式;而 Unified Search 则提供了动态、交互式的探索体验,让用户能够实时查询和验证假设。

通过可视化,我们不仅能更好地理解数据的内在结构,还能发现隐藏在文本背后的模式和联系,帮助你从复杂的信息网络中提取真正有价值的洞察。


GraphRAG 快速入门

在我们之前的学习过程中,曾简单介绍过 RAGFlow 的一个高级特性 —— 提取知识图谱。通过运用 GraphRAG 和 LightRAG 技术,RAGFlow 可以从实体、关系的提取到子图的构建和合并,再到实体消歧和社区报告的生成,形成完整的知识图谱。因此,这种方法在处理涉及复杂关系与多个实体的文档时,尤其是在多跳问答场景中,表现得尤为卓越。

最早关于 GraphRAG 的概念实际上可以追溯到去年微软研究院发表的论文《From Local to Global: A Graph RAG Approach to Query-Focused Summarization》中:

随后,微软开源了 GraphRAG 项目,在 RAG 领域又掀起了一波新的浪潮:

传统的 RAG 技术通常依赖于基于向量相似度来检索原始文本块,然而,当面对需要整合离散信息以解答的复杂问题时,这种方法往往表现得力不从心。GraphRAG 则采取了一种创新的路径,首先利用大型语言模型从非结构化文本数据中提取出实体和关系,并据此构建一个知识图谱。接着,依托图谱的拓扑结构进行社区发现,对每个社区进行逐层总结,最终形成一个分层且结构化的知识网络。在进行查询时,GraphRAG 利用这一知识网络来强化信息检索,从而为模型提供更具启发性的上下文,以回答那些需要深度推理和全局视角的问题。

核心概念

GraphRAG 的流程主要分为 索引(Indexing)查询(Querying) 两个阶段:

  • 索引(Indexing):这是 GraphRAG 的数据处理阶段,它将非结构化文本转化为结构化知识。主要步骤包括:

    • 文本分块:将长文档切分成小的文本单元(TextUnit);
    • 图谱提取:利用大模型从文本单元中提取出实体(Entities)和关系(Relationships),构建知识图谱;
    • 社区发现:使用 Leiden 等算法对图谱进行层次化社区聚类;
    • 社区总结:利用大模型为每个层级的社区生成摘要报告;
  • 查询(Querying):这是 GraphRAG 的数据检索与问答阶段,利用构建好的知识图谱和摘要来回答问题。比较常见的查询方式有:

    • 全局搜索(Global Search):利用社区摘要,通过 Map-Reduce 的方式对整个数据集进行归纳总结,回答宏观问题;
    • 本地搜索(Local Search):当问题涉及特定实体时,从该实体出发,在知识图谱中向外扩展,聚合其邻近的实体、关系以及相关的原始文本块,为大模型提供精准的局部上下文;

以下是 GraphRAG 的整个流水线示意图:

graphrag-pipeline.png

今天,我们就来快速上手体验一下 GraphRAG 的基本使用。

安装

GraphRAG 是一个 Python 库,可以通过 pip 轻松安装:

$ pip install graphrag

也可以通过源码进行安装,这里我采用这种方法,这样可以一边体验 GraphRAG 一边研究它的源码。首先克隆代码仓库:

$ git clone https://github.com/microsoft/graphrag.git

进入项目目录:

$ cd graphrag

使用 uv 创建 Python 虚拟环境:

$ uv venv --python 3.10
$ source .venv/bin/activate

安装 Python 依赖:

$ uv sync --extra dev

uv 是一个极速的 Python 包和项目管理工具,uv sync 是它的一个命令,用于更新项目的环境,确保项目环境中的依赖与 uv.lock 文件一致。

快速上手

下面,我们就按照官方文档的步骤,通过一个完整的端到端示例,来体验 GraphRAG 的基本使用。首先,我们创建一个测试目录:

$ mkdir -p ./ragtest/input

该目录用于存放原始文档,GraphRAG 支持 txt、json 和 csv 三种格式的文档。

官网给的文档示例是查尔斯・狄更斯(Charles Dickens)创作的一部著名小说《圣诞颂歌》,讲述吝啬鬼斯克鲁奇(Scrooge)在圣诞节前夜经历的奇妙故事,通过三个幽灵的拜访,他的自私和冷酷逐渐崩塌,人性中的同情、仁慈、爱心及喜悦被唤醒,从此成为了一个乐善好施的人。我们可以从 古腾堡计划 下载这本书:

$ curl https://www.gutenberg.org/cache/epub/24022/pg24022.txt -o ./ragtest/input/book.txt

古腾堡计划是一个致力于创建和分发免费电子书的志愿者项目,它提供了大量版权已过期的经典文学作品,可以在上面找到很多免费的图书。

接着,使用 init 命令初始化 GraphRAG 工作空间:

$ uv run poe init --root ./ragtest

其中 poePoe The Poet 的简称,它是一个任务运行器工具,允许在 pyproject.toml 中定义和运行各种任务(类似 npm scriptsMakefile)。通过 poe 可以简化常见操作,如运行测试、启动服务、构建项目等。

如果你是通过 pip 安装的,可以直接使用 graphrag 命令:

$ graphrag init --root ./ragtest

该命令会在 ./ragtest 目录下创建两个核心配置文件以及一些提示词文件:

  • .env:用于存放环境变量,主要是 API Key;
  • settings.yaml:GraphRAG 的主配置文件,包含了数据输入、模型配置、工作流等所有设置;
  • prompts:运行过程中使用的一些提示词,用户可以对其进行微调;

GraphRAG 的核心流程严重依赖大模型,因此我们需要配置模型服务。打开刚刚生成的 .env 文件,将其中的 GRAPHRAG_API_KEY 替换成你自己的 OpenAI API Key:

GRAPHRAG_API_KEY="sk-..."

如果你使用的是 OpenAI 兼容接口,除了 API Key,还需要在 settings.yaml 文件中配置 api_base 参数;如果你使用的是 Azure OpenAI,可能还需要配置 api_versiondeployment_name 等参数:

models:
  default_chat_model:
    type: openai_chat # or azure_openai_chat
    api_base: ${GRAPHRAG_API_BASE}  # set this in the generated .env file
    # api_version: 2024-05-01-preview
    auth_type: api_key # or azure_managed_identity
    api_key: ${GRAPHRAG_API_KEY}    # set this in the generated .env file
    model: gpt-4o-mini
    # deployment_name: <azure_model_deployment_name>
  default_embedding_model:
    type: openai_embedding # or azure_openai_embedding
    api_base: ${GRAPHRAG_API_BASE}
    auth_type: api_key # or azure_managed_identity
    api_key: ${GRAPHRAG_API_KEY}
    model: text-embedding-3-small
    # deployment_name: <azure_model_deployment_name>

配置完成后,我们就可以开始构建索引了:

$ uv run poe index --root ./ragtest

这个过程会花费一些时间,具体取决于你的数据大小和模型性能。GraphRAG 会在后台执行一系列复杂的流程,包括:文本分块、实体与关系提取、图谱构建、社区发现、社区报告生成等。执行成功后,你会在 ./ragtest/output 目录下看到一系列 Parquet 格式的结果文件,这些就是我们构建好的知识图谱数据:

$ tree ./ragtest/output
├── communities.parquet        # 社区表
├── community_reports.parquet  # 社区报告
├── context.json
├── documents.parquet          # 文档表
├── embeddings.community.full_content.parquet
├── embeddings.entity.description.parquet
├── embeddings.text_unit.text.parquet
├── entities.parquet           # 实体表
├── graph.graphml              # 知识图谱
├── lancedb                    # 向量数据库
│   ├── default-community-full_content.lance
│   ├── default-entity-description.lance
│   └── default-text_unit-text.lance
├── relationships.parquet      # 关系表
├── stats.json
└── text_units.parquet         # 文本单元

GraphRAG 使用 Parquet 存储数据,这是一种列式存储的二进制文件格式,专为高效存储和处理大规模结构化数据而设计,广泛用于大数据处理和分析场景。另外,LanceDB 是一个为机器学习优化的向量数据库,使用 Apache Arrow 格式存储。GraphRAG 使用它来存储文本嵌入向量,用于相似性搜索。

索引构建完成后,我们就可以通过 query 命令来查询,GraphRAG 支持多种查询模式,我们来体验下最常用的两种。

  • 全局搜索(Global Search):适用于需要对整个数据集进行宏观理解和总结的问题:
$ uv run poe query \
    --root ./ragtest \
    --method global \
    --query "What are the top themes in this story? 用中文回答"

graphrag-query-1.png

  • 本地搜索(Local Search):适用于查询关于特定实体的具体信息:
$ uv run poe query \
    --root ./ragtest \
    --method local \
    --query "Who is Scrooge and what are his main relationships? 用中文回答"

graphrag-query-2.png

小结

今天,我们初步探索了 GraphRAG,通过动手实践,体验了其从安装、配置到索引构建和查询的完整流程。GraphRAG 通过将非结构化文本转化成结构化的知识图谱,提供了一种创新的 RAG 范式。这种方法不仅提高了复杂问题的解决能力,还为多跳问答和深度推理提供了强有力的支持。

当然,GraphRAG 的功能远不止于此,它还支持更高级的功能,如提示词自动调优、自定义工作流、图可视化等。

我们后面将结合其源码,继续探索 GraphRAG 的强大能力和实现原理。