Fork me on GitHub

分类 RAGFlow 下的文章

学习 RAGFlow 的 DeepDoc 技术之解析器

我们昨天将任务执行器中的 do_handle_task() 函数从头到尾梳理了一遍,详细学习了 RAGFlow 的文件解析和分块逻辑。其中还遗漏了一些关键技术点,包括 DeepDoc 深度解析文档、RAPTOR 分块策略、GraphRAG 构建知识图谱,以及自动提取关键字、自动生成问题、标签集的构建和使用等高级配置。让我们一样一样来,先从 DeepDoc 技术学起。

在知识库的配置页面,当我们选择与 PDF 兼容的切片方法时,比如 GeneralManualPaperBookLawsPresentationOne 这几个,会出现 PDF 解析器下拉菜单:

ragflow-kb-configuration.png

默认支持 DeepDocNative 两种方式:

  • DeepDoc - 基于 OCR、表格识别、布局分析等视觉模型,深度分析 PDF 文档,有效地提取文档标题、文本块、图像和表格等内容;
  • Native - 仅提取 PDF 中的纯文本内容;当用户的 PDF 仅包含纯文本时,可以选择该选项,从而减少整体解析时间;

目前最新版本中还有个实验特性,如果用户配置了第三方视觉模型(也就是之前配置的 Img2txt 模型),也可以使用这些视觉模型来做 PDF 解析。

ragflow-pdf-parser.png

我们重点关注 DeepDoc 方式,因为这是 RAGFlow 的默认方式,也是最强大的方式。

注意 DeepDoc 选项暂时不适用于 Word 文档,若要使用该功能,可以将 Word 文档转换为 PDF 格式。

DeepDoc 简介

DeepDoc 是 RAGFlow 的核心特性之一,可以对来自不同领域、具有不同格式和不同检索要求的文档进行深度分析,它负责将各类复杂文档转换为结构化的文本格式,为后续的分块和向量化处理打下基础。从 v0.17.0 版本开始,RAGFlow 将数据提取任务与分块方法解耦,成为两个独立的模块。这种解耦的设计具有更高的灵活性,支持更深层次的定制,以适应更复杂的用例。用户能够自主选择适合的视觉模型,在速度和性能之间取得平衡,以满足具体的用例需求。

分块的逻辑位于 rag/app 目录:

$ tree rag/app
rag/app
├── __init__.py
├── audio.py
├── book.py
├── email.py
├── laws.py
├── manual.py
├── naive.py
├── one.py
├── paper.py
├── picture.py
├── presentation.py
├── qa.py
├── resume.py
├── table.py
└── tag.py

这些文件就对应我们之前学习过的十四种不同的 分块器(Chunker),RAGFlow 会根据不同的文档类型使用不同的分块器,对分块逻辑感兴趣的朋友可以研究下这里的代码。有很多分块器依赖于 DeepDoc 来解析文档,DeepDoc 的逻辑位于 deepdoc 目录:

$ tree deepdoc
deepdoc
├── __init__.py
├── parser
│   └── ...
└── vision
    ├── layout_recognizer.py
    ├── ocr.py
    └── table_structure_recognizer.py

从目录结构可以看出,DeepDoc 由两个部分组成:解析器(parser)视觉处理(vision),解析器提供了不同格式文档的通用解析方法,供分块器使用;而视觉处理部分则提供了 OCR、表格识别、布局分析等高级特性,可以对 PDF 和 图片等文件实现更好的识别效果。分块器、解析器和视觉处理这三者之间的关系如下:

ragflow-deepdoc.png

解析器概览

今天我们主要关注 DeepDoc 的解析器部分,视觉处理部分我们放到下一篇文章中学习。

$ tree deepdoc/parser
deepdoc/parser
├── __init__.py
├── docx_parser.py
├── excel_parser.py
├── figure_parser.py
├── html_parser.py
├── json_parser.py
├── markdown_parser.py
├── pdf_parser.py
├── ppt_parser.py
├── resume
│   ├── step_one.py
│   └── step_two.py
├── txt_parser.py
└── utils.py

deepdoc/parser 目录下的文件可以看到,DeepDoc 内置了 10 种不同的解析器,支持 Word、Excel、PPT、PDF、HTML、Markdown、JSON、图片等文件的处理和解析;此外,由于简历是一种非常复杂的文档,包含各种格式的非结构化文本,可以被解析为包含近百个字段的结构化数据,RAGFlow 还内置了一个专门的简历解析器。

DOCX 解析器

DOCX 解析器的实现类为 RAGFlowDocxParser,代码如下:

docx-parser.png

它使用 python-docx 读取 docx 文件,将其解析为结构化的文本和表格数据,主要包括两个部分:

  • 段落解析:遍历每个 paragraphrun(在 python-docx 中,paragraph 表示段落,run 表示具有相同样式的一段连续文本,它是段落的更细分单位);提取出文本和样式信息,格式为 (text, style_name)
  • 表格解析:遍历每个 tablerowscells,然后将表格转换为 pandas 的 DataFrame 对象;使用了一些规则来识别表头和数据类型,然后按"表头: 内容"的格式返回组织好的表格数据,比如 表头1:内容;表头2:内容;...

注意它只对 docx 文件生效,对于 doc 文件,在分块器代码里可以看到使用 tika 来读取,目前这个并没有放到解析器里。

假设 docx 文件中含有如下段落内容:

docx-parser-1.png

解析后的 secs 如下:

[
  ('标题一', 'Heading 1'), 
  ('这里是第一段正文。', 'Normal'), 
  ('标题二', 'Heading 1'), 
  ('这里是第二段正文。', 'Normal')
]

假设 docx 文件中含有如下表格内容:

docx-parser-2.png

解析后的 tbls 如下:

[
  [
    '姓名: 张三;学号: 001;年龄: 18;性别: 男;成绩: 100',
    '姓名: 李四;学号: 002;年龄: 19;性别: 女;成绩: 99',
    '姓名: 王五;学号: 003;年龄: 20;性别: 男;成绩: 98'
  ]
]

Excel 解析器

Excel 解析器的实现类为 RAGFlowExcelParser,代码如下:

excel-parser.png

它的实现比较简单,首先通过 _load_excel_to_workbook() 方法将 Excel 文件加载为 Workbook 对象,然后遍历每个 Worksheet 的表格数据,将其转换为类似上面 Word 表格解析后的格式,方便后续文本向量化处理。

解析器支持自动识别 Excel 或 CSV 格式:

if not (file_head.startswith(b'PK\x03\x04') or file_head.startswith(b'\xD0\xCF\x11\xE0')):
  
  # 不是 Excel 文件
  df = pd.read_csv(file_like_object)
  return RAGFlowExcelParser._dataframe_to_workbook(df)

return load_workbook(file_like_object,data_only= True)

它通过文件头检测技术判断文件是否为 Excel 文件,如果前 4 字节是 PK\x03\x04 表示是 XLSX 文件,如果是 \xD0\xCF\x11\xE0 表示是旧版 XLS 文件;对于 Excel 文件,程序使用 openpyxlload_workbook 来读取;对于 CSV 文件,则使用 pandas 的 read_csv 来读取,然后调用 _dataframe_to_workbook() 方法将其转换为 Workbook 对象,便于后续的统一处理。

PPT 解析器

PPT 解析器的实现类为 RAGFlowPptParser,代码如下:

ppt-parser.png

它使用 python-pptx 来读取 PPT 文件,通过遍历幻灯片中的所有形状,提取出所有的纯文本数据。形状(Shape) 是 PPT 文件中的一个重要概念,包含各种类型的元素:文本框、图片、表格、图表、图形、组合形状等。函数 __extract(shape) 从形状中提取内容,核心逻辑如下:

  • 文本框:提取段落文本,保留项目符号和缩进结构
  • 表格:转换为键值对格式(第一行作为表头)
  • 组合形状:按位置顺序递归处理内部的所有形状
  • 其他:返回空字符串

这里有一个细节,程序在遍历形状时,会按垂直位置和水平位置排序,确保文本顺序符合视觉阅读顺序。

未完待续

今天我们学习了 RAGFlow 的 DeepDoc 技术,它由解析器和视觉处理两部分组成,将各类复杂文档转换为结构化的文本数据,为分块和向量化打下基础。我们详细学习了其中的 3 种解析器,包括:DOCX 解析器、Excel 解析器 和 PPT 解析器。

今天的内容就这么多,我们将在下一篇继续学习剩下的解析器。


再学 RAGFlow 的文件解析逻辑

经过几天的学习,我们了解了 RAGFlow 的文件上传和解析流程,了解了解析任务是如何触发并放入 Redis Stream 消息队列中,等待任务执行器消费和处理的。今天我们将继续学习任务执行器中最重要的函数 do_handle_task() 的实现,看看 RAGFlow 是如何具体执行每个解析任务的。

do_handle_task 函数实现

