Fork me on GitHub

2025年7月

再学 RAGFlow 的问答流程

昨天,我们对 RAGFlow 的问答流程进行了全面的学习,主要介绍了聊天助手的核心配置,并从源码的角度研究了从用户发起查询开始,到后端系统如何一步步处理并最终生成答案的过程。不过,问答过程中还有不少有趣的点可以展开学习,我们今天就继续深入这些细节。

SQL 检索优化

当知识库配置的 field_map 不为空时,RAGFlow 会使用 SQL 检索替代向量检索,来优化查询:

field_map = KnowledgebaseService.get_field_map(dialog.kb_ids)
if field_map:
  ans = use_sql(questions[-1], field_map, dialog.tenant_id, chat_mdl, prompt_config.get("quote", True))
  if ans:
    yield ans
    return

那么这个 field_map 是什么意思呢?要了解这一点,我们还需要复习下之前的 Table 分块器的代码:

def chunk(filename, binary=None, from_page=0, to_page=10000000000, lang="Chinese", callback=None, **kwargs):
  
  # 表格文件解析
  if re.search(r"\.xlsx?$", filename, re.IGNORECASE):
    # 处理 Excel 文件
    excel_parser = Excel()
    dfs = excel_parser(filename, binary, from_page=from_page, to_page=to_page, callback=callback)
  elif re.search(r"\.(txt|csv)$", filename, re.IGNORECASE):
    # 处理 TXT 或 CSV 文件
    dfs = [pd.DataFrame(np.array(rows), columns=headers)]
  else:
    raise NotImplementedError("file type not supported yet(excel, text, csv supported)")
  
  res = []
  # 组装返回结果
  for df in dfs:
    # ...
    # 更新 field_map
    KnowledgebaseService.update_parser_config(kwargs["kb_id"], {"field_map": {k: v for k, v in clmns_map}})
  return res

可以看到当知识库使用 Table 分块策略时,会在知识库的 parser_config 字段中维护一个 field_map 字典,表明 ES 中存储的是表格数据,并且记录了表格的所有列信息,可以打开 MySQL 数据库的 knowledgebase 表,parser_config 字段的值如下:

{
  "pages": [
    [
      1,
      1000000
    ]
  ],
  "field_map": {
    "xing_ming_tks": "姓名",
    "xue_hao_long": "学号",
    "nian_ling_long": "年龄",
    "xing_bie_tks": "性别",
    "cheng_ji_long": "成绩"
  }
}

在分块列表中可以看到,表格的每一行都是一个独立分块:

ragflow-table-chunks.png

而在 ES 中,每条记录不仅以文本的形式存储在 content_with_weight 字段中,而且每一列的值也都单独存储:

ragflow-table-chunks-es.png

这就给 SQL 检索提供了条件。后面的 use_sql() 函数逻辑如下:

def use_sql(question, field_map, tenant_id, chat_mdl, quota=True):

  # 表数据查询
  def get_table():

    # 根据表结构和用户问题,生成 SQL
    sql = chat_mdl.chat(sys_prompt, [{"role": "user", "content": user_prompt}], {"temperature": 0.06})
    
    # 调用 ES 的 SQL 查询接口
    return settings.retrievaler.sql_retrieval(sql, format="json"), sql

  tbl, sql = get_table()

  # 如果出错,则提供错误信息,重新生成 SQL
  if tbl.get("error") and tried_times <= 2:
    tbl, sql = get_table()

  # 组装 Markdown 表格,并返回引用块
  return {
    "answer": "\n".join([columns, line, rows]),
    "reference": {
      "chunks": [{"doc_id": r[docid_idx], "docnm_kwd": r[doc_name_idx]} for r in tbl["rows"]],
      "doc_aggs": [{"doc_id": did, "doc_name": d["doc_name"], "count": d["count"]} for did, d in doc_aggs.items()],
    },
    "prompt": sys_prompt,
  }

这里的核心有两点。第一点,我们要知道 ES 除了关键词检索和向量检索,也提供了 SQL 检索的功能,这里的 retrievaler.sql_retrieval() 就是调用 ES 的 SQL 接口,使用标准的 SQL 语句来查询 ES 数据库:

POST /_sql?format=json

{
  "query": "select doc_id, docnm_kwd, xing_ming_tks, xue_hao_long, nian_ling_long, xing_bie_tks, cheng_ji_long from ragflow_fb5e4b9e5ae211f0b4620242ac120006 where cheng_ji_long >= 80"
}

更多内容可以阅读 ES 的官方文档:

第二点,这里通过大模型生成 SQL 语句,表结构信息就是从 field_map 里取的,调用 ES 查询如果出错,会将出错信息丢给大模型重试一次,这是 Text to SQL 很常见的一种做法。

最后,将查询结果组装成 Markdown 格式,插入引用信息,并返回:

ragflow-table-chat.png

多轮对话优化

在 RAG 问答中,我们需要根据用户的问题检索知识库。但是在处理多轮对话时,用户的问题往往会不完整,比如用户第一个问题是 “张三的成绩怎么样?”,第二个问题是 “李四呢?”,如果只处理最近一轮对话,就会丢失很多上下文信息。

RAGFlow 支持开启多轮对话优化:

if len(questions) > 1 and prompt_config.get("refine_multiturn"):
  # 如果开启多轮优化,使用大模型分析对话,生成完整的问题
  questions = [full_question(dialog.tenant_id, dialog.llm_id, messages)]
else:
  # 如果未开启,取最后一轮用户的问题
  questions = questions[-1:]

多轮对话优化的实质就是通过大模型将多轮对话压缩成一个用户问题,比如上面的例子,转写后的问题可能是 “李四的成绩怎么样?”。

使用的提示词如下:

## 角色
一个乐于助人的助手。

## 任务和步骤
1. 生成一个符合对话逻辑的完整用户问题。
2. 如果用户的问题涉及相对日期,根据今天({{ today }})将其转换为绝对日期。
   - “昨天”={{ yesterday }},“明天”={{ tomorrow }}

## 要求和限制
- 如果用户最新的问题已经完整,无需进行任何操作,只需返回原始问题。
- 除了优化后的问题外,不要生成任何其他内容。
{% if language %}
- 生成的文本必须使用{{ language }}。
{% else %}
- 生成的文本必须与用户原始问题使用相同的语言。
{% endif %}

## 真实数据

**对话:**

{{ conversation }}

这里的 conversation 是完整的历史会话消息,其实可以再优化下,比如只取最近 5 条消息即可。

变量的使用

昨天我们学过,在聊天助手的配置中有一个自定义变量的功能,可以使用它来动态调整大模型的系统提示词,下面我来举一个例子。正常情况下,我们创建的助手是不知道当前日期的:

ragflow-normal-chat.png

如果希望助手回答这个问题,我们可以给它加一个 current_date 自定义变量:

ragflow-new-variable.png

然后调整默认的提示词:

ragflow-use-variable-in-prompt.png

不过这个自定义变量,在页面上问答是无法传值的,唯一可以传递其值的方法是调用助手的对话 API,比如 SDKHTTP 方式。

为了体验 RAGFlow 的 API 功能,我们必须先到 API 配置页面(点击右上角的头像)创建一个 API KEY:

ragflow-api.png

这个页面还列出了 RAGFlow 所有 API 接口,在调用 API 时可参考之。

有了 API KEY 之后,我们首先调用 /api/v1/chats/<chat-id>/sessions 接口,创建新会话:

$ curl --request POST \
   --url http://localhost/api/v1/chats/59c7a84e65d111f0872a0242ac120006/sessions \
   --header 'Content-Type: application/json' \
   --header 'Authorization: Bearer ragflow-YxN2QxNzFjNmFlNjExZjA5OWZhMDI0Mm' \
   --data '
   {
     "name": "demo"
   }'

这里的 <chat-id> 是聊天助手的 ID,可以随便进入该助手的一个聊天页面,网址中的 dialogId 就是。

调用结果如下:

ragflow-api-create-session.png

拿到会话 ID 后,就可以和助手进行对话了,调用 /api/v1/chats/<chat-id>/completions 接口:

$ curl --request POST \
   --url http://localhost/api/v1/chats/59c7a84e65d111f0872a0242ac120006/completions \
   --header 'Content-Type: application/json' \
   --header 'Authorization: Bearer ragflow-YxN2QxNzFjNmFlNjExZjA5OWZhMDI0Mm' \
   --data-binary '
   { 
     "question": "今天是星期几?", 
     "stream": true,
     "session_id": "63fc27aa6ae911f086620242ac120006"
   }'

这时没有带 current_date 参数,调用结果如下:

ragflow-api-chat.png

带上 current_date 参数,再次调用:

$ curl --request POST \
   --url http://localhost/api/v1/chats/59c7a84e65d111f0872a0242ac120006/completions \
   --header 'Content-Type: application/json' \
   --header 'Authorization: Bearer ragflow-YxN2QxNzFjNmFlNjExZjA5OWZhMDI0Mm' \
   --data-binary '
   { 
     "question": "今天是星期几?", 
     "stream": true,
     "session_id": "63fc27aa6ae911f086620242ac120006",
     "current_date": "2025年7月27号,星期日"
   }'

调用结果如下:

ragflow-api-chat-2.png

RAGFlow 的流式接口有点奇怪,为什么每一帧都包含前一帧的完整数据?这在输出长文本时不会有性能问题吗?

未完待续

我们今天深入学习了问答过程中几个有趣的点,比如:对于使用 Table 切片方法的知识库,RAGFlow 会使用 SQL 检索替代向量检索,优化查询效率;对于多轮对话,RAGFlow 通过大模型对用户问题进行改写,改写后的问题可以让查询更加精准;通过自定义用户变量,我们可以在 SDK 或 API 调用聊天助手时动态调整系统提示词,实现一些页面聊天做不到的功能。今天暂且先学这么多,剩下的明天继续。


学习 RAGFlow 的问答流程

昨天,我们详细学习了 RAGFlow 的检索测试功能,深入了解了其如何通过混合检索、重排序、知识图谱等多种策略从知识库中高效地召回相关信息。然而,检索只是整个 RAG 流程的第一步,召回的文本块并不能直接作为最终答案呈现给用户。为了生成连贯、精准的回答,RAGFlow 还需要将这些检索到的信息与用户的问题一起,通过精心设计的提示词提交给大语言模型进行综合处理。

今天,我们将深入探讨 RAGFlow 的问答流程,学习如何创建一个聊天助手,了解其背后的配置参数,并分析从接收用户问题、执行检索、组装上下文到最终生成回答的完整源码实现。此外,我们还会研究 RAGFlow 提供的独立 AI 搜索功能,看看它与检索测试或问答流程有何不同。

创建聊天助手

一旦你创建了知识库、完成了文件解析并进行了检索测试,就可以开始 AI 对话了。点击页面顶部中间的 “聊天” 标签,然后点击 “新建助理” 按钮,打开聊天配置对话框:

ragflow-chat-create.png

创建聊天助手时,我们需要配置一系列参数,这些参数共同决定了问答的效果,这些配置分为三大块:助理设置、提示引擎、模型设置。助理设置参数如下:

  • 助理姓名:为聊天助手取个合适的名称。
  • 助理描述:简单描述聊天助手的使用场景。
  • 助理头像:为聊天助手配个好看的头像。
  • 空回复:如果在知识库中没有检索到用户的问题,将统一使用它作为兜底回复;如果希望让大模型来回答,请将此留空。
  • 设置开场白:用户打开对话窗口时,系统自动发送的第一条消息。
  • 显示引文:默认启用,展示回答所依据的信息来源。
  • 关键词分析:使用大模型分析用户的问题,提取在相关性计算中要强调的关键词。我们昨天在学习检索测试时遇到了一个没用的参数 req.get("keyword"),看来就是这个功能了。
  • 文本转语音:将回答内容转换为语音,请先在设置里面选择 TTS 模型。
  • Tavily API Key:如果配置有效,将使用 Tavily 进行网络搜索作为知识库的补充。这个参数放在这感觉有点格格不入,改成 “开启联网搜索” 是不是更好点?
  • 知识库:将聊天助手与一个或多个知识库进行绑定。注意,如果这里选择多个知识库,请确保它们使用相同的嵌入模型,否则会发生错误。

切换到提示引擎配置页面:

ragflow-chat-create-2.png

这个页面的配置参数如下:

  • 系统提示词:大模型回答问题需要遵循的说明,比如角色设计、回复长度、回复语言等。如果模型原生支持在问答中推理,可以通过 //no_thinking 关闭自动推理。
  • 相似度阈值:如果查询和分块之间的相似度小于此阈值,则该分块将被过滤掉。默认设置为 0.2,也就是说文本块的混合相似度得分至少 20 才会被召回。
  • 关键字相似度权重:RAGFlow 使用混合相似度得分来评估两行文本之间的距离,它是加权关键词相似度和向量余弦相似度。两个权重的总和为 1.0。
  • Top N:该参数决定最终输入大模型的有多少分块。也就是说,并非所有相似度得分高于相似度阈值的分块都会被提供给大语言模型,大模型只能看到这些 Top N 分块。
  • 多轮对话优化:在多轮对话的中,对知识库查询的问题进行优化。会调用大模型额外消耗令牌。
  • 使用知识图谱:是否检索与所选知识库对应的知识图谱相关文本块,以处理复杂的多跳问题?这一过程将涉及对实体、关系和社区报告文本块的多次检索,会显著延长检索时间。
  • 推理:在回复时使用类似 Deepseek-R1 或 OpenAI o1 等模型的推理过程,启用后,该功能允许模型访问外部知识,并借助思维链推理等技术逐步解决复杂问题。通过将问题分解为可处理的步骤,这种方法增强了模型提供准确回答的能力,从而在需要逻辑推理和多步思考的任务上表现更优。
  • 重排序模型:非必选项,若不选择,系统将默认采用关键词相似度与向量余弦相似度相结合的混合查询方式;如果设置了重排序模型,则混合查询中的向量相似度部分将被重排序打分替代。
  • 跨语言搜索:选择一种或多种语言进行跨语言搜索。如果未选择任何语言,系统将使用原始查询进行搜索。
  • 变量:通过变量动态调整大模型的系统提示词,{knowledge} 为系统预留变量,代表从指定知识库召回的文本块,所有变量都必须用大括号 {} 括起来。如果你在这里自定义了其他变量,唯一可以传递其值的方法是调用助手的对话 API,比如 SDKHTTP 方式。

可以看到这里的大多数配置参数昨天在学习 “检索测试” 时都已经见过,今天重点学习那些新的参数,比如多轮对话优化、推理、变量等。

再切换到模型设置页面:

ragflow-chat-create-3.png

这里主要是大模型相关的参数:

  • 模型:选择用于生成最终答案的大语言模型。
  • 温度:控制生成文本的随机性,值越高,答案越具创造性;值越低,答案越确定。
  • Top P:一种采样策略,与温度类似,用于控制生成文本的多样性。
  • 存在处罚:这会通过惩罚对话中已经出现的单词来阻止模型重复相同的信息,更高的存在惩罚值意味着鼓励模型在回复中包含更多样化的标记。
  • 频率惩罚:与存在惩罚类似,这减少了模型频繁重复相同单词的倾向。
  • 最大令牌数:限制生成答案的长度。

下面这些参数不懂也没关系,可以通过 “自由度” 来统一调整,RAGFlow 内置了三种自由度:

  • 即兴发挥:温度值较高(0.8),意味着生成更有创意的回复。
  • 精确:温度值较低(0.2),意味着生成更为保守的回复。
  • 平衡:温度值居中(0.5),是谨慎与自由之间的平衡。

完成这些配置后,一个功能完备的聊天助手就创建好了,我们可以开始进行问答交互:

ragflow-chat.png

注意每次回复的上方有一个小灯泡,点击它可以看到送给大模型的完整提示词,以及各个步骤消耗的时间和令牌:

ragflow-chat-2.png

标准问答流程解析

当用户在聊天窗口输入问题并点击发送时,前端会调用 /v1/conversation/completion 接口,从而触发整个后端的问答逻辑。这个接口的入口位于 api/apps/conversation_app.py 文件:

