Fork me on GitHub

2025年7月

学习 RAGFlow 的 DeepDoc 技术之解析器

我们昨天将任务执行器中的 do_handle_task() 函数从头到尾梳理了一遍,详细学习了 RAGFlow 的文件解析和分块逻辑。其中还遗漏了一些关键技术点,包括 DeepDoc 深度解析文档、RAPTOR 分块策略、GraphRAG 构建知识图谱,以及自动提取关键字、自动生成问题、标签集的构建和使用等高级配置。让我们一样一样来,先从 DeepDoc 技术学起。

在知识库的配置页面,当我们选择与 PDF 兼容的切片方法时,比如 GeneralManualPaperBookLawsPresentationOne 这几个,会出现 PDF 解析器下拉菜单:

ragflow-kb-configuration.png

默认支持 DeepDocNative 两种方式:

  • DeepDoc - 基于 OCR、表格识别、布局分析等视觉模型,深度分析 PDF 文档,有效地提取文档标题、文本块、图像和表格等内容;
  • Native - 仅提取 PDF 中的纯文本内容;当用户的 PDF 仅包含纯文本时,可以选择该选项,从而减少整体解析时间;

目前最新版本中还有个实验特性,如果用户配置了第三方视觉模型(也就是之前配置的 Img2txt 模型),也可以使用这些视觉模型来做 PDF 解析。

ragflow-pdf-parser.png

我们重点关注 DeepDoc 方式,因为这是 RAGFlow 的默认方式,也是最强大的方式。

注意 DeepDoc 选项暂时不适用于 Word 文档,若要使用该功能,可以将 Word 文档转换为 PDF 格式。

DeepDoc 简介

DeepDoc 是 RAGFlow 的核心特性之一,可以对来自不同领域、具有不同格式和不同检索要求的文档进行深度分析,它负责将各类复杂文档转换为结构化的文本格式,为后续的分块和向量化处理打下基础。从 v0.17.0 版本开始,RAGFlow 将数据提取任务与分块方法解耦,成为两个独立的模块。这种解耦的设计具有更高的灵活性,支持更深层次的定制,以适应更复杂的用例。用户能够自主选择适合的视觉模型,在速度和性能之间取得平衡,以满足具体的用例需求。

分块的逻辑位于 rag/app 目录:

$ tree rag/app
rag/app
├── __init__.py
├── audio.py
├── book.py
├── email.py
├── laws.py
├── manual.py
├── naive.py
├── one.py
├── paper.py
├── picture.py
├── presentation.py
├── qa.py
├── resume.py
├── table.py
└── tag.py

这些文件就对应我们之前学习过的十四种不同的 分块器(Chunker),RAGFlow 会根据不同的文档类型使用不同的分块器,对分块逻辑感兴趣的朋友可以研究下这里的代码。有很多分块器依赖于 DeepDoc 来解析文档,DeepDoc 的逻辑位于 deepdoc 目录:

$ tree deepdoc
deepdoc
├── __init__.py
├── parser
│   └── ...
└── vision
    ├── layout_recognizer.py
    ├── ocr.py
    └── table_structure_recognizer.py

从目录结构可以看出,DeepDoc 由两个部分组成:解析器(parser)视觉处理(vision),解析器提供了不同格式文档的通用解析方法,供分块器使用;而视觉处理部分则提供了 OCR、表格识别、布局分析等高级特性,可以对 PDF 和 图片等文件实现更好的识别效果。分块器、解析器和视觉处理这三者之间的关系如下:

ragflow-deepdoc.png

解析器概览

今天我们主要关注 DeepDoc 的解析器部分,视觉处理部分我们放到下一篇文章中学习。

$ tree deepdoc/parser
deepdoc/parser
├── __init__.py
├── docx_parser.py
├── excel_parser.py
├── figure_parser.py
├── html_parser.py
├── json_parser.py
├── markdown_parser.py
├── pdf_parser.py
├── ppt_parser.py
├── resume
│   ├── step_one.py
│   └── step_two.py
├── txt_parser.py
└── utils.py

deepdoc/parser 目录下的文件可以看到,DeepDoc 内置了 10 种不同的解析器,支持 Word、Excel、PPT、PDF、HTML、Markdown、JSON、图片等文件的处理和解析;此外,由于简历是一种非常复杂的文档,包含各种格式的非结构化文本,可以被解析为包含近百个字段的结构化数据,RAGFlow 还内置了一个专门的简历解析器。

DOCX 解析器

DOCX 解析器的实现类为 RAGFlowDocxParser,代码如下:

docx-parser.png

它使用 python-docx 读取 docx 文件,将其解析为结构化的文本和表格数据,主要包括两个部分:

  • 段落解析:遍历每个 paragraphrun(在 python-docx 中,paragraph 表示段落,run 表示具有相同样式的一段连续文本,它是段落的更细分单位);提取出文本和样式信息,格式为 (text, style_name)
  • 表格解析:遍历每个 tablerowscells,然后将表格转换为 pandas 的 DataFrame 对象;使用了一些规则来识别表头和数据类型,然后按"表头: 内容"的格式返回组织好的表格数据,比如 表头1:内容;表头2:内容;...

注意它只对 docx 文件生效,对于 doc 文件,在分块器代码里可以看到使用 tika 来读取,目前这个并没有放到解析器里。

假设 docx 文件中含有如下段落内容:

docx-parser-1.png

解析后的 secs 如下:

[
  ('标题一', 'Heading 1'), 
  ('这里是第一段正文。', 'Normal'), 
  ('标题二', 'Heading 1'), 
  ('这里是第二段正文。', 'Normal')
]

假设 docx 文件中含有如下表格内容:

docx-parser-2.png

解析后的 tbls 如下:

[
  [
    '姓名: 张三;学号: 001;年龄: 18;性别: 男;成绩: 100',
    '姓名: 李四;学号: 002;年龄: 19;性别: 女;成绩: 99',
    '姓名: 王五;学号: 003;年龄: 20;性别: 男;成绩: 98'
  ]
]

Excel 解析器

Excel 解析器的实现类为 RAGFlowExcelParser,代码如下:

excel-parser.png

它的实现比较简单,首先通过 _load_excel_to_workbook() 方法将 Excel 文件加载为 Workbook 对象,然后遍历每个 Worksheet 的表格数据,将其转换为类似上面 Word 表格解析后的格式,方便后续文本向量化处理。

解析器支持自动识别 Excel 或 CSV 格式:

if not (file_head.startswith(b'PK\x03\x04') or file_head.startswith(b'\xD0\xCF\x11\xE0')):
  
  # 不是 Excel 文件
  df = pd.read_csv(file_like_object)
  return RAGFlowExcelParser._dataframe_to_workbook(df)

return load_workbook(file_like_object,data_only= True)

它通过文件头检测技术判断文件是否为 Excel 文件,如果前 4 字节是 PK\x03\x04 表示是 XLSX 文件,如果是 \xD0\xCF\x11\xE0 表示是旧版 XLS 文件;对于 Excel 文件,程序使用 openpyxlload_workbook 来读取;对于 CSV 文件,则使用 pandas 的 read_csv 来读取,然后调用 _dataframe_to_workbook() 方法将其转换为 Workbook 对象,便于后续的统一处理。

PPT 解析器

PPT 解析器的实现类为 RAGFlowPptParser,代码如下:

ppt-parser.png

它使用 python-pptx 来读取 PPT 文件,通过遍历幻灯片中的所有形状,提取出所有的纯文本数据。形状(Shape) 是 PPT 文件中的一个重要概念,包含各种类型的元素:文本框、图片、表格、图表、图形、组合形状等。函数 __extract(shape) 从形状中提取内容,核心逻辑如下:

  • 文本框:提取段落文本,保留项目符号和缩进结构
  • 表格:转换为键值对格式(第一行作为表头)
  • 组合形状:按位置顺序递归处理内部的所有形状
  • 其他:返回空字符串

这里有一个细节,程序在遍历形状时,会按垂直位置和水平位置排序,确保文本顺序符合视觉阅读顺序。

未完待续

今天我们学习了 RAGFlow 的 DeepDoc 技术,它由解析器和视觉处理两部分组成,将各类复杂文档转换为结构化的文本数据,为分块和向量化打下基础。我们详细学习了其中的 3 种解析器,包括:DOCX 解析器、Excel 解析器 和 PPT 解析器。

今天的内容就这么多,我们将在下一篇继续学习剩下的解析器。


再学 RAGFlow 的文件解析逻辑

经过几天的学习,我们了解了 RAGFlow 的文件上传和解析流程,了解了解析任务是如何触发并放入 Redis Stream 消息队列中,等待任务执行器消费和处理的。今天我们将继续学习任务执行器中最重要的函数 do_handle_task() 的实现,看看 RAGFlow 是如何具体执行每个解析任务的。

do_handle_task 函数实现

do_handle_task 是 RAGFlow 系统中的任务处理函数,负责处理文档解析、分块、向量化和索引的完整流程。它的主要逻辑如下:

  1. 判断任务是否被取消,如果是,则直接返回;
  2. 根据任务配置绑定对应的嵌入模型,用于后续的向量化处理;
  3. 根据嵌入模型的向量维度,初始化知识库索引结构;
  4. 根据任务类型执行不同的处理流程:

    • 如果是 RAPTOR 类型的任务,则执行递归抽象处理;
    • 如果是 GraphRAG 类型的任务,则执行知识图谱构建;
    • 如果是标准分块类型的任务,则执行普通分块处理;
  5. 批量插入分块数据到知识库索引中;
  6. 更新文档统计信息,包括分块数量、token 数量等;

下面是 do_handle_task 函数的核心实现:

async def do_handle_task(task):

  # 过程回调,用于报告进度
  progress_callback = partial(set_progress, task_id, task_from_page, task_to_page)

  # 判断任务是否已取消
  task_canceled = TaskService.do_cancel(task_id)
  if task_canceled:
    progress_callback(-1, msg="Task has been canceled.")
    return

  # 绑定嵌入模型
  embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
  vts, _ = embedding_model.encode(["ok"])
  vector_size = len(vts[0])

  # 初始化知识库
  init_kb(task, vector_size)

  if task.get("task_type", "") == "raptor":
    # 使用 RAPTOR 分块策略
  elif task.get("task_type", "") == "graphrag":
    # 使用 GraphRAG 分块策略
  else:
    # 使用标准分块策略
    chunks = await build_chunks(task, progress_callback)
    # 计算每个分块的向量
    token_count, vector_size = await embedding(chunks, embedding_model, task_parser_config, progress_callback)

  # 批量插入分块数据
  for b in range(0, len(chunks), DOC_BULK_SIZE):
    doc_store_result = await trio.to_thread.run_sync(
      lambda: settings.docStoreConn.insert(
        chunks[b:b + DOC_BULK_SIZE], search.index_name(task_tenant_id), task_dataset_id))

  # 更新文档统计信息
  DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)

上面的代码中有几个需要注意的点,值得展开来学习下。

了解偏函数

progress_callback = partial(set_progress, task_id, task_from_page, task_to_page)

这段代码使用了 Python 中的 偏函数(Partial Function),一个比较冷门但有点意思的功能。偏函数是一种特殊的函数,它通过固定原函数的某些参数,从而创建出一个新的函数。偏函数的创建主要通过 functools.partial 实现,下面是一个简单的示例,固定加法函数的一个参数:

from functools import partial

# 定义一个加法函数
def add(a, b):
    return a + b

# 创建一个偏函数,固定 a=10
add_ten = partial(add, 10)

# 调用偏函数
print(add_ten(5))  # 输出 15 (相当于 10 + 5)
print(add_ten(10)) # 输出 20 (相当于 10 + 10)

回到前面的代码片段,我们使用偏函数创建了一个新的回调函数 progress_callback,它固定了 set_progress 函数的前三个参数:task_idtask_from_pagetask_to_page。当后续调用 progress_callback 时,只需提供剩余的参数(如进度值和错误消息)即可:

progress_callback(50, msg="Processing...")

progress_callback(-1, msg="Error occurred")

这种方式的好处是,在多次调用 progress_callback 时,不需要重复传递某些固定参数,减少函数调用时需要提供的参数数量。在 do_handle_task() 函数的实现中,会大量的调用 progress_callback 汇报任务进度,通过偏函数,可以让代码更加简洁和灵活,提高了代码的可读性和可维护性。

绑定嵌入模型

embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
vts, _ = embedding_model.encode(["ok"])
vector_size = len(vts[0])

RAGFlow 通过 LLMBundle 将所有大模型的操作统一封装在一个类里,方便使用。比如通过 LLMBundle.encode() 方法调用嵌入模型,LLMBundle.chat() 方法调用聊天模型,等等。

LLMType 枚举了所有支持的模型类型(这也就是我们之前在 “模型供应商” 页面配置的模型类型):