do_handle_task 是 RAGFlow 系统中的任务处理函数,负责处理文档解析、分块、向量化和索引的完整流程。它的主要逻辑如下:

  1. 判断任务是否被取消,如果是,则直接返回;
  2. 根据任务配置绑定对应的嵌入模型,用于后续的向量化处理;
  3. 根据嵌入模型的向量维度,初始化知识库索引结构;
  4. 根据任务类型执行不同的处理流程:

    • 如果是 RAPTOR 类型的任务,则执行递归抽象处理;
    • 如果是 GraphRAG 类型的任务,则执行知识图谱构建;
    • 如果是标准分块类型的任务,则执行普通分块处理;
  5. 批量插入分块数据到知识库索引中;
  6. 更新文档统计信息,包括分块数量、token 数量等;

下面是 do_handle_task 函数的核心实现:

async def do_handle_task(task):

  # 过程回调,用于报告进度
  progress_callback = partial(set_progress, task_id, task_from_page, task_to_page)

  # 判断任务是否已取消
  task_canceled = TaskService.do_cancel(task_id)
  if task_canceled:
    progress_callback(-1, msg="Task has been canceled.")
    return

  # 绑定嵌入模型
  embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
  vts, _ = embedding_model.encode(["ok"])
  vector_size = len(vts[0])

  # 初始化知识库
  init_kb(task, vector_size)

  if task.get("task_type", "") == "raptor":
    # 使用 RAPTOR 分块策略
  elif task.get("task_type", "") == "graphrag":
    # 使用 GraphRAG 分块策略
  else:
    # 使用标准分块策略
    chunks = await build_chunks(task, progress_callback)
    # 计算每个分块的向量
    token_count, vector_size = await embedding(chunks, embedding_model, task_parser_config, progress_callback)

  # 批量插入分块数据
  for b in range(0, len(chunks), DOC_BULK_SIZE):
    doc_store_result = await trio.to_thread.run_sync(
      lambda: settings.docStoreConn.insert(
        chunks[b:b + DOC_BULK_SIZE], search.index_name(task_tenant_id), task_dataset_id))

  # 更新文档统计信息
  DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)

上面的代码中有几个需要注意的点,值得展开来学习下。

了解偏函数

progress_callback = partial(set_progress, task_id, task_from_page, task_to_page)

这段代码使用了 Python 中的 偏函数(Partial Function),一个比较冷门但有点意思的功能。偏函数是一种特殊的函数,它通过固定原函数的某些参数,从而创建出一个新的函数。偏函数的创建主要通过 functools.partial 实现,下面是一个简单的示例,固定加法函数的一个参数:

from functools import partial

# 定义一个加法函数
def add(a, b):
    return a + b

# 创建一个偏函数,固定 a=10
add_ten = partial(add, 10)

# 调用偏函数
print(add_ten(5))  # 输出 15 (相当于 10 + 5)
print(add_ten(10)) # 输出 20 (相当于 10 + 10)

回到前面的代码片段,我们使用偏函数创建了一个新的回调函数 progress_callback,它固定了 set_progress 函数的前三个参数:task_idtask_from_pagetask_to_page。当后续调用 progress_callback 时,只需提供剩余的参数(如进度值和错误消息)即可:

progress_callback(50, msg="Processing...")

progress_callback(-1, msg="Error occurred")

这种方式的好处是,在多次调用 progress_callback 时,不需要重复传递某些固定参数,减少函数调用时需要提供的参数数量。在 do_handle_task() 函数的实现中,会大量的调用 progress_callback 汇报任务进度,通过偏函数,可以让代码更加简洁和灵活,提高了代码的可读性和可维护性。

绑定嵌入模型

embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
vts, _ = embedding_model.encode(["ok"])
vector_size = len(vts[0])

RAGFlow 通过 LLMBundle 将所有大模型的操作统一封装在一个类里,方便使用。比如通过 LLMBundle.encode() 方法调用嵌入模型,LLMBundle.chat() 方法调用聊天模型,等等。

LLMType 枚举了所有支持的模型类型(这也就是我们之前在 “模型供应商” 页面配置的模型类型):

class LLMType(StrEnum):
  CHAT = 'chat'                # 聊天模型
  EMBEDDING = 'embedding'      # 嵌入模型
  SPEECH2TEXT = 'speech2text'  # 语音转文字
  IMAGE2TEXT = 'image2text'    # 图片转文字
  RERANK = 'rerank'            # 排序模型
  TTS    = 'tts'               # 文字转语音

这里创建的 embedding_model 是一个嵌入模型,它的具体实现位于 rag/llm/embedding_model.py 文件,该文件定义了大量的嵌入模型实现,包括:

  • OpenAI 系列

    • OpenAI - text-embedding-ada-002 / text-embedding-3-large (OpenAIEmbed)
    • Azure OpenAI (AzureEmbed)
  • 中国厂商

    • 阿里通义千问 - text_embedding_v2 (QWenEmbed)
    • 智谱 AI - embedding-2 / embedding-3 (ZhipuEmbed)
    • 百川智能 - Baichuan-Text-Embedding (BaiChuanEmbed)
    • 百度千帆 (BaiduYiyanEmbed)
  • 国外厂商

    • Cohere - embed-multilingual-v3.0 (CoHereEmbed)
    • Mistral - mistral-embed (MistralEmbed)
    • Jina - jina-embeddings-v3 (JinaEmbed)
    • Voyage AI - voyage-large-2 (VoyageEmbed)
    • Gemini - text-embedding-004 (GeminiEmbed)
    • Bedrock - amazon.titan-embed-text-v2:0 / cohere.embed-multilingual-v3 (BedrockEmbed)
  • 聚合平台

    • TogetherAI - togethercomputer/m2-bert-80M-8k-retrieval (TogetherAIEmbed)
    • SiliconFlow - 硅基流动 (SILICONFLOWEmbed)
    • VolcEngine - 火山引擎方舟平台 (VolcEngineEmbed)
    • PerfXCloud - 澎峰科技 (PerfXCloudEmbed)
    • Upstage - solar-embedding-1-large (UpstageEmbed)
    • NovitaAI - 兼容接口 (NovitaEmbed)
    • GiteeAI - 码云AI平台 (GiteeEmbed)
    • Replicate - 云端推理平台 (ReplicateEmbed)
  • 本地部署模型

    • BAAI/bge-large-zh-v1.5 (DefaultEmbedding)
    • BAAI/bge-small-en-v1.5 (FastEmbed)
    • maidalun1020/bce-embedding-base_v1 (YoudaoEmbed)
  • 本地部署框架

初始化知识库索引

创建嵌入模型实例之后,RAGFlow 通过 embedding_model.encode(["ok"]) 来验证模型的可用性,同时还能获取到嵌入模型的向量维度信息。接着,它会根据向量维度创建相应的索引库:

init_kb(task, vector_size)

init_kb() 函数的实现如下:

def init_kb(row, vector_size: int):
  idxnm = search.index_name(row["tenant_id"])
  return settings.docStoreConn.createIdx(idxnm, row.get("kb_id", ""), vector_size)

可以看到 RAGFlow 通过 docStoreConncreateIdx(tenant_id, kb_id, vector_size) 创建索引库,其中 docStoreConn 表示文档存储引擎,可以通过环境变量 DOC_ENGINE 进行切换,目前支持三种:

RAGFlow 默认使用 Elasticsearch 作为文档存储引擎。另外从 createIdx 的三个参数可以看出,RAGFlow 是支持多租户的,不同租户的索引库之间是隔离的。其实,只有 Infinity 是根据传入的三个参数创建索引库的,Elasticsearch 和 OpenSearch 则是根据 tenant_id 创建索引库的,kb_idvector_size 对它们来说没有用。

es-index.png

之所以 Elasticsearch 和 OpenSearch 创建索引时不用指定向量维度,是因为 RAGFlow 使用了一种比较讨巧的方法,它将常见的向量维度提前预定义在 Mapping 里了(参考 conf/mapping.jsonconf/os_mapping.json 文件):

{
  "dynamic_templates": [
    {
      "dense_vector": {
        "match": "*_512_vec",
        "mapping": {
          "type": "dense_vector",
          "index": true,
          "similarity": "cosine",
          "dims": 512
        }
      }
    },
    {
      "dense_vector": {
        "match": "*_768_vec",
        "mapping": {
          "type": "dense_vector",
          "index": true,
          "similarity": "cosine",
          "dims": 768
        }
      }
    },
    {
      "dense_vector": {
        "match": "*_1024_vec",
        "mapping": {
          "type": "dense_vector",
          "index": true,
          "similarity": "cosine",
          "dims": 1024
        }
      }
    },
    {
      "dense_vector": {
        "match": "*_1536_vec",
        "mapping": {
          "type": "dense_vector",
          "index": true,
          "similarity": "cosine",
          "dims": 1536
        }
      }
    }
  ]
}

如果你的嵌入模型不是 512、768、1024 或 1536 维的,那么在创建索引时可能会报错,需要修改 Mapping 文件来支持新的向量维度。

文档分块策略

昨天在学习知识库配置时,我们提到了两个高级配置:

  • 使用召回增强 RAPTOR 策略(use_raptor) - 为多跳问答任务启用 RAPTOR 提高召回效果;
  • 提取知识图谱(use_graphrag) - 在当前知识库的文件块上构建知识图谱,以增强涉及嵌套逻辑的多跳问答;

这两个配置其实对应着 RAGFlow 不同的文档分块策略:

if task.get("task_type", "") == "raptor":
  # 使用 RAPTOR 分块策略
elif task.get("task_type", "") == "graphrag":
  # 使用 GraphRAG 分块策略
else:
  # 使用标准分块策略
  chunks = await build_chunks(task, progress_callback)
  # 计算每个分块的向量
  token_count, vector_size = await embedding(chunks, embedding_model, task_parser_config, progress_callback)

关于 RAPTOR 和 GraphRAG 的实现,我们后面再详细学习,今天我们先来学习下标准分块策略的实现。它的核心流程分两步:

  • build_chunks() - 执行标准文档分块流程
  • embedding() - 对分块进行向量化处理

其中,build_chunks() 函数的实现大致如下:

async def build_chunks(task, progress_callback):

    # 从对象存储中读取文件
    bucket, name = File2DocumentService.get_storage_address(doc_id=task["doc_id"])
    binary = await get_storage_binary(bucket, name)

    # 调用分块器进行分块,通过 chunk_limiter 限制并发路数
    chunker = FACTORY[task["parser_id"].lower()]
    async with chunk_limiter:
        cks = await trio.to_thread.run_sync(lambda: chunker.chunk(...))

    # 将分块结果上传到对象存储
    async with trio.open_nursery() as nursery:
        for ck in cks:
            nursery.start_soon(upload_to_minio, doc, ck)

    return docs

它首先根据 doc_id 从数据库中查询出桶名和文件名,从对象存储中读取出文件内容;接着使用 parser_id 创建对应的分块器,然后调用它的 chunk() 方法对文件进行分块;最后将分块结果上传到对象存储。这里的 parser_id 我们昨天已经学习过了,它表示切片方法,有 General, Q&A, Resume, Manual, Table, Paper, Book, Laws, Presentation, One, Tag 共 11 种,对应的分块器实现定义在 FACTORY 工厂中:

FACTORY = {
  "general": naive,
  ParserType.NAIVE.value: naive,
  ParserType.PAPER.value: paper,
  ParserType.BOOK.value: book,
  ParserType.PRESENTATION.value: presentation,
  ParserType.MANUAL.value: manual,
  ParserType.LAWS.value: laws,
  ParserType.QA.value: qa,
  ParserType.TABLE.value: table,
  ParserType.RESUME.value: resume,
  ParserType.PICTURE.value: picture,
  ParserType.ONE.value: one,
  ParserType.AUDIO.value: audio,
  ParserType.EMAIL.value: email,
  ParserType.KG.value: naive,
  ParserType.TAG.value: tag
}

可以看到 FACTORY 工厂中除了上面的 11 种切片方法之外,还多出了 PICTURE, AUDIOEMAIL 三种切片方法,暂时没看到使用,估计 RAGFlow 后面会支持对图片、音频和邮件的处理吧。