@manager.route("/completion", methods=["POST"])
def completion():

    # 查询会话
    conv = ConversationService.get_by_id(req["conversation_id"])

    # 流式回复
    def stream():
        # 调用核心 chat 方法
        for ans in chat(dia, msg, True, **req):
            ans = structure_answer(conv, ans, message_id, conv.id)
            yield "data:" + json.dumps({"code": 0, "message": "", "data": ans}, ensure_ascii=False) + "\n\n"
        # 更新会话
        ConversationService.update_by_id(conv.id, conv.to_dict())
        
        yield "data:" + json.dumps({"code": 0, "message": "", "data": True}, ensure_ascii=False) + "\n\n"

    # 流式接口,使用 SSE 回复
    resp = Response(stream(), mimetype="text/event-stream")
    resp.headers.add_header("Cache-control", "no-cache")
    resp.headers.add_header("Connection", "keep-alive")
    resp.headers.add_header("X-Accel-Buffering", "no")
    resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8")
    return resp

可以看到,接口本身主要负责处理流式响应,真正的核心逻辑被封装在了 chat() 方法中。这个方法位于 api/db/services/dialog_service.py,是整个问答流程的中枢:

def chat(dialog, messages, stream=True, **kwargs):
    
  # 如果助手没有关联知识库且没有配置 Tavily API,则调用 chat_solo 进行纯大模型对话
  if not dialog.kb_ids and not dialog.prompt_config.get("tavily_api_key"):
    for ans in chat_solo(dialog, messages, stream):
      yield ans
    return

  # 绑定嵌入、重排序、聊天、TTS 等模型
  kbs, embd_mdl, rerank_mdl, chat_mdl, tts_mdl = get_models(dialog)

  # SQL 检索优化
  # 这里比较有意思,当 `field_map` 不为空时,直接使用 SQL 而非向量检索,优化检索性能
  field_map = KnowledgebaseService.get_field_map(dialog.kb_ids)
  if field_map:
    ans = use_sql(questions[-1], field_map, dialog.tenant_id, chat_mdl, prompt_config.get("quote", True))
    if ans:
      yield ans
      return

  # 多轮对话优化,对历史对话进行重写,生成便于检索的完整问题
  if len(questions) > 1 and prompt_config.get("refine_multiturn"):
    questions = [full_question(dialog.tenant_id, dialog.llm_id, messages)]
  else:
    questions = questions[-1:]

  # 跨语言处理,支持多语言查询转换
  if prompt_config.get("cross_languages"):
    questions = [cross_languages(dialog.tenant_id, dialog.llm_id, questions[0], prompt_config["cross_languages"])]

  # 关键词增强,提取关键词增强检索效果
  if prompt_config.get("keyword", False):
    questions[-1] += keyword_extraction(chat_mdl, questions[-1])

  
  kbinfos = {"total": 0, "chunks": [], "doc_aggs": []}
  if prompt_config.get("reasoning", False):
    # 深度推理模式
    reasoner = DeepResearcher(chat_mdl, prompt_config, ...)
    for think in reasoner.thinking(kbinfos, " ".join(questions)):
      # ...
  else:
    # 标准检索模式
    kbinfos = retriever.retrieval(" ".join(questions), embd_mdl, ...)
    if prompt_config.get("tavily_api_key"):
      # 集成 Tavily 网络搜索
      tav = Tavily(prompt_config["tavily_api_key"])
      tav_res = tav.retrieve_chunks(" ".join(questions))
      kbinfos["chunks"].extend(tav_res["chunks"])
      kbinfos["doc_aggs"].extend(tav_res["doc_aggs"])
    if prompt_config.get("use_kg"):
      # 知识图谱检索
      ck = settings.kg_retrievaler.retrieval(" ".join(questions), ...)
      if ck["content_with_weight"]:
        kbinfos["chunks"].insert(0, ck)

  # 如果检索结果为空,兜底回复
  if not knowledges and prompt_config.get("empty_response"):
    empty_res = prompt_config["empty_response"]
    yield {"answer": empty_res, "reference": kbinfos, "prompt": "\n\n### Query:\n%s" % " ".join(questions), "audio_binary": tts(tts_mdl, empty_res)}
    return {"answer": prompt_config["empty_response"], "reference": kbinfos}

  # 组装提示词,替换系统提示词中的变量,开启引用的提示词
  msg = [{"role": "system", "content": prompt_config["system"].format(**kwargs)}]
  prompt4citation = ""
  if knowledges and (prompt_config.get("quote", True) and kwargs.get("quote", True)):
    prompt4citation = citation_prompt()
  msg.extend([{"role": m["role"], "content": re.sub(r"##\d+\$\$", "", m["content"])} for m in messages if m["role"] != "system"])
  # 确保对话消息不超过模型的 token 限制
  used_token_count, msg = message_fit_in(msg, int(max_tokens * 0.95))
  prompt = msg[0]["content"]

  # 格式化返回结果
  def decorate_answer(answer):
    # 引用处理
    if knowledges and (prompt_config.get("quote", True) and kwargs.get("quote", True)):
      idx = set([])
      if embd_mdl and not re.search(r"\[ID:([0-9]+)\]", answer):
        answer, idx = retriever.insert_citations(answer, ...)
      else:
        for match in re.finditer(r"\[ID:([0-9]+)\]", answer):
          i = int(match.group(1))
          if i < len(kbinfos["chunks"]):
            idx.add(i)

      # 修复错误引用格式
      answer, idx = repair_bad_citation_formats(answer, kbinfos, idx)

    # 返回完整结果,包括引用,提示词,详细时间统计等
    return {"answer": think + answer, "reference": refs, "prompt": re.sub(r"\n", "  \n", prompt), "created_at": time.time()}

  # 调用大模型流式输出
  for ans in chat_mdl.chat_streamly(prompt + prompt4citation, msg[1:], gen_conf):
    # 流式输出,支持 TTS 文本转语音
    yield {"answer": thought + answer, "reference": {}, "audio_binary": tts(tts_mdl, delta_ans)}
  # 最终完整答案
  yield decorate_answer(thought + answer)

整个问答的逻辑有点复杂,但是主要脉络还是比较清晰的,主要分为四步:问题处理 -> 检索 -> 提示词组装 -> 答案生成,在上面的代码中基本上已经加了注释,感兴趣的同学可以结合源码进行研究。其中有一些检索相关的内容,我们昨天已经介绍过,可以结合上一篇的内容一起学习。

AI 搜索

除了在聊天助手中进行问答,RAGFlow 还在知识库页面提供了一个独立的 “AI 搜索” 功能:

ragflow-search.png

AI 搜索和聊天助手的区别在于:聊天助手是一个多轮的 AI 对话,我们可以自定义检索策略,自定义聊天模型;而 AI 搜索使用预定义的检索策略(结合加权关键词相似度和加权向量相似度的混合搜索)以及系统默认的聊天模型进行单轮 AI 对话,它不涉及知识图谱、关键词提取或问题提取等高级 RAG 策略。相关片段会按照相似度得分从高到低排列,显示在聊天模型回复的下方。

可以看出,它和 “检索测试” 很像,在调试聊天助手时,我们也可以使用 AI 搜索作为参考,以验证模型设置和检索策略是否生效。

AI 搜索旨在提供一种更接近于现代搜索引擎的体验:用户输入查询,系统不仅返回相关文档列表,还会基于这些文档生成一个精炼的摘要式答案。此外,它还提供了一个思维导图的功能,点击搜索结果右边的回形针按钮即可生成:

ragflow-search-mindmap.png

AI 搜索的接口为 /v1/conversation/ask,其后端实现位于 api/apps/conversation_app.py 文件中:

@manager.route("/ask", methods=["POST"])
def ask_about():

  # 流式输出
  def stream():
    for ans in ask(req["question"], req["kb_ids"], uid):
      yield "data:" + json.dumps({"code": 0, "message": "", "data": ans}, ensure_ascii=False) + "\n\n"   
    yield "data:" + json.dumps({"code": 0, "message": "", "data": True}, ensure_ascii=False) + "\n\n"

  # 使用 SSE 流式输出
  resp = Response(stream(), mimetype="text/event-stream")
  resp.headers.add_header("Cache-control", "no-cache")
  resp.headers.add_header("Connection", "keep-alive")
  resp.headers.add_header("X-Accel-Buffering", "no")
  resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8")
  return resp

相比于 chat() 方法,这里的 ask() 方法要简单的多:

def ask(question, kb_ids, tenant_id, chat_llm_name=None):
    
    # 使用普通检索 or 图谱检索
    retriever = settings.retrievaler if not is_knowledge_graph else settings.kg_retrievaler
    kbinfos = retriever.retrieval(question, ...)

    # 将检索结果拼接到内置提示词中
    knowledges = kb_prompt(kbinfos, max_tokens)
    prompt = DEFAULT_PROMPT % "\n".join(knowledges)
    msg = [{"role": "user", "content": question}]

    # 调用大模型流式回复
    answer = ""
    for ans in chat_mdl.chat_streamly(prompt, msg, {"temperature": 0.1}):
        answer = ans
        yield {"answer": answer, "reference": {}}

    # 返回最终的格式化回复
    yield decorate_answer(answer)

对比标准问答流程,AI 搜索的核心区别在于:

  • 无对话历史:它是一次性的请求,不考虑任何对话上下文。
  • 固定提示词:它使用一个内置的、专门为生成搜索摘要而优化的提示词,而不是聊天助手中可配置的提示词模板。
  • 目标是摘要:它的最终目的是生成一个简洁的摘要,而不是进行多轮的、交互式的对话。

总的来说,AI 搜索是一个轻量级的、以信息获取为目的的工具,而聊天助手则是一个功能更强大、以交互式问答为核心的应用。

小结

今天,我们对 RAGFlow 的问答与搜索流程进行了全面的学习,主要介绍了聊天助手的核心配置,并从源码的角度研究了从用户发起查询开始,到后端系统如何一步步处理并最终生成答案的过程。不过,问答过程中还有不少有趣的点可以展开学习:

  • SQL 检索优化:知识库 field_map 字段是什么意思?为什么要用 SQL 而非向量检索?
  • 多轮对话优化:如何对历史对话进行重写生成便于检索的完整问题?
  • 变量的使用:如何在系统提示词中使用变量,带有自定义变量的助手又该如何传值和调用?
  • 提示词的组装:使用什么样的策略确保对话消息不超过模型的 token 限制?
  • 引用处理:开启引用后的提示词有何变化,输出的引用如何处理?
  • 深度推理模式:什么是深度推理模式?如何借助思维链推理等技术解决复杂问题?

我们明天继续。


学习 RAGFlow 的检索流程

经过一段时间的学习,我们已经深入了解了 RAGFlow 从文件上传、解析、分块到知识库构建的全过程,并探索了 RAPTOR、知识图谱、标签集等高级功能。至此,知识库的构建阶段已经完成,接下来我们将进入 RAG 应用的核心环节:检索与问答

我们今天先学习 RAGFlow 的检索测试功能。

运行检索测试

当知识库构建完毕,我们需要验证下知识库的配置是否有效。RAGFlow 在知识库页面提供了一个 “检索测试” 的功能,我们可以在这里快速提问,检查是否能够检索到预期的文档块:

ragflow-retrieval-test.png

注:在运行检索测试之前,确保你的文件已上传并成功解析。

就像微调精密仪器一样,RAGFlow 需要仔细调试才能实现最佳问答性能,你的知识库设置、聊天助手配置以及所指定的模型配置都可能对最终结果产生重大影响。因此在继续配置聊天助手之前建议先运行一次检索测试,验证目标片段是否能够被检索出来,从而帮助你快速发现需要改进的地方或定位需要解决的问题。例如,在调试问答系统时,如果知道可以检索到正确的片段,就可以将精力集中在其他方面。

你可以通过调整下面这些参数来运行检索测试:

  • 相似度阈值(Similarity threshold):如果查询语句和分块之间的相似度小于此阈值,则该分块将被过滤掉,默认设置为 0.2,也就是说文本块的相似度得分至少 20 才会被召回;
  • 关键字相似度权重(Keyword similarity weight):RAGFlow 使用混合相似度得分来评估两行文本之间的距离,它结合了关键词相似度和向量相似度,这个参数用于设置关键词相似度在混合相似度得分中的权重,默认情况下,该值为 0.7,使得另一个组成部分的权重为 0.3,两个权重之和为 1.0;
  • 重排序模型(Rerank model):上面提到混合相似度得分结合了关键词相似度和向量相似度,这其实是没有配置重排序模型时的算法,当配置了重排序模型,那么混合相似度得分中的向量相似度将由重排序分数替代,混合相似度得分将结合关键词相似度和重排序分数;
  • 使用知识图谱(Use knowledge graph):启用该配置前,确保知识图谱已成功构建,它会检索知识图谱相关文本块,以处理复杂的多跳问题。在知识图谱中,实体描述、关系描述或社区报告各自作为独立的片段存在,因此启用该配置将涉及对实体、关系和社区报告文本块的多次检索,会显著延长检索时间;
  • 跨语言搜索(Cross-language search):选择一种或多种语言进行跨语言搜索,如果未选择任何语言,系统将使用原始查询进行搜索。

检索测试源码解读

检索测试调用的接口为 /v1/chunk/retrieval_test,入口代码位于 api/apps/chunk_app.py 文件:

@manager.route('/retrieval_test', methods=['POST'])
def retrieval_test():

  # 查看知识库详情
  kb = KnowledgebaseService.get_by_id(kb_ids[0])

  # 开启跨语言搜索,将用户问题翻译成其他语言
  if langs:
    question = cross_languages(kb.tenant_id, None, question, langs)

  # 从用户问题中提取关键词,拼接到用户问题上
  if req.get("keyword", False):
    chat_mdl = LLMBundle(kb.tenant_id, LLMType.CHAT)
    question += keyword_extraction(chat_mdl, question)

  # 如果知识库配置了标签集,给用户问题打上标签
  labels = label_question(question, [kb])

  # 检索的核心逻辑
  ranks = settings.retrievaler.retrieval(question, ..., rank_feature=labels)

  # 检索知识图谱,结果插到第一条
  if use_kg:
    ck = settings.kg_retrievaler.retrieval(question, ...)
    if ck["content_with_weight"]:
      ranks["chunks"].insert(0, ck)

  return get_json_result(data=ranks)

这里的 req.get("keyword") 这个参数比较奇怪,看起来应该是 “是否启用关键词” 之类的开关,但是我在页面上并没有找到对应的配置。

核心逻辑已有注释标出,下面对这里的几个关键函数分节讨论。

跨语言搜索

默认情况下,未选择目标语言,系统只会使用原始查询进行搜索,当知识库包含多种语言文档时,或者存在某些专业术语在不同语言中的表达差异时,单语言查询可能导致遗漏其他语言的相关信息。开启跨语言搜索后,RAGFlow 会将用户的单语言查询扩展为多语言版本,以提高跨语言文档检索的准确性。

跨语言搜索的第一步是将用户的问题翻译成目标语言,使用大模型完成,逻辑比较简单:

def cross_languages(tenant_id, llm_id, query, languages=[]):

  # 绑定大模型
  chat_mdl = LLMBundle(tenant_id, LLMType.CHAT, llm_id)

  # 组装提示词
  rendered_sys_prompt = PROMPT_JINJA_ENV.from_string(CROSS_LANGUAGES_SYS_PROMPT_TEMPLATE).render()
  rendered_user_prompt = PROMPT_JINJA_ENV.from_string(CROSS_LANGUAGES_USER_PROMPT_TEMPLATE).render(query=query, languages=languages)

  # 调用大模型,过滤 <think> 标签中思考的内容
  # 采用 temperature=0.2 低温度采样,确保翻译的一致性和准确性
  ans = chat_mdl.chat(rendered_sys_prompt, [{"role": "user", "content": rendered_user_prompt}], {"temperature": 0.2})
  ans = re.sub(r"^.*</think>", "", ans, flags=re.DOTALL)

  # 出错时返回原始查询
  if ans.find("**ERROR**") >= 0:
    return query

  # 返回多语言查询
  return "\n".join([a for a in re.sub(r"(^Output:|\n+)", "", ans, flags=re.DOTALL).split("===") if a.strip()])

这里感觉有个 BUG,看下面的提示词,RAGFlow 使用 ### 分隔不同语言版本,但是这里却使用 === 分隔。

使用提示词如下:

## 角色
一个高效的多语言翻译器。

## 行为规则
1. 接收以下格式的批量翻译请求:
   **输入:** `[文本]`
   **目标语言:** 逗号分隔的列表

2. 保持:
   - 原始格式(表格、列表、间距)
   - 专业术语的准确性
   - 文化背景的适宜性

3. 以下列格式输出翻译:

[语言1的翻译]
###
[语言2的翻译]

下面是一个输入和输出的示例:

**输入:**
Hello World! Let's discuss AI safety.
===
Chinese, French, Japanese