class LLMType(StrEnum):
  CHAT = 'chat'                # 聊天模型
  EMBEDDING = 'embedding'      # 嵌入模型
  SPEECH2TEXT = 'speech2text'  # 语音转文字
  IMAGE2TEXT = 'image2text'    # 图片转文字
  RERANK = 'rerank'            # 排序模型
  TTS    = 'tts'               # 文字转语音

这里创建的 embedding_model 是一个嵌入模型,它的具体实现位于 rag/llm/embedding_model.py 文件,该文件定义了大量的嵌入模型实现,包括:

  • OpenAI 系列

    • OpenAI - text-embedding-ada-002 / text-embedding-3-large (OpenAIEmbed)
    • Azure OpenAI (AzureEmbed)
  • 中国厂商

    • 阿里通义千问 - text_embedding_v2 (QWenEmbed)
    • 智谱 AI - embedding-2 / embedding-3 (ZhipuEmbed)
    • 百川智能 - Baichuan-Text-Embedding (BaiChuanEmbed)
    • 百度千帆 (BaiduYiyanEmbed)
  • 国外厂商

    • Cohere - embed-multilingual-v3.0 (CoHereEmbed)
    • Mistral - mistral-embed (MistralEmbed)
    • Jina - jina-embeddings-v3 (JinaEmbed)
    • Voyage AI - voyage-large-2 (VoyageEmbed)
    • Gemini - text-embedding-004 (GeminiEmbed)
    • Bedrock - amazon.titan-embed-text-v2:0 / cohere.embed-multilingual-v3 (BedrockEmbed)
  • 聚合平台

    • TogetherAI - togethercomputer/m2-bert-80M-8k-retrieval (TogetherAIEmbed)
    • SiliconFlow - 硅基流动 (SILICONFLOWEmbed)
    • VolcEngine - 火山引擎方舟平台 (VolcEngineEmbed)
    • PerfXCloud - 澎峰科技 (PerfXCloudEmbed)
    • Upstage - solar-embedding-1-large (UpstageEmbed)
    • NovitaAI - 兼容接口 (NovitaEmbed)
    • GiteeAI - 码云AI平台 (GiteeEmbed)
    • Replicate - 云端推理平台 (ReplicateEmbed)
  • 本地部署模型

    • BAAI/bge-large-zh-v1.5 (DefaultEmbedding)
    • BAAI/bge-small-en-v1.5 (FastEmbed)
    • maidalun1020/bce-embedding-base_v1 (YoudaoEmbed)
  • 本地部署框架

初始化知识库索引

创建嵌入模型实例之后,RAGFlow 通过 embedding_model.encode(["ok"]) 来验证模型的可用性,同时还能获取到嵌入模型的向量维度信息。接着,它会根据向量维度创建相应的索引库:

init_kb(task, vector_size)

init_kb() 函数的实现如下:

def init_kb(row, vector_size: int):
  idxnm = search.index_name(row["tenant_id"])
  return settings.docStoreConn.createIdx(idxnm, row.get("kb_id", ""), vector_size)

可以看到 RAGFlow 通过 docStoreConncreateIdx(tenant_id, kb_id, vector_size) 创建索引库,其中 docStoreConn 表示文档存储引擎,可以通过环境变量 DOC_ENGINE 进行切换,目前支持三种:

RAGFlow 默认使用 Elasticsearch 作为文档存储引擎。另外从 createIdx 的三个参数可以看出,RAGFlow 是支持多租户的,不同租户的索引库之间是隔离的。其实,只有 Infinity 是根据传入的三个参数创建索引库的,Elasticsearch 和 OpenSearch 则是根据 tenant_id 创建索引库的,kb_idvector_size 对它们来说没有用。

es-index.png

之所以 Elasticsearch 和 OpenSearch 创建索引时不用指定向量维度,是因为 RAGFlow 使用了一种比较讨巧的方法,它将常见的向量维度提前预定义在 Mapping 里了(参考 conf/mapping.jsonconf/os_mapping.json 文件):

{
  "dynamic_templates": [
    {
      "dense_vector": {
        "match": "*_512_vec",
        "mapping": {
          "type": "dense_vector",
          "index": true,
          "similarity": "cosine",
          "dims": 512
        }
      }
    },
    {
      "dense_vector": {
        "match": "*_768_vec",
        "mapping": {
          "type": "dense_vector",
          "index": true,
          "similarity": "cosine",
          "dims": 768
        }
      }
    },
    {
      "dense_vector": {
        "match": "*_1024_vec",
        "mapping": {
          "type": "dense_vector",
          "index": true,
          "similarity": "cosine",
          "dims": 1024
        }
      }
    },
    {
      "dense_vector": {
        "match": "*_1536_vec",
        "mapping": {
          "type": "dense_vector",
          "index": true,
          "similarity": "cosine",
          "dims": 1536
        }
      }
    }
  ]
}

如果你的嵌入模型不是 512、768、1024 或 1536 维的,那么在创建索引时可能会报错,需要修改 Mapping 文件来支持新的向量维度。

文档分块策略

昨天在学习知识库配置时,我们提到了两个高级配置:

  • 使用召回增强 RAPTOR 策略(use_raptor) - 为多跳问答任务启用 RAPTOR 提高召回效果;
  • 提取知识图谱(use_graphrag) - 在当前知识库的文件块上构建知识图谱,以增强涉及嵌套逻辑的多跳问答;

这两个配置其实对应着 RAGFlow 不同的文档分块策略:

if task.get("task_type", "") == "raptor":
  # 使用 RAPTOR 分块策略
elif task.get("task_type", "") == "graphrag":
  # 使用 GraphRAG 分块策略
else:
  # 使用标准分块策略
  chunks = await build_chunks(task, progress_callback)
  # 计算每个分块的向量
  token_count, vector_size = await embedding(chunks, embedding_model, task_parser_config, progress_callback)

关于 RAPTOR 和 GraphRAG 的实现,我们后面再详细学习,今天我们先来学习下标准分块策略的实现。它的核心流程分两步:

  • build_chunks() - 执行标准文档分块流程
  • embedding() - 对分块进行向量化处理

其中,build_chunks() 函数的实现大致如下:

async def build_chunks(task, progress_callback):

    # 从对象存储中读取文件
    bucket, name = File2DocumentService.get_storage_address(doc_id=task["doc_id"])
    binary = await get_storage_binary(bucket, name)

    # 调用分块器进行分块,通过 chunk_limiter 限制并发路数
    chunker = FACTORY[task["parser_id"].lower()]
    async with chunk_limiter:
        cks = await trio.to_thread.run_sync(lambda: chunker.chunk(...))

    # 将分块结果上传到对象存储
    async with trio.open_nursery() as nursery:
        for ck in cks:
            nursery.start_soon(upload_to_minio, doc, ck)

    return docs

它首先根据 doc_id 从数据库中查询出桶名和文件名,从对象存储中读取出文件内容;接着使用 parser_id 创建对应的分块器,然后调用它的 chunk() 方法对文件进行分块;最后将分块结果上传到对象存储。这里的 parser_id 我们昨天已经学习过了,它表示切片方法,有 General, Q&A, Resume, Manual, Table, Paper, Book, Laws, Presentation, One, Tag 共 11 种,对应的分块器实现定义在 FACTORY 工厂中:

FACTORY = {
  "general": naive,
  ParserType.NAIVE.value: naive,
  ParserType.PAPER.value: paper,
  ParserType.BOOK.value: book,
  ParserType.PRESENTATION.value: presentation,
  ParserType.MANUAL.value: manual,
  ParserType.LAWS.value: laws,
  ParserType.QA.value: qa,
  ParserType.TABLE.value: table,
  ParserType.RESUME.value: resume,
  ParserType.PICTURE.value: picture,
  ParserType.ONE.value: one,
  ParserType.AUDIO.value: audio,
  ParserType.EMAIL.value: email,
  ParserType.KG.value: naive,
  ParserType.TAG.value: tag
}

可以看到 FACTORY 工厂中除了上面的 11 种切片方法之外,还多出了 PICTURE, AUDIOEMAIL 三种切片方法,暂时没看到使用,估计 RAGFlow 后面会支持对图片、音频和邮件的处理吧。