不同类型的文件使用不同的切片方法,这是 RAGFlow 的核心优势之一,官方将这种特性称为 基于模板的文本切片方法(Template-based chunking;实际上,在实施切片之前,我们还需要将各类文档转换为文本格式,这是基于 RAGFlow 的 深度文档理解(DeepDoc 技术实现的;DeepDoc 支持广泛的文件格式,能够处理各类复杂文档的布局和结构,确保从 PDF、Word、PPT 等文件中提取高质量、有价值的信息。

对分块进行向量化处理

通过调用 build_chunks() 方法,我们根据知识库配置,将文档切分成了一个个的分块数据,接着调用 embedding() 对分块进行向量化处理:

async def embedding(docs, mdl, parser_config=None, callback=None):

    # 准备标题和内容数据
    tts, cnts = [], []
    for d in docs:
        tts.append(d.get("docnm_kwd", "Title"))
        cnts.append(d["content_with_weight"])

    # 计算标题的向量(只计算第一个标题,然后复制到所有文档,这里的 docs 属于同一个文档,因此文件名是一样的)
    tk_count = 0
    if len(tts) == len(cnts):
        vts, c = await trio.to_thread.run_sync(
            lambda: mdl.encode(tts[0: 1]))
        tts = np.concatenate([vts for _ in range(len(tts))], axis=0)
        tk_count += c

    # 计算内容的向量(按批生成)
    cnts_ = np.array([])
    for i in range(0, len(cnts), EMBEDDING_BATCH_SIZE):
        vts, c = await trio.to_thread.run_sync(
            lambda: mdl.encode([truncate(c, mdl.max_length-10) for c in cnts[i: i + EMBEDDING_BATCH_SIZE]]))
        if len(cnts_) == 0:
            cnts_ = vts
        else:
            cnts_ = np.concatenate((cnts_, vts), axis=0)
        tk_count += c

    # 加权融合标题和内容向量
    cnts = cnts_
    filename_embd_weight = parser_config.get("filename_embd_weight", 0.1)
    title_w = float(filename_embd_weight)
    vects = (title_w * tts + (1 - title_w) * cnts) if len(tts) == len(cnts) else cnts

    # 将向量添加到每个文档中
    vector_size = 0
    for i, d in enumerate(docs):
        v = vects[i].tolist()
        vector_size = len(v)
        d["q_%d_vec" % len(v)] = v
    return tk_count, vector_size

这里有一个点值得注意,RAGFlow 在计算分块向量时综合考虑了标题(也就是文件名)和内容的,通过加权将标题和内容的向量进行融合,标题的权重默认为 0.1,内容的权重为 0.9,可以通过 filename_embd_weight 参数进行调整。最后计算出的向量会添加到每个文档的 q_N_vec 字段中,其中 N 表示向量的维度。

至此,我们就得到了文档的分块以及每个分块的向量,在 do_handle_task 函数的最后,通过批量插入将分块数据写入到知识库索引中。下面是写入到 ES 索引库中的一个分块示例:

es-doc.png

也可以点击知识库中的文件名称,对文件的分块数据进行浏览和编辑:

ragflow-kb-chunks.png

小结

今天我们详细学习了 RAGFlow 的文件解析逻辑,将任务执行器中的 do_handle_task() 函数从头到尾梳理了一遍,从任务进度的汇报,嵌入模型的选择,索引库的构建,到根据文档类型选择合适的文档分块策略,再到对分块后的内容进行向量化处理,到最后的批量写入。相信通过整个过程的学习,你对 RAGFlow 的文件解析逻辑已经有了更深入的了解,并对 RAGFlow 的工作原理有了更直观的感受。

不过,这里还有很多技术细节没有展开,比如 RAGFlow 是如何使用 DeepDoc 技术深度理解和解析文档的,它使用的 RAPTOR 分块策略是什么,它又是如何使用 GraphRAG 构建知识图谱的。还有昨天学习的一些高级配置参数也值得进一步研究,比如自动提取关键字,自动生成问题,标签集的构建和使用,等等。我们明天继续学习。


学习 RAGFlow 的知识库配置

书接上回,昨天我们深入学习了如何触发解析任务,如何通过 Redis Stream 作为消息队列投递任务,以及任务执行器如何利用 trio 异步框架和消费者组机制,消费和处理这些任务。我们可以用 Redis 客户端连接到 Redis,看看 rag_flow_svr_queue 队列中的消息是什么样的:

redis-stream-detail.png

任务消息结构

我们知道,每条 Redis Stream 的消息由 ID 和 Value 组成,Value 是一个字典,包含多对键值对;这里的 Value 只有一对键值对,键为 message,值为一个 JSON 字符串,表示任务的详细信息:

{
  "id": "58b8b5a65e5b11f0b6c20242ac120006",
  "doc_id": "677bfde25e5a11f09c890242ac120006",
  "progress": 0.0,
  "from_page": 0,
  "to_page": 100000000,
  "digest": "81e29dac5b568aca",
  "priority": 0,
  "create_time": 1752240687243,
  "create_date": "2025-07-11 21:31:27",
  "update_time": 1752240687243,
  "update_date": "2025-07-11 21:31:27"
}

很显然,这里的信息还不够完整,因此 collect() 继续通过任务 ID 查询数据库,获取了更详细的任务信息:

ragflow-task-executor-collect.png

详细的任务信息如下:

{
  "id": "58b8b5a65e5b11f0b6c20242ac120006",
  "doc_id": "677bfde25e5a11f09c890242ac120006",
  "from_page": 0,
  "to_page": 100000000,
  "retry_count": 0,
  "kb_id": "e5aa2dbc5b9711f0b0880242ac120006",
  "parser_id": "naive",
  "parser_config": {
    "layout_recognize": "DeepDOC",
    "chunk_token_num": 512,
    "delimiter": "\n",
    "auto_keywords": 0,
    "auto_questions": 0,
    "html4excel": false,
    "raptor": {
      "use_raptor": false
    },
    "graphrag": {
      "use_graphrag": false
    }
  },
  "name": "README.md",
  "type": "doc",
  "location": "README.md",
  "size": 9078,
  "tenant_id": "fb5e4b9e5ae211f0b4620242ac120006",
  "language": "English",
  "embd_id": "text-embedding-3-small@OpenAI",
  "pagerank": 0,
  "img2txt_id": "gpt-4.1-mini@OpenAI",
  "asr_id": "whisper-1@OpenAI",
  "llm_id": "gpt-4.1-mini@OpenAI",
  "update_time": 1752240687243,
  "task_type": ""
}

这里的 parser_idparser_config 是文件解析时用到的最为重要的两个参数,parser_id 表示切片方法,而 parser_config 则表示文件解析时的配置,包括解析策略、分块大小、分隔符、是否自动提取关键字和问题等。

在继续研究 do_handle_task() 函数的解析逻辑之前,我们需要先了解下 RAGFlow 的知识库配置都有哪些。

切片方法

RAGFlow 提供了多种切片方法,以便对不同布局的文件进行分块,并确保语义完整性。我们可以在知识库配置页面中进行选择:

ragflow-kb-configuration.png

正确选择知识库配置对于未来的召回和问答效果至关重要。下面是官方对每种切片方法的说明:

ragflow-chunk-method.png

General 分块方法

这是最简单的一种分块方法,也是最通用的一种,它支持众多的文件格式,包括 MD、MDX、DOCX、XLSX、XLS、PPT、PDF、TXT、JPEG、JPG、PNG、TIF、GIF、CSV、JSON、EML、HTML 等。它的处理逻辑如下:

  • 它首先使用视觉检测模型将连续文本分割成多个片段;
  • 接下来,这些连续的片段被合并成 Token 数不超过 chunk_token_num 的块。

下面是 General 分块方法的示例:

general-1.png

general-2.png

Q&A 分块方法

此分块方法专门用于问答对的处理,支持 Excel、CSV 和 TXT 文件格式:

  • 如果文件是 Excel 格式,则应由两列组成,第一列提出问题,第二列提供答案。文件不需要标题行,且支持多个工作表(Sheet);
  • 如果文件是 CSV 和 TXT 格式,必须是 UTF-8 编码且使用制表符(TAB)作为问题和答案的定界符;

注意,RAGFlow 在处理问答对文件时将自动忽略不满足上述规则的文本行。

下面是 Q&A 分块方法的示例:

qa-1.jpg

qa-2.jpg

Resume 分块方法

此分块方法专门用于处理简历文件,将各种形式的简历解析并整理成结构化数据,方便招聘人员搜索候选人。支持 DOCX、PDF 和 TXT 格式。

resume.png

Manual 分块方法

此分块方法专门用于处理手册文件,目前仅支持 PDF 格式。

RAGFlow 在处理手册文件时,假设手册具有分层的章节结构,将最低层级的章节标题作为文档分块的基本单位。因此,同一章节中的图表和表格不会被分开,这可能会导致分块的篇幅更大。

manual.png

Table 分块方法

此分块方法专门用于处理表格文件,支持 Excel、CSV 和 TXT 格式。

  • 表格文件的第一行必须包含列标题;
  • 列标题必须是有意义的术语,便于大模型理解;可以将同义词用斜线 / 隔开,比如 分块方法/切片方法;并使用括号列出所有的枚举值,例如:性别(男,女)颜色(黄,蓝,棕)尺寸(M,L,XL,XXL) 等;
  • 如果是 CSV 或 TXT 格式,列与列之间的分隔符必须是制表符(TAB);

table-1.jpg

table-2.jpg

Paper 分块方法

此分块方法专门用于处理论文文件,目前仅支持 PDF 格式。

论文将按章节进行分块,例如 摘要1.1 节1.2 节 等部分。这种方法使大模型能够更有效地总结论文,并提供更全面、易于理解的回答。然而,它也增加了对话的上下文,进而增加了大模型的计算成本。因此,在对话过程中,可考虑降低 topN 的值。

paper.png

Book 分块方法

此分块方法专门用于处理书籍文件,支持 DOCX、PDF 和 TXT 格式。对于 PDF 格式的书,请设置页面范围,以去除不必要的信息并缩短分析时间。

book-1.jpg

book-2.jpg

Laws 分块方法

此分块方法专门用于处理法律文书,支持 DOCX、PDF 和 TXT 格式。

在法律文书(如合同、宪法、国际条约、公司章程等)中,常常按 篇(Part) - 章(Chapter) - 节(Section) - 条(Article) - 款(Paragraph) - 项(Subparagraph) 这样的层级划分内容。其中 条(Article) 是构成法律文书的基本结构单元,它用于对特定主题或事项进行分点阐述,具有明确的逻辑层级和法律效力。例如《联合国宪章》中的 “Article 51”(第五十一条) 或者合同中的 “Article 3: Payment Terms”(第三条:付款条款)

RAGFlow 在处理法律文书时,将 条(Article) 作为分块的基本单位,确保所有上层文本都包含在该块中。

laws.jpg

Presentation 分块方法

此方法专门用于处理幻灯片文件,支持 PDF 和 PPTX 格式。

  • 幻灯片的每一页都被视为一个分块,并存储其缩略图;
  • 此分块方法会自动应用于所有上传的 PPT 文件,因此对于 PPT 文件来说无需手动指定;

ppt-1.png

ppt-2.png

One 分块方法

此分块方法将每个文档整体视为一个分块,当需要使用大模型对整个文档进行总结,且模型能够处理该上下文长度时适用。支持的文件格式包括 DOCX、XLSX、XLS、PDF、TXT 等。

Tag 分块方法

这是一种特殊的知识库配置,使用 Tag 作为分块方法的知识库不会参与 RAG 流程,而是充当标签集的角色。其他知识库可以使用它来标记自己的分块,对这些知识库的查询也将使用此标签集进行标记。

此知识库中的每个分块都是一个独立的 描述 - 标签 对,支持 Excel、CSV 和 TXT 文件格式。

  • 如果文件是 Excel 格式,它应包含两列:第一列用于标签描述,第二列用于标签名称;和 Q&A 分块方法一样,文件不需要标题行,且支持多个工作表(Sheet);
  • 如果文件是 CSV 和 TXT 格式,必须采用 UTF-8 编码,且使用制表符(TAB)作为分隔符来分隔描述和标签;
  • 在标签列中可以包含多个标签,使用逗号分隔;

不符合上述规则的文本行将被忽略。

tag-1.jpg

标签集构建完成后,可以在页面下方看到类似这样的标签云:

tag-2.jpg

关于标签集的用法,我们会在后面专门的文章中进行介绍,可以把它视作一种检索优化的手段,而不是分块方法。

其他配置参数

除了分块方法,知识库配置页面还提供了一些其他参数,包括:

  • PDF 解析器(layout_recognize) - 基于 PDF 布局分析的可视化模型,有效地定位文档标题、文本块、图像和表格;支持 DeepDocNative 两种方式;如果选择 Native 选项,将仅检索 PDF 中的纯文本;目前最新版本中还有个实验特性,使用大模型的多模态能力实现该功能;此选项仅适用于 PDF 文档;
  • 建议文本块大小(chunk_token_num) - 推荐的分块大小,如果一个片段的令牌数少于此阈值,它将与后续片段合并,直到令牌总数超过该阈值,此时才会创建一个分块。除非遇到分隔符,否则即使超过阈值也不会创建新的分块;
  • 文本分段标识符(delimiter) - 分隔符或分隔标识可以由一个或多个特殊字符组成。如果是多个字符,确保它们用反引号(``)括起来。例如,如果你像这样配置分隔符:`n`##`;`,那么你的文本将在行尾、双井号(##)和分号处进行分隔;
  • 嵌入模型(embd_id) - 知识库的默认嵌入模型。一旦知识库有了分块,该选择就无法更改。要切换不同的嵌入模型,必须删除知识库中所有现有的分块;

除此之外,还有一些高级配置,比如:

  • 页面排名(pagerank) - 你可以为特定的知识库分配更高的 PageRank 分数,这个分数会加到从这些知识库检索到的文本块的混合相似度分数上,从而提高它们的排名;
  • 自动关键词提取(auto_keywords) - 自动为每个文本块提取 N 个关键词,以提高包含这些关键词的查询的排名,可以在文本块列表中查看或更新为某个文本块添加的关键词;
  • 自动问题提取(auto_questions) - 自动为每个文本块提取 N 个问题,以提高包含这些问题的查询的排名,可以在文本块列表中查看或更新为某个文本块添加的问题;
  • 表格转 HTML(html4excel) - 与 General 分块方法一起使用。禁用时,知识库中的电子表格将被解析为键值对;而启用时,它们将被解析为 HTML 表格,按照每 12 行进行拆分;
  • 标签集 - 选择一个或多个标签知识库,以自动为你的知识库中的分块添加标签;用户查询也将自动添加标签;自动添加标签与自动提取关键词之间的区别:标签知识库是用户定义的封闭集合,而由大模型提取的关键词可被视为开放集合;在运行自动添加标签功能之前,你必须手动上传指定格式的标签集;而自动提取关键词功能依赖于大语言模型,并且会消耗大量的令牌;

ragflow-kbc-2.png

  • 使用召回增强 RAPTOR 策略(use_raptor) - 为多跳问答任务启用 RAPTOR 提高召回效果;

ragflow-kbc-3.png

  • 提取知识图谱(use_graphrag) - 在当前知识库的文件块上构建知识图谱,以增强涉及嵌套逻辑的多跳问答;

ragflow-kbc-4.png

小结

今天我们详细学习了 RAGFlow 的知识库配置。我们首先分析了任务消息的结构,然后重点探讨了 RAGFlow 提供的多种切片方法,如 GeneralQ&ATablePaper 等,并了解了如何根据不同的文档类型选择最合适的配置。此外,我们还介绍了 PDF 解析器、分块大小、嵌入模型以及 PageRank、RAPTOR 等高级设置。

了解这些配置是掌握 RAGFlow 的关键一步。在下一篇文章中,我们将深入 do_handle_task() 函数的实现,揭示 RAGFlow 是如何根据这些配置来具体执行文件解析任务的。


学习 RAGFlow 的文件解析逻辑

昨天我们已经学习了 RAGFlow 文件上传的相关逻辑,今天继续学习文件解析的逻辑。

触发文件解析

文件上传后,在文件列表中会有一个 “解析” 按钮,点击后会触发文件解析:

ragflow-file-list.png

调用接口为 /v1/document/run,其实现逻辑位于 api/apps/document_app.py 文件:

@manager.route("/run", methods=["POST"])
@login_required
def run():
  req = request.json
  for id in req["doc_ids"]:

    # 任务进度清零
    info = {"run": str(req["run"]), "progress": 0}
    DocumentService.update_by_id(id, info)

    if str(req["run"]) == TaskStatus.RUNNING.value:

      # 任务队列
      doc = DocumentService.get_by_id(id)
      bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"])
      queue_tasks(doc, bucket, name, 0)

  return get_json_result(data=True)

这里的代码做了简化,只保留了主要部分,其中 req["run"] 可能是 RUNNING(1)CANCEL(2),表示启动或取消任务。下面的 queue_tasks 函数是触发文件解析的入口,它的实现比较复杂,它根据不同的文件类型和配置,创建和排队文件处理任务,主要逻辑如下:

  1. 如果文件类型为 PDF 文件,默认每 12 页创建一个任务,可以通过 parser_config 中的 task_page_size 来修改;如果 paper_idpaper,表示使用论文分块方法,则按每 22 页创建任务;如果 paper_idoneknowledge_graph,表示不分块或提取知识图谱,则整个文件创建一个任务;
  2. 如果 paper_idtable,表示文件类型为 Excel、CSV 等表格文件,则每 3000 行创建一个任务;
  3. 对于其他类型,则整个文件创建一个任务;

此外,它会根据 doc_id 获取文档的分块配置,计算任务的摘要,并检查之前是否已经执行过一样的任务,如果有,则直接复用之前已经执行过的任务结果,提高处理效率。

最后将任务列表批量插入到数据库,并标记文档开始解析,然后从任务列表中筛选出未完成的任务,加入 Redis 队列:

def queue_tasks(doc: dict, bucket: str, name: str, priority: int):

  # 根据不同的文件类型和配置,创建任务
  parse_task_array = []
  if doc["type"] == FileType.PDF.value:
    # ...
  elif doc["parser_id"] == "table":
    # ...
  else:
    # ...

  # 批量插入任务到数据库
  bulk_insert_into_db(Task, parse_task_array, True)

  # 标记文档开始解析
  DocumentService.begin2parse(doc["id"])

  # 将未完成的任务加入 Redis 队列
  unfinished_task_array = [task for task in parse_task_array if task["progress"] < 1.0]
  for unfinished_task in unfinished_task_array:
    assert REDIS_CONN.queue_product(
      get_svr_queue_name(priority), message=unfinished_task
    ), "Can't access Redis. Please check the Redis' status."

投递解析任务

上面通过 REDIS_CONN.queue_product() 将任务加入 Redis 队列,队列名为 rag_flow_svr_queuerag_flow_svr_queue_1。RAGFlow 支持任务优先级,默认 priority 为 0,任务会被投递到 rag_flow_svr_queue,如果 priority 为 1,则会被投递到 rag_flow_svr_queue_1

函数 queue_product() 的实现如下:

def queue_product(self, queue, message) -> bool:
  for _ in range(3):
    try:
      payload = {"message": json.dumps(message)}
      self.REDIS.xadd(queue, payload)
      return True
    except Exception as e:
      logging.exception(
        "RedisDB.queue_product " + str(queue) + " got exception: " + str(e)
      )
  return False

它通过 Redis 的 XADD 命令将任务加入到队列中,重试 3 次。

学习 Redis 的 Stream 数据类型

Redis Stream 是 Redis 5.0 引入的一个强大的数据结构,专门用于处理流式数据和消息队列。它适用于下面这些场景:

  • 实时数据处理:适合处理传感器数据、日志流、用户行为事件等实时数据;
  • 消息队列:可替代传统的 List 作为消息队列,提供更强的功能和可靠性保证;
  • 事件溯源:支持回放历史消息,适合需要事件重放的业务场景;
  • 微服务通信:在微服务架构中作为轻量级的消息中间件使用;

Redis Stream 相比传统的 List 和 Pub/Sub,提供了更完善的消息处理机制,包括消息持久化、消费进度跟踪、消费者组协作等企业级特性:

  • 持久化日志结构:Stream 以只追加的方式存储数据,每条消息都有唯一的 ID 和时间戳,类似于 Apache Kafka 的日志结构;
  • 消息格式:每条消息由 ID 和 Value 组成,Value 是一个字典,包含多对键值对;
  • 消费者组(Consumer Groups):支持多个消费者协作处理消息,提供负载均衡和故障转移能力,每个消费者组独立跟踪消费进度;

redis-streams.png

下面我们简单学习下 Redis Stream 的基本用法。

使用 XADD 生产消息

首先是通过 XADD 命令 生产消息,该命令格式如下:

redis-streams-xadd.jpg

其中 mystream 是这个 Stream 的名称,* 表示自动生成 ID,后面的内容是 Stream 的值,由多个键值对组成。我们创建 Stream 并添加一些测试消息:

127.0.0.1:6379> XADD mystream * sensor_id temp_01 temperature 25.3
"1752285099426-0"
127.0.0.1:6379> XADD mystream * sensor_id temp_02 temperature 26.1
"1752285103416-0"
127.0.0.1:6379> XADD mystream * sensor_id temp_01 temperature 24.8
"1752285107329-0"

使用 XLEN 命令 获取 Stream 长度:

127.0.0.1:6379> XLEN mystream
(integer) 3

使用 XREAD 消费消息

有多种消费 Redis Stream 的方法,最简单的是使用 XREAD 命令

127.0.0.1:6379> XREAD COUNT 1 BLOCK 1000 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1752285099426-0"
         2) 1) "sensor_id"
            2) "temp_01"
            3) "temperature"
            4) "25.3"