**输出:**
你好世界!让我们讨论人工智能安全问题。
###
Bonjour le monde ! Parlons de la sécurité de l'IA.
###
こんにちは世界!AIの安全性について話し合いましょう。

检索的核心逻辑

检索的核心逻辑位于 rag/nlp/search.py 中的 retrieval() 方法,它通过结合向量相似度和全文检索执行混合语义搜索,同时提供重排序和分页功能,返回相关文档片段及相似度评分:

def retrieval(self, question, ...):
    
  # 通过 “关键词加权 + 语义向量匹配” 双重方式进行 ES 检索
  sres = self.search(req, [index_name(tid) for tid in tenant_ids], kb_ids, embd_mdl, highlight, rank_feature=rank_feature)

  if rerank_mdl and sres.total > 0:
    # 使用外部模型重排序
    sim, tsim, vsim = self.rerank_by_model(rerank_mdl, sres, question, 1 - vector_similarity_weight, vector_similarity_weight, rank_feature=rank_feature)
  else:
    # 使用内置重排序算法
    sim, tsim, vsim = self.rerank(sres, question, 1 - vector_similarity_weight, vector_similarity_weight, rank_feature=rank_feature)

  # 组装返回结果
  idx = np.argsort(sim * -1)[(page - 1) * page_size:page * page_size]
  for i in idx:
    d = {
      "chunk_id": id,
      "content_ltks": chunk["content_ltks"],
      ...
    }
    ranks["chunks"].append(d)
  return ranks

整体逻辑比较清晰,首先通过 ES 的关键词和向量检索,然后对检索结果进行重排序。第一步其实就是调用 ES 的查询接口,感兴趣的可以将 ES 查询语句打印出来看看,类似于下面这样:

{
  "query": {
    "bool": {
      "must": [
        {
          "query_string": {
            "fields": [
              "title_tks^10",
              "title_sm_tks^5",
              "important_kwd^30",
              "important_tks^20",
              "question_tks^20",
              "content_ltks^2",
              "content_sm_ltks"
            ],
            "type": "best_fields",
            "query": "((毕业)^0.3859298493710235 (\"哪 所\")^0.22073633367349388 (詹姆斯 OR \"詹姆斯\" OR (\"詹姆斯\"~2)^0.5)^0.2003688922699708 (高中)^0.19296492468551174 (\"詹姆斯 毕业 于 哪 所 高中\"~2)^1.5)",
            "minimum_should_match": "30%",
            "boost": 1
          }
        }
      ],
      "filter": [
        {
          "terms": {
            "kb_id": [
              "d580f9bc65b911f0a5d20242ac120006"
            ]
          }
        },
        {
          "bool": {
            "must_not": [
              {
                "range": {
                  "available_int": {
                    "lt": 1
                  }
                }
              }
            ]
          }
        }
      ],
      "boost": 0.050000000000000044
    }
  },
  "knn": {
    "field": "q_1536_vec",
    "k": 1024,
    "num_candidates": 2048,
    "query_vector": [
      -0.018787164241075516,
      -0.016176467761397362,
      0.0048667327500879765,
      0.007344431709498167,
      ...
    ],
    "filter": {
      // 和上面的 `query` 部分一致
    },
    "similarity": 0.2
  },
  "from": 0,
  "size": 60
}

整个查询分为 query 核心查询部分和 knn 向量检索部分。其中 query 部分比较有意思,通过 query_string 实现多字段加权检索,具体规则如下:

  • 检索字段及权重:对多个字段设置不同权重,优先匹配重要性高的字段,例如:重要关键词权重最高(important_kwd^30important_tks^20),问题和标题字段次之(question_tks^20,title_tks^10),内容相关字段权重较低(content_ltks^2)等;
  • 检索关键词及权重:

    • 核心词:毕业(权重0.386)、哪所(0.221)、詹姆斯(0.200)、高中(0.193)
    • 短语匹配:“詹姆斯 毕业 于 哪 所 高中”~2(允许短语中词语间隔不超过2个词,权重1.5,优先匹配连贯短语)
  • 匹配阈值:minimum_should_match: "30%",确保至少匹配 30% 的关键词,避免结果过于宽泛。

这里多字段加权检索是通过 FulltextQueryer 中的 question() 生成的,感兴趣的可以深入看下这个函数的逻辑。另外,对查询语法感兴趣的同学也可以参考 ES 的 Query DSL 文档:

返回检索结果后,RAGFlow 接着会计算每条结果的综合得分,对其进行重排序。如果没有配置重排序模型,综合得分由关键词相似度和向量相似度加权计算,如果配置了重排序模型,则使用关键词相似度和重排序得分加权计算。此外,还需要综合考虑标签的特征相似度和 PageRank 分数等。

使用重排序模型会显著增加检索耗时,建议使用 SaaS 服务,譬如 CohereJina 等,如果你倾向于使用本地部署,请务必使用 GPU 推理。

检索知识图谱

如果开启了知识图谱,在检索的最后阶段,还会检索知识图谱相关文本块,这块的逻辑比较复杂,其核心代码如下:

def retrieval(self, question: str, ...):
    
  # 使用大模型分析用户问题,从用户问题中提取实体类型(ty_kwds)以及实体(ents)
  # 提示词参考 [MiniRAG](https://github.com/HKUDS/MiniRAG)
  ty_kwds, ents = self.query_rewrite(llm, qst, [index_name(tid) for tid in tenant_ids], kb_ids)

  # 根据提取实体的向量,从图中查找相似的实体
  ents_from_query = self.get_relevant_ents_by_keywords(ents, filters, idxnms, kb_ids, emb_mdl, ent_sim_threshold)

  # 根据提取的实体类型,从图中查询实体
  ents_from_types = self.get_relevant_ents_by_types(ty_kwds, filters, idxnms, kb_ids, 10000)

  # 根据用户问题的向量,从图中查询相似的关系
  rels_from_txt = self.get_relevant_relations_by_txt(qst, filters, idxnms, kb_ids, emb_mdl, rel_sim_threshold)

  # 这里还有很多细碎的计算得分的逻辑,包括:
  # 1. N 跳路径分析,遍历每个实体的多跳邻居路径,相似度按 1/(2+跳数) 衰减,多条路径的相似度累加;
  # 2. 相似度增强策略,包括实体增强和关系增强,比如同时出现在类型检索和关键词检索中的实体,相似度翻倍,等等;

  # 将这些检索到的实体和关系的 PageRank 值与其对查询的相似度分数相乘进行排序,返回 topN 默认前 6 个
  ents_from_query = sorted(ents_from_query.items(), key=lambda x: x[1]["sim"] * x[1]["pagerank"], reverse=True)[:ent_topn]
  rels_from_txt = sorted(rels_from_txt.items(), key=lambda x: x[1]["sim"] * x[1]["pagerank"], reverse=True)[:rel_topn]

  # 返回最终结果,检索涉及最终检索中实体最多的社区的报告
  return {
      "chunk_id": get_uuid(),
      "content_with_weight": ents + relas + self._community_retrival_(...),
    }

检索知识图谱的过程涉及对实体、关系和社区报告文本块的多次检索,大体流程包括下面几个步骤:

  1. 使用大模型从用户的查询中提取实体及实体类型;
  2. 根据提取的实体,计算它们的嵌入向量,从图中查找相似实体及其 N 跳关系;
  3. 根据提取的实体类型,基于它们的 PageRank 值从图中检索排名前 N 的实体;
  4. 计算用户的查询嵌入,从图中检索相似的关系;
  5. 将这些检索到的实体和关系的 PageRank 值与其对查询的相似度分数相乘进行排序,返回前 N 个作为最终检索结果;
  6. 检索涉及最终结果中实体最多的社区的报告;

知识图谱的检索结果最终全部拼接在一起,插入到返回结果的第一条,如下所示:

ragflow-retrieval-test-graph.png

小结

今天我们详细学习了 RAGFlow 的检索测试功能,这是验证和调试知识库配置的关键步骤。我们深入探讨了其背后的核心检索逻辑和多种高级功能,主要包括:

  • 跨语言搜索: 将用户的单语言查询扩展为多语言版本,以提高跨语言文档检索的准确性;
  • 混合检索策略:结合关键词全文检索(query_string)和向量相似度(knn)在 Elasticsearch 中实现混合搜索;
  • 多字段加权:通过为不同字段(如标题、关键词、内容)设置不同权重来优化检索;
  • 重排序:使用内置算法和外部重排序模型对检索结果进行排序;
  • 知识图谱检索:检索知识图谱中和用户问题相关的实体、关系和社区报告等;

通过今天的学习,我们对 RAGFlow 的检索机制有了全面的了解。接下来,我们将把目光投向最终的问答环节,看看 RAGFlow 是如何利用这些检索到的信息,结合大语言模型生成精准回答的。


构建和使用 RAGFlow 的标签集

检索准确性是衡量生产级 RAG 框架的试金石。除了自动关键词提取、自动问题提取、知识图谱等提升检索效果的方法外,RAGFlow 还引入了自动提取标签的功能,它会根据每个知识块的相似性,自动将用户自定义标签集中的标签映射到知识库中的相关块。这些标签为现有的数据集增加了额外的层级领域知识,特别适用于各个片段彼此非常相似,以至于无法将目标片段与其他片段区分开的情况。例如,当你有少量关于 iPhone 的片段,而大多数是关于 iPhone 手机壳或配件时,如果没有额外信息,就很难检索到那些关于 iPhone 的片段。

要使用此功能,你必须先创建一个标签集,然后在知识库的配置页面中指定该标签集。我们今天就来学习如何构建和使用 标签集(Tag Set)

构建标签集

在前面学习的 11 种切片方法中有一种叫 Tag 分块,这是一种特殊的知识库配置,使用 Tag 作为切片方法的知识库不会参与 RAG 的检索和问答流程,而是充当标签集的角色。其他知识库可以使用它来标记自己的分块,对这些知识库的查询也将使用此标签集进行标记。

首先我们创建一个知识库,选择 Tag 作为切片方法:

ragflow-build-tag-set.jpg

然后上传提前准备好的 Excel 标签文件,文件内容如下:

ragflow-tag-file.png

标签文件可以是 Excel、CSV 或 TXT 格式,文件内容需满足下面的格式要求:

  • 如果文件是 Excel 格式,它应包含两列:第一列用于标签描述,第二列用于标签名称;不需要标题行,且支持多个工作表(Sheet);
  • 标签描述可以是示例分块或示例查询;
  • 如果文件是 CSV 和 TXT 格式,必须采用 UTF-8 编码,且使用制表符(TAB)作为分隔符来分隔描述和标签;
  • 在标签列中可以包含多个标签,使用逗号分隔;

然后点击 “解析” 按钮,解析完成后查看分块列表:

ragflow-tag-set-chunks.png

可以看到,标签集的第一列,也就是描述部分变成了知识库的分块,而第二列变成了该分块对应的标签列表。

在知识库的配置页面底部,我们可以以标签云的形式查看整个标签集:

ragflow-tag-cloud.png

标签越大代表出现的频次越高,或者以表格形式查看:

ragflow-tag-table.png

构建标签集的实现

构建标签集的逻辑主要位于 Tag 分块器,也就是 rag/app/tag.py 文件:

def chunk(filename, binary=None, lang="Chinese", callback=None, **kwargs):
    
  # 分块基础信息
  res = []
  doc = {
    # 文件名
    "docnm_kwd": filename,
    # 对文件名进行分词
    "title_tks": rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", filename))
  }

  # 处理 Excel 文件
  if re.search(r"\.xlsx?$", filename, re.IGNORECASE):
    excel_parser = Excel()
    for ii, (q, a) in enumerate(excel_parser(filename, binary, callback)):
      res.append(beAdoc(deepcopy(doc), q, a, eng, ii))
    return res

  # 处理 TXT 文件
  elif re.search(r"\.(txt)$", filename, re.IGNORECASE):
    # ...

  # 处理 CSV 文件
  elif re.search(r"\.(csv)$", filename, re.IGNORECASE):
    # ...

其中比较关键的部分是 beAdoc() 函数,对每个标签描述和标签统一进行处理,组装文本块信息:

def beAdoc(d, q, a, eng, row_num=-1):
  # 标签描述的原始内容
  d["content_with_weight"] = q
  # 对标签描述进行分词
  d["content_ltks"] = rag_tokenizer.tokenize(q)
  # 对标签描述再进行细粒度分词
  d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])
  # 标签处理
  d["tag_kwd"] = [t.strip().replace(".", "_") for t in a.split(",") if t.strip()]
  # 行号记录
  if row_num >= 0:
    d["top_int"] = [row_num]
  return d

其中 tokenize() 对原始文本进行首次分词,感兴趣的可以看看里面的实现,包含完整的文本预处理,比如全角转半角、繁体转简体、去除非词字符、大小写转换等,然后使用 正向/反向最大匹配算法 进行中文分词,对有歧义的部分使用 DFS 进一步处理;fine_grained_tokenize() 对已分词的结果进行二次细分,它只处理 3-10 字符长度且中文词汇占比超过 20% 的文本,输出更细粒度的分词结果。

可以在 ES 中看到完整的文本块信息:

ragflow-tag-set-chunks-es.png

使用标签集

构建好标签集之后,我们就可以在其他知识库中使用它。接下来,我们创建一个新知识库,选择刚刚创建的标签集以及 Top-N 标签数量:

ragflow-use-tag-set.png

注意这里可以选择多个标签集。通常来讲一个标签集就足够了,当使用多个标签集时,确保它们彼此独立,否则,建议合并你的标签集。

然后在新知识库上传文件,等待解析完成后,查看分块详情,每个分块上都打上了用户自定义的标签:

ragflow-use-tag-set-chunks.png

在解析过程中,知识库中的每个分块都会与标签集中的每个条目进行比较,并根据相似度自动应用标签。可以看到,自动添加标签和昨天我们学习的自动提取关键词很像,这两个功能都增强了 RAGFlow 中的检索能力,它们之间的区别是:标签集是用户定义的封闭集合,而提取关键词则是开放集合。在使用自动添加标签功能之前,你必须手动构建好指定格式的标签集;自动提取关键依赖于大语言模型,并且会消耗大量的令牌。

自动提取标签的实现

当启用标签集功能时,文档分块之后会增加一些后处理逻辑,如下:

async def build_chunks(task, progress_callback):

  # ...
  if task["kb_parser_config"].get("tag_kb_ids", []):
    
    # 从指定知识库中检索所有的标签,返回每个标签出现的频率
    kb_ids = task["kb_parser_config"]["tag_kb_ids"]
    all_tags = settings.retrievaler.all_tags_in_portion(tenant_id, kb_ids, S)

    # 基于统计的打标
    for d in docs:
      # 遍历所有的文本块,先用统计方法快速生成标签(基于相似文档的标签)
      if settings.retrievaler.tag_content(tenant_id, kb_ids, d, all_tags, topn_tags=topn_tags, S=S) and len(d[TAG_FLD]) > 0:
        # 成功生成标签的文档作为示例样本
        examples.append({"content": d["content_with_weight"], TAG_FLD: d[TAG_FLD]})
      else:
        # 统计方法失败的文档加入待处理列表
        docs_to_tag.append(d)

    # 绑定大模型
    chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])

    # 基于大模型的智能打标
    async def doc_content_tagging(chat_mdl, d, topn_tags):

      # 缓存机制
      cached = get_llm_cache(chat_mdl.llm_name, d["content_with_weight"], all_tags, {"topn": topn_tags})
      if not cached:

        # 为每个文档准备 2 个随机示例
        picked_examples = random.choices(examples, k=2) if len(examples)>2 else examples
        if not picked_examples:
          picked_examples.append({"content": "This is an example", TAG_FLD: {'example': 1}})

        # 限制大模型并发路数,默认 10 路
        async with chat_limiter:
          cached = await trio.to_thread.run_sync(
            lambda: content_tagging(chat_mdl, d["content_with_weight"], all_tags, picked_examples, topn=topn_tags))
        if cached:
          cached = json.dumps(cached)
      if cached:
        set_llm_cache(chat_mdl.llm_name, d["content_with_weight"], cached, all_tags, {"topn": topn_tags})
        d[TAG_FLD] = json.loads(cached)

    # 启动并发任务,为每个文本块打标
    async with trio.open_nursery() as nursery:
      for d in docs_to_tag:
        nursery.start_soon(doc_content_tagging, chat_mdl, d, topn_tags)