不同类型的文件使用不同的切片方法,这是 RAGFlow 的核心优势之一,官方将这种特性称为 基于模板的文本切片方法(Template-based chunking;实际上,在实施切片之前,我们还需要将各类文档转换为文本格式,这是基于 RAGFlow 的 深度文档理解(DeepDoc 技术实现的;DeepDoc 支持广泛的文件格式,能够处理各类复杂文档的布局和结构,确保从 PDF、Word、PPT 等文件中提取高质量、有价值的信息。

对分块进行向量化处理

通过调用 build_chunks() 方法,我们根据知识库配置,将文档切分成了一个个的分块数据,接着调用 embedding() 对分块进行向量化处理:

async def embedding(docs, mdl, parser_config=None, callback=None):

    # 准备标题和内容数据
    tts, cnts = [], []
    for d in docs:
        tts.append(d.get("docnm_kwd", "Title"))
        cnts.append(d["content_with_weight"])

    # 计算标题的向量(只计算第一个标题,然后复制到所有文档,这里的 docs 属于同一个文档,因此文件名是一样的)
    tk_count = 0
    if len(tts) == len(cnts):
        vts, c = await trio.to_thread.run_sync(
            lambda: mdl.encode(tts[0: 1]))
        tts = np.concatenate([vts for _ in range(len(tts))], axis=0)
        tk_count += c

    # 计算内容的向量(按批生成)
    cnts_ = np.array([])
    for i in range(0, len(cnts), EMBEDDING_BATCH_SIZE):
        vts, c = await trio.to_thread.run_sync(
            lambda: mdl.encode([truncate(c, mdl.max_length-10) for c in cnts[i: i + EMBEDDING_BATCH_SIZE]]))
        if len(cnts_) == 0:
            cnts_ = vts
        else:
            cnts_ = np.concatenate((cnts_, vts), axis=0)
        tk_count += c

    # 加权融合标题和内容向量
    cnts = cnts_
    filename_embd_weight = parser_config.get("filename_embd_weight", 0.1)
    title_w = float(filename_embd_weight)
    vects = (title_w * tts + (1 - title_w) * cnts) if len(tts) == len(cnts) else cnts

    # 将向量添加到每个文档中
    vector_size = 0
    for i, d in enumerate(docs):
        v = vects[i].tolist()
        vector_size = len(v)
        d["q_%d_vec" % len(v)] = v
    return tk_count, vector_size

这里有一个点值得注意,RAGFlow 在计算分块向量时综合考虑了标题(也就是文件名)和内容的,通过加权将标题和内容的向量进行融合,标题的权重默认为 0.1,内容的权重为 0.9,可以通过 filename_embd_weight 参数进行调整。最后计算出的向量会添加到每个文档的 q_N_vec 字段中,其中 N 表示向量的维度。

至此,我们就得到了文档的分块以及每个分块的向量,在 do_handle_task 函数的最后,通过批量插入将分块数据写入到知识库索引中。下面是写入到 ES 索引库中的一个分块示例:

es-doc.png

也可以点击知识库中的文件名称,对文件的分块数据进行浏览和编辑:

ragflow-kb-chunks.png

小结

今天我们详细学习了 RAGFlow 的文件解析逻辑,将任务执行器中的 do_handle_task() 函数从头到尾梳理了一遍,从任务进度的汇报,嵌入模型的选择,索引库的构建,到根据文档类型选择合适的文档分块策略,再到对分块后的内容进行向量化处理,到最后的批量写入。相信通过整个过程的学习,你对 RAGFlow 的文件解析逻辑已经有了更深入的了解,并对 RAGFlow 的工作原理有了更直观的感受。

不过,这里还有很多技术细节没有展开,比如 RAGFlow 是如何使用 DeepDoc 技术深度理解和解析文档的,它使用的 RAPTOR 分块策略是什么,它又是如何使用 GraphRAG 构建知识图谱的。还有昨天学习的一些高级配置参数也值得进一步研究,比如自动提取关键字,自动生成问题,标签集的构建和使用,等等。我们明天继续学习。


学习 RAGFlow 的知识库配置

书接上回,昨天我们深入学习了如何触发解析任务,如何通过 Redis Stream 作为消息队列投递任务,以及任务执行器如何利用 trio 异步框架和消费者组机制,消费和处理这些任务。我们可以用 Redis 客户端连接到 Redis,看看 rag_flow_svr_queue 队列中的消息是什么样的:

redis-stream-detail.png

任务消息结构

我们知道,每条 Redis Stream 的消息由 ID 和 Value 组成,Value 是一个字典,包含多对键值对;这里的 Value 只有一对键值对,键为 message,值为一个 JSON 字符串,表示任务的详细信息:

{
  "id": "58b8b5a65e5b11f0b6c20242ac120006",
  "doc_id": "677bfde25e5a11f09c890242ac120006",
  "progress": 0.0,
  "from_page": 0,
  "to_page": 100000000,
  "digest": "81e29dac5b568aca",
  "priority": 0,
  "create_time": 1752240687243,
  "create_date": "2025-07-11 21:31:27",
  "update_time": 1752240687243,
  "update_date": "2025-07-11 21:31:27"
}

很显然,这里的信息还不够完整,因此 collect() 继续通过任务 ID 查询数据库,获取了更详细的任务信息:

ragflow-task-executor-collect.png

详细的任务信息如下:

{
  "id": "58b8b5a65e5b11f0b6c20242ac120006",
  "doc_id": "677bfde25e5a11f09c890242ac120006",
  "from_page": 0,
  "to_page": 100000000,
  "retry_count": 0,
  "kb_id": "e5aa2dbc5b9711f0b0880242ac120006",
  "parser_id": "naive",
  "parser_config": {
    "layout_recognize": "DeepDOC",
    "chunk_token_num": 512,
    "delimiter": "\n",
    "auto_keywords": 0,
    "auto_questions": 0,
    "html4excel": false,
    "raptor": {
      "use_raptor": false
    },
    "graphrag": {
      "use_graphrag": false
    }
  },
  "name": "README.md",
  "type": "doc",
  "location": "README.md",
  "size": 9078,
  "tenant_id": "fb5e4b9e5ae211f0b4620242ac120006",
  "language": "English",
  "embd_id": "text-embedding-3-small@OpenAI",
  "pagerank": 0,
  "img2txt_id": "gpt-4.1-mini@OpenAI",
  "asr_id": "whisper-1@OpenAI",
  "llm_id": "gpt-4.1-mini@OpenAI",
  "update_time": 1752240687243,
  "task_type": ""
}

这里的 parser_idparser_config 是文件解析时用到的最为重要的两个参数,parser_id 表示切片方法,而 parser_config 则表示文件解析时的配置,包括解析策略、分块大小、分隔符、是否自动提取关键字和问题等。

在继续研究 do_handle_task() 函数的解析逻辑之前,我们需要先了解下 RAGFlow 的知识库配置都有哪些。

切片方法

RAGFlow 提供了多种切片方法,以便对不同布局的文件进行分块,并确保语义完整性。我们可以在知识库配置页面中进行选择:

ragflow-kb-configuration.png

正确选择知识库配置对于未来的召回和问答效果至关重要。下面是官方对每种切片方法的说明:

ragflow-chunk-method.png

General 分块方法

这是最简单的一种分块方法,也是最通用的一种,它支持众多的文件格式,包括 MD、MDX、DOCX、XLSX、XLS、PPT、PDF、TXT、JPEG、JPG、PNG、TIF、GIF、CSV、JSON、EML、HTML 等。它的处理逻辑如下:

  • 它首先使用视觉检测模型将连续文本分割成多个片段;
  • 接下来,这些连续的片段被合并成 Token 数不超过 chunk_token_num 的块。

下面是 General 分块方法的示例:

general-1.png

general-2.png

Q&A 分块方法

此分块方法专门用于问答对的处理,支持 Excel、CSV 和 TXT 文件格式:

  • 如果文件是 Excel 格式,则应由两列组成,第一列提出问题,第二列提供答案。文件不需要标题行,且支持多个工作表(Sheet);
  • 如果文件是 CSV 和 TXT 格式,必须是 UTF-8 编码且使用制表符(TAB)作为问题和答案的定界符;

注意,RAGFlow 在处理问答对文件时将自动忽略不满足上述规则的文本行。

下面是 Q&A 分块方法的示例:

qa-1.jpg

qa-2.jpg

Resume 分块方法

此分块方法专门用于处理简历文件,将各种形式的简历解析并整理成结构化数据,方便招聘人员搜索候选人。支持 DOCX、PDF 和 TXT 格式。

resume.png

Manual 分块方法

此分块方法专门用于处理手册文件,目前仅支持 PDF 格式。

RAGFlow 在处理手册文件时,假设手册具有分层的章节结构,将最低层级的章节标题作为文档分块的基本单位。因此,同一章节中的图表和表格不会被分开,这可能会导致分块的篇幅更大。

manual.png

Table 分块方法

此分块方法专门用于处理表格文件,支持 Excel、CSV 和 TXT 格式。

  • 表格文件的第一行必须包含列标题;
  • 列标题必须是有意义的术语,便于大模型理解;可以将同义词用斜线 / 隔开,比如 分块方法/切片方法;并使用括号列出所有的枚举值,例如:性别(男,女)颜色(黄,蓝,棕)尺寸(M,L,XL,XXL) 等;
  • 如果是 CSV 或 TXT 格式,列与列之间的分隔符必须是制表符(TAB);

table-1.jpg

table-2.jpg

Paper 分块方法

此分块方法专门用于处理论文文件,目前仅支持 PDF 格式。

论文将按章节进行分块,例如 摘要1.1 节1.2 节 等部分。这种方法使大模型能够更有效地总结论文,并提供更全面、易于理解的回答。然而,它也增加了对话的上下文,进而增加了大模型的计算成本。因此,在对话过程中,可考虑降低 topN 的值。

paper.png

Book 分块方法

此分块方法专门用于处理书籍文件,支持 DOCX、PDF 和 TXT 格式。对于 PDF 格式的书,请设置页面范围,以去除不必要的信息并缩短分析时间。

book-1.jpg

book-2.jpg

Laws 分块方法

此分块方法专门用于处理法律文书,支持 DOCX、PDF 和 TXT 格式。

在法律文书(如合同、宪法、国际条约、公司章程等)中,常常按 篇(Part) - 章(Chapter) - 节(Section) - 条(Article) - 款(Paragraph) - 项(Subparagraph) 这样的层级划分内容。其中 条(Article) 是构成法律文书的基本结构单元,它用于对特定主题或事项进行分点阐述,具有明确的逻辑层级和法律效力。例如《联合国宪章》中的 “Article 51”(第五十一条) 或者合同中的 “Article 3: Payment Terms”(第三条:付款条款)

RAGFlow 在处理法律文书时,将 条(Article) 作为分块的基本单位,确保所有上层文本都包含在该块中。

laws.jpg

Presentation 分块方法

此方法专门用于处理幻灯片文件,支持 PDF 和 PPTX 格式。

  • 幻灯片的每一页都被视为一个分块,并存储其缩略图;
  • 此分块方法会自动应用于所有上传的 PPT 文件,因此对于 PPT 文件来说无需手动指定;

ppt-1.png

ppt-2.png

One 分块方法

此分块方法将每个文档整体视为一个分块,当需要使用大模型对整个文档进行总结,且模型能够处理该上下文长度时适用。支持的文件格式包括 DOCX、XLSX、XLS、PDF、TXT 等。

Tag 分块方法

这是一种特殊的知识库配置,使用 Tag 作为分块方法的知识库不会参与 RAG 流程,而是充当标签集的角色。其他知识库可以使用它来标记自己的分块,对这些知识库的查询也将使用此标签集进行标记。

此知识库中的每个分块都是一个独立的 描述 - 标签 对,支持 Excel、CSV 和 TXT 文件格式。

  • 如果文件是 Excel 格式,它应包含两列:第一列用于标签描述,第二列用于标签名称;和 Q&A 分块方法一样,文件不需要标题行,且支持多个工作表(Sheet);
  • 如果文件是 CSV 和 TXT 格式,必须采用 UTF-8 编码,且使用制表符(TAB)作为分隔符来分隔描述和标签;
  • 在标签列中可以包含多个标签,使用逗号分隔;

不符合上述规则的文本行将被忽略。

tag-1.jpg

标签集构建完成后,可以在页面下方看到类似这样的标签云:

tag-2.jpg

关于标签集的用法,我们会在后面专门的文章中进行介绍,可以把它视作一种检索优化的手段,而不是分块方法。

其他配置参数

除了分块方法,知识库配置页面还提供了一些其他参数,包括:

  • PDF 解析器(layout_recognize) - 基于 PDF 布局分析的可视化模型,有效地定位文档标题、文本块、图像和表格;支持 DeepDocNative 两种方式;如果选择 Native 选项,将仅检索 PDF 中的纯文本;目前最新版本中还有个实验特性,使用大模型的多模态能力实现该功能;此选项仅适用于 PDF 文档;
  • 建议文本块大小(chunk_token_num) - 推荐的分块大小,如果一个片段的令牌数少于此阈值,它将与后续片段合并,直到令牌总数超过该阈值,此时才会创建一个分块。除非遇到分隔符,否则即使超过阈值也不会创建新的分块;
  • 文本分段标识符(delimiter) - 分隔符或分隔标识可以由一个或多个特殊字符组成。如果是多个字符,确保它们用反引号(``)括起来。例如,如果你像这样配置分隔符:`n`##`;`,那么你的文本将在行尾、双井号(##)和分号处进行分隔;
  • 嵌入模型(embd_id) - 知识库的默认嵌入模型。一旦知识库有了分块,该选择就无法更改。要切换不同的嵌入模型,必须删除知识库中所有现有的分块;

除此之外,还有一些高级配置,比如:

  • 页面排名(pagerank) - 你可以为特定的知识库分配更高的 PageRank 分数,这个分数会加到从这些知识库检索到的文本块的混合相似度分数上,从而提高它们的排名;
  • 自动关键词提取(auto_keywords) - 自动为每个文本块提取 N 个关键词,以提高包含这些关键词的查询的排名,可以在文本块列表中查看或更新为某个文本块添加的关键词;
  • 自动问题提取(auto_questions) - 自动为每个文本块提取 N 个问题,以提高包含这些问题的查询的排名,可以在文本块列表中查看或更新为某个文本块添加的问题;
  • 表格转 HTML(html4excel) - 与 General 分块方法一起使用。禁用时,知识库中的电子表格将被解析为键值对;而启用时,它们将被解析为 HTML 表格,按照每 12 行进行拆分;
  • 标签集 - 选择一个或多个标签知识库,以自动为你的知识库中的分块添加标签;用户查询也将自动添加标签;自动添加标签与自动提取关键词之间的区别:标签知识库是用户定义的封闭集合,而由大模型提取的关键词可被视为开放集合;在运行自动添加标签功能之前,你必须手动上传指定格式的标签集;而自动提取关键词功能依赖于大语言模型,并且会消耗大量的令牌;

ragflow-kbc-2.png

  • 使用召回增强 RAPTOR 策略(use_raptor) - 为多跳问答任务启用 RAPTOR 提高召回效果;

ragflow-kbc-3.png

  • 提取知识图谱(use_graphrag) - 在当前知识库的文件块上构建知识图谱,以增强涉及嵌套逻辑的多跳问答;

ragflow-kbc-4.png

小结

今天我们详细学习了 RAGFlow 的知识库配置。我们首先分析了任务消息的结构,然后重点探讨了 RAGFlow 提供的多种切片方法,如 GeneralQ&ATablePaper 等,并了解了如何根据不同的文档类型选择最合适的配置。此外,我们还介绍了 PDF 解析器、分块大小、嵌入模型以及 PageRank、RAPTOR 等高级设置。

了解这些配置是掌握 RAGFlow 的关键一步。在下一篇文章中,我们将深入 do_handle_task() 函数的实现,揭示 RAGFlow 是如何根据这些配置来具体执行文件解析任务的。


学习 RAGFlow 的文件解析逻辑

昨天我们已经学习了 RAGFlow 文件上传的相关逻辑,今天继续学习文件解析的逻辑。

触发文件解析

文件上传后,在文件列表中会有一个 “解析” 按钮,点击后会触发文件解析:

ragflow-file-list.png

调用接口为 /v1/document/run,其实现逻辑位于 api/apps/document_app.py 文件:

@manager.route("/run", methods=["POST"])
@login_required
def run():
  req = request.json
  for id in req["doc_ids"]:

    # 任务进度清零
    info = {"run": str(req["run"]), "progress": 0}
    DocumentService.update_by_id(id, info)

    if str(req["run"]) == TaskStatus.RUNNING.value:

      # 任务队列
      doc = DocumentService.get_by_id(id)
      bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"])
      queue_tasks(doc, bucket, name, 0)

  return get_json_result(data=True)

这里的代码做了简化,只保留了主要部分,其中 req["run"] 可能是 RUNNING(1)CANCEL(2),表示启动或取消任务。下面的 queue_tasks 函数是触发文件解析的入口,它的实现比较复杂,它根据不同的文件类型和配置,创建和排队文件处理任务,主要逻辑如下:

  1. 如果文件类型为 PDF 文件,默认每 12 页创建一个任务,可以通过 parser_config 中的 task_page_size 来修改;如果 paper_idpaper,表示使用论文分块方法,则按每 22 页创建任务;如果 paper_idoneknowledge_graph,表示不分块或提取知识图谱,则整个文件创建一个任务;
  2. 如果 paper_idtable,表示文件类型为 Excel、CSV 等表格文件,则每 3000 行创建一个任务;
  3. 对于其他类型,则整个文件创建一个任务;

此外,它会根据 doc_id 获取文档的分块配置,计算任务的摘要,并检查之前是否已经执行过一样的任务,如果有,则直接复用之前已经执行过的任务结果,提高处理效率。

最后将任务列表批量插入到数据库,并标记文档开始解析,然后从任务列表中筛选出未完成的任务,加入 Redis 队列:

def queue_tasks(doc: dict, bucket: str, name: str, priority: int):

  # 根据不同的文件类型和配置,创建任务
  parse_task_array = []
  if doc["type"] == FileType.PDF.value:
    # ...
  elif doc["parser_id"] == "table":
    # ...
  else:
    # ...

  # 批量插入任务到数据库
  bulk_insert_into_db(Task, parse_task_array, True)

  # 标记文档开始解析
  DocumentService.begin2parse(doc["id"])

  # 将未完成的任务加入 Redis 队列
  unfinished_task_array = [task for task in parse_task_array if task["progress"] < 1.0]
  for unfinished_task in unfinished_task_array:
    assert REDIS_CONN.queue_product(
      get_svr_queue_name(priority), message=unfinished_task
    ), "Can't access Redis. Please check the Redis' status."

投递解析任务

上面通过 REDIS_CONN.queue_product() 将任务加入 Redis 队列,队列名为 rag_flow_svr_queuerag_flow_svr_queue_1。RAGFlow 支持任务优先级,默认 priority 为 0,任务会被投递到 rag_flow_svr_queue,如果 priority 为 1,则会被投递到 rag_flow_svr_queue_1

函数 queue_product() 的实现如下:

def queue_product(self, queue, message) -> bool:
  for _ in range(3):
    try:
      payload = {"message": json.dumps(message)}
      self.REDIS.xadd(queue, payload)
      return True
    except Exception as e:
      logging.exception(
        "RedisDB.queue_product " + str(queue) + " got exception: " + str(e)
      )
  return False

它通过 Redis 的 XADD 命令将任务加入到队列中,重试 3 次。

学习 Redis 的 Stream 数据类型

Redis Stream 是 Redis 5.0 引入的一个强大的数据结构,专门用于处理流式数据和消息队列。它适用于下面这些场景:

  • 实时数据处理:适合处理传感器数据、日志流、用户行为事件等实时数据;
  • 消息队列:可替代传统的 List 作为消息队列,提供更强的功能和可靠性保证;
  • 事件溯源:支持回放历史消息,适合需要事件重放的业务场景;
  • 微服务通信:在微服务架构中作为轻量级的消息中间件使用;

Redis Stream 相比传统的 List 和 Pub/Sub,提供了更完善的消息处理机制,包括消息持久化、消费进度跟踪、消费者组协作等企业级特性:

  • 持久化日志结构:Stream 以只追加的方式存储数据,每条消息都有唯一的 ID 和时间戳,类似于 Apache Kafka 的日志结构;
  • 消息格式:每条消息由 ID 和 Value 组成,Value 是一个字典,包含多对键值对;
  • 消费者组(Consumer Groups):支持多个消费者协作处理消息,提供负载均衡和故障转移能力,每个消费者组独立跟踪消费进度;

redis-streams.png

下面我们简单学习下 Redis Stream 的基本用法。

使用 XADD 生产消息

首先是通过 XADD 命令 生产消息,该命令格式如下:

redis-streams-xadd.jpg

其中 mystream 是这个 Stream 的名称,* 表示自动生成 ID,后面的内容是 Stream 的值,由多个键值对组成。我们创建 Stream 并添加一些测试消息:

127.0.0.1:6379> XADD mystream * sensor_id temp_01 temperature 25.3
"1752285099426-0"
127.0.0.1:6379> XADD mystream * sensor_id temp_02 temperature 26.1
"1752285103416-0"
127.0.0.1:6379> XADD mystream * sensor_id temp_01 temperature 24.8
"1752285107329-0"

使用 XLEN 命令 获取 Stream 长度:

127.0.0.1:6379> XLEN mystream
(integer) 3

使用 XREAD 消费消息

有多种消费 Redis Stream 的方法,最简单的是使用 XREAD 命令

127.0.0.1:6379> XREAD COUNT 1 BLOCK 1000 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1752285099426-0"
         2) 1) "sensor_id"
            2) "temp_01"
            3) "temperature"
            4) "25.3"