其中 COUNT 表示一次消费的数量,不指定则消费所有消息;BLOCK 表示阻塞等待时间(毫秒),置为 0 表示无限等待;STREAMS 后面跟要消费的 Stream 名称;最后的 0 表示从头开始消费。从输出结果可以看到,我们成功拿到了队列中的第一条消息,要获取后续消息,可以拿着这个 ID 继续消费:

127.0.0.1:6379> XREAD COUNT 1 BLOCK 1000 STREAMS mystream 1752285099426-0
1) 1) "mystream"
   2) 1) 1) "1752285103416-0"
         2) 1) "sensor_id"
            2) "temp_02"
            3) "temperature"
            4) "26.1"

使用消费者组消费消息

Redis Stream 的一大特色是支持消费者组,每个消费者组都有自己的消费进度,互不影响,同一个消费者组允许多个消费者协作处理消息。通过 XGROUP CREATE 命令 创建消费者组:

127.0.0.1:6379> XGROUP CREATE mystream mygroup 0
OK

其中 mygroup 是消费者组的名称,0 表示从头开始消费。然后我们就可以使用 XREADGROUP 命令 消费消息了:

127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1752285099426-0"
         2) 1) "sensor_id"
            2) "temp_01"
            3) "temperature"
            4) "25.3"

其中 consumer1 是消费者名称,后面指定要读取的 Stream 和起始位置:

  • > 表示读取新消息
  • 0 表示读取所有待确认消息
  • 具体 ID 表示从该 ID 之后读取

XREAD 方便的是,我们可以一直使用 > 来读取新消息,而不需要关心上一次读取的 ID,Redis 会自动记录消费进度:

127.0.0.1:6379> XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1752285103416-0"
         2) 1) "sensor_id"
            2) "temp_02"
            3) "temperature"
            4) "26.1"

Redis Stream 支持多消费者协作,我们可以换一个消费者 consumer2 继续读取:

127.0.0.1:6379> XREADGROUP GROUP mygroup consumer2 COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1752285107329-0"
         2) 1) "sensor_id"
            2) "temp_01"
            3) "temperature"
            4) "24.8"

处理待确认消息

和传统的消息队列一样,消费 Redis Stream 之后需要确认消息已经处理完成,否则消息会一直是未确认状态,通过 XPENDING 命令 查看待确认的消息:

127.0.0.1:6379> XPENDING mystream mygroup
1) (integer) 3
2) "1752285099426-0"
3) "1752285107329-0"
4) 1) 1) "consumer1"
      2) "2"
   2) 1) "consumer2"
      2) "1"