核心代码我都加了注释,主要逻辑可以分为两部分:

  1. 基于统计方法的快速打标:首先通过 all_tags_in_portion() 函数,从标签集中检索所有的标签,返回每个标签出现的频率;然后遍历所有的文本块,调用 tag_content() 函数检索相似分块,根据统计算法快速生成标签;
  2. 基于大模型的智能打标:从统计打标成功的分块中随机抽两条作为示例样本,让大模型对统计打标失败的分块进行处理;

通过这种先用统计方法快速匹配、失败后再调用大模型的两阶段策略,RAGFlow 实现了高效的自动提取标签流程。

其中,在统计方法快速打标的实现中,有两个函数比较有意思,可以深入看一下:

def all_tags_in_portion(self, tenant_id: str, kb_ids: list[str], S=1000):
  # 在指定知识库中搜索所有文档的 tag_kwd 字段
  res = self.dataStore.search([], [], {}, [], OrderByExpr(), 0, 0, index_name(tenant_id), kb_ids, ["tag_kwd"])
  # 统计每个标签的出现次数
  res = self.dataStore.getAggregation(res, "tag_kwd")
  # 计算所有标签的总出现次数
  total = np.sum([c for _, c in res])
  # 返回每个标签的出现频率
  return {t: (c + 1) / (total + S) for t, c in res}

这个函数从标签集中检索所有的标签,并返回每个标签出现的频率,在计算每个标签出现的频率时使用了 拉普拉斯平滑(Laplace Smoothing) 公式:

laplace-smoothing.png

拉普拉斯平滑也称为 加一平滑(Add-One Smoothing),是一种在概率估计中用于解决 “零概率问题” 的技术,广泛应用于自然语言处理和机器学习等领域。这里通过拉普拉斯平滑将每个标签出现的频率归一化,为后续的相似度计算提供权重,用于标签相关性评分。

def tag_content(self, tenant_id: str, kb_ids: list[str], doc, all_tags, topn_tags=3, keywords_topn=30, S=1000):
  # 合并文档标题和内容的分词结果,提取关键词,构造查询条件 MatchTextExpr
  match_txt = self.qryr.paragraph(doc["title_tks"] + " " + doc["content_ltks"], doc.get("important_kwd", []), keywords_topn)
  # 用构造的查询条件搜索相似文档,只返回 tag_kwd 字段
  res = self.dataStore.search([], [], {}, [match_txt], OrderByExpr(), 0, 0, idx_nm, kb_ids, ["tag_kwd"])
  # 获取搜索结果中标签的出现频次
  aggs = self.dataStore.getAggregation(res, "tag_kwd")
  # 如果没有找到相关标签则返回 False
  if not aggs:
    return False
  # 总标签数
  cnt = np.sum([c for _, c in aggs])
  # 计算相关性得分,按得分降序排序,取前 3 个
  tag_fea = sorted([(a, round(0.1*(c + 1) / (cnt + S) / max(1e-6, all_tags.get(a, 0.0001)))) for a, c in aggs], key=lambda x: x[1] * -1)[:topn_tags]
  # 将计算出的标签得分保存到文档的 TAG_FLD 字段
  doc[TAG_FLD] = {a.replace(".", "_"): c for a, c in tag_fea if c > 0}
  return True

这个函数针对当前分块,从标签库中检索出相似分块,根据标签的相关性得分为分块生成标签。这里最复杂的是计算相关性得分这一步,不过将公式展开可以发现它其实就是 TF-IDF 公式:

tf-idf.png

TF-IDF(Term Frequency-Inverse Document Frequency) 是一种常用于信息检索和文本挖掘的统计方法,用于评估一个词在文档集合中的重要程度。它通过计算词的 词频(TF)逆文档频率(IDF) 的乘积,突出对特定文档有区分性的词,抑制常见词的影响:

  • 词频(TF):一个词在文档中出现的频率越高,通常越重要;上面的 (c + 1) / (cnt + S) 就是在计算词频(某个标签出现的频率);
  • 逆文档频率(IDF):一个词在整个文档集合中出现的文档数越少,越稀有,其携带的信息量越大,权重越高;上面的 1 / max(1e-6, all_tags.get(a, 0.0001)) 就是逆文档频率,max(1e-6, ...) 防止除零错误;值得注意的是,在经典的 TF-IDF 公式里,IDF = log(总文档数 / 包含该词的文档数),这里采用倒数算是一种变体吧,好处是简单高效,易于实现和计算。

通过计算 TF-IDF 相关性得分,为输入文档自动标注最相关的 3 个标签,这就是基于统计的打标原理。而基于大模型的智能打标就比较简单了,使用的提示词如下:

## 角色
你是一名文本分析员。

## 任务
根据示例和完整的标签集,为给定的一段文本内容添加标签(标记)。

## 步骤
- 查看标签集。
- 查看示例,所有示例均包含文本内容和带有相关度评分的已分配标签(JSON格式)。
- 总结文本内容,并从标签集中选取与文本最相关的前{{ topn }}个标签及其相应的相关度评分进行标记。

## 要求
- 标签必须来自标签集。
- 输出必须仅为JSON格式,键为标签,值为其相关度评分。
- 相关度评分必须在1到10之间。
- 仅输出关键词。

# 标签集
{{ all_tags | join(', ') }}

{% for ex in examples %}

# 示例 {{ loop.index0 }}
### 文本内容
{{ ex.content }}

输出:
{{ ex.tags_json }}

{% endfor %}

# 真实数据
### 文本内容
{{ content }}

小结

今天我们详细学习了 RAGFlow 中标签集的构建和使用方法。通过创建一个专门的 Tag 类型知识库,我们可以为 RAG 系统引入一套自定义的、封闭的领域知识标签,从而显著提升检索的精准度。

我们探讨了以下核心内容:

  • 构建标签集:如何创建 Tag 类型的知识库,并上传符合格式要求的标签描述文件。
  • 使用标签集:如何在其他知识库中启用标签集,实现对文本块的自动标注。
  • 两阶段打标策略:深入分析了 RAGFlow 如何结合高效的统计方法和精准的大模型方法为文本块自动分配最相似的标签。

通过合理利用标签集,我们可以为知识库增加更丰富的语义层次,尤其是在处理内容相似度高、难以区分的文档时,标签能够提供关键的区分信息,是优化 RAG 检索效果的重要手段。

至此,我们已经全面学习了 RAGFlow 知识库的所有配置选项。从基础的分块方法到高级的 RAPTOR、知识图谱和标签集,掌握这些配置将使我们能够根据不同场景,构建出更加强大和智能的 RAG 应用。接下来,我们将开始探索 RAGFlow 的另一核心部分:检索与问答。


学习 RAGFlow 知识库高级配置

目前为止,我们已经学习了很多关于 RAGFlow 的知识库配置,包括分块方法,PDF 解析器,嵌入模型,RAPTOR 策略,提取知识图谱等,除此之外,还剩下一些高级配置,我们今天一起来看下:

ragflow-kbc-left.png

页面排名(pagerank)

当我们从多个指定的知识库中检索知识时,可能希望某些知识库的知识优先被检索到。比如我们有两个知识库:知识库 A 用于 2024 年新闻,知识库 B 用于 2025 年新闻,但希望优先显示 2025 年的新闻,那么提高知识库 B 的页面排名将非常有用。

PageRank 最初被设计为一种对网页进行排名的算法,RAGFlow 中的这个参数用于为特定的知识库分配更高的 PageRank 分数,这个分数会在文档分块和解析时加到每个文本块上,处理逻辑位于任务执行器的 build_chunks() 方法中:

async def build_chunks(task, progress_callback):

  # ...
  if task["pagerank"]:
    doc[PAGERANK_FLD] = int(task["pagerank"])

需要注意的是,这个参数是针对整个知识库的,而不是针对单个文档。当我们修改知识库的 pagerank 值时,整个知识库的文本块都会同步更新:

@manager.route('/update', methods=['post'])
def update():

  # 当 pagerank 参数有变动时
  if kb.pagerank != req.get("pagerank", 0):

    # 该参数只支持 ES 文档引擎
    if os.environ.get("DOC_ENGINE", "elasticsearch") != "elasticsearch":
      return get_data_error_result(message="'pagerank' can only be set when doc_engine is elasticsearch")
    
    if req.get("pagerank", 0) > 0:
      # 更新知识库中的所有文本块的 PAGERANK_FLD
      settings.docStoreConn.update({"kb_id": kb.id}, {PAGERANK_FLD: req["pagerank"]}, search.index_name(kb.tenant_id), kb.id)
    else:
      # 由于 ES 中的 PAGERANK_FLD 不能为 0,因此删除该字段
      settings.docStoreConn.update({"exists": PAGERANK_FLD}, {"remove": PAGERANK_FLD}, search.index_name(kb.tenant_id), kb.id)

可以在 ES 中查看文本块详情,会多一个 pagerank_fea 字段:

ragflow-pagerank.png

自动关键词提取(auto_keywords)

这个参数的作用是自动为每个文本块提取 N 个关键词,当用户问题包含这些关键词时,可以提高文本块的排名。一般情况下,如果分块大小在 1000 字符左右,那么推荐提取 3-5 个关键词即可;如果你的分块较大,可以适当提高这个值,但是请注意,随着数值的增加,边际收益会递减,最多是 30 个。

提取关键词的逻辑同样位于任务执行器的 build_chunks() 方法中:

async def build_chunks(task, progress_callback):

  # ...
  if task["parser_config"].get("auto_keywords", 0):
    
    # 绑定大模型
    chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])

    # 调用大模型,为指定文本块生成 N 个关键词
    async def doc_keyword_extraction(chat_mdl, d, topn):
      
      # 缓存机制
      cached = get_llm_cache(chat_mdl.llm_name, d["content_with_weight"], "keywords", {"topn": topn})
      if not cached:
        # 限制大模型并发路数,默认 10 路
        async with chat_limiter:
          cached = await trio.to_thread.run_sync(lambda: keyword_extraction(chat_mdl, d["content_with_weight"], topn))
        set_llm_cache(chat_mdl.llm_name, d["content_with_weight"], cached, "keywords", {"topn": topn})
      if cached:
        d["important_kwd"] = cached.split(",")
        d["important_tks"] = rag_tokenizer.tokenize(" ".join(d["important_kwd"]))
      return

    # 启动并发任务,为每个文本块生成关键词
    async with trio.open_nursery() as nursery:
      for d in docs:
        nursery.start_soon(doc_keyword_extraction, chat_mdl, d, task["parser_config"]["auto_keywords"])

关键词提取的提示词比较简单,如下:

### 角色
你是一名文本分析员。

### 任务
提取给定文本内容中最重要的关键词/短语。

### 要求
- 总结文本内容,并给出排名前 {{ topn }} 的重要关键词/短语。
- 关键词必须与给定文本内容的语言一致。
- 关键词之间用英文逗号分隔。
- 仅输出关键词。

---

## 文本内容
{{ content }}

生成的关键词可以在文本块列表中查看或更新(双击文本块):

ragflow-auto-keyword.png

自动问题提取(auto_questions)

这个参数的作用是自动为每个文本块提取 N 个问题,当用户问题和这些问题比较类似时,可以提高文本块的排名。一般情况下,如果分块大小在 1000 字符左右,那么推荐提取 1-2 个问题即可;如果你的分块较大,可以适当提高这个值,最多 10 个。

问题提取逻辑和关键词提取几乎一模一样,同样位于任务执行器的 build_chunks() 方法中:

async def build_chunks(task, progress_callback):

  # ...
  if task["parser_config"].get("auto_questions", 0):
    
    # 绑定大模型
    chat_mdl = LLMBundle(task["tenant_id"], LLMType.CHAT, llm_name=task["llm_id"], lang=task["language"])

    # 调用大模型,为指定文本块生成 N 个问题
    async def doc_question_proposal(chat_mdl, d, topn):

      # 缓存机制
      cached = get_llm_cache(chat_mdl.llm_name, d["content_with_weight"], "question", {"topn": topn})
      if not cached:
        # 限制大模型并发路数,默认 10 路
        async with chat_limiter:
          cached = await trio.to_thread.run_sync(lambda: question_proposal(chat_mdl, d["content_with_weight"], topn))
        set_llm_cache(chat_mdl.llm_name, d["content_with_weight"], cached, "question", {"topn": topn})
      if cached:
        d["question_kwd"] = cached.split("\n")
        d["question_tks"] = rag_tokenizer.tokenize("\n".join(d["question_kwd"]))

    # 启动并发任务,为每个文本块生成问题
    async with trio.open_nursery() as nursery:
      for d in docs:
        nursery.start_soon(doc_question_proposal, chat_mdl, d, task["parser_config"]["auto_questions"])

问题提取的提示词如下:

角色
你是一名文本分析员。

任务
就给定的一段文本内容提出{{ topn }}个问题。

要求
- 理解并总结文本内容,提出最重要的{{ topn }}个问题。
- 问题之间不应有含义重叠。
- 问题应尽可能涵盖文本的主要内容。
- 问题必须与给定文本内容的语言一致。
- 每行一个问题。
- 仅输出问题。

---

文本内容
{{ content }}

生成的问题可以在文本块列表中查看或更新(双击文本块):

ragflow-auto-question.png

自动关键词提取和自动问题提取的数值和知识库分块大小密切相关,如果你是第一次使用此功能且不确定从哪个数值开始,可以参考下面这份从社区收集的推荐值:

ragflow-auto-keyword-question-sample.png

表格转 HTML(html4excel)

该参数只对 Excel 或 CSV 表格文件生效,并与 General 分块方法一起使用。当禁用时,文件将被解析为键值对;当启用时,文件将被解析为 HTML 表格,按照每 12 行进行拆分;实现代码位于 rag/app/naive.py 文件中:

def chunk(filename, binary=None, from_page=0, to_page=100000, ...):
    
  # ...
  elif re.search(r"\.(csv|xlsx?)$", filename, re.IGNORECASE):
    excel_parser = ExcelParser()
    if parser_config.get("html4excel"):
      sections = [(_, "") for _ in excel_parser.html(binary, 12) if _]
    else:
      sections = [(_, "") for _ in excel_parser(binary) if _]

该参数默认禁用,表格内容被解析成 “表头1: 内容1;表头2:内容2;” 的键值对格式,如下所示:

ragflow-html4excel-disable.png

这种格式适用于一些比较简单的表格,当表格比较复杂时,比如层次结构标题、合并单元格和投影行标题等,建议解析成 HTML 格式。比如下面这样的表格:

ragflow-html4excel-test.png

开启表格转 HTML 之后,解析结果如下所示:

ragflow-html4excel-enable.png

页面上显示的是渲染后的结果,实际上存储的是 HTML 代码。

可以看到,HTML 代码还不是那么完美,并没有完全保留原表格的样式,比如这里无法体现出合并单元格。另外,值得注意的是,RAGFlow 在处理 DOCX 和 PDF 文件中的表格时,并没有按这个参数来,而是各自处理的,默认都是解析成 HTML 格式,我觉得这块的逻辑可以统一。

小结

今天我们学习了 RAGFlow 知识库的几个高级配置选项,这些功能可以帮助我们进一步优化 RAG 应用的检索效果和处理能力。主要包括:

  • 页面排名:通过为知识库设置不同的权重,实现对检索结果的优先级排序。
  • 自动关键词提取:利用大模型为每个文本块自动提取核心关键词,提升相关查询的召回率。
  • 自动问题提取:同样利用大模型,为文本块生成可能的问题,增强对问答式查询的理解和匹配。
  • 表格转 HTML:为 Excel 和 CSV 文件提供两种不同的解析方式,通过转换为 HTML 格式更好地处理复杂表格。

至此,我们已经系统地学习了 RAGFlow 知识库的大部分配置。掌握这些配置,可以让我们根据不同的业务场景和文档类型,灵活地构建出高质量的 RAG 应用。在知识库的高级配置中,还剩下一个标签集的功能,我们明天再来学习它。


学习 RAGFlow 的知识图谱功能

昨天我们学习了 RAGFlow 的 RAPTOR 分块策略,今天我们将继续学习另一种高级配置 —— 提取知识图谱(use_graphrag)

ragflow-graph.png

该特性自 v0.16.0 起引入,开启该配置后,RAGFlow 会在当前知识库的分块上构建知识图谱,构建步骤位于数据抽取和索引之间,如下所示:

ragflow-construct-kb-graph.png

知识图谱在涉及嵌套逻辑的多跳问答中尤其有用,当你在对书籍或具有复杂实体和关系的作品进行问答时,知识图谱的表现优于传统的抽取方法。

请注意,构建知识图谱将消耗大量 token 和时间。

开启 GraphRAG 任务

GraphRAG 的逻辑位于任务执行器的 do_handle_task() 函数中:

async def do_handle_task(task):
    
  # ...
  elif task.get("task_type", "") == "graphrag":
    
    # 绑定聊天模型
    chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)

    # 运行 GraphRAG 逻辑
    graphrag_conf = task["kb_parser_config"].get("graphrag", {})
    with_resolution = graphrag_conf.get("resolution", False)
    with_community = graphrag_conf.get("community", False)
    async with kg_limiter:
      await run_graphrag(task, task_language, with_resolution, with_community, chat_model, embedding_model, progress_callback)
    return

这个函数我们之前已经详细学习过,但是跳过了 GraphRAG 相关的逻辑,今天我们就继续来看下这个 run_graphrag() 的实现细节:

async def run_graphrag(row: dict, language, with_resolution: bool, with_community: bool, chat_model, embedding_model, callback):
    
  # 检索原始分块列表
  chunks = []
  for d in settings.retrievaler.chunk_list(...):
    chunks.append(d["content_with_weight"])

  # 使用 LightKGExt 或 GeneralKGExt 生成子图
  subgraph = await generate_subgraph(
    LightKGExt if row["kb_parser_config"]["graphrag"]["method"] != "general" else GeneralKGExt,
    ...
    row["kb_parser_config"]["graphrag"]["entity_types"],
    ...
  )

  # 将子图合并到知识图谱中
  subgraph_nodes = set(subgraph.nodes())
  new_graph = await merge_subgraph(...)

  # 实体消歧
  if with_resolution:
    await resolve_entities(new_graph, subgraph_nodes, ...)

  # 社区报告
  if with_community:
    await extract_community(new_graph, ...)

整体的逻辑还是比较清晰的,首先通过 retrievaler.chunk_list() 检索出该文档原始的分块列表,然后基于配置的实体类型生成子图,然后将子图合并到知识图谱中,最后进行实体消歧和社区报告的生成。

和 RAPTOR 任务一样,开启知识图谱也需要先执行一次标准的分块策略,生成原始的分块列表,在完成第一个任务后,会再生成一个知识图谱类型的任务,执行上面的代码逻辑。

提取知识图谱涉及的配置参数如下:

  • 实体类型(entity_types) - 指定要提取的实体类型,默认类型包括:组织(organization)、人物(person)、事件(event)和类别(category),可根据具体的知识库内容添加或删除类型;
  • 方法(method) - 用于构建知识图谱的方法,RAGFlow 支持两种方法:

    • 通用(general):使用 GraphRAG 提供的提示词提取实体和关系。
    • 轻量(light):使用 LightRAG 提供的提示词来提取实体和关系。此选项消耗更少的 tokens、更少的内存和更少的计算资源。
  • 实体消歧(resolution) - 是否启用实体消歧。启用后,解析过程会将具有相同含义的实体合并在一起,从而使知识图谱更简洁、更准确。例如 “2025” 和 “2025 年” 或 “IT” 和 “信息技术”,“特朗普总统” 和 “唐纳德·特朗普” 等。
  • 社区报告生成(community) - 是否生成社区报告。在知识图谱中,社区是由关系连接的实体簇,可以让大模型为每个社区生成摘要,这被称为 社区报告

构建子图

构建子图的逻辑位于 generate_subgraph() 函数:

async def generate_subgraph(...):

  # 检查 doc_id 是否已经构建过子图
  contains = await does_graph_contains(tenant_id, kb_id, doc_id)
  if contains:
    return None

  # 创建提取器实例,提取实体和关系
  ext = extractor(llm_bdl, language=language, entity_types=entity_types)
  ents, rels = await ext(doc_id, chunks, callback)

  # 将实体和关系构建成 NetworkX 子图
  subgraph = nx.Graph()
  for ent in ents:
    ent["source_id"] = [doc_id]
    subgraph.add_node(ent["entity_name"], **ent)
  for rel in rels:
    rel["source_id"] = [doc_id]
    subgraph.add_edge(
      rel["src_id"],
      rel["tgt_id"],
      **rel,
    )

  # 将子图序列化为 JSON 字符串,作为分块存到文档库中
  subgraph.graph["source_id"] = [doc_id]
  chunk = {
    "content_with_weight": json.dumps(
      nx.node_link_data(subgraph, edges="edges"), ensure_ascii=False
    ),
    "knowledge_graph_kwd": "subgraph",
    "kb_id": kb_id,
    "source_id": [doc_id],
    "available_int": 0,
    "removed_kwd": "N",
  }
  cid = chunk_id(chunk)

  # 首先根据 doc_id 删除旧的子图
  await trio.to_thread.run_sync(
    lambda: settings.docStoreConn.delete(
      {"knowledge_graph_kwd": "subgraph", "source_id": doc_id}, search.index_name(tenant_id), kb_id
    )
  )

  # 然后插入新的子图
  await trio.to_thread.run_sync(
    lambda: settings.docStoreConn.insert(
      [{"id": cid, **chunk}], search.index_name(tenant_id), kb_id
    )
  )

  return subgraph

关键步骤已经由注释标出,这里不再赘述。主要关注三点:

  1. 支持两种提取器,GeneralKGExtLightKGExt,提取的步骤差不多(都是经过三步:首次抽取 -> 二次抽取 -> 判断是否抽取完毕),只是使用的提示词不一样而已;
  2. 子图是通过 NetworkX 库构建的,这是一种 Python 中常用的图论库,可以方便地创建、操作和分析图结构;
  3. 子图会序列化为 JSON 字符串,作为分块存到文档库中,可以在 ES 中通过 "knowledge_graph_kwd": "subgraph" 条件检索出来:

ragflow-es-search-subgraph.png

感兴趣的可以看下这个 content_with_weight 字段,里面包含从文档中抽取出来的完整子图。

合并子图

上面一步生成的是文档级别的子图,接下来,将该子图合并到全局知识图谱中:

async def merge_subgraph(tenant_id: str, kb_id: str, subgraph: nx.Graph, ...):

  # 检索旧的全局知识图谱
  change = GraphChange()
  old_graph = await get_graph(tenant_id, kb_id, subgraph.graph["source_id"])
  if old_graph is not None:
    # 如果旧图谱存在,则将文档子图合并到全局图谱中
    new_graph = graph_merge(old_graph, subgraph, change)
  else:
    # 如果旧图谱不存在,则直接使用文档子图作为新的全局图谱
    new_graph = subgraph
    change.added_updated_nodes = set(new_graph.nodes())
    change.added_updated_edges = set(new_graph.edges())

  # 计算 PageRank
  pr = nx.pagerank(new_graph)
  for node_name, pagerank in pr.items():
    new_graph.nodes[node_name]["pagerank"] = pagerank

  # 保存新的全局图谱
  await set_graph(tenant_id, kb_id, embedding_model, new_graph, change, callback)
  
  return new_graph

合并的逻辑比较简单,就是遍历文档子图中的所有节点和边,判断是否已经存在于全局图谱中,如果存在,就将 descriptionkeywordssource_id 等属性拼接到全局图谱中。此外,还会使用 NetworkX 的 pagerank() 方法 对合并后的图谱计算 PageRank 值,为每个节点添加 pagerank 属性,用于衡量节点的重要性。

PageRank 最初被设计为一种对网页进行排名的算法,在 NetworkX 中,是根据指向该节点的边的个数来计算节点的排名,表示该实体在知识图谱中的重要性。

开启实体消歧

实体消歧的逻辑位于 graphrag/entity_resolution.py 文件:

class EntityResolution(Extractor):

  async def __call__(self, graph: nx.Graph, subgraph_nodes: set[str], ...) -> EntityResolutionResult:
    
    # 将节点按照实体类型分组
    nodes = sorted(graph.nodes())
    entity_types = sorted(set(graph.nodes[node].get('entity_type', '-') for node in nodes))
    node_clusters = {entity_type: [] for entity_type in entity_types}
    for node in nodes:
      node_clusters[graph.nodes[node].get('entity_type', '-')].append(node)

    # 在同类型实体中生成所有可能的配对组合
    candidate_resolution = {entity_type: [] for entity_type in entity_types}
    for k, v in node_clusters.items():
      candidate_resolution[k] = [
        (a, b) for a, b in itertools.combinations(v, 2) 
        if (a in subgraph_nodes or b in subgraph_nodes) and self.is_similarity(a, b)
      ]
    
    # 并发调用大模型进行批量消歧,大模型针对每一对实体输出明确的 Yes/No 判断
    # 默认一批 100 对实体,最多并发 5 个任务
    resolution_result = set()
    async with trio.open_nursery() as nursery:
      for candidate_resolution_i in candidate_resolution.items():
        for i in range(0, len(candidate_resolution_i[1]), resolution_batch_size):
          candidate_batch = candidate_resolution_i[0], candidate_resolution_i[1][i:i + resolution_batch_size]
          nursery.start_soon(limited_resolve_candidate, candidate_batch, resolution_result, resolution_result_lock)

    # 将消歧结果构建成新的图谱
    change = GraphChange()
    connect_graph = nx.Graph()
    connect_graph.add_edges_from(resolution_result)
    async with trio.open_nursery() as nursery:
      for sub_connect_graph in nx.connected_components(connect_graph):
        merging_nodes = list(sub_connect_graph)
        nursery.start_soon(limited_merge_nodes, graph, merging_nodes, change)

    return EntityResolutionResult(
      graph=graph,
      change=change,
    )

实体消歧所使用的提示词核心部分如下,主要是输出部分使用的一些特殊符号,方便程序解析结果:

问题:
在判断两个产品是否相同时,你应该只关注关键属性,忽略噪声因素。

演示 1: 产品A的名称是:"电脑",产品B的名称是:"手机" 不,产品A和产品B是不同的产品。
问题 1: 产品A的名称是:"电视机",产品B的名称是:"电视"
问题 2: 产品A的名称是:"杯子",产品B的名称是:"马克杯"
问题 3: 产品A的名称是:"足球",产品B的名称是:"橄榄球"
问题 4: 产品A的名称是:"钢笔",产品B的名称是:"橡皮擦"

使用产品的领域知识来帮助理解文本,并按以下格式回答上述4个问题:对于问题i,是的,产品A和产品B是同一个产品。或者 不,产品A和产品B是不同的产品。对于问题i+1,(重复上述程序)
################
输出:
(对于问题 <|>1<|>,&&是&&,产品A和产品B是同一个产品。)##
(对于问题 <|>2<|>,&&是&&,产品A和产品B是同一个产品。)##
(对于问题 <|>3<|>,&&不&&,产品A和产品B是不同的产品。)##
(对于问题 <|>4<|>,&&不&&,产品A和产品B是不同的产品。)##

生成社区报告

生成社区报告的逻辑位于 graphrag/general/community_reports_extractor.py 文件:

class CommunityReportsExtractor(Extractor):

  async def __call__(self, graph: nx.Graph, callback: Callable | None = None):

    # 使用 Leiden 算法来发现图中的社区结构
    # 将社区组织成一个多层级的树形结构,每个层级包含多个社区
    communities: dict[str, dict[str, list]] = leiden.run(graph, {})

    # 遍历每一个社区,从图中提取当前社区中的所有实体和关系的描述,调用大模型生成社区报告
    async with trio.open_nursery() as nursery:
      for level, comm in communities.items():
        logging.info(f"Level {level}: Community: {len(comm.keys())}")
        for community in comm.items():
          nursery.start_soon(extract_community_report, community)

    return CommunityReportsResult(
      structured_output=res_dict,
      output=res_str,
    )

整个流程比较简单,分为两步。第一步,使用 Leiden 算法 发现图中的社区结构。

在网络科学或图论中,社区(Community) 是指网络中的一组节点,其核心特征是:社区内部的节点之间连接紧密,而与社区外部节点的连接相对稀疏,这种 “内密外疏” 的结构是社区的核心标志,反映了网络中节点的聚类性和关联性。Leiden 算法是一种在图数据中识别社区结构的高效算法,由 Traag 等人在莱顿大学于 2018 年提出。它在经典的 Louvain 算法 基础上进行了改进,解决了 Louvain 算法中可能出现的 “分辨率限制” 和社区划分不精确的问题,因此在复杂网络分析中被广泛应用。

这里,RAGFlow 使用的是 graspologic 库的 hierarchical_leiden() 方法。

第二步,调用大模型为每个社区生成摘要,这被称为 社区报告(Community Report),报告以 JSON 格式输出:

{
  "title": <报告标题>,
  "summary": <执行摘要>,
  "rating": <影响严重性评级>,
  "rating_explanation": <评级说明>,
  "findings": [
    {
      "summary":<洞察1摘要>,
      "explanation": <洞察1解释>
    },
    {
      "summary":<洞察2摘要>,
      "explanation": <洞察2解释>
    }
  ]
}

包括以下几个部分:

  • 标题:代表其关键实体的社区名称,标题应简短但具体,如果可能,在标题中包含代表性的命名实体;
  • 摘要:社区整体结构的执行摘要,其实体如何相互关联,以及与其实体相关的重要信息;
  • 影响严重性评级:0-10 之间的浮点分数,表示社区内实体造成的影响严重程度;
  • 评级说明:对影响严重性评级给出一句话解释;
  • 详细发现:关于社区的 5-10 个关键洞察列表,每个洞察应有一个简短摘要,然后是根据下面的基础规则进行的多段解释性文本,要全面;

生成的社区报告可以在 ES 中通过 "knowledge_graph_kwd": "community_report" 条件检索出来:

ragflow-es-search-commutity-report.png

小结

在今天的学习中,我们深入探讨了 RAGFlow 中的知识图谱功能,我们详细了解了提取知识图谱的流程,包括:实体和关系的提取,子图的构建和合并,实体消歧和社区报告生成等。图谱生成成功后,知识库的配置页面会多出一个 “知识图谱” 的菜单项:

ragflow-kb-graph.png

通过引入知识图谱,RAGFlow 能在复杂多跳问答场景中表现得更加出色,特别是在分析具有复杂关系和实体的文档时。和昨天学习的 RAPTOR 一样,启用知识图谱功能需要大量的内存、计算资源和令牌,在使用时需要权衡利弊,建议提前在少量测试集上进行验证,只有当效果提升明显,才具有足够的性价比,才建议开启该功能。


学习 RAGFlow 的 RAPTOR 分块策略

在学习知识库配置时,我们提到了一个高级配置 —— 使用召回增强 RAPTOR 策略(use_raptor)

ragflow-raptor.png

开启该配置后,RAGFlow 会使用 RAPTOR 分块策略,这是去年提出的一种增强型文档预处理技术,旨在解决多跳问答问题,通过对文档片段进行递归聚类和摘要,构建分层树状结构,使得在长文档中实现更具上下文感知的检索。RAGFlow 从 v0.6.0 版本开始集成 RAPTOR 策略,用于文档聚类,作为其数据预处理流程中数据提取与索引之间的一部分,如下所示:

ragflow-document-clustering.jpg

使用这种新方法,RAGFlow 在复杂多步推理的问答任务上展示了 SOTA 结果,通过将 RAPTOR 与其他内置的分块或检索方法结合使用,可以进一步提升问答的准确性。

RAPTOR 简介

RAPTOR 是一种新型的信息检索方法,全称为 用于树形组织检索的递归抽象处理,由 Parth Sarthi 等人于 2024 年发表的论文 《RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval》 中提出:

传统的 RAG 方法仅从检索语料库中检索短的连续片段,这限制了大模型对整体文档上下文的全面理解,导致其在总结型、整合型的问题上表现堪忧;而 RAPTOR 则通过递归地对文本片段进行嵌入、聚类和摘要,从下到上构建具有不同摘要层级的树结构,从而提高检索的效率和问答的效果。

RAPTOR 的本质是树的构建和检索。树的构建流程如下图所示:

raptor-tree-construction.png

  1. 首先,将原始文本切分成块(chunks),然后计算每个分块的向量,这一步和传统 RAG 没有区别,这些分块构成树的叶节点层(Leaf layer);
  2. 接着,对分块进行聚类,将相似的分块归为一类,使用语言模型对每一类分块进行摘要,生成新的分块,这些分块构成树的中间层;
  3. 重复执行上述步骤,直到分块不可再分为止,即树的根节点层(Root layer);

对于每个非叶子节点,都存储着自己的索引、子节点、总结后的文本以及对应的向量。

构建好树形结构之后,就是对树的检索。RAPTOR 提供了两种检索树的方法,树遍历(Tree Traversal Retrieval)压缩树(Collapsed Tree Retrieval)

raptor-tree-retrieval.png