其中 COUNT 表示一次消费的数量,不指定则消费所有消息;BLOCK 表示阻塞等待时间(毫秒),置为 0 表示无限等待;STREAMS 后面跟要消费的 Stream 名称;最后的 0 表示从头开始消费。从输出结果可以看到,我们成功拿到了队列中的第一条消息,要获取后续消息,可以拿着这个 ID 继续消费:

127.0.0.1:6379> XREAD COUNT 1 BLOCK 1000 STREAMS mystream 1752285099426-0
1) 1) "mystream"
   2) 1) 1) "1752285103416-0"
         2) 1) "sensor_id"
            2) "temp_02"
            3) "temperature"
            4) "26.1"

使用消费者组消费消息

Redis Stream 的一大特色是支持消费者组,每个消费者组都有自己的消费进度,互不影响,同一个消费者组允许多个消费者协作处理消息。通过 XGROUP CREATE 命令 创建消费者组:

127.0.0.1:6379> XGROUP CREATE mystream mygroup 0
OK

其中 mygroup 是消费者组的名称,0 表示从头开始消费。然后我们就可以使用 XREADGROUP 命令 消费消息了:

127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1752285099426-0"
         2) 1) "sensor_id"
            2) "temp_01"
            3) "temperature"
            4) "25.3"

其中 consumer1 是消费者名称,后面指定要读取的 Stream 和起始位置:

  • > 表示读取新消息
  • 0 表示读取所有待确认消息
  • 具体 ID 表示从该 ID 之后读取

XREAD 方便的是,我们可以一直使用 > 来读取新消息,而不需要关心上一次读取的 ID,Redis 会自动记录消费进度:

127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1752285103416-0"
         2) 1) "sensor_id"
            2) "temp_02"
            3) "temperature"
            4) "26.1"

Redis Stream 支持多消费者协作,我们可以换一个消费者 consumer2 继续读取:

127.0.0.1:6379> XREADGROUP GROUP mygroup consumer2 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1752285107329-0"
         2) 1) "sensor_id"
            2) "temp_01"
            3) "temperature"
            4) "24.8"

处理待确认消息

和传统的消息队列一样,消费 Redis Stream 之后需要确认消息已经处理完成,否则消息会一直是未确认状态,通过 XPENDING 命令 查看待确认的消息:

127.0.0.1:6379> XPENDING mystream mygroup
1) (integer) 3
2) "1752285099426-0"
3) "1752285107329-0"
4) 1) 1) "consumer1"
      2) "2"
   2) 1) "consumer2"
      2) "1"

可以看到 consumer1 有 2 条未确认消息,consumer2 有 1 条未确认消息。使用 XACK 命令 确认消息:

127.0.0.1:6379> XACK mystream mygroup 1752285099426-0
(integer) 1

关于 Redis Stream 的更多内容,可以参考下面的官方文档:

了解了 Redis Stream 的基本知识,可以帮助我们更好的理解 RAGFlow 任务执行器的实现原理。

任务执行器

我们之前提过,RAGFlow 由 API 服务器(API Server)任务执行器(Task Executor) 两大核心组成,上面投递到 Redis 中的任务,就是由任务执行器来消费和处理的。任务执行器的入口位于 rag/svr/task_executor.py 文件,它是一个 Trio 异步程序:

import trio

# 控制并发任务数
MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5"))
task_limiter = trio.Semaphore(MAX_CONCURRENT_TASKS)

async def main():
  async with trio.open_nursery() as nursery:
    
    # 定时汇报任务状态
    nursery.start_soon(report_status)
    
    # 启动任务管理器
    while not stop_event.is_set():
      await task_limiter.acquire()
      nursery.start_soon(task_manager)
  logging.error("BUG!!! You should not reach here!!!")

程序首先通过 trio.open_nursery() 创建了一个 nursery 对象(中文翻译 “托儿所”),用于管理异步任务。然后使用 start_soon() 启动一个定时任务,每隔一段时间汇报一次任务状态;最后使用 start_soon() 启动任务管理器,通过信号量控制并发任务数,默认为 5,可以通过环境变量 MAX_CONCURRENT_TASKS 修改。

启动任务执行器时有一个参数 --workers=5 表示启动 5 个 worker 进程,每个进程内部都是一个 Trio 程序,由上面的代码可知,每个进程内部又会启动 5 个并发的任务,所以实际上一共有 25 个处理线程。

任务管理器的核心逻辑位于 handle_task() 函数中:

async def handle_task():

  # 从 Redis 中读取任务
  redis_msg, task = await collect()

  # 处理任务
  logging.info(f"handle_task begin for task {json.dumps(task)}")
  await do_handle_task(task)
  logging.info(f"handle_task done for task {json.dumps(task)}")

  # 确认任务完成
  redis_msg.ack()

这里的一系列操作就使用了上面我们学习的 Redis Stream 相关知识。它首先通过 collect() 函数从 Redis 中读取任务:

ragflow-task-executor-collect.png

这里的消费者组名称为 rag_flow_svr_task_broker,消费者名称为 "task_executor_" + CONSUMER_NO,其中 CONSUMER_NO 就是启动任务执行器时传入的 ${host_id}_${consumer_id} 参数,用于唯一标识每个消费者。

collect() 函数优先读取未确认的消息(调用 XREADGROUPID 传 0),当没有未确认消息时,再读取新消息(调用 XREADGROUPID>),读取消息之后,调用 do_handle_task() 函数处理任务,最后通过 redis_msg.ack() 确认任务完成。

小结

本文深入探讨了 RAGFlow 的文件解析流程。我们首先学习了如何通过 API 触发解析任务,这些任务随后被智能地拆分并放入 Redis Stream 消息队列中。接着,我们详细了解了 Redis Stream 的工作原理,并分析了任务执行器如何利用 trio 异步框架和消费者组机制,高效、可靠地消费和处理这些任务。

在下一篇文章中,我们将继续研究 do_handle_task() 函数的实现,深入了解 RAGFlow 是如何具体执行每个解析任务的。


学习 RAGFlow 的文件上传逻辑

在上一篇中,我们学习了 RAGFlow 的系统架构和启动流程,了解了 RAGFlow 的 API 服务器(API Server)任务执行器(Task Executor) 两大核心组件,一个负责提供外部接口和平台基本功能,另一个则负责文件的解析和切片处理。

从系统架构图中,我们可以看到 RAGFlow 的核心流程包括 文件上传 -> 文件解析 -> 文件分块 -> 多路召回 -> 重排序 -> 大模型回答 这些步骤,今天我们就从源码的角度,先来学习下文件上传的相关逻辑。

文件上传接口实现

文件上传接口为 /v1/document/upload,其实现逻辑位于 api/apps/document_app.py 文件:

@manager.route("/upload", methods=["POST"])
@login_required
def upload():
  kb_id = request.form.get("kb_id")
  file_objs = request.files.getlist("file")
  
  # 根据 kb_id 查询知识库
  e, kb = KnowledgebaseService.get_by_id(kb_id)

  # 上传文件到指定知识库
  err, files = FileService.upload_document(kb, file_objs, current_user.id)

  return get_json_result(data=files)

其中 FileServiceupload_document 实现如下:

@classmethod
@DB.connection_context()
def upload_document(self, kb, file_objs, user_id):
  
  # 初始化知识库目录结构:/.knowledgebase/kb_name
  root_folder = self.get_root_folder(user_id)
  pf_id = root_folder["id"]
  self.init_knowledgebase_docs(pf_id, user_id)
  kb_root_folder = self.get_kb_folder(user_id)
  kb_folder = self.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])

  err, files = [], []
  for file in file_objs:
    # 文件重名处理,a.pdf -> a(1).pdf
    filename = duplicate_name(DocumentService.query, name=file.filename, kb_id=kb.id)

    # 读取文件内容,如果是 PDF 尝试对格式错误的文件进行修复
    blob = file.read()
    if filetype == FileType.PDF.value:
      blob = read_potential_broken_pdf(blob)
    
    # 上传文件到对象存储
    location = filename
    STORAGE_IMPL.put(kb.id, location, blob)

    # 生成文件缩略图
    doc_id = get_uuid()
    img = thumbnail_img(filename, blob)
    thumbnail_location = ""
    if img is not None:
      thumbnail_location = f"thumbnail_{doc_id}.png"
      STORAGE_IMPL.put(kb.id, thumbnail_location, img)

    # 保存到 document 表
    doc = {
      "id": doc_id,
      "kb_id": kb.id,
      "parser_id": self.get_parser(filetype, filename, kb.parser_id),
      "parser_config": kb.parser_config,
      "created_by": user_id,
      "type": filetype,
      "name": filename,
      "location": location,
      "size": len(blob),
      "thumbnail": thumbnail_location,
    }
    DocumentService.insert(doc)

    # 保存到 file 表
    FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id)
    files.append((doc, blob))

  return err, files