可以看到 consumer1 有 2 条未确认消息,consumer2 有 1 条未确认消息。使用 XACK 命令 确认消息:

127.0.0.1:6379> XACK mystream mygroup 1752285099426-0
(integer) 1

关于 Redis Stream 的更多内容,可以参考下面的官方文档:

了解了 Redis Stream 的基本知识,可以帮助我们更好的理解 RAGFlow 任务执行器的实现原理。

任务执行器

我们之前提过,RAGFlow 由 API 服务器(API Server)任务执行器(Task Executor) 两大核心组成,上面投递到 Redis 中的任务,就是由任务执行器来消费和处理的。任务执行器的入口位于 rag/svr/task_executor.py 文件,它是一个 Trio 异步程序:

import trio

# 控制并发任务数
MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5"))
task_limiter = trio.Semaphore(MAX_CONCURRENT_TASKS)

async def main():
  async with trio.open_nursery() as nursery:
    
    # 定时汇报任务状态
    nursery.start_soon(report_status)
    
    # 启动任务管理器
    while not stop_event.is_set():
      await task_limiter.acquire()
      nursery.start_soon(task_manager)
  logging.error("BUG!!! You should not reach here!!!")

程序首先通过 trio.open_nursery() 创建了一个 nursery 对象(中文翻译 “托儿所”),用于管理异步任务。然后使用 start_soon() 启动一个定时任务,每隔一段时间汇报一次任务状态;最后使用 start_soon() 启动任务管理器,通过信号量控制并发任务数,默认为 5,可以通过环境变量 MAX_CONCURRENT_TASKS 修改。

启动任务执行器时有一个参数 --workers=5 表示启动 5 个 worker 进程,每个进程内部都是一个 Trio 程序,由上面的代码可知,每个进程内部又会启动 5 个并发的任务,所以实际上一共有 25 个处理线程。

任务管理器的核心逻辑位于 handle_task() 函数中:

async def handle_task():

  # 从 Redis 中读取任务
  redis_msg, task = await collect()

  # 处理任务
  logging.info(f"handle_task begin for task {json.dumps(task)}")
  await do_handle_task(task)
  logging.info(f"handle_task done for task {json.dumps(task)}")

  # 确认任务完成
  redis_msg.ack()

这里的一系列操作就使用了上面我们学习的 Redis Stream 相关知识。它首先通过 collect() 函数从 Redis 中读取任务:

ragflow-task-executor-collect.png

这里的消费者组名称为 rag_flow_svr_task_broker,消费者名称为 "task_executor_" + CONSUMER_NO,其中 CONSUMER_NO 就是启动任务执行器时传入的 ${host_id}_${consumer_id} 参数,用于唯一标识每个消费者。

collect() 函数优先读取未确认的消息(调用 XREADGROUPID 传 0),当没有未确认消息时,再读取新消息(调用 XREADGROUPID>),读取消息之后,调用 do_handle_task() 函数处理任务,最后通过 redis_msg.ack() 确认任务完成。

小结

本文深入探讨了 RAGFlow 的文件解析流程。我们首先学习了如何通过 API 触发解析任务,这些任务随后被智能地拆分并放入 Redis Stream 消息队列中。接着,我们详细了解了 Redis Stream 的工作原理,并分析了任务执行器如何利用 trio 异步框架和消费者组机制,高效、可靠地消费和处理这些任务。

在下一篇文章中,我们将继续研究 do_handle_task() 函数的实现,深入了解 RAGFlow 是如何具体执行每个解析任务的。


学习 RAGFlow 的文件上传逻辑

在上一篇中,我们学习了 RAGFlow 的系统架构和启动流程,了解了 RAGFlow 的 API 服务器(API Server)任务执行器(Task Executor) 两大核心组件,一个负责提供外部接口和平台基本功能,另一个则负责文件的解析和切片处理。

从系统架构图中,我们可以看到 RAGFlow 的核心流程包括 文件上传 -> 文件解析 -> 文件分块 -> 多路召回 -> 重排序 -> 大模型回答 这些步骤,今天我们就从源码的角度,先来学习下文件上传的相关逻辑。

文件上传接口实现

文件上传接口为 /v1/document/upload,其实现逻辑位于 api/apps/document_app.py 文件:

@manager.route("/upload", methods=["POST"])
@login_required
def upload():
  kb_id = request.form.get("kb_id")
  file_objs = request.files.getlist("file")
  
  # 根据 kb_id 查询知识库
  e, kb = KnowledgebaseService.get_by_id(kb_id)

  # 上传文件到指定知识库
  err, files = FileService.upload_document(kb, file_objs, current_user.id)

  return get_json_result(data=files)

其中 FileServiceupload_document 实现如下:

@classmethod
@DB.connection_context()
def upload_document(self, kb, file_objs, user_id):
  
  # 初始化知识库目录结构:/.knowledgebase/kb_name
  root_folder = self.get_root_folder(user_id)
  pf_id = root_folder["id"]
  self.init_knowledgebase_docs(pf_id, user_id)
  kb_root_folder = self.get_kb_folder(user_id)
  kb_folder = self.new_a_file_from_kb(kb.tenant_id, kb.name, kb_root_folder["id"])

  err, files = [], []
  for file in file_objs:
    # 文件重名处理,a.pdf -> a(1).pdf
    filename = duplicate_name(DocumentService.query, name=file.filename, kb_id=kb.id)

    # 读取文件内容,如果是 PDF 尝试对格式错误的文件进行修复
    blob = file.read()
    if filetype == FileType.PDF.value:
      blob = read_potential_broken_pdf(blob)
    
    # 上传文件到对象存储
    location = filename
    STORAGE_IMPL.put(kb.id, location, blob)

    # 生成文件缩略图
    doc_id = get_uuid()
    img = thumbnail_img(filename, blob)
    thumbnail_location = ""
    if img is not None:
      thumbnail_location = f"thumbnail_{doc_id}.png"
      STORAGE_IMPL.put(kb.id, thumbnail_location, img)

    # 保存到 document 表
    doc = {
      "id": doc_id,
      "kb_id": kb.id,
      "parser_id": self.get_parser(filetype, filename, kb.parser_id),
      "parser_config": kb.parser_config,
      "created_by": user_id,
      "type": filetype,
      "name": filename,
      "location": location,
      "size": len(blob),
      "thumbnail": thumbnail_location,
    }
    DocumentService.insert(doc)

    # 保存到 file 表
    FileService.add_file_from_kb(doc, kb_folder["id"], kb.tenant_id)
    files.append((doc, blob))

  return err, files

这里有几个值得注意的点,我们来逐一分析下。

知识库目录结构

RAGFlow 除了知识库管理之外,还有一个文件管理功能:

ragflow-file-management.png

用户可以直接在这里创建目录和上传文件,然后点击右边的 “链接知识库” 将其挂到某个知识库下。和直接在知识库中上传文件相比,在这里管理文件的好处是,一份文件可有链接到多个知识库,且知识库删除时文件不会被删除。在知识库中上传的文件也会出现在文件管理里,每个知识库在 /.knowledgebase 目录下都有一个对应的文件夹,只不过该文件夹是只读的,不允许用户在其中创建子文件夹或对文件进行修改。

整个文件的目录结构如下所示:

/
├── Folder 1
│   ├── File 11
│   └── File 12
├── Folder 2
│   ├── File 21
│   └── File 22
└── .knowledgebase
    ├── kb1
    │   ├── File 11
    │   └── File 12
    └── kb2
        ├── File 21
        └── File 22

此外,从文件管理上传的文件会在 file 表中插入记录,而在知识库中上传的文件会同时在 filedocument 表中插入记录,并通过 file2document 表维护两者之间的关系,这也是上面最后几句代码的作用。

修复 PDF 格式错误

PDF 文件的处理是一件非常棘手的问题,RAGFlow 在上传 PDF 文件的时候,会检查文件能否正常打开,如果有问题,则尝试用 Ghostscript 对其进行修复:

def read_potential_broken_pdf(blob):
  def try_open(blob):
    try:
      with pdfplumber.open(BytesIO(blob)) as pdf:
        if pdf.pages:
          return True
    except Exception:
      return False
    return False

  if try_open(blob):
    return blob

  repaired = repair_pdf_with_ghostscript(blob)
  if try_open(repaired):
    return repaired

  return blob

修复的逻辑很简单,就是执行 Ghostscript 命令:

$ gs -o <outfile> -sDEVICE=pdfwrite -dPDFSETTINGS=/prepress <infile>

我们之前在学习 PDFMathTranslate 时,了解到它有一个兼容模式,通过 pikepdf 将 PDF 转换为 PDF/A 格式,可以提高 PDF 文件的兼容性。感觉这也算一种修复 PDF 文件的方式,只是不知道二者之间有何区别。

缩略图的生成

RAGFlow 支持为不同格式的文件生成缩略图,可以学习下它这里不同文件的实现。

比如使用 pdfplumberPage.to_image() 生成 PDF 文件的缩略图:

import pdfplumber
pdf = pdfplumber.open(BytesIO(blob))

buffered = BytesIO()
resolution = 32
pdf.pages[0].to_image(resolution=resolution).annotated.save(buffered, format="png")
img = buffered.getvalue()

pdf.close()
return img