树遍历的流程如下:

  1. 从根节点层出发,选取跟问题最相似的 topK 个节点,记为 S1;
  2. 再在 S1 的子节点中选取跟问题最相似的 topK 个节点,记为 S2;
  3. 重复这个过程,一直执行到叶节点层为止,构成 S1、S2、S3... 等;
  4. 将 S1、S2、S3... 合并,组装为上下文用于大模型回答;

而压缩树就要简单的多了,它直接将整个树的所有节点(包括根节点和叶子节点)都放在一个集合里,然后走传统 RAG 一样的检索方法,选出 topK 个相似节点作为上下文。

RAPTOR 的实现在 Github 上开源了,可以参考其官方仓库:

开启 RAPTOR 任务

RAPTOR 的逻辑位于任务执行器的 do_handle_task() 函数中:

async def do_handle_task(task):

  # ...
  if task.get("task_type", "") == "raptor":

    # 绑定聊天模型
    chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)

    # 运行 RAPTOR 逻辑
    async with kg_limiter:
      chunks, token_count = await run_raptor(task, chat_model, embedding_model, vector_size, progress_callback)

这个函数我们之前已经详细学习过,但是跳过了 RAPTOR 相关的逻辑,今天我们就继续来看下这个 run_raptor() 的实现细节:

async def run_raptor(row, chat_mdl, embd_mdl, vector_size, callback=None):
    
  # 检索原始分块列表
  chunks = []
  vctr_nm = "q_%d_vec"%vector_size
  for d in settings.retrievaler.chunk_list(row["doc_id"], ...):
    chunks.append((d["content_with_weight"], np.array(d[vctr_nm])))

  # 初始化 RAPTOR 实例
  raptor = Raptor(
    row["parser_config"]["raptor"].get("max_cluster", 64),
    chat_mdl,
    embd_mdl,
    row["parser_config"]["raptor"]["prompt"],
    row["parser_config"]["raptor"]["max_token"],
    row["parser_config"]["raptor"]["threshold"]
  )

  # 执行 RAPTOR 分块
  chunks = await raptor(chunks, ...)
  # ...

首先通过 retrievaler.chunk_list() 检索出该文档原始的分块列表,然后使用配置参数初始化 RAPTOR 实例,并执行 RAPTOR 分块,最后返回新的分块列表。这里的几个初始化参数可以在上面的知识库配置里进行修改,解释如下:

  • 最大聚类数(max_cluster) - 要创建的最大聚类数,默认为 64,最大限制为 1024;
  • 提示词(prompt) - 使用该 Prompt 对聚类内容生成摘要,其中 {cluster_content} 作为内部参数,表示聚类的内容;
  • 最大 token 数(max_token) - 每个生成摘要的最大 token 数,默认为 256,最大限制为 2048;
  • 阈值(threshold) - 在 RAPTOR 中,分块根据其语义相似度进行聚类,该参数设置了分块被归为一组所需的最小相似度。默认值为 0.1,最大限制为 1。阈值越高,每个聚类中的分块就越少,阈值越低,则分块越多;

看到这里,细心的同学可能会有个疑问,第一次执行到这里时,还没有原始的分块列表,怎么对其进行 RAPTOR 聚类呢?

其实,对于开启了 RAPTOR 的知识库,RAGFlow 会生成两个任务。第一个任务会先执行一次标准的分块策略,生成原始的分块列表,在完成第一个任务后,会再生成一个 RAPTOR 类型的任务,执行上面的代码逻辑。我们可以在 DocumentServiceupdate_progress() 更新任务进度这里看到:

ragflow-queue-raptor-task.png

RAPTOR 的实现

RAPTOR 的实现位于 rag/raptor.py 文件中,其核心代码如下:

ragflow-raptor-deep.png

通过一个 while 循环不断迭代,其中 chunks[start:end] 表示当前层需要处理的分块范围,当 end - start > 1 时,表示当前层的分块数量大于 1,需要继续进行聚类和摘要,从而生成上一层的分块;当 end - start <= 1 时,表示当前层的分块数量小于等于 1,不需要继续进行聚类和摘要,此时 while 循环结束,表示树的构建完成。

在迭代的过程中,有几个地方值得重点关注。第一个地方是对 Embedding 向量进行 UMAP 降维:

n_neighbors = int((len(embeddings) - 1) ** 0.8)
reduced_embeddings = umap.UMAP(
  n_neighbors=max(2, n_neighbors),
  n_components=min(12, len(embeddings) - 2),
  metric="cosine",
).fit_transform(embeddings)

现代嵌入模型(如 OpenAI、BGE 等)通常生成高维向量,高维向量虽然在表征文本语义方面表现出色,但也存在一些问题:

  • 距离失效:在高维空间中,所有点之间的距离趋于相等;
  • 聚类困难:高斯混合模型在高维空间中表现不佳;
  • 计算复杂度:高维数据的协方差矩阵计算成本极高;
  • 稀疏性:高维空间中数据点变得稀疏,难以找到有意义的聚类;

简单说就是高维向量不适合聚类,因此需要进行降维处理,将高维向量映射到低维空间中,这样可以提高聚类效果。RAPTOR 使用 UMAP 算法进行降维处理,UMAP 是一种非线性降维算法,可以较好地保留数据的局部结构和全局结构,适用于嵌入向量的降维处理。

第二个地方是 _get_optimal_clusters(...) 函数,它的核心作用是自动确定最优的聚类数量:


def _get_optimal_clusters(self, embeddings: np.ndarray, random_state: int):
  max_clusters = min(self._max_cluster, len(embeddings))
  n_clusters = np.arange(1, max_clusters)
  bics = []
  for n in n_clusters:
    gm = GaussianMixture(n_components=n, random_state=random_state)
    gm.fit(embeddings)
    bics.append(gm.bic(embeddings))
  optimal_clusters = n_clusters[np.argmin(bics)]
  return optimal_clusters

n_clusters = self._get_optimal_clusters(reduced_embeddings, random_state)

这是 RAPTOR 算法中的关键步骤,它解决了聚类分析中的一个经典问题:如何确定数据应该被分成多少个聚类?

该函数通过穷举搜索,从 1 个聚类到最大聚类数逐一尝试,使用 BIC(Bayesian Information Criterion,贝叶斯信息准则) 来选择最优的聚类数量。BIC 是一种用于模型选择的统计方法,它考虑了模型的复杂度和拟合数据的程度,倾向于选择在解释力和复杂度之间取得平衡的模型。

第三个地方是使用上面确定的聚类数量,使用 高斯混合模型(Gaussian Mixture Models,GMM) 进行聚类:

from sklearn.mixture import GaussianMixture

# 创建 GMM 模型
gm = GaussianMixture(n_components=n_clusters, random_state=random_state)

# 学习每个聚类的高斯分布参数(均值、协方差、权重)
gm.fit(reduced_embeddings)

# 返回概率矩阵
probs = gm.predict_proba(reduced_embeddings)

# 找出所有概率超过阈值的聚类索引
lbls = [np.where(prob > self._threshold)[0] for prob in probs]
lbls = [lbl[0] if isinstance(lbl, np.ndarray) else lbl for lbl in lbls]

这里直接使用 scikit-learn 内置的 GaussianMixture 创建 GMM 模型,对此感兴趣的同学可以参考下官方文档:

注意,这里的 lbl 只取了第一个,也就是一个分块只放到一个聚类里,而 RAPTOR 论文指出,一个分块是可以隶属于多个聚类的,因此 RAGFlow 这里算是 RAPTOR 的一种简化版实现。

最后,针对同一个聚类中的分块,使用语言模型进行摘要,生成新的分块:

async with trio.open_nursery() as nursery:
  for c in range(n_clusters):
    ck_idx = [i + start for i in range(len(lbls)) if lbls[i] == c]
    nursery.start_soon(summarize, ck_idx)

深入分析上面的实现代码可以发现,整个实现过程中并没有看到构建树形结构,这是因为 RAGFlow 只实现了 RAPTOR 中的 压缩树 方式,所有的分块是扁平的,这种方式实现起来简单,存储和检索都方便,相比于 树遍历 方式,性价比更高。

我们可以在完成 RAPTOR 任务之后,点击文件名称,进入分块列表:

ragflow-raptor-chunks.png

可以看到,RAPTOR 分块的结果和普通分块的结果没有任何区别,都是混在一起的。

小结

今天我们深入学习了 RAPTOR 的实现,它为解决多跳问答任务提供了一个非常有趣的方法。通过递归性地聚类和摘要文档片段,创造树状的检索结构,RAPTOR 让复杂语境的检索更具效率。

RAGFlow 采用了 RAPTOR 的简化版实现,并且选择了更轻巧的压缩树方法。这种选择在性能和效果间找到了一种平衡:它既保持了检索的简单性,又充分发挥了多层次语境理解的优势。RAPTOR 一般用在长文档上,短文档就没必要用了,启用 RAPTOR 需要大量的内存、计算资源和令牌,在使用时需要权衡利弊。

另外,知识图谱也可用于多跳问答任务,这也是知识库配置中的一个高级选项,我们明天就来看看它。


学习 RAGFlow 的 DeepDoc 技术之视觉处理

我们之前已经学过,DeepDoc解析器(parser)视觉处理(vision) 两个部分组成。解析器提供了不同格式文档的通用解析方法,我们花了两天时间,对这 10 个解析器的源码做了深入分析;今天我们将学习 DeepDoc 的视觉处理部分,包括 光学字符识别(OCR,Optical Character Recognition)文档布局识别(DLR,Document Layout Recognition)表格结构识别(TSR,Table Structure Recognition) 等高级特性,可以对 PDF 文件实现更好的识别效果。

测试脚本

为了更直观的体验 OCR、DLR 和 TSR 的效果,DeepDoc 提供了两个测试脚本,位于 deepdoc/vision 目录下:

  • t_ocr.py - 调用 文本检测模型 (det.onnx)文本识别模型 (rec.onnx) 测试 OCR 功能;
  • t_recognizer.py - 调用 布局分析模型 (layout.onnx)表格结构识别模型 (tsr.onnx) 测试 DLR 和 TSR 功能;

拿一个简单的 PDF 文档测试一下 OCR 功能:

$ python deepdoc/vision/t_ocr.py --inputs /path/to/demo.pdf

运行之后会在当前目录下生成一个 ./ocr_outputs 目录,里面包含了 OCR 的结果。每个页面生成两个文件:一个是 jpg 图片,对应文本检测结果,使用黑框将检测到的文本区域框了起来;另一个是 txt 文本,包含文本识别的结果:

ocr-result-1.png

表格页面的识别结果:

ocr-result-2.png

继续使用下面的命令测试一下 DLR 功能(注意后面的 --mode layout 参数,表示布局分析模式):

$ python deepdoc/vision/t_recognizer.py --inputs /path/to/demo.pdf --mode layout

--inputs 输入可以是 PDF 文件,也可以是图片文件,还可以是包含 PDF 或图片的目录。另外,它还有一个 --output_dir OUTPUT_DIR 参数,可以指定输出位置。

运行之后会在当前目录下生成一个 ./layout_outputs 目录,里面包含了 DLR 的结果。每个页面生成一个 jpg 图片,识别到区域使用不同颜色的框框了出来:

![](./images/dlr-result-1.png)

可以注意到框的左上角还打上了标签,比如 title 标题,text 正文。下面是表格页面的识别结果:

dlr-result-2.png

左上角的标签为 table 表格,表示模型成功识别出了表格区域。