这里有几个值得注意的点,我们来逐一分析下。

知识库目录结构

RAGFlow 除了知识库管理之外,还有一个文件管理功能:

ragflow-file-management.png

用户可以直接在这里创建目录和上传文件,然后点击右边的 “链接知识库” 将其挂到某个知识库下。和直接在知识库中上传文件相比,在这里管理文件的好处是,一份文件可有链接到多个知识库,且知识库删除时文件不会被删除。在知识库中上传的文件也会出现在文件管理里,每个知识库在 /.knowledgebase 目录下都有一个对应的文件夹,只不过该文件夹是只读的,不允许用户在其中创建子文件夹或对文件进行修改。

整个文件的目录结构如下所示:

/
├── Folder 1
│   ├── File 11
│   └── File 12
├── Folder 2
│   ├── File 21
│   └── File 22
└── .knowledgebase
    ├── kb1
    │   ├── File 11
    │   └── File 12
    └── kb2
        ├── File 21
        └── File 22

此外,从文件管理上传的文件会在 file 表中插入记录,而在知识库中上传的文件会同时在 filedocument 表中插入记录,并通过 file2document 表维护两者之间的关系,这也是上面最后几句代码的作用。

修复 PDF 格式错误

PDF 文件的处理是一件非常棘手的问题,RAGFlow 在上传 PDF 文件的时候,会检查文件能否正常打开,如果有问题,则尝试用 Ghostscript 对其进行修复:

def read_potential_broken_pdf(blob):
  def try_open(blob):
    try:
      with pdfplumber.open(BytesIO(blob)) as pdf:
        if pdf.pages:
          return True
    except Exception:
      return False
    return False

  if try_open(blob):
    return blob

  repaired = repair_pdf_with_ghostscript(blob)
  if try_open(repaired):
    return repaired

  return blob

修复的逻辑很简单,就是执行 Ghostscript 命令:

$ gs -o <outfile> -sDEVICE=pdfwrite -dPDFSETTINGS=/prepress <infile>

我们之前在学习 PDFMathTranslate 时,了解到它有一个兼容模式,通过 pikepdf 将 PDF 转换为 PDF/A 格式,可以提高 PDF 文件的兼容性。感觉这也算一种修复 PDF 文件的方式,只是不知道二者之间有何区别。

缩略图的生成

RAGFlow 支持为不同格式的文件生成缩略图,可以学习下它这里不同文件的实现。

比如使用 pdfplumberPage.to_image() 生成 PDF 文件的缩略图:

import pdfplumber
pdf = pdfplumber.open(BytesIO(blob))

buffered = BytesIO()
resolution = 32
pdf.pages[0].to_image(resolution=resolution).annotated.save(buffered, format="png")
img = buffered.getvalue()

pdf.close()
return img

使用 PILImage.thumbnail() 生成图片的缩略图:

from PIL import Image
image = Image.open(BytesIO(blob))

image.thumbnail((30, 30))
buffered = BytesIO()
image.save(buffered, format="png")

return buffered.getvalue()

使用 aspose-slidesSlide.get_thumbnail() 生成 PPT 文件的缩略图:

import aspose.pydrawing as drawing
import aspose.slides as slides

with slides.Presentation(BytesIO(blob)) as presentation:
  buffered = BytesIO()
  scale = 0.03
  img = None
  presentation.slides[0].get_thumbnail(scale, scale).save(buffered, drawing.imaging.ImageFormat.png)
  img = buffered.getvalue()
  return img

文件存储的实现

从代码可以看到,这里通过 STORAGE_IMPL.put(...) 将文件上传到对象存储,RAGFlow 默认使用 Minio 存储,可以在浏览器里输入 http://localhost:9001 来访问它:

minio-login.jpg

默认用户为 rag_flow,密码为 infini_rag_flow,登录进去后可以浏览 RAGFlow 的所有的文件:

minio-files.png

其中桶名就是知识库的 ID,Key 就是文件的名称。

RAGFlow 支持多种不同的文件存储实现,除了 Minio 还支持下面这些:

可以在 .env 文件中通过 STORAGE_IMPL 变量来切换其他的存储实现。比如使用阿里云的 OSS 存储,需要在 .env 文件中添加下面的配置:

STORAGE_IMPL=OSS

同时修改 service_conf.yaml.template 中对应的 oss 配置:

oss:
  access_key: 'access_key'
  secret_key: 'secret_key'
  endpoint_url: 'http://oss-cn-hangzhou.aliyuncs.com'
  region: 'cn-hangzhou'
  bucket: 'bucket_name'

小结

我们今天学习了 RAGFlow 的文件上传逻辑,了解了 RAGFlow 是如何组织知识库的目录结构、如何修复 PDF 格式错误、如何生成不同文件的缩略图、以及如何切换不同的文件存储等相关内容。文件上传之后,自然就要对其进行解析处理了,我们明天继续吧。


学习 RAGFlow 的系统架构

昨天,我们学习了 RAGFlow 的安装配置和基本使用,通过创建一个知识库并上传文档,完整地体验了 RAGFlow 从数据处理到智能问答的基本工作流程。作为一个 RAG 系统,这套流程也是 RAGFlow 的核心流程,下面是 RAGFlow 的系统架构图:

ragflow-architecture.png

上面的架构图中省略了中间件部分,包括 ES、MySQL、Redis 和 Minio 等,仅展示了 RAGFlow 的两个核心服务:API 服务器(API Server)任务执行器(Task Executor),其中 API 服务器负责提供外部接口,包括知识库管理、文件管理、搜索、聊天等功能,而任务执行器则负责文件的解析和切片任务,正所谓 Quality in, quality out,它的深度文档理解和智能文本切片是 RAGFlow 的关键特性。

今天我们就从物理部署的角度来看看 RAGFlow 的这两个服务。

深入 entrypoint.sh 脚本

我们昨天学习了构建 RAGFlow 镜像的过程,感兴趣的同学可以研究下 Dockerfile 文件,它通过 多阶段构建(Multi-stage builds) 技巧,将构建过程分成基础(base)、构建(builder)、生产(production)三个阶段,大概的文件结构如下:

# --------
# 基础阶段
# --------
FROM ubuntu:22.04 AS base
USER root
WORKDIR /ragflow

# 从资源镜像拷贝模型资源
# 安装所需的系统类库
# 安装 Python Git Nginx 等软件 ...
# 安装 JDK、Node.js 等 ...

# --------
# 构建阶段
# --------
FROM base AS builder

# 安装 Python 依赖...
# 编译 Web 页面 ...

# --------
# 生产阶段
# --------
FROM base AS production

# 拷贝 Python 包
# 拷贝 Web 页面 ...

ENTRYPOINT ["./entrypoint.sh"]

从最后的生产阶段可以看出,RAGFlow 镜像的入口文件为 /ragflow/entrypoint.sh,它的用法如下:

function usage() {
  echo "Usage: $0 [--disable-webserver] [--disable-taskexecutor] [--consumer-no-beg=<num>] [--consumer-no-end=<num>] [--workers=<num>] [--host-id=<string>]"
  echo
  echo "  --disable-webserver             Disables the web server (nginx + ragflow_server)."
  echo "  --disable-taskexecutor          Disables task executor workers."
  echo "  --enable-mcpserver              Enables the MCP server."
  echo "  --consumer-no-beg=<num>         Start range for consumers (if using range-based)."
  echo "  --consumer-no-end=<num>         End range for consumers (if using range-based)."
  echo "  --workers=<num>                 Number of task executors to run (if range is not used)."
  echo "  --host-id=<string>              Unique ID for the host (defaults to \`hostname\`)."
  echo
  echo "Examples:"
  echo "  $0 --disable-taskexecutor"
  echo "  $0 --disable-webserver --consumer-no-beg=0 --consumer-no-end=5"
  echo "  $0 --disable-webserver --workers=2 --host-id=myhost123"
  echo "  $0 --enable-mcpserver"
  exit 1
}

可以看到这个镜像可以以多种方式启动:

  • --disable-taskexecutor 禁用任务执行器,仅启动 API 服务器
  • --disable-webserver 禁用 API 服务器,仅启动任务执行器
  • --enable-mcpserver 启动 MCP 服务器

RAGFlow 默认会在一个容器中同时启动 API 服务器和任务执行器,便于开发和测试,但是在生产环境中我们可以灵活地根据需要选择启动方式,将两者分开部署。

仅启动 API 服务器

我们可以修改 docker/docker-compose.yml 文件中的启动参数来做到这一点:

services:
  ragflow:
    image: ${RAGFLOW_IMAGE}
    command:
      - --disable-taskexecutor
    container_name: ragflow-server
    # 其他配置 ...

entrypoint.sh 文件中,启动 API 服务器的代码如下:

if [[ "${ENABLE_WEBSERVER}" -eq 1 ]]; then
  echo "Starting nginx..."
  /usr/sbin/nginx

  echo "Starting ragflow_server..."
  while true; do
    "$PY" api/ragflow_server.py
  done &
fi

首先启动 Nginx,然后执行 ragflow_server.py 脚本,它是一个基于 Flask 开发的 Web 服务,默认监听 9380 端口。这里的 while true; do ... done & 的写法挺有意思,while true 表示无限循环,& 表示将脚本放入后台执行,这样做可以确保服务进程在崩溃或异常退出后能够自动重启,通过这种纯 Shell 的方式实现自动恢复机制,不依赖任何第三方进程管理器(如 systemdsupervisor)。

Nginx 用于托管 Web 前端页面以及透传 API 服务器的 HTTP 请求,它的配置位于 ragflow.conf 文件中,内容如下:

server {
  listen 80;
  server_name _;
  root /ragflow/web/dist;

  gzip on;

  location ~ ^/(v1|api) {
    proxy_pass http://ragflow:9380;
    include proxy.conf;
  }

  location / {
    index index.html;
    try_files $uri $uri/ /index.html;
  }

  # Cache-Control: max-age~@~AExpires
  location ~ ^/static/(css|js|media)/ {
    expires 10y;
    access_log off;
  }
}

如果要对外提供 HTTPS 服务,可以将 docker/docker-compose.yml 文件中的 ragflow.conf 替换成 ragflow.https.conf,并将证书文件挂到容器中:

services:
  ragflow:
    volumes:
      # 证书文件
      - /path/to/fullchain.pem:/etc/nginx/ssl/fullchain.pem:ro
      - /path/to/privkey.pem:/etc/nginx/ssl/privkey.pem:ro
      # 使用 ragflow.https.conf 替换 ragflow.conf
      - ./nginx/ragflow.https.conf:/etc/nginx/conf.d/ragflow.conf
      # 其他配置 ...

同时编辑 nginx/ragflow.https.conf 文件,将 my_ragflow_domain.com 替换成你真实的域名。然后重启服务即可:

$ docker-compose down
$ docker-compose up -d

仅启动任务执行器

当处理的文档数量很多时,将任务执行器单独部署多个实例可以提高文档解析的速度。我们可以修改 docker/docker-compose.yml 文件,将 ragflow 配置复制一份出来,仅启动任务执行器:

services:
  ragflow_task_executor:
    image: ${RAGFLOW_IMAGE}
    command:
      - --disable-webserver
      - --workers=5
    container_name: ragflow-task-executor
    # 其他配置 ...

我们可以通过 --workers 参数来指定启动的 worker 数量。启动任务执行器的代码如下:

if [[ "${ENABLE_TASKEXECUTOR}" -eq 1 ]]; then
    echo "Starting ${WORKERS} task executor(s) on host '${HOST_ID}'..."
    for (( i=0; i<WORKERS; i++ ))
    do
        task_exe "${i}" "${HOST_ID}" &
    done
fi

每个 worker 都会启动一个独立的进程,其中 task_exe() 函数定义如下:

function task_exe() {
    local consumer_id="$1"
    local host_id="$2"

    JEMALLOC_PATH="$(pkg-config --variable=libdir jemalloc)/libjemalloc.so"
    while true; do
        LD_PRELOAD="$JEMALLOC_PATH" \
        "$PY" rag/svr/task_executor.py "${host_id}_${consumer_id}"
    done
}