使用 PILImage.thumbnail() 生成图片的缩略图:

from PIL import Image
image = Image.open(BytesIO(blob))

image.thumbnail((30, 30))
buffered = BytesIO()
image.save(buffered, format="png")

return buffered.getvalue()

使用 aspose-slidesSlide.get_thumbnail() 生成 PPT 文件的缩略图:

import aspose.pydrawing as drawing
import aspose.slides as slides

with slides.Presentation(BytesIO(blob)) as presentation:
  buffered = BytesIO()
  scale = 0.03
  img = None
  presentation.slides[0].get_thumbnail(scale, scale).save(buffered, drawing.imaging.ImageFormat.png)
  img = buffered.getvalue()
  return img

文件存储的实现

从代码可以看到,这里通过 STORAGE_IMPL.put(...) 将文件上传到对象存储,RAGFlow 默认使用 Minio 存储,可以在浏览器里输入 http://localhost:9001 来访问它:

minio-login.jpg

默认用户为 rag_flow,密码为 infini_rag_flow,登录进去后可以浏览 RAGFlow 的所有的文件:

minio-files.png

其中桶名就是知识库的 ID,Key 就是文件的名称。

RAGFlow 支持多种不同的文件存储实现,除了 Minio 还支持下面这些:

可以在 .env 文件中通过 STORAGE_IMPL 变量来切换其他的存储实现。比如使用阿里云的 OSS 存储,需要在 .env 文件中添加下面的配置:

STORAGE_IMPL=OSS

同时修改 service_conf.yaml.template 中对应的 oss 配置:

oss:
  access_key: 'access_key'
  secret_key: 'secret_key'
  endpoint_url: 'http://oss-cn-hangzhou.aliyuncs.com'
  region: 'cn-hangzhou'
  bucket: 'bucket_name'

小结

我们今天学习了 RAGFlow 的文件上传逻辑,了解了 RAGFlow 是如何组织知识库的目录结构、如何修复 PDF 格式错误、如何生成不同文件的缩略图、以及如何切换不同的文件存储等相关内容。文件上传之后,自然就要对其进行解析处理了,我们明天继续吧。


学习 RAGFlow 的系统架构

昨天,我们学习了 RAGFlow 的安装配置和基本使用,通过创建一个知识库并上传文档,完整地体验了 RAGFlow 从数据处理到智能问答的基本工作流程。作为一个 RAG 系统,这套流程也是 RAGFlow 的核心流程,下面是 RAGFlow 的系统架构图:

ragflow-architecture.png

上面的架构图中省略了中间件部分,包括 ES、MySQL、Redis 和 Minio 等,仅展示了 RAGFlow 的两个核心服务:API 服务器(API Server)任务执行器(Task Executor),其中 API 服务器负责提供外部接口,包括知识库管理、文件管理、搜索、聊天等功能,而任务执行器则负责文件的解析和切片任务,正所谓 Quality in, quality out,它的深度文档理解和智能文本切片是 RAGFlow 的关键特性。

今天我们就从物理部署的角度来看看 RAGFlow 的这两个服务。

深入 entrypoint.sh 脚本

我们昨天学习了构建 RAGFlow 镜像的过程,感兴趣的同学可以研究下 Dockerfile 文件,它通过 多阶段构建(Multi-stage builds) 技巧,将构建过程分成基础(base)、构建(builder)、生产(production)三个阶段,大概的文件结构如下:

# --------
# 基础阶段
# --------
FROM ubuntu:22.04 AS base
USER root
WORKDIR /ragflow

# 从资源镜像拷贝模型资源
# 安装所需的系统类库
# 安装 Python Git Nginx 等软件 ...
# 安装 JDK、Node.js 等 ...

# --------
# 构建阶段
# --------
FROM base AS builder

# 安装 Python 依赖...
# 编译 Web 页面 ...

# --------
# 生产阶段
# --------
FROM base AS production

# 拷贝 Python 包
# 拷贝 Web 页面 ...

ENTRYPOINT ["./entrypoint.sh"]

从最后的生产阶段可以看出,RAGFlow 镜像的入口文件为 /ragflow/entrypoint.sh,它的用法如下:

function usage() {
  echo "Usage: $0 [--disable-webserver] [--disable-taskexecutor] [--consumer-no-beg=<num>] [--consumer-no-end=<num>] [--workers=<num>] [--host-id=<string>]"
  echo
  echo "  --disable-webserver             Disables the web server (nginx + ragflow_server)."
  echo "  --disable-taskexecutor          Disables task executor workers."
  echo "  --enable-mcpserver              Enables the MCP server."
  echo "  --consumer-no-beg=<num>         Start range for consumers (if using range-based)."
  echo "  --consumer-no-end=<num>         End range for consumers (if using range-based)."
  echo "  --workers=<num>                 Number of task executors to run (if range is not used)."
  echo "  --host-id=<string>              Unique ID for the host (defaults to \`hostname\`)."
  echo
  echo "Examples:"
  echo "  $0 --disable-taskexecutor"
  echo "  $0 --disable-webserver --consumer-no-beg=0 --consumer-no-end=5"
  echo "  $0 --disable-webserver --workers=2 --host-id=myhost123"
  echo "  $0 --enable-mcpserver"
  exit 1
}

可以看到这个镜像可以以多种方式启动:

  • --disable-taskexecutor 禁用任务执行器,仅启动 API 服务器
  • --disable-webserver 禁用 API 服务器,仅启动任务执行器
  • --enable-mcpserver 启动 MCP 服务器

RAGFlow 默认会在一个容器中同时启动 API 服务器和任务执行器,便于开发和测试,但是在生产环境中我们可以灵活地根据需要选择启动方式,将两者分开部署。

仅启动 API 服务器

我们可以修改 docker/docker-compose.yml 文件中的启动参数来做到这一点:

services:
  ragflow:
    image: ${RAGFLOW_IMAGE}
    command:
      - --disable-taskexecutor
    container_name: ragflow-server
    # 其他配置 ...

entrypoint.sh 文件中,启动 API 服务器的代码如下:

if [[ "${ENABLE_WEBSERVER}" -eq 1 ]]; then
  echo "Starting nginx..."
  /usr/sbin/nginx

  echo "Starting ragflow_server..."
  while true; do
    "$PY" api/ragflow_server.py
  done &
fi

首先启动 Nginx,然后执行 ragflow_server.py 脚本,它是一个基于 Flask 开发的 Web 服务,默认监听 9380 端口。这里的 while true; do ... done & 的写法挺有意思,while true 表示无限循环,& 表示将脚本放入后台执行,这样做可以确保服务进程在崩溃或异常退出后能够自动重启,通过这种纯 Shell 的方式实现自动恢复机制,不依赖任何第三方进程管理器(如 systemdsupervisor)。

Nginx 用于托管 Web 前端页面以及透传 API 服务器的 HTTP 请求,它的配置位于 ragflow.conf 文件中,内容如下:

server {
  listen 80;
  server_name _;
  root /ragflow/web/dist;

  gzip on;

  location ~ ^/(v1|api) {
    proxy_pass http://ragflow:9380;
    include proxy.conf;
  }

  location / {
    index index.html;
    try_files $uri $uri/ /index.html;
  }

  # Cache-Control: max-age~@~AExpires
  location ~ ^/static/(css|js|media)/ {
    expires 10y;
    access_log off;
  }
}

如果要对外提供 HTTPS 服务,可以将 docker/docker-compose.yml 文件中的 ragflow.conf 替换成 ragflow.https.conf,并将证书文件挂到容器中:

services:
  ragflow:
    volumes:
      # 证书文件
      - /path/to/fullchain.pem:/etc/nginx/ssl/fullchain.pem:ro
      - /path/to/privkey.pem:/etc/nginx/ssl/privkey.pem:ro
      # 使用 ragflow.https.conf 替换 ragflow.conf
      - ./nginx/ragflow.https.conf:/etc/nginx/conf.d/ragflow.conf
      # 其他配置 ...

同时编辑 nginx/ragflow.https.conf 文件,将 my_ragflow_domain.com 替换成你真实的域名。然后重启服务即可:

$ docker-compose down
$ docker-compose up -d

仅启动任务执行器

当处理的文档数量很多时,将任务执行器单独部署多个实例可以提高文档解析的速度。我们可以修改 docker/docker-compose.yml 文件,将 ragflow 配置复制一份出来,仅启动任务执行器:

services:
  ragflow_task_executor:
    image: ${RAGFLOW_IMAGE}
    command:
      - --disable-webserver
      - --workers=5
    container_name: ragflow-task-executor
    # 其他配置 ...

我们可以通过 --workers 参数来指定启动的 worker 数量。启动任务执行器的代码如下:

if [[ "${ENABLE_TASKEXECUTOR}" -eq 1 ]]; then
    echo "Starting ${WORKERS} task executor(s) on host '${HOST_ID}'..."
    for (( i=0; i<WORKERS; i++ ))
    do
        task_exe "${i}" "${HOST_ID}" &
    done
fi

每个 worker 都会启动一个独立的进程,其中 task_exe() 函数定义如下:

function task_exe() {
    local consumer_id="$1"
    local host_id="$2"

    JEMALLOC_PATH="$(pkg-config --variable=libdir jemalloc)/libjemalloc.so"
    while true; do
        LD_PRELOAD="$JEMALLOC_PATH" \
        "$PY" rag/svr/task_executor.py "${host_id}_${consumer_id}"
    done
}

这里也用了 while true 的技巧,防止 worker 进程异常退出,每个 worker 进程执行 task_executor.py 脚本,并将 ${host_id}_${consumer_id} 作为参数传入。任务执行器是一个基于 Trio 异步库开发的命令行程序,它通过监听 Redis 消息队列,对用户上传的文件进行解析处理。这里的 ${host_id} 是当前的主机名,${consumer_id} 是指 worker 的序号,拼接起来用于区分不同的消费者。

启动 MCP 服务器

RAGFlow 还支持 MCP 服务器,开启方法很简单,只需将 docker/docker-compose.yml 文件中 services.ragflow.command 部分的注释去掉即可:

services:
  ragflow:
    image: ${RAGFLOW_IMAGE}
    command:
      - --enable-mcpserver
      - --mcp-host=0.0.0.0
      - --mcp-port=9382
      - --mcp-base-url=http://127.0.0.1:9380
      - --mcp-script-path=/ragflow/mcp/server/server.py
      - --mcp-mode=self-host
      - --mcp-host-api-key=ragflow-xxxxxxx

关于 RAGFlow MCP 服务器的使用,我们今天暂且跳过,后面单开一篇介绍。

小结

通过今天的学习,我们了解了 RAGFlow 的系统架构,以及如何通过 entrypoint.sh 脚本启动不同的服务。接下来,我们将继续剖析 RAGFlow 的源码,探索 API 服务器和任务执行器的实现原理。


RAGFlow 快速入门

在构建高级 AI 应用时,检索增强生成(RAG)已成为一项关键技术,它能让大语言模型(LLM)利用外部知识库,提供更准确、更具上下文的回答。然而,如何高效地处理和理解格式各异的复杂文档(如 PDF、Word、PPT 等),并从中提取高质量信息,一直是 RAG 应用落地的一大挑战。

今天,我们将介绍一款强大的开源 RAG 引擎 —— RAGFlow,它专为解决这一难题而生。RAGFlow 基于深度文档理解技术,能够为企业和个人提供一套精简、高效的 RAG 工作流程,让 AI 应用能够从海量复杂数据中高质量地提取信息,真正做到 Quality in, quality out

ragflow-logo.png

RAGFlow 的核心特性如下:

  • 深度文档理解:不仅仅是提取文本,RAGFlow 能够深入理解各类复杂文档的布局和结构,确保从 PDF、Word、PPT 等文件中提取高质量、有价值的信息;
  • 智能文本切片:提供基于模板的文本切片方法,不仅智能,而且整个过程清晰可控,方便解释和调整;
  • 有理有据的回答:生成的回答都附带关键引用的快照,并支持追根溯源,最大限度地减少了 AI 幻觉;
  • 广泛的异构数据支持:兼容各类数据源,包括 Word 文档、PPT、Excel 表格、PDF、图片、网页,甚至是扫描件;
  • 自动化的 RAG 工作流:提供从数据处理、多路召回到融合重排序的全自动化 RAG 工作流,并支持灵活配置大语言模型和向量模型,提供易用的 API,方便与现有系统集成;

本文将带你快速入门 RAGFlow,学习如何安装、配置并使用它来构建你自己的 RAG 应用。

安装与上手

安装 RAGFlow 最简单的方法是使用 Docker 和 Docker Compose。首先检查我们的电脑上已经安装了它们:

$ docker --version
Docker version 24.0.2, build cb74dfc

$ docker compose version
Docker Compose version 2.38.1

确保 Docker 的版本在 24.0.0 以上,Docker Compose 的版本在 v2.26.1 以上。然后克隆 RAGFlow 仓库:

$ git clone https://github.com/infiniflow/ragflow.git

进入 docker 文件夹:

$ cd ragflow/docker

这个文件夹下有几个比较重要的文件:

  • docker-compose.yml - 定义了 RAGFlow 的镜像和配置,这个文件通过 Docker Compose 的 include 语法引用了 docker-compose-base.yml 文件,因此启动时只需指定这个入口文件即可
  • docker-compose-base.yml 定义了 RAGFlow 依赖的中间件的镜像和配置,包括 ES、MySQL、Redis 和 Minio 等
  • .env - 通过环境变量修改启动配置,比如调整各组件的端口,用户名和密码,默认镜像等,在 macOS 电脑上可以将 MACOS=1 打开,如果访问不了 huggingface.co 可以开启 HF_ENDPOINT=https://hf-mirror.com 参数

配置确认无误后,使用 Docker Compose 一键启动:

$ docker compose -f docker-compose.yml up -d

启动时默认会拉取官方构建好的 infiniflow/ragflow:v0.19.1-slim 镜像,该镜像比较大,下载要花点时间。启动成功后如下:

ragflow-containers.png

构建 ARM64 镜像

目前官方提供的 Docker 镜像均基于 x86 架构构建,并不提供基于 ARM64 的 Docker 镜像,比如在我的 macOS 上启动后,容器的下面会显示一个 AMD64 的标签。如果你的 Docker 和我一样,开启了 QEMU 或 Apple 的 Virtualization Framework 虚拟化技术,在 ARM64 机器上也可以跑 x86 的镜像,就是速度有点慢。

docker-desktop-setting.png

当然你也可以自行构建 ARM64 架构的镜像,顺便也能看看镜像中隐藏的一些细节,参考这篇文档:

首先,下载构建镜像所需的资源:

$ uv run download_deps.py

下载的资源包括:

  • 几个库文件

    • libssl
    • tika-server-standard.jar
    • cl100k_base.tiktoken
    • chrome 和 chromedriver
  • 几个 nltk_data 资源

    • wordnet
    • punkt,
    • punkt_tab
  • 几个 huggingface 模型

    • InfiniFlow/text_concat_xgb_v1.0
    • InfiniFlow/deepdoc
    • InfiniFlow/huqie
    • BAAI/bge-large-zh-v1.5
    • maidalun1020/bce-embedding-base_v1

然后构建资源镜像(就是将刚刚下载的资源拷贝到基础镜像里):

$ docker build -f Dockerfile.deps -t infiniflow/ragflow_deps .

然后基于资源镜像构建 RAGFlow 镜像:

$ docker build --build-arg LIGHTEN=1 -f Dockerfile -t infiniflow/ragflow:nightly-slim .

构建完成后,打开 docker/.env 文件,找到 RAGFLOW_IMAGE 配置,将其修改为 infiniflow/ragflow:nightly-slim。最后,使用 Docker Compose 一键启动:

$ cd docker
$ docker compose -f docker-compose-macos.yml up -d

RAGFlow 登录

启动后,查看 RAGFlow 容器的日志,当显示如下的文字 LOGO 时,说明启动成功:

ragflow-start-log.png

RAGFlow 默认监听本地 80 端口,直接用浏览器打开 http://localhost 即可,进入 RAGFlow 的登录页面:

ragflow-login.jpg

吐槽下 RAGFlow 的登录页面,这背景图选的,文字都看不清。

第一次使用需要注册一个新账号,注册完成后使用新账号登录即可:

ragflow-login-success.png

RAGFlow 初体验

进入 RAGFlow 的第一件事是配置模型,点击右上角的头像,然后进入 “模型供应商” 页面:

ragflow-model-setting.png

从下面的列表中选择并添加自己的模型,根据不同的模型,需要配置 API Key 等不同的参数。然后设置默认模型:

ragflow-model-setting-default.png

RAGFlow 支持大量的模型供应商,这些模型按功能被划分成几类:

  • 聊天模型
  • 嵌入模型
  • Img2txt模型
  • Speech2txt模型
  • Rerank模型
  • TTS模型

根据需要配置这些模型,一般来讲,除了聊天模型和嵌入模型是必填的,其他的可以不填;配置完默认模型后,就可以体验 RAGFlow 的功能了。进入 “知识库” 页面,创建一个新知识库:

ragflow-new-kb.png

然后点击 “新增文件” 按钮,从本地上传一个文件,上传后点击解析按钮,只有解析成功后的文件才可以对其问答,文件解析完成后如下所示:

ragflow-kb-files.png

我们再进入 “聊天” 页面,点击 “新建助理” 创建一个聊天助手:

ragflow-new-chat.png

下面的知识库选择我们刚刚创建的知识库,创建成功后,就可以和它进行对话了:

ragflow-kb-chat.png

小结

在本文中,我们对 RAGFlow 进行了快速入门。我们学习了 RAGFlow 的核心特性,讲解了如何通过 Docker Compose 进行安装部署,并为 ARM64 用户提供了详细的镜像构建指南。在完成模型供应商的配置后,我们通过创建一个知识库并上传文档,完整地体验了 RAGFlow 从数据处理到智能问答的基本工作流程。

通过今天的学习,我们对 RAGFlow 已经有了初步的了解。在后续的文章中,我们将结合源码深入其核心,探索更多高级功能,例如深度文档理解、智能文本切片、自动化 RAG 工作流等。