最后将上面的表格区域抠出来,另存成一个张独立的图片,测试一下 TSR 功能(注意后面的 `--mode tsr 参数,表示表格结构识别模式):

$ python deepdoc/vision/t_recognizer.py --inputs /path/to/table.png --mode tsr

运行之后的结果同样保存在 ./layout_outputs 目录,里面有一个 jpg 图片,标识出了更加详细的表格区域:

tsr-result.png

上面绿色的框左上角标签为 table row 表示行,红色的框左上角标签为 table column 表示列。另外,该目录下还生成了一个 html 文件,以 HTML 形式展示了表格的结构:

<html>
<head>
<style>
  ...
</style>
</head>
<body>
<table>
<tr><th  >姓名</th><th  >学号</th><th  >年龄</th><th  >性别</th><th  >成绩</th></tr>
<tr><td  >张三</td><td  >001</td><td  >18</td><td  >男</td><td  >100</td></tr>
<tr><td  >李四</td><td  >002</td><td  >19</td><td  >女</td><td  >99</td></tr>
<tr><td  >王五</td><td  >003</td><td  >20</td><td  >男</td><td  >98</td></tr>
</table>
</body>
</html>

文本检测

文本检测是 OCR 流水线的第一步,为后续的文本识别提供准确的文本区域定位。它的代码逻辑位于 deepdoc/vision/ocr.py 文件中,实现类 TextDetector 如下:

text-detector.png

它的核心流程分为三个部分:预处理阶段,模型推理阶段,后处理阶段。

预处理阶段

data = transform(data, self.preprocess_op)

这里的 self.preprocess_op 有点意思,它是通过下面这个 create_operators() 函数创建:

def create_operators(op_param_list, global_config=None):
  ops = []
  for operator in op_param_list:
    op_name = list(operator)[0]
    param = {} if operator[op_name] is None else operator[op_name]
    if global_config is not None:
      param.update(global_config)
    op = getattr(operators, op_name)(**param)
    ops.append(op)
  return ops

这个函数接受一个 JSON 数组,数组中的每个元素表示一个操作(这些操作定义在 deepdoc/vision/operators.py 文件中),操作的名称为 JSON 对象的键,操作的参数为 JSON 对象的值。比如下面这个 JSON 数组,表示了四个预处理操作:

pre_process_list = [{
    # 第一步:图像缩放
    # 将图像最大边长限制为 960 像素,确保输入尺寸适合模型处理
    'DetResizeForTest': {
        'limit_side_len': 960,
        'limit_type': "max",
    }
}, {
    # 第二步:图像归一化
    # 将像素值缩放到 [0,1] 范围(也就是 'scale': '1./255.')
    # 使用 ImageNet 预训练参数进行标准化(这里的 std 和 mean 这组参数并非随机设定,而是在 ImageNet 数据集上统计得到的)
    'NormalizeImage': {
        'std': [0.229, 0.224, 0.225],
        'mean': [0.485, 0.456, 0.406],
        'scale': '1./255.',
        'order': 'hwc'
    }
}, {
    # 第三步:通道转换
    # 将图像的通道顺序从 HWC(高、宽、通道)转换为 CHW(通道、高、宽),符合模型的输入要求
    'ToCHWImage': None
}, {
    # 第四步:数据筛选
    # 只保留 'image' 和 'shape' 两个关键字段
    'KeepKeys': {
        'keep_keys': ['image', 'shape']
    }
}]

模型推理阶段

for i in range(100000):
  try:
    outputs = self.predictor.run(None, input_dict, self.run_options)
    break
  except Exception as e:
    if i >= 3:
      raise e
    time.sleep(5)

调用 self.predictor.run() 进行推理,包含重试机制,最多重试 3 次。这里的 self.predictor 是通过 load_model() 函数加载的 det.onnx 文本检测模型:

self.predictor, self.run_options = load_model(model_dir, 'det', device_id)

它会根据机器上是否安装 CUDA 库来选择使用 GPU 还是 CPU 进行推理:

import onnxruntime as ort

def load_model(model_dir, nm, device_id: int | None = None):
  # ...
  run_options = ort.RunOptions()
  if cuda_is_available():
    cuda_provider_options = {
      "device_id": device_id, # Use specific GPU
      "gpu_mem_limit": 512 * 1024 * 1024, # Limit gpu memory
      "arena_extend_strategy": "kNextPowerOfTwo",  # gpu memory allocation strategy
    }
    sess = ort.InferenceSession(
      model_file_path,
      options=options,
      providers=['CUDAExecutionProvider'],
      provider_options=[cuda_provider_options]
      )
    run_options.add_run_config_entry("memory.enable_memory_arena_shrinkage", "gpu:" + str(device_id))
    logging.info(f"load_model {model_file_path} uses GPU")
  else:
    sess = ort.InferenceSession(
      model_file_path,
      options=options,
      providers=['CPUExecutionProvider'])
    run_options.add_run_config_entry("memory.enable_memory_arena_shrinkage", "cpu")
    logging.info(f"load_model {model_file_path} uses CPU")
  loaded_model = (sess, run_options)
  return loaded_model

可以看出,模型推理使用的是 ONNX Runtime 库。

后处理阶段

post_result = self.postprocess_op({"maps": outputs[0]}, shape_list)
dt_boxes = post_result[0]['points']
dt_boxes = self.filter_tag_det_res(dt_boxes, ori_im.shape)

这里的 self.postprocess_op 和上面的预处理操作类似,也是通过一个 JSON 配置创建的:

postprocess_params = {
  "name": "DBPostProcess",
  # 二值化阈值,用于将概率图转换为二值图
  "thresh": 0.3,
  # 文本框置信度阈值,过滤低质量检测框
  "box_thresh": 0.5,
  # 最大候选框数量
  "max_candidates": 1000,
  # 文本框扩展比例,防止文本被裁切
  "unclip_ratio": 1.5,
  "use_dilation": False,
  # 评分模式选择,支持 fast 和 slow 两种
  "score_mode": "fast",
  "box_type": "quad"
}

这个后处理操作 DBPostProcess 定义在 deepdoc/vision/postprocess.py 文件中。DeepDoc 的文本检测模型使用了 DB(Differentiable Binarization)算法,最终输出文本区域的 概率图 (Probability Map),表示每个像素是文本的概率。因此后处理阶段的任务就是从概率图中提取出文本区域的坐标框。

DB(Differentiable Binarization)算法介绍

DB(Differentiable Binarization)算法即可微分二值化算法,由百度的 PaddleOCR 团队提出,主要用于场景文本检测领域。以下是具体介绍:

  • 算法背景:基于分割的场景文本检测需将分割产生的概率图转化为边界框和文字区域,二值化是其中关键的后处理过程。常规二值化通过设定固定阈值,难以适应复杂多变的检测场景,且因不可微分无法在训练阶段随分割网络优化,DB 算法则旨在解决这些问题。
  • 核心原理:DB 算法将传统图像分割流程中的二值化操作引入到网络中,并使之可微分。它利用一个可微分的近似替代,这种处理方式既在前向推理时近似于二值化,又允许梯度传递,从而能在训练过程中优化网络参数,实现端到端的训练。
  • 网络结构:整体采用 全卷积网络(FCN) 架构,由 Backbone、FPN 和 DB Head 组成。首先通过 Backbone(如 MobileNetV3 或 ResNet50)提取图像特征,再利用 FPN 模块融合多尺度特征,最后由 DB Head 同时输出文本区域概率图和阈值图,用于后续的可微分二值化处理。
  • 算法优势:一是结构简单,基于 FCN 框架,易于实现和优化;二是高效推理,轻量级结构适合移动端及实时应用;三是检测效果好,能够自适应学习最佳阈值,将文字区域与背景更好地区分开,还能把相近的实例分离开来,提升文本检测的准确性。
  • 应用场景:适用于对实时性有较高要求且背景复杂的任务,如自然场景文本检测、移动端 OCR 应用等,许多手机 OCR 应用采用 DB 算法实现低延迟的实时文字检测。

文本识别

文本检测之后,我们得到了图片中各个文本区域的坐标框,然后我们可以根据这些坐标,将文本区域从原图中裁剪出来,这些裁剪出来的子图,就可以丢给文本识别模型进行识别了。

文本识别的代码逻辑同样位于 deepdoc/vision/ocr.py 文件中,实现类 TextRecognizer 如下:

text-recognizer.png

它接受的参数是子图列表,根据 batch_num 按批次进行识别,默认 16 张图一个批次。针对每个批次,它的核心流程也可以分为三个部分:

  • 预处理阶段:通过 resize_norm_img() 对批次中的每张图片进行预处理,主要包括尺寸调整、数据类型转换、通道调整、归一化等步骤,然后将预处理后的图片拼接成一个大图,作为模型的输入;
  • 模型推理阶段:和文本检测一样,使用 ONNX Runtime 执行推理,加载的模型为 rec.onnx 文本识别模型,该模型基于 CTC(Connectionist Temporal Classification)算法 实现;
  • 后处理阶段:从 ocr.res 文件加载字符映射表,通过 CTC 解码得到文本和置信度,并基于置信度阈值过滤低质量结果。

CTC(Connectionist Temporal Classification)算法介绍

CTC(Connectionist Temporal Classification)算法即联结主义时序分类算法,是一种常用于语音识别、文本识别等领域的算法,主要用于解决输入和输出序列长度不一、无法对齐的问题。以下是其详细介绍:

  • 算法背景:在语音识别、文本识别等序列标注任务中,输入序列(如语音信号帧序列、文本图像特征序列)和输出序列(如文字序列)长度往往是可变的,且它们之间难以找到严格的对齐关系。传统的序列标注算法需要每一时刻输入与输出符号完全对齐,这就导致在处理此类问题时面临巨大挑战,标注成本极高且难度大。CTC 算法由 DeepMind 研究员 Alex Graves 等人在 ICML 2006 上提出,旨在解决这些问题,实现端到端的序列学习。
  • 核心原理:CTC 引入了一个特殊的空白字符 “blank”,输出序列中的每个元素可以是正常字符或空白字符。对于给定的输入序列,模型输出一系列概率分布,每个时间步的输出表示该时刻出现不同字符(包括空白字符)的概率。通过合并相邻重复字符并删除空白字符,将包含空白字符的输出序列映射到最终的目标标签序列。CTC 的目标是最大化给定输入序列下,正确目标标签序列的概率,通常通过计算负对数似然损失函数来进行模型训练,利用动态规划的前向后向算法高效计算损失函数及其导数,从而更新模型参数。
  • 应用场景

    • 语音识别:可将连续的语音信号映射到文字序列,实现自动语音转写,无需对语音信号进行复杂的帧级对齐标注,广泛应用于语音助手、语音转文字软件等。
    • 文本识别:常用于光学字符识别(OCR),将图像中的文字映射到文字序列,能处理不同长度的文字图像,提高识别效率和准确性。
    • 其他序列标注任务:还可应用于动态手势识别、声音事件检测、动作标注、唇读等领域,只要是涉及序列输入输出且存在对齐困难的任务,都可以考虑使用 CTC 算法。

文档布局分析

来自不同领域的文档可能有不同的布局,如报纸、杂志、书籍和简历在布局方面往往是不同的。只有当机器有准确的布局分析时,它才能决定这些文本部分是连续的还是不连续的,或者这个部分需要使用表结构识别来处理,或者这个部分是一个配图并用这个标题来描述。DeepDoc 支持 10 个基本布局组件,涵盖了大多数情况:

  • 文本(Text)
  • 标题(Title)
  • 配图(Figure)
  • 配图标题(Figure caption)
  • 表格(Table)
  • 表格标题(Table caption)
  • 页头(Header)
  • 页尾(Footer)
  • 参考引用(Reference)
  • 公式(Equation)

布局分析的代码逻辑位于 deepdoc/vision/layout_recognizer.py 文件中,实现类 LayoutRecognizer4YOLOv10 如下:

layout-recognizer.png

从类名可以看出,它是一个基于 YOLOv10 模型的文档布局识别器,通过加载 layout.onnx 模型文件,对输入图片进行推理,得到检测到的布局区域坐标和类型。

表结构识别

表格是一种常用的结构,经常出现在各类文档中,它的结构可能非常复杂,比如层次结构标题、跨单元格和投影行标题等。在 RAG 领域,对表格的处理和理解,直接影响着对表格内数据的问答效果。我们一般先通过布局分析,定位到文档中的表格区域,然后将其裁剪出来,使用专门的表结构识别模型进行识别。DeepDoc 支持 5 中不同的表结构:

  • 列(table column)
  • 行(table row)
  • 列标题(table column header)
  • 行标题(table projected row header)
  • 合并单元格(table spanning cell)

表结构识别的代码逻辑位于 deepdoc/vision/table_structure_recognizer.py 文件中,实现类 TableStructureRecognizer 如下:

table-structure-recognizer.png

它的调用流程和之前的模型基本一致,前处理,模型推理,后处理,这里使用的是 tsr.onnx 这个表结构识别模型,最后返回的是表格区域的坐标和类型。

小结

本文深入探讨了 DeepDoc 中的视觉处理部分,包括光学字符识别(OCR)、文档布局识别(DLR)和表格结构识别(TSR)的实现原理及技术细节。这几个类的实现都比较复杂,特别是调用每个模型的前处理和后处理部分,非常烧脑,有不少逻辑我也没看懂,但是这并不妨碍我们了解 RAGFlow 解析 PDF 文件的大致思路,权当是扩展下课外知识吧。

本文介绍的四个视觉模型都可以从 HuggingFace 下载

huggingface-deepdoc-files.png

而且可以看到,这几个模型都很轻量级,最大的布局检测模型也就只有 75MB 而已。另外,官方还提供了几个领域定制的布局识别模型,比如 layout.laws.onnx 用于法律文书,layout.manual.onnx 用于手册,layout.paper.onnx 用于论文,感兴趣的朋友可以自己尝试下。


学习 RAGFlow 的 DeepDoc 技术之解析器(二)

在上一篇文章中,我们学习了 RAGFlow 的 DeepDoc 技术,并对 DeepDoc 的 10 大解析器做了个概览,目前我们已经学习了其中的 3 种解析器,包括:DOCX 解析器、Excel 解析器 和 PPT 解析器。今天我们继续学习剩下的解析器。

TXT 解析器

TXT 解析器的实现类为 RAGFlowTxtParser,代码如下:

txt-parser.png

它通过 get_text() 直接读取文本,并使用 parser_txt() 对文本进行分块处理,默认分块大小为 128,分隔符为 "\n!?;。;!?"。具体的分块逻辑如下:

  • delimiter 中提取出所有的分割符;支持多字符分隔符,确保用反引号(``)把它们括起来;比如 `n`##`;`,表示在行尾、双井号(##)和分号处进行分隔;
  • 使用 | 将所有的分割符拼接(这是一个合法的正则表达式),并使用 re.split() 对原文本进行分割得到 secs 数组;
  • 遍历 secs 数组,如果数据和分隔符一样,忽略之,否则加入分块;这一点感觉有点粗暴,比如 hello!world! 分块后的结果为 helloworld,分割符号会丢掉;
  • 使用 tiktokencl100k_base 编码器计算分块的 token 数量,如果没有超过 chunk_token_num,则进行合并,最终得到 cks 数组;

注意解析器最后返回的 [[c, ""] for c in cks] 格式,这个 [c, ""] 是为了统一不同解析器的返回,第一个元素表示文本内容,第二个元素表示位置信息或其他元数据。可以在 native 分块器代码里看到其他的解析器的处理:

txt-parser-native-chunk.png

吐槽一句,RAGFlow 中解析器返回的格式有点乱,有的是在解析器里返回这个格式,有的是在分块器中拼这个格式。

HTML 解析器

HTML 解析器的实现类为 RAGFlowHtmlParser,代码如下:

html-parser.png

这个解析器的实现比较简单,直接使用 readability-lxmlhtml-text 两个库来读取 HTML 标题和内容,最后返回使用换行符分割后的数组。

可以看到,这个返回格式和 TXT 解析器就不一样,因此要在分块器中进行适配。

这里比较有意思的是它对 HTML 文件编码的处理,我们可以参考下,使用 chardet 来检测文件编码:

import chardet

all_codecs = [
  'utf-8', 'gb2312', 'gbk', 'utf_16', 'ascii', 'big5', 'big5hkscs',
  # 这里省略一堆编码 ...
]

def find_codec(blob):
  detected = chardet.detect(blob[:1024])
  if detected['confidence'] > 0.5:
    if detected['encoding'] == "ascii":
      return "utf-8"

  for c in all_codecs:
    try:
      blob[:1024].decode(c)
      return c
    except Exception:
      pass
    try:
      blob.decode(c)
      return c
    except Exception:
      pass

  return "utf-8"

Markdown 解析器

Markdown 解析器的实现类为 RAGFlowMarkdownParser,代码如下:

markdown-parser.png

和之间的解析器不一样的是,这里只有一个 extract_tables_and_remainder() 函数,用于从 Markdown 文本中提出表格数据和剩余的文本内容。从代码中可以看到它支持三种不同的表格形式:

  • 标准 Markdown 表格
  • 无边框表格
  • HTML 表格

下面是三种表格的示例:

# 标准 Markdown 表格

| 姓名 | 年龄 |
| ---- | ---- |
| 张三 | 18   |
| 李四 | 19   |

# 无边框表格

姓名 | 年龄 
---- | ---- 
张三 | 18   
李四 | 19   

# HTML 表格

<table>
  <tr>
    <th>姓名</th>
    <th>年龄</th>
  </tr>
  <tr>
    <td>张三</td>
    <td>18</td>
  </tr>
  <tr>
    <td>李四</td>
    <td>19</td>
  </tr>
</table>

光看这里的代码可能有点蒙,这个解析器咋就一个函数,入口都没有。很显然,这又是 RAGFlow 解析器不规范的一个例子。实际上,在 native 分块器里可以看到 Markdown 解析器的完整实现:

markdown-parser-native-chunk.png

从 Markdown 提取出表格内容和剩余文本后,文本段落按标题进行分块,表格部分使用 markdown 库将表格转换为 HTML 格式;最后返回两部分:文本段落(sections) 和 表格(tbls),用于后续的 RAG 处理。

看来,解析器和分块器的代码耦合还是很多啊!

JSON 解析器

JSON 解析器的实现类为 RAGFlowJsonParser,代码如下:

json-parser.png

这是一个挺有意思的解析器,实现了 JSON 的分块算法,它通过递归的方式将 JSON 对象切分成多个小块,确保每个小块的大小不超过 max_chunk_size 限制,并且在切分过程中保持 JSON 的层次结构,确保每个小块都是有效的 JSON 对象。

分块算法的核心逻辑如下:

  • 首先,使用 json.loads 将 JSON 解析为 Python 对象;
  • 接着,将对象中的所有数组转换为字典格式,比如 ["a", "b", "c"] 转换为 {"0": "a", "1": "b", "2": "c"}
  • 然后遍历对象中的每个键值对,根据键值对的类型进行不同的处理:

    • 如果值不是字典(字符串、数字等),直接添加到当前分块;
    • 如果值是字典,则遍历字典中的每个键值对;判断当前键值对的大小是否超过限制,没有的话,将键值对加到当前分块;如果超出限制,则创建新分块,递归处理;
  • 最后返回分块数组,数组里的每一项转化为 JSON 字符串;

下面是一个简单的 JSON 示例:

{
  "user": {"name": "Alice", "age": 30},
  "tags": ["python", "json"]
}

首先将数组转换为字典格式:

{
  "user": {"name": "Alice", "age": 30},
  "tags": {"0": "python", "1": "json"}
}

假设分块大小限制为 30,那么分块后,变成两个 JSON 字符串,如下:

[
  "{'user': {'name': 'Alice', 'age': 30}}",
  "{'tags': {'0': 'python', '1': 'json'}}"
]

图片解析器

图片解析器的实现类为 VisionFigureParser,代码如下:

figure-parser.png

这个解析器的实现很简单,直接使用视觉大模型对图片进行描述,Prompt 如下:

## 角色
你是一位专业的视觉数据分析师。

## 目标
分析图像并对其内容进行全面描述,重点在于识别视觉数据呈现的类型(例如柱状图、饼图、折线图、表格、流程图)、其结构以及图像中包含的任何文字说明或标签。

## 任务
1. 描述视觉呈现的整体结构,明确它是图表、图形、表格还是示意图。
2. 识别并提取图像中存在的任何轴、图例、标题或标签。在可获得的情况下提供确切文本。
3. 从视觉元素中提取数据点(例如柱高、折线图坐标、饼图扇区、表格行和列)。
4. 分析并解释数据中显示的任何趋势、对比或模式。
5. 捕捉任何注释、说明或脚注,并解释它们与图像的相关性。
6. 只包含图像中明确存在的细节。如果某个元素(例如轴、图例或说明)不存在或不可见,请勿提及。

## 输出格式(只包含与图像内容相关的部分)
- 视觉类型:[类型]
- 标题:[标题文本,如有]
- 轴/图例/标签:[细节,如有]
- 数据点:[提取的数据]
- 趋势/见解:[分析和解读]
- 说明/注释:[文本及相关性,如有]

> 确保分析的准确性、清晰度和完整性,只包含图像中存在的信息。避免对缺失元素做出不必要的陈述。

简历解析器

简历解析器是 RAGFlow 中的一类特殊解析器,它专门针对简历文件而打造,为 HR 和招聘场景提供了强大的简历处理能力,将非结构化的简历文档转换为可搜索、可分析的结构化数据。

个人感觉将简历处理逻辑放在分块器模块不是更合适么?解析器里都是一些通用的文档处理逻辑。

这个解析器的实现位于 deepdoc/parser/resume,目录结构如下:

tree deepdoc/parser/resume 
deepdoc/parser/resume
├── __init__.py
├── entities
│   ├── __init__.py
│   ├── corporations.py         - 公司处理
│   ├── degrees.py              - 学历处理
│   ├── industries.py           - 行业处理
│   ├── regions.py              - 地区处理
│   ├── res                     - 一些资源数据
│   │   ├── corp.tks.freq.json
│   │   ├── corp_baike_len.csv
│   │   ├── corp_tag.json
│   │   ├── good_corp.json
│   │   ├── good_sch.json
│   │   ├── school.rank.csv
│   │   └── schools.csv
│   └── schools.py             学校处理
├── step_one.py                - 第一步,数据处理和基础字段提取
└── step_two.py                - 第二步,深度解析和特征提取

整个简历的处理分为两步:

  • step_one.py - 第一步,数据处理和基础字段提取,包括姓名、性别、出生日期、联系方式、公司、教育背景、工作经历等基础信息;
  • step_two.py - 第二步,深度解析和特征提取,对基础信息进行补充,这一部分的处理非常细致,很有参考价值。比如对学校的处理:识别985/211/双一流等标签,获取学校的排名信息,海外知名学校识别等等;比如对公司的处理:公司名称标准化,好公司识别和标签生成,公司规模和行业分类等;

简历解析器整体结构比较散乱,代码写的也是一言难尽,整个模块没有明确的调用入口。不过,我们可以在简历分块器的代码(rag/app/resume.py)中找到它的调用逻辑:

resume-chunk.png

可以看到,这里调用了一个 http://127.0.0.1:61670/tog 接口,简历解析器处理的是这个接口返回的 JSON 数据。不过,我在源码里搜了一圈,都没找到 61670 这个服务的相关说明,后来在官方的 Issues 中看到也有人提过这个疑问,官方答复是暂不计划开源 o(╥﹏╥)o

PDF 解析器

PDF 解析器作为压轴出场的最后一个解析器,也是最重要、最复杂的一个,包括 RAGFlowPdfParserPlainParserVisionParser 三种不同实现;其中 RAGFlowPdfParser 是默认实现,依赖 DeepDoc 的视觉处理部分,使用 OCR、表格识别、布局分析等视觉模型,深度分析 PDF 文档,有效地提取文档标题、文本块、图像和表格等内容;PlainParser 则是直接使用 pypdf 提取 PDF 文本,不进行任何布局分析;VisionParser 则是将页面转成图片,然后使用视觉大模型提取文档内容。

VisionParser 的实现如下:

pdf-parser-version.png

它首先通过 __images__ 方法将 PDF 中的每一页转换成图片(通过 pdfplumberPage.to_image() 生成),然后遍历每一页的图片,调用视觉模型对图片进行描述,提取出图片中的文字内容。使用的 Prompt 如下:

## 说明
将提供的PDF页面图像中的内容转录为清晰的Markdown格式。

- 仅输出从图像中转录的内容。
- 不要输出本说明或任何其他解释。

## 规则
1. 不要生成示例、演示或模板。
2. 不要输出任何额外文本,如“示例”、“示例输出”等类似内容。
3. 不要生成任何表格、标题或图像中未明确呈现的内容。
4. 逐字转录内容。不要修改、翻译或遗漏任何内容。
5. 不要解释Markdown或提及你正在使用Markdown。
6. 不要将输出包裹在```markdown或```块中。
7. 仅根据图像的布局,对标题、段落、列表和表格应用Markdown结构。除非图像中确实存在表格,否则不要创建表格。
8. 严格按照图像所示,保留原始语言、信息和顺序。

{% if page %}
在转录的末尾,添加页面分隔符:`--- Page {{ page }} ---`。
{% endif %}

> 如果你在图像中未检测到有效内容,请返回空字符串。

PlainParser 的实现如下:

pdf-parser-plain.png

它使用 pypdf 库的 PdfReader 读取 PDF 文件,通过 page.extract_text() 提取页面上的文本内容,通过 pdf.outline 递归解析 PDF 的书签或大纲结构。

RAGFlowPdfParser 的实现如下:

pdf-parser.png

别看这里只有短短 8 行代码,但是它是整个 PDF 解析的核心,它通过 OCR、布局分析、表格识别等多个模型协同工作,高质量地提取 PDF 文档中的各种内容。整个 PDF 解析的流程如下:

  1. __images__(fnm, zoomin) - 这是解析流程的第一步和核心基础,负责将 PDF 页面转换为图像并执行 OCR 识别,为后续的所有处理步骤提供基础数据。它首先使用 pdfplumber 将 PDF 页面转换为图像;然后异步并发调用 self.__ocr() 对每个页面进行 OCR 处理;处理流程包括三步:

    • 文本检测:调用 self.ocr.detect() 函数使用深度学习模型检测图像中的文本区域;
    • 字符合并:找到与 PDF 原生字符重叠的 OCR 文本框,检查字符高度与文本框高度的匹配度(阈值70%),将匹配的字符分配到对应的文本框中;接着对文本框中的字符垂直排序,并将字符按顺序拼接成完整文本;
    • 文本识别:使用 get_rotate_crop_image() 从原图中裁剪出文本区域,然后使用 self.ocr.recognize_batch() 对所有裁剪图像进行文本识别,将识别结果赋值给对应的文本框;
  2. _layouts_rec(zoomin) - 使用 LayoutRecognizer 对页面进行文档布局分析,识别和标注不同类型的文档元素(如标题、正文、表格、图片等);
  3. _table_transformer_job(zoomin) - 这一步对已识别的表格区域进行更进一步的结构分析,识别表格的行、列、表头、合并单元格等细粒度结构信息;它首先从上一步的结果中筛选出所有表格区域,然后根据区域裁剪出表格图像,再调用 TableStructureRecognizer 对所有表格图像进行批量结构识别;
  4. _text_merge() - 合并水平方向上相邻的文本框。通过 self._y_dis() 方法计算两个文本框中心点的垂直距离,如果距离小于平均行高的 1/3,则认为它们是在同一行;
  5. _concat_downward() - 对文档中的文本框进行排序。首先按照 top 坐标从上到下排序,当两个文本框的垂直位置差距小于阈值 threshold 时,按照 x0 坐标从左到右排序,模拟人类的阅读顺序;
  6. _filter_forpages() - 去除对内容理解没有帮助的目录和前言页面,让最终提取的文本更加干净和有用。首先检测包含目录标题的文本框(如 "contents"、"目录" 等),删除该标题以及后续相关的目录条目;如果没有找到明确的目录标题,函数会检测包含大量省略号的页面,这些通常是目录页面的特征(章节标题和页码之间用省略号连接);
  7. _extract_table_figure() - 提取和处理文档中的表格和图片。首先遍历所有文本框,根据 layout_type 识别表格和图片区域,然后找到距离表格或图片最近的标题与之关联;另外还支持将跨页的表格合并为完整的表格,调用 self.tbl_det.construct_table() 进行表格结构识别,以 HTML 格式返回表格内容;
  8. __filterout_scraps() - 这是解析流程的最后一步,负责过滤掉无用的文本碎片并生成最终的文本输出。它根据文本块是否包含项目符号或标题,且有足够的高度和宽度来决定是否保留;最后通过 _line_tag() 方法为每行文本添加位置标签,用于后续的位置追踪和引用。

整个解析的逻辑是非常复杂的,而且 RAGFlow 的代码写得相当晦涩难懂,几乎没有注释,强烈建议大家结合大模型来阅读。

小结

本文深入学习了 RAGFlow 的 DeepDoc 技术中剩余的 7 种解析器,包括 TXT、HTML、Markdown、JSON、图片、简历和 PDF 解析器。其中 PDF 解析器最为复杂,包含三种不同实现方式,RAGFlowPdfParser 是默认实现,依赖视觉模型实现高质量的 PDF 解析。它在处理过程中使用了下面这些 InfiniFlow/deepdoc 模型:

  • 文本检测模型 (det.onnx)
  • 文本识别模型 (rec.onnx)
  • 布局分析模型 (layout.onnx)
  • 表格结构识别模型 (tsr.onnx)

下篇文章我们将学习 DeepDoc 技术中的视觉处理部分,瞅瞅这些模型的具体实现和工作原理。


学习 RAGFlow 的 DeepDoc 技术之解析器

我们昨天将任务执行器中的 do_handle_task() 函数从头到尾梳理了一遍,详细学习了 RAGFlow 的文件解析和分块逻辑。其中还遗漏了一些关键技术点,包括 DeepDoc 深度解析文档、RAPTOR 分块策略、GraphRAG 构建知识图谱,以及自动提取关键字、自动生成问题、标签集的构建和使用等高级配置。让我们一样一样来,先从 DeepDoc 技术学起。

在知识库的配置页面,当我们选择与 PDF 兼容的切片方法时,比如 GeneralManualPaperBookLawsPresentationOne 这几个,会出现 PDF 解析器下拉菜单:

ragflow-kb-configuration.png

默认支持 DeepDocNative 两种方式:

  • DeepDoc - 基于 OCR、表格识别、布局分析等视觉模型,深度分析 PDF 文档,有效地提取文档标题、文本块、图像和表格等内容;
  • Native - 仅提取 PDF 中的纯文本内容;当用户的 PDF 仅包含纯文本时,可以选择该选项,从而减少整体解析时间;

目前最新版本中还有个实验特性,如果用户配置了第三方视觉模型(也就是之前配置的 Img2txt 模型),也可以使用这些视觉模型来做 PDF 解析。

ragflow-pdf-parser.png

我们重点关注 DeepDoc 方式,因为这是 RAGFlow 的默认方式,也是最强大的方式。

注意 DeepDoc 选项暂时不适用于 Word 文档,若要使用该功能,可以将 Word 文档转换为 PDF 格式。

DeepDoc 简介

DeepDoc 是 RAGFlow 的核心特性之一,可以对来自不同领域、具有不同格式和不同检索要求的文档进行深度分析,它负责将各类复杂文档转换为结构化的文本格式,为后续的分块和向量化处理打下基础。从 v0.17.0 版本开始,RAGFlow 将数据提取任务与分块方法解耦,成为两个独立的模块。这种解耦的设计具有更高的灵活性,支持更深层次的定制,以适应更复杂的用例。用户能够自主选择适合的视觉模型,在速度和性能之间取得平衡,以满足具体的用例需求。

分块的逻辑位于 rag/app 目录:

$ tree rag/app
rag/app
├── __init__.py
├── audio.py
├── book.py
├── email.py
├── laws.py
├── manual.py
├── naive.py
├── one.py
├── paper.py
├── picture.py
├── presentation.py
├── qa.py
├── resume.py
├── table.py
└── tag.py

这些文件就对应我们之前学习过的十四种不同的 分块器(Chunker),RAGFlow 会根据不同的文档类型使用不同的分块器,对分块逻辑感兴趣的朋友可以研究下这里的代码。有很多分块器依赖于 DeepDoc 来解析文档,DeepDoc 的逻辑位于 deepdoc 目录:

$ tree deepdoc
deepdoc
├── __init__.py
├── parser
│   └── ...
└── vision
    ├── layout_recognizer.py
    ├── ocr.py
    └── table_structure_recognizer.py

从目录结构可以看出,DeepDoc 由两个部分组成:解析器(parser)视觉处理(vision),解析器提供了不同格式文档的通用解析方法,供分块器使用;而视觉处理部分则提供了 OCR、表格识别、布局分析等高级特性,可以对 PDF 和 图片等文件实现更好的识别效果。分块器、解析器和视觉处理这三者之间的关系如下:

ragflow-deepdoc.png

解析器概览

今天我们主要关注 DeepDoc 的解析器部分,视觉处理部分我们放到下一篇文章中学习。

$ tree deepdoc/parser
deepdoc/parser
├── __init__.py
├── docx_parser.py
├── excel_parser.py
├── figure_parser.py
├── html_parser.py
├── json_parser.py
├── markdown_parser.py
├── pdf_parser.py
├── ppt_parser.py
├── resume
│   ├── step_one.py
│   └── step_two.py
├── txt_parser.py
└── utils.py

deepdoc/parser 目录下的文件可以看到,DeepDoc 内置了 10 种不同的解析器,支持 Word、Excel、PPT、PDF、HTML、Markdown、JSON、图片等文件的处理和解析;此外,由于简历是一种非常复杂的文档,包含各种格式的非结构化文本,可以被解析为包含近百个字段的结构化数据,RAGFlow 还内置了一个专门的简历解析器。

DOCX 解析器

DOCX 解析器的实现类为 RAGFlowDocxParser,代码如下:

docx-parser.png

它使用 python-docx 读取 docx 文件,将其解析为结构化的文本和表格数据,主要包括两个部分:

  • 段落解析:遍历每个 paragraphrun(在 python-docx 中,paragraph 表示段落,run 表示具有相同样式的一段连续文本,它是段落的更细分单位);提取出文本和样式信息,格式为 (text, style_name)
  • 表格解析:遍历每个 tablerowscells,然后将表格转换为 pandas 的 DataFrame 对象;使用了一些规则来识别表头和数据类型,然后按"表头: 内容"的格式返回组织好的表格数据,比如 表头1:内容;表头2:内容;...

注意它只对 docx 文件生效,对于 doc 文件,在分块器代码里可以看到使用 tika 来读取,目前这个并没有放到解析器里。

假设 docx 文件中含有如下段落内容:

docx-parser-1.png

解析后的 secs 如下:

[
  ('标题一', 'Heading 1'), 
  ('这里是第一段正文。', 'Normal'), 
  ('标题二', 'Heading 1'), 
  ('这里是第二段正文。', 'Normal')
]

假设 docx 文件中含有如下表格内容:

docx-parser-2.png

解析后的 tbls 如下:

[
  [
    '姓名: 张三;学号: 001;年龄: 18;性别: 男;成绩: 100',
    '姓名: 李四;学号: 002;年龄: 19;性别: 女;成绩: 99',
    '姓名: 王五;学号: 003;年龄: 20;性别: 男;成绩: 98'
  ]
]

Excel 解析器

Excel 解析器的实现类为 RAGFlowExcelParser,代码如下:

excel-parser.png

它的实现比较简单,首先通过 _load_excel_to_workbook() 方法将 Excel 文件加载为 Workbook 对象,然后遍历每个 Worksheet 的表格数据,将其转换为类似上面 Word 表格解析后的格式,方便后续文本向量化处理。

解析器支持自动识别 Excel 或 CSV 格式:

if not (file_head.startswith(b'PK\x03\x04') or file_head.startswith(b'\xD0\xCF\x11\xE0')):
  
  # 不是 Excel 文件
  df = pd.read_csv(file_like_object)
  return RAGFlowExcelParser._dataframe_to_workbook(df)

return load_workbook(file_like_object,data_only= True)

它通过文件头检测技术判断文件是否为 Excel 文件,如果前 4 字节是 PK\x03\x04 表示是 XLSX 文件,如果是 \xD0\xCF\x11\xE0 表示是旧版 XLS 文件;对于 Excel 文件,程序使用 openpyxlload_workbook 来读取;对于 CSV 文件,则使用 pandas 的 read_csv 来读取,然后调用 _dataframe_to_workbook() 方法将其转换为 Workbook 对象,便于后续的统一处理。

PPT 解析器

PPT 解析器的实现类为 RAGFlowPptParser,代码如下:

ppt-parser.png

它使用 python-pptx 来读取 PPT 文件,通过遍历幻灯片中的所有形状,提取出所有的纯文本数据。形状(Shape) 是 PPT 文件中的一个重要概念,包含各种类型的元素:文本框、图片、表格、图表、图形、组合形状等。函数 __extract(shape) 从形状中提取内容,核心逻辑如下:

  • 文本框:提取段落文本,保留项目符号和缩进结构
  • 表格:转换为键值对格式(第一行作为表头)
  • 组合形状:按位置顺序递归处理内部的所有形状
  • 其他:返回空字符串

这里有一个细节,程序在遍历形状时,会按垂直位置和水平位置排序,确保文本顺序符合视觉阅读顺序。

未完待续

今天我们学习了 RAGFlow 的 DeepDoc 技术,它由解析器和视觉处理两部分组成,将各类复杂文档转换为结构化的文本数据,为分块和向量化打下基础。我们详细学习了其中的 3 种解析器,包括:DOCX 解析器、Excel 解析器 和 PPT 解析器。

今天的内容就这么多,我们将在下一篇继续学习剩下的解析器。