这里也用了 while true 的技巧,防止 worker 进程异常退出,每个 worker 进程执行 task_executor.py 脚本,并将 ${host_id}_${consumer_id} 作为参数传入。任务执行器是一个基于 Trio 异步库开发的命令行程序,它通过监听 Redis 消息队列,对用户上传的文件进行解析处理。这里的 ${host_id} 是当前的主机名,${consumer_id} 是指 worker 的序号,拼接起来用于区分不同的消费者。

启动 MCP 服务器

RAGFlow 还支持 MCP 服务器,开启方法很简单,只需将 docker/docker-compose.yml 文件中 services.ragflow.command 部分的注释去掉即可:

services:
  ragflow:
    image: ${RAGFLOW_IMAGE}
    command:
      - --enable-mcpserver
      - --mcp-host=0.0.0.0
      - --mcp-port=9382
      - --mcp-base-url=http://127.0.0.1:9380
      - --mcp-script-path=/ragflow/mcp/server/server.py
      - --mcp-mode=self-host
      - --mcp-host-api-key=ragflow-xxxxxxx

关于 RAGFlow MCP 服务器的使用,我们今天暂且跳过,后面单开一篇介绍。

小结

通过今天的学习,我们了解了 RAGFlow 的系统架构,以及如何通过 entrypoint.sh 脚本启动不同的服务。接下来,我们将继续剖析 RAGFlow 的源码,探索 API 服务器和任务执行器的实现原理。


RAGFlow 快速入门

在构建高级 AI 应用时,检索增强生成(RAG)已成为一项关键技术,它能让大语言模型(LLM)利用外部知识库,提供更准确、更具上下文的回答。然而,如何高效地处理和理解格式各异的复杂文档(如 PDF、Word、PPT 等),并从中提取高质量信息,一直是 RAG 应用落地的一大挑战。

今天,我们将介绍一款强大的开源 RAG 引擎 —— RAGFlow,它专为解决这一难题而生。RAGFlow 基于深度文档理解技术,能够为企业和个人提供一套精简、高效的 RAG 工作流程,让 AI 应用能够从海量复杂数据中高质量地提取信息,真正做到 Quality in, quality out

ragflow-logo.png

RAGFlow 的核心特性如下:

  • 深度文档理解:不仅仅是提取文本,RAGFlow 能够深入理解各类复杂文档的布局和结构,确保从 PDF、Word、PPT 等文件中提取高质量、有价值的信息;
  • 智能文本切片:提供基于模板的文本切片方法,不仅智能,而且整个过程清晰可控,方便解释和调整;
  • 有理有据的回答:生成的回答都附带关键引用的快照,并支持追根溯源,最大限度地减少了 AI 幻觉;
  • 广泛的异构数据支持:兼容各类数据源,包括 Word 文档、PPT、Excel 表格、PDF、图片、网页,甚至是扫描件;
  • 自动化的 RAG 工作流:提供从数据处理、多路召回到融合重排序的全自动化 RAG 工作流,并支持灵活配置大语言模型和向量模型,提供易用的 API,方便与现有系统集成;

本文将带你快速入门 RAGFlow,学习如何安装、配置并使用它来构建你自己的 RAG 应用。

安装与上手

安装 RAGFlow 最简单的方法是使用 Docker 和 Docker Compose。首先检查我们的电脑上已经安装了它们:

$ docker --version
Docker version 24.0.2, build cb74dfc

$ docker compose version
Docker Compose version 2.38.1

确保 Docker 的版本在 24.0.0 以上,Docker Compose 的版本在 v2.26.1 以上。然后克隆 RAGFlow 仓库:

$ git clone https://github.com/infiniflow/ragflow.git

进入 docker 文件夹:

$ cd ragflow/docker

这个文件夹下有几个比较重要的文件:

  • docker-compose.yml - 定义了 RAGFlow 的镜像和配置,这个文件通过 Docker Compose 的 include 语法引用了 docker-compose-base.yml 文件,因此启动时只需指定这个入口文件即可
  • docker-compose-base.yml 定义了 RAGFlow 依赖的中间件的镜像和配置,包括 ES、MySQL、Redis 和 Minio 等
  • .env - 通过环境变量修改启动配置,比如调整各组件的端口,用户名和密码,默认镜像等,在 macOS 电脑上可以将 MACOS=1 打开,如果访问不了 huggingface.co 可以开启 HF_ENDPOINT=https://hf-mirror.com 参数

配置确认无误后,使用 Docker Compose 一键启动:

$ docker compose -f docker-compose.yml up -d

启动时默认会拉取官方构建好的 infiniflow/ragflow:v0.19.1-slim 镜像,该镜像比较大,下载要花点时间。启动成功后如下:

ragflow-containers.png

构建 ARM64 镜像

目前官方提供的 Docker 镜像均基于 x86 架构构建,并不提供基于 ARM64 的 Docker 镜像,比如在我的 macOS 上启动后,容器的下面会显示一个 AMD64 的标签。如果你的 Docker 和我一样,开启了 QEMU 或 Apple 的 Virtualization Framework 虚拟化技术,在 ARM64 机器上也可以跑 x86 的镜像,就是速度有点慢。

docker-desktop-setting.png

当然你也可以自行构建 ARM64 架构的镜像,顺便也能看看镜像中隐藏的一些细节,参考这篇文档:

首先,下载构建镜像所需的资源:

$ uv run download_deps.py

下载的资源包括:

  • 几个库文件

    • libssl
    • tika-server-standard.jar
    • cl100k_base.tiktoken
    • chrome 和 chromedriver
  • 几个 nltk_data 资源

    • wordnet
    • punkt,
    • punkt_tab
  • 几个 huggingface 模型

    • InfiniFlow/text_concat_xgb_v1.0
    • InfiniFlow/deepdoc
    • InfiniFlow/huqie
    • BAAI/bge-large-zh-v1.5
    • maidalun1020/bce-embedding-base_v1

然后构建资源镜像(就是将刚刚下载的资源拷贝到基础镜像里):

$ docker build -f Dockerfile.deps -t infiniflow/ragflow_deps .

然后基于资源镜像构建 RAGFlow 镜像:

$ docker build --build-arg LIGHTEN=1 -f Dockerfile -t infiniflow/ragflow:nightly-slim .

构建完成后,打开 docker/.env 文件,找到 RAGFLOW_IMAGE 配置,将其修改为 infiniflow/ragflow:nightly-slim。最后,使用 Docker Compose 一键启动:

$ cd docker
$ docker compose -f docker-compose-macos.yml up -d

RAGFlow 登录

启动后,查看 RAGFlow 容器的日志,当显示如下的文字 LOGO 时,说明启动成功:

ragflow-start-log.png

RAGFlow 默认监听本地 80 端口,直接用浏览器打开 http://localhost 即可,进入 RAGFlow 的登录页面:

ragflow-login.jpg

吐槽下 RAGFlow 的登录页面,这背景图选的,文字都看不清。

第一次使用需要注册一个新账号,注册完成后使用新账号登录即可:

ragflow-login-success.png

RAGFlow 初体验

进入 RAGFlow 的第一件事是配置模型,点击右上角的头像,然后进入 “模型供应商” 页面:

ragflow-model-setting.png

从下面的列表中选择并添加自己的模型,根据不同的模型,需要配置 API Key 等不同的参数。然后设置默认模型:

ragflow-model-setting-default.png

RAGFlow 支持大量的模型供应商,这些模型按功能被划分成几类:

  • 聊天模型
  • 嵌入模型
  • Img2txt模型
  • Speech2txt模型
  • Rerank模型
  • TTS模型

根据需要配置这些模型,一般来讲,除了聊天模型和嵌入模型是必填的,其他的可以不填;配置完默认模型后,就可以体验 RAGFlow 的功能了。进入 “知识库” 页面,创建一个新知识库:

ragflow-new-kb.png

然后点击 “新增文件” 按钮,从本地上传一个文件,上传后点击解析按钮,只有解析成功后的文件才可以对其问答,文件解析完成后如下所示:

ragflow-kb-files.png

我们再进入 “聊天” 页面,点击 “新建助理” 创建一个聊天助手:

ragflow-new-chat.png

下面的知识库选择我们刚刚创建的知识库,创建成功后,就可以和它进行对话了:

ragflow-kb-chat.png

小结

在本文中,我们对 RAGFlow 进行了快速入门。我们学习了 RAGFlow 的核心特性,讲解了如何通过 Docker Compose 进行安装部署,并为 ARM64 用户提供了详细的镜像构建指南。在完成模型供应商的配置后,我们通过创建一个知识库并上传文档,完整地体验了 RAGFlow 从数据处理到智能问答的基本工作流程。

通过今天的学习,我们对 RAGFlow 已经有了初步的了解。在后续的文章中,我们将结合源码深入其核心,探索更多高级功能,例如深度文档理解、智能文本切片、自动化 RAG 工作流等。


Gemini CLI vs. Claude Code 功能对比

经过两周多的深度学习和体验,我们对 Claude Code 的基本功能和特性已经有了基本了解。而正在我们学习 Claude Code 的期间,Google 开源了另一款终端 AI 助手 ———— Gemini CLI,作为 Claude Code 的开源平替,它接入了 Google 顶级的 Gemini 2.5 Pro 大模型,而且使用个人账号登录即可免费使用,支持每分钟最多 60 次、每日最多 1000 次请求,引起了业界的广泛关注,仅用几天时间就获得了 52k+ 的星标。

刚开始学习 Claude Code 的时候,我本来想着充 5 美刀体验体验,到后来又充了 10 美刀,直到最后,两周时间一共花了我 35 美刀,实在是贵的肉疼。相对来说,开源且免费的 Gemini CLI 用起来是真香,所以 Gemini CLI 发布后的第一时间,我也赶紧试用了一番,这篇文章就对两者的功能做一个全面的对比。

安装与上手

Gemini CLI 的安装和 Claude Code 一样便捷,都依赖于 npm 进行全局安装:

$ npm install -g @google/gemini-cli

安装后,只需在终端输入 gemini,首次运行会引导用户进行 Google 账户授权和主题选择,整个过程和 Claude Code 几乎没有区别,完成后进入交互模式:

gemini-cli.png

下面将从几个重要的维度对 Gemini CLI 和 Claude Code 的功能进行对比。

代码理解与交互

Gemini CLI 和 Claude Code 这两款工具都具备强大的代码理解能力,支持超长的上下文窗口,能够轻松应对大型项目。它们都允许用户通过 @ 符号引用文件和目录,并提供自动补全功能,使得与代码库的对话变得异常简单。

Gemini CLI 在与代码交互时,更侧重于直接的问答和指令执行;Claude Code 则更进一步,它通过与 IDE 的深度集成,能够获取更丰富的上下文信息,例如当前打开的文件、光标选中的代码、甚至是 IDE 的诊断信息。这使得 Claude Code 在进行代码修复或重构时,能够提供更精准、更贴合当前开发场景的建议。

上手第一个问题,必定是让它 “介绍下这个项目”:

gemini-cli-first-q.png

可以看到 Gemini CLI 也能很好的完成这个工作,不过相对于 Claude Code 感觉还是差点意思,缺少了中间读取文件的过程:

claude-code-summarize.png

内置工具与扩展性

Gemini CLI 内置了一系列实用的工具,涵盖了文件查找 (FindFiles/ReadFolder)、内容搜索 (SearchText)、文件读取(ReadFile/ReadManyFiles)、文件编辑 (Edit)、文件创建 (WriteFile)、Shell 命令执行 (Shell) 、网页抓取 (WebFetch) 、联网搜索(GoogleSearch)和保存记忆(Save Memory)等常用功能。

可以在交互模式下输入 /tools 斜杠命令查看:

gemini-cli-tools.png

而 Claude Code 的内置工具集更为丰富和专业,多达 16 个。除了 Gemini CLI 包含的基础功能外,它还提供了:

  • 更精细的文件操作: MultiEdit 用于批量修改,NotebookRead/Edit 用于原生支持 Jupyter Notebook 操作;
  • 更高效的文件检索Grep 工具通过 ripgrep 替代 grep 在检索大型代码库时效率更高;
  • 任务管理与规划: TodoWrite/Read 用于生成和跟踪任务列表,Task (子智能体) 用于执行深度搜索和研究,exit_plan_mode (规划模式) 用于在执行复杂任务前制定详细计划;
  • Git/GitHub 集成: 内置了对 gitgh (GitHub CLI) 的深度支持,能够理解 PR、审查代码、处理评论等;

关于 Claude Code 的这些功能,可以参考我之前的文章。尽管如此,Gemini CLI 提供的这些基础工具已经可以完成绝大多数工作了,其中,保存记忆(Save Memory 是 Gemini CLI 的一个比较有特色的工具,我们可以通过自然语言让 Gemini CLI 记住我们的个人偏好或项目设置等信息:

gemini-cli-save-memory.png

也可以通过斜杠命令 /memory add 手动添加:

> /memory add "your fact here"

添加的记忆内容将保存在用户目录下的 ~/.gemini/GEMINI.md 文件中,查看该文件的内容如下:

$ cat ~/.gemini/GEMINI.md

## Gemini Added Memories
- My best programming language is Python.

后续对话时 Gemini CLI 会自动填充记忆内容。我们也可以在项目的根目录手动创建记忆文件,供整个项目团队共用。

很显然,这个功能和 Claude Code 的 CLAUDE.md 非常类似,只不过 Claude Code 并没有把它抽象成工具,而是通过 /init 命令初始化记忆以及 # 快捷键来手动添加记忆。

此外,在扩展性方面,Gemini CLI 也支持 MCP 协议,允许用户集成外部工具和服务,这和 Claude Code 基本上差不多。

与开发环境的集成

Claude Code 目前在外部集成方面功能强大,它可以与 IDE 无缝集成,能直接在 IDE 的差异视图中展示代码变更,自动将 IDE 的上下文共享给大模型,甚至可以通过快捷键在 IDE 和 CLI 之间切换。这种“沉浸式”的体验,使得 Claude Code 更像是一个嵌入在开发环境中的智能伙伴,而不是一个外部工具。

此外,Claude Code 还支持以 GitHub Actions 的方式集成到 CI/CD 流程中。你可以在 PR 或 Issue 的评论中 @claude,让它自动分析代码、创建 PR 或修复错误,将 AI 的能力贯穿于整个软件开发生命周期。

显然,Gemini CLI 在这方面要薄弱地多,它更多地被定位为一个独立的终端工具,虽然可以和 VS Code 中的 Gemini Code Assist 配合使用,但两者之间的联动相对松散。

权限控制与安全性

Claude Code 在权限控制和安全性方面考虑得非常周全,它提供了一套非常完善和精细的权限控制机制,用户可以通过配置文件或命令行参数,精确控制每个工具的权限,例如:

  • 允许或禁止某个 Bash 命令 (Bash(npm run build))
  • 限制文件的读写范围 (Edit(docs/**))
  • 设置不同的权限模式(如 plan 模式只读不写,yolo 模式则跳过所有确认)
  • 在企业环境下,系统管理员还可以强制执行用户无法覆盖的安全策略
  • 最近还引入了 钩子 (Hooks) 功能,允许在工具执行前后运行自定义脚本,实现日志记录、代码格式化、自定义校验等高级功能

而 Gemini CLI 在这方面相对简单,主要依赖于执行前的用户确认。不过 Gemini CLI 有一个 检查点(Checkpointing) 功能,比较有意思。如果开启了该功能,Gemini CLI 的工具对文件进行修改之前,会自动保存项目状态的快照,这样用户可以放心地尝试和应用代码,我们可以通过 --checkpointing 参数开启:

$ gemini --checkpointing

然后让 Gemini CLI 对代码稍作修改:

gemini-cli-seo.png

修改完成后输入 /restore 命令,可以查看生成的检查点:

gemini-cli-restore.png

输入 /restore <checkpoint-file> 可以将项目还原到特定的检查点:

gemini-cli-restore-2.png

运行该命令后,你的文件和对话将立即恢复到创建检查点时的状态,工具调用的提示也会重新出现。

此外,Gemini CLI 的 沙盒模式 也是其一大特色,它支持两种实现方式:

  1. 基于 macOS 的 Seatbelt 功能实现轻量级沙盒,使用 sandbox-exec 命令,默认限制为禁止项目目录之外的写操作;也可以通过 SEATBELT_PROFILE 环境变量调整限制策略,比如严格限制、禁止联网等;
  2. 基于 Docker 或 Podman 容器实现完全进程隔离的跨平台沙盒;
# 使用默认的沙盒机制
$ gemini --sandbox

# 通过环境变量指定沙盒机制
$ GEMINI_SANDBOX=docker gemini

下面是以 Docker 容器方式运行,会自动拉取沙箱镜像:

gemini-cli-sandbox.png

小结

经过一番研究和对比,我们对 Gemini CLI 也有了基本的认识,简单说,两款工具都很棒,但定位不同:

  • Gemini CLI 开源、免费、够用,适合个人开发者或想省钱的用户;
  • Claude Code 更加专业、功能更强、集成更好,适合追求极致效率的团队;

所以,想免费尝鲜,用 Gemini CLI;不差钱,追求专业高效,就选 Claude Code。同时,我们也要及时关注开源社区的动态,相信在社区的共同努力下,Gemini CLI 也会越来越好用,超越 Claude Code 也不无可能。


将 Claude Code 集成到更多地方

我们已经熟悉了在终端中与 Claude Code 进行会话,并了解了其强大的功能。为了进一步提升开发效率,本文将介绍如何将 Claude Code 无缝集成到我们日常使用的 IDE 和 GitHub 工作流中,使其成为开发过程中的得力助手。

将 Claude Code 添加到 IDE 中

Claude Code 可以无缝地集成到一些流行的 IDE 中,这样我们在使用 IDE 编码的同时也能享受 Claude Code 的功能。它目前支持两个主要的 IDE 系列:VS Code(包括 Cursor、Windsurf 等分支)和 JetBrains IDEs(包括 PyCharm、WebStorm、IntelliJ 和 GoLand)。

我们以 VS Code 为例,它的安装非常简单,直接在集成终端中运行 claude 命令,即可自动安装扩展。

注意 VS Code 版本必须是 1.98 以上。

如果没有自动安装,我们也可以手动在应用商店中找到 Claude Code 扩展,点击安装即可:

vscode-plugin.png

安装成功后,在 VS Code 右上角会显示一个 “Run Claude Code” 的按钮:

vscode-run-cc.png

吐槽一句,必须得随便打开一个文件才能看到这个按钮。

点击这个按钮或使用 Cmd + Esc 快捷键,可以快速打开 Claude Code 交互界面;或者直接在集成终端中运行 claude 命令也可以自动连上 VS Code;甚至可以在任何外部终端中使用 claude --ide/ide 命令也能连接到 VS Code。

连接到 IDE 之后,我们可以享受下面这些功能:

  • 差异查看:代码更改可以直接在 IDE 中显示,而不是在终端中,代码差异更加直观;
  • 选择上下文:你在 IDE 中选择的文本和打开的文件会自动填充到 Claude Code 上下文中;
  • 文件引用快捷键:使用 Cmd + Option + K 插入文件引用(比如 @File#L1-99);
  • 诊断共享:在 IDE 中出现的诊断错误会自动与 Claude Code 共享;

在 IDE 中查看代码差异:

claude-code-in-vscode-diff.png

在 Claude Code 引用选择的文本:

claude-code-in-vscode-select.png

让 Claude Code 操作 Github

之前在学习 Bash 工具时,我们曾介绍过 Claude Code 是如何操作 Git 的。其实,当时还有一部分内容没讲,除了 Git 操作,Claude Code 还可以处理许多 GitHub 操作,我们今天补下这部分内容。

开始之前,你需要提前安装 Github CLI 命令行工具:

$ brew install gh

然后登录你的 Github 账号:

gh-auth-login.png

接下来就可以让 Claude Code 操作 Github 仓库了,比如:

  • 创建拉取请求(Pull Request):自动查看你的更改和最近历史,生成符合上下文的提交消息,并使用 gh pr create 创建 PR;
  • 代码审查(Code Review):对用户提交的 PR 进行审查,提出优化或修复建议;
  • 处理代码审查(Code Review)评论:根据 PR 上的评论来修复问题,并在完成后推送回 PR 分支;
  • 修复构建失败或 linter 警告:根据构建报错信息或 linter 警告来修复问题;
  • 分类和筛选未解决的问题:让 Claude 循环处理未解决的 GitHub Issues;

创建拉取请求(Pull Request)

和 Git 提交一样,Claude Code 会自动查看你的更改和最近历史,生成符合上下文的提交消息;Claude 理解 PR 简写,所以可以直接这样说:

claude-code-gh-pr.png

Bash 的工具描述中,有一份创建 PR 的规范,可以对照着上面的截图,了解下 Claude Code 是如何创建 PR 的:

对于所有 GitHub 相关任务,包括处理 issues、拉取请求、检查和发布,请使用 Bash 工具通过 gh 命令。
如果给出了 GitHub URL,请使用 gh 命令获取所需信息。

重要提示:当用户要求您创建拉取请求时,请仔细遵循以下步骤:

1. 始终使用 Bash 工具并行运行以下 bash 命令,以了解分支自从与主分支分离以来的当前状态:
   - 运行 git status 查看所有未跟踪的文件
   - 运行 git diff 查看将要提交的已暂存和未暂存更改
   - 检查当前分支是否跟踪远程分支并与远程保持最新,这样你就知道是否需要推送到远程
   - 运行 git log 命令和 `git diff [base-branch]...HEAD` 来了解当前分支的完整提交历史(从它与基础分支分离的时间开始)

2. 分析将包含在拉取请求中的所有更改,确保查看所有相关提交(不仅仅是最新提交,而是将包含在拉取请求中的所有提交!!!),并起草拉取请求摘要

3. 始终并行运行以下命令:
   - 如果需要,创建新分支
   - 如果需要,使用 -u 标志推送到远程
   - 使用 gh pr create 创建 PR,格式如下,使用 HEREDOC 传递正文以确保正确格式化。

<example>
gh pr create --title "PR 标题" --body "$(cat <<'EOF'
## 摘要
<1-3 个要点>
## 测试计划
[测试拉取请求的待办事项清单...]
.... 由 [Claude Code](https://claude.ai/code) 生成
EOF
)"
</example>

创建 PR 成功后,可以在 Github 页面看到对应的信息:

github-pr.png

代码审查(Code Review)

可以向 Claude Code 请求代码审查,它会对你提交的代码进行审查,提出优化或修复建议;Claude Code 甚至内置了一个斜杠命令来做这个:

> /review    

还可以让 Claude Code 根据 PR 上的评论来修复问题,并在完成后推送回 PR 分支;Claude Code 也内置了一个斜杠命令来做这个:

> /pr_comments

Claude Code 会调用 gh api 命令获取 PR 评论:

$ gh api repos/aneasystone/sudoku/pulls/1/comments

比如我对这个提交不满意,可以在这里加一些评论:

github-pr-comments.png

调用 gh api 命令的返回如下:

gh-api-pr-comments.png

从命令的返回中 Claude Code 可以知道代码所在行以及评论内容,然后根据这些信息对代码进行完善。

将 Claude Code 集成到 Github Actions 中

Claude Code 还支持以 GitHub Actions 的方式集成到你的 Github 工作流中,然后你就可以在任何 PR 或 Issue 评论中 @claude 让 Claude Code 分析你的代码、创建拉取请求、实现功能和修复错误。

首先启动 Claude Code 进入交互模式,运行 /install-github-app 斜杠命令:

claude-code-install-github-app.png

选择你的仓库后,会打开浏览器,跳到 Claude Github App 的安装页面:

claude-code-install-github-app-2.png

点击安装按钮,可以选择为所有仓库安装还是为指定仓库安装,安装结束后返回交互模式确认,提示要安装下面两个工作流:

claude-code-install-workflows.png

其中 @Claude Code 让我们可以在 PR 或 Issue 评论中 @claude 来帮我们做事,而 Claude Code Review 会自动对新 PR 进行代码审查。确认之后,还需要配置 Claude Code 的 API Key,再接着 Claude Code 就会自动完成后面的动作:

claude-code-install-success.png

直到最后,它会打开一个创建新 PR 的页面,标题和内容都已经填好:

claude-code-open-a-pull-request.png

我们只需要点击创建按钮确认即可,再将该 PR 合并到你的项目中就大功告成了。

随便找一个 PR ,在评论中 @claude 试试:

claude-code-at.png

可以去我的这个仓库 PR 看完整的运行结果:

由于安装的工作流里有一个是自动对新 PR 进行代码审查的,所以其实刚刚 Claude Code 为我们创建 PR 的时候已经触发过一次了,感兴趣的可以看这个 PR 的运行结果:

小结

今天,我们探索了如何将 Claude Code 从终端扩展到更广阔的开发场景中。通过将其无缝集成到 VS Code 等主流 IDE,我们能直接在编码环境中利用其智能,极大地提升了编码效率和体验。

更进一步,我们学习了如何授权 Claude Code 操作 GitHub,借助 gh 命令行工具,它能像一位经验丰富的同事一样,帮助我们处理创建 PR、审查代码、响应评论等日常任务。

最令人兴奋的是,通过集成 GitHub Actions,Claude Code 能够化身为代码仓库的智能守护者。它不仅可以在我们召唤时(@claude)及时出现,还能自动为新的 PR 提供代码审查,将 AI 的力量深度融入到团队协作和 CI/CD 流程中。

至此,我们已经将 Claude Code 集成到了开发的方方面面。如果这些还不能满足你的需求,Claude Code 甚至提供了 SDK,让你可以将其能力嵌入到任何自定义的工具或自动化流程中,开启无限可能。


给 Claude Code 的工具加上钩子

就在前几天,Claude Code 发布了 v1.0.38 版本,推出了一项有趣的功能:钩子(Hooks),它允许用户注册自己的 Shell 脚本,在 Claude Code 生命周期的各个阶段执行,以此来自定义和扩展 Claude Code 的行为。

claude-code-hooks.png

我们上周刚学习了 Claude Code 的权限控制机制,它为工具的调用提供了全方位的安全保障,现在有了钩子机制,我们可以实现更加个性化的控制。当然,除了权限控制,钩子的想象空间还是很大的,以下是一些示例用例:

  • 通知:当 Claude Code 等待用户确认时,可以用钩子自定义通知用户的方式
  • 自动格式化:每次编辑文件后,对文件进行格式化,比如对 .ts 文件运行 prettier,对 .go 文件运行 gofmt
  • 日志记录:跟踪和统计所有执行的命令,以满足合规性或调试需求
  • 反馈:当 Claude Code 生成的代码不符合代码库规范时,提供自动反馈
  • 自定义权限:对工具调用实现更加个性化的权限控制,比如阻止对生产文件或敏感目录的修改

钩子配置

钩子配置和权限配置一样,保存在配置文件里,该配置文件可以放在下面这些位置:

  • 用户设置(~/.claude/settings.json):用户私有,适用于所有项目
  • 共享项目设置(.claude/settings.json):提交到 Git 仓库,团队成员共用
  • 本地项目设置(.claude/settings.local.json):不提交到 Git 仓库,用户私有,只适用于当前项目
  • 企业管理策略(/path/to/policies.json):在企业环境下,系统管理员可以强制执行用户无法覆盖的安全策略,优先级最高

钩子配置的结构如下:

{
  "hooks": {
    "EventName": [
      {
        "matcher": "ToolPattern",
        "hooks": [
          {
            "type": "command",
            "command": "your-command-here"
          }
        ]
      }
    ]
  }
}

其中 EventName 表示事件名称,Claude Code 目前支持下面这些钩子事件:

  • PreToolUse - 在 Claude Code 生成工具参数后、执行工具调用前运行
  • PostToolUse - 在 Claude Code 执行工具调用成功结束后运行
  • Notification - 在 Claude Code 发送通知时运行
  • Stop - 在主 Claude Code 完成响应时运行
  • SubagentStop - 在 Claude Code 子智能体完成响应时运行(也就是 Task 工具调用)

下面的 matcher 叫做匹配器,表示匹配工具名称的模式。它可以是简单的字符串完全匹配,比如 Write 仅匹配 Write 工具;也可以是正则表达式,比如 Edit|WriteNotebook.*;如果省略或为空字符串,钩子将匹配所有工具;注意这个参数仅适用于 PreToolUsePostToolUse 事件。

匹配器也支持匹配 MCP 工具,MCP 工具名称遵循 mcp__<server>__<tool> 格式。匹配器可以完全匹配,如 mcp__filesystem__read_file;也可以正则匹配,如 mcp__memory__.*mcp__.*__write.* 等。

每个匹配器可以配多个钩子,每个钩子支持三个配置参数:

  • type:当前仅支持 "command" 类型
  • command:要运行的 Bash 命令,运行环境为 Claude Code 所在目录,如果匹配多个钩子,所有钩子并行运行
  • timeout:(可选)命令运行超时时间,以秒为单位,默认 60 秒;只要有一个命令运行超时,所有进行中的钩子都会被取消

你的第一个钩子

这是官方文档中展示的一个钩子示例:

{
  "hooks": {
    "PreToolUse": [
      {
        "matcher": "Bash",
        "hooks": [
          {
            "type": "command",
            "command": "jq -r '\"\\(.tool_input.command) - \\(.tool_input.description // \"No description\")\"' >> ~/.claude/bash-command-log.txt"
          }
        ]
      }
    ]
  }
}

请提前安装用于 JSON 处理的 jq 命令。

这个示例用于记录 Claude Code 运行的 Shell 命令,它创建了一个监听 PreToolUse 事件的钩子,在调用 Bash 工具之前,先运行 jq -r ... 这串命令。这个 jq -r ... 命令从 STDIO 读取一个 JSON 数据,将 Bash 要执行的命令 .tool_input.command 和该命令的描述 .tool_input.description 写入 ~/.claude/bash-command-log.txt 文件中。

其中 // 是 jq 内置的运算符,其功能是当字段为空时提供默认值,感兴趣的可以查阅 jq 的官方手册

将上面的钩子示例保存到你的配置文件 ~/.claude/settings.json 中,这样可以对所有项目生效,启动 Claude Code 进入交互模式,输入 /hooks 斜杠命令:

claude-code-add-hooks.png

在这里可以检查你的钩子是否生效。然后随便问一个需要调用 Bash 工具的问题,比如这里我们让 Claude Code 查看最近一次提交记录:

claude-code-bash-log.png

可以看到 Claude Code 运行了两次 git 命令,并将命令信息写到了 ~/.claude/bash-command-log.txt 文件中。

钩子输入

在上面的钩子示例中,我们使用 jq -r ... 命令从 STDIO 读取一个 JSON 数据,这个 JSON 数据就是钩子的输入,它的格式如下,包含了会话信息和特定事件信息:

{
  // 公共字段
  session_id: string       // 会话 ID
  transcript_path: string  // 对话 JSON 的路径

  // 特定事件字段
  ...
}

不同的钩子事件对应不同的 JSON 格式,比如 PreToolUse 输入:

{
  "session_id": "abc123",
  "transcript_path": "~/.claude/projects/.../00893aaf-19fa-41d2-8238-13269b9b3ca0.jsonl",
  "tool_name": "Write",
  "tool_input": {
    "file_path": "/path/to/file.txt",
    "content": "file content"
  }
}

PostToolUse 输入:

{
  "session_id": "abc123",
  "transcript_path": "~/.claude/projects/.../00893aaf-19fa-41d2-8238-13269b9b3ca0.jsonl",
  "tool_name": "Write",
  "tool_input": {
    "file_path": "/path/to/file.txt",
    "content": "file content"
  },
  "tool_response": {
    "filePath": "/path/to/file.txt",
    "success": true
  }
}

这里的 tool_inputtool_response 会根据工具的不同而不同。

Notification 输入:

{
  "session_id": "abc123",
  "transcript_path": "~/.claude/projects/.../00893aaf-19fa-41d2-8238-13269b9b3ca0.jsonl",
  "message": "Task completed successfully",
  "title": "Claude Code"
}

StopSubagentStop 输入:

{
  "session_id": "abc123",
  "transcript_path": "~/.claude/projects/.../00893aaf-19fa-41d2-8238-13269b9b3ca0.jsonl",
  "stop_hook_active": true
}

钩子输出

钩子有两种输出方式,Claude Code 根据钩子的输出决定是阻止还是继续运行,或将信息反馈给用户。

第一种是简单方式,钩子通过退出代码、标准输出 stdout 和标准错误 stderr 传达状态:

  • 退出代码 0:成功。在对话模式中向用户显示 stdout,注意该信息并不会反馈给 Claude Code
  • 退出代码 2:阻止错误。stderr 会自动反馈给 Claude Code 进行处理,注意不同的钩子事件对退出代码 2 的行为不一样,参见下面的表格
  • 其他退出代码:非阻止错误。向用户显示 stderr,并继续执行

不同的钩子事件对退出代码 2 的行为如下:

钩子事件行为
PreToolUse阻止工具调用,向 Claude Code 显示错误
PostToolUse向 Claude Code 显示错误(工具已运行)
Notification不适用,仅向用户显示 stderr
Stop阻止停止,向 Claude Code 显示错误
SubagentStop阻止停止,向 Claude Code 子代理显示错误

第二种是高级方式,钩子可以在 stdout 中返回结构化 JSON,以实现更复杂的控制:

{
  "continue": true, // Claude 在钩子执行后是否应继续(默认:true)
  "stopReason": "string", // 当 continue 为 false 时显示的消息
  "suppressOutput": true, // 从对话模式隐藏 stdout(默认:false)
}

如果 continue 为 false,Claude 在钩子运行后会立即停止,并将停止原因 stopReason 显示给用户。

如果 continue 为 true,Claude 会继续根据不同的钩子事件来进一步控制,比如 PreToolUsePostToolUse 钩子都可以控制是否继续工具调用,可以多返回两个字段:

{
  "decision": "approve" | "block" | undefined,
  "reason": "决策解释"
}

Claude Code 根据返回的 decision 来决定下一步的动作:

  • approve - 绕过权限系统,并将 reason 显示给用户,但不返回给 Claude。该值对 PostToolUse 不适用,因为工具已经调用过了
  • block - 阻止调用当前工具,并将 reason 返回给 Claude,以便 Claude 选择切换其他工具
  • undefined - 进入现有权限流程,忽略 reason 字段

StopSubagentStop 钩子可以控制 Claude 是否停止,返回的字段和上面一样:

{
  "decision": "block" | undefined,
  "reason": "当阻止 Claude 停止时必须提供"
}

Claude Code 根据返回的 decision 来决定下一步的动作:

  • block - 阻止 Claude 停止,必须返回 reason 字段,以便 Claude 知道如何继续
  • undefined - 允许 Claude 停止,忽略 reason 字段

一个复杂点的例子

下面是一个稍微复杂点的例子,首先添加如下钩子配置:

{
  "hooks": {
    "PreToolUse": [
      {
        "matcher": "Bash",
        "hooks": [
          {
            "type": "command",
            "command": "python3 ./validate_command.py"
          }
        ]
      }
    ]
  }
}

然后在当前目录下创建一个 validate_command.py 文件:

#!/usr/bin/env python3
import json
import re
import sys

# 验证规则
VALIDATION_RULES = [
    (
        r"\bgrep\b(?!.*\|)",
        "使用 'rg'(ripgrep)代替 'grep' 以获得更好的性能和功能",
    ),
    (
        r"\bfind\s+\S+\s+-name\b",
        "使用 'rg --files | rg pattern' 或 'rg --files -g pattern' 代替 'find -name' 以获得更好的性能",
    ),
]

# 验证命令是否匹配规则
def validate_command(command: str) -> list[str]:
    issues = []
    for pattern, message in VALIDATION_RULES:
        if re.search(pattern, command):
            issues.append(message)
    return issues

# 从 stdin 读取 JSON 输入
try:
    input_data = json.load(sys.stdin)
except json.JSONDecodeError as e:
    print(f"错误:无效的 JSON 输入:{e}", file=sys.stderr)
    sys.exit(1)

tool_name = input_data.get("tool_name", "")
tool_input = input_data.get("tool_input", {})
command = tool_input.get("command", "")

# 只处理 Bash 工具
if tool_name != "Bash" or not command:
    sys.exit(1)

# 对命令进行核查
issues = validate_command(command)
if issues:
    for message in issues:
        print(f"• {message}", file=sys.stderr)
    # 退出代码 2 阻止工具调用并向 Claude 显示 stderr
    sys.exit(2)

这段 Python 脚本通过退出代码、标准输出和标准错误向 Claude Code 传达消息;它对 Bash 工具执行的命令进行核查,阻止 Claude Code 执行 grepfind 命令,而是替换成性能更高的 rg 命令。

小结

本文详细介绍了 Claude Code 新增的钩子(Hooks)功能,它为用户提供了一种强大的机制,通过在工具生命周期的关键节点执行自定义 Shell 脚本来扩展和定制 Claude Code 的行为。

我们学习了钩子的核心概念和功能,包括:

  • 钩子配置:如何通过配置文件,在不同作用域(用户、项目、本地)下定义钩子。
  • 生命周期事件:学习了 PreToolUsePostToolUse 等多种事件类型,以及如何使用匹配器精确触发钩子。
  • 输入与输出:了解了钩子如何通过标准输入(STDIN)接收 JSON 格式的上下文数据,并通过退出码和标准输出(STDOUT)与 Claude Code 交互,实现批准、阻止或提供反馈等高级控制。
  • 实际应用:通过日志记录和命令验证等示例,展示了钩子的实际应用场景。

钩子功能在赋予我们高度灵活性的同时,也带来了重要的安全问题。钩子会在未经用户确认的情况下,以当前用户的完整权限执行 Shell 命令。因此,确保钩子脚本的安全性和可靠性至关重要。在部署任何钩子之前,请务必遵循安全最佳实践,如验证输入、使用绝对路径、避免处理敏感文件,并在受控环境中进行充分测试。

总而言之,钩子机制极大地增强了 Claude Code 的可扩展性,使其能够更好地融入现有的开发工作流和规范。善用钩子,我们能将 Claude Code 打造成更智能、更贴合团队需求的自动化编程伙伴。