Fork me on GitHub

分类 Coze Studio 下的文章

学习 Coze Studio 的知识库检索逻辑

经过前面几天的学习,我们已经深入探讨了 Coze Studio 从知识库的创建到文档入库的完整流程。今天,我们将继续这一探索之旅,深入研究知识库的最后一个核心环节 —— 检索(Retrieval),看看在 Coze Studio 中,智能体是如何从知识库中精准高效地找到与用户问题最相关的信息的。Coze Studio 在这方面设计了一套颇为完善的检索流水线,它融合了查询重写、多路并行检索(向量、全文、NL2SQL)以及重排序等多种技术。下面就让我们一起看看它背后的实现细节。

知识库检索配置

在 Coze Studio 中,我们可以为智能体提供特定领域的背景知识,包括文本、表格和图片,让其可以回答领域内的问题:

coze-agent-kb-settings.png

我们还可以对知识库进行一些配置,以便智能体更好地对知识库进行检索:

coze-agent-kb-settings-2.png

这些配置参数包括:

  • 调用方式:分为 自动调用按需调用 两种,自动调用指的是每次对话时都会发起一次检索,将检索结果放置在内置的系统提示词中,而按需调用则是将知识库检索封装成一个工具供智能体调用,只有当用户问题确实需要检索知识库时才触发调用。实际上,在开源版本中暂时只支持自动调用,我们之前在学习智能体执行逻辑时已经看到了,通过 Graph API 编排出来的智能体图,知识库检索是必然执行的节点之一,而且在源码中搜索 recallKnowledge 工具,也找不到其他地方用到;
  • 搜索策略:分为 语义全文混合 三种,语义表示基于向量的文本相关性查询,推荐在需要理解语义关联度和跨语言查询的场景使用,全文表示依赖于关键词的全文搜索,推荐在搜索具有特定名称、缩写词、短语或 ID 的场景使用,混合表示结合全文检索与语义检索的优势,并对结果进行综合排序。
  • 最大召回数量:从知识库中返回给大模型的最大段落数,注意默认是 1,可以适当调大一点;
  • 最小匹配度:根据设置的匹配度选取段落返回给大模型,低于设定匹配度的内容不会被召回;
  • 表格 SQL 查询:同时将查询的自然语言转为 SQL 语句进行查询,SQL 执行结果与 RAG 召回段落一同输入给模型;该选项仅对表格知识库有效;
  • 查询改写:结合对话上下文,对用户的问题进行改写,使得改写后的查询更适合检索;
  • 结果重排:通过分析用户查询的意图,对召回结果重新排序,使得最相关的内容排在前面;

知识库检索流程

回顾之前学习的智能体执行逻辑,其中有一路负责知识库的检索:

// 新建知识库检索器
kr, err := newKnowledgeRetriever(ctx, &retrieverConfig{
  knowledgeConfig: conf.Agent.Knowledge,
})

// 第一个节点检索
_ = g.AddLambdaNode(keyOfKnowledgeRetriever,
  compose.InvokableLambda[*AgentRequest, []*schema.Document](kr.Retrieve),
  compose.WithNodeName(keyOfKnowledgeRetriever))

// 第二个节点组装成字符串
_ = g.AddLambdaNode(keyOfKnowledgeRetrieverPack,
  compose.InvokableLambda[[]*schema.Document, string](kr.PackRetrieveResultInfo),
  compose.WithOutputKey(placeholderOfKnowledge),
)

首先新建一个知识库检索器,将其 Retrieve() 方法作为第一个节点,知识库检索后,该方法返回 []*schema.Document 文档列表,因此还需要第二个节点,调用 PackRetrieveResultInfo 将其组装成字符串,替换系统提示词模版中的 {{ knowledge }} 占位符。

可以看出只要智能体关联了知识库,用户每次对话时总是会触发一次知识库检索,检索的逻辑位于 knowledge 领域层,如下:

func (k *knowledgeSVC) Retrieve(ctx context.Context, request *RetrieveRequest) (response *RetrieveResponse, err error) {

  // 检索上下文
  retrieveContext, err := k.newRetrieveContext(ctx, request)

  // 查询重写
  rewriteNode := compose.InvokableLambda(k.queryRewriteNode)
  // 向量化召回
  vectorRetrieveNode := compose.InvokableLambda(k.vectorRetrieveNode)
  // ES召回
  EsRetrieveNode := compose.InvokableLambda(k.esRetrieveNode)
  // Nl2Sql召回
  Nl2SqlRetrieveNode := compose.InvokableLambda(k.nl2SqlRetrieveNode)
  // 用户查询透传
  passRequestContextNode := compose.InvokableLambda(k.passRequestContext)
  // 重排序
  reRankNode := compose.InvokableLambda(k.reRankNode)
  // 打包查询结果
  packResult := compose.InvokableLambda(k.packResults)

  // 多路召回
  parallelNode := compose.NewParallel().
    AddLambda("vectorRetrieveNode", vectorRetrieveNode).
    AddLambda("esRetrieveNode", EsRetrieveNode).
    AddLambda("nl2SqlRetrieveNode", Nl2SqlRetrieveNode).
    AddLambda("passRequestContext", passRequestContextNode)

  // 编排检索链
  chain := compose.NewChain[*RetrieveContext, []*knowledgeModel.RetrieveSlice]()
  r, err := chain.
    AppendLambda(rewriteNode).
    AppendParallel(parallelNode).
    AppendLambda(reRankNode).
    AppendLambda(packResult).
    Compile(ctx)
  // 调用链
  output, err := r.Invoke(ctx, retrieveContext)
  return &RetrieveResponse{
    RetrieveSlices: output,
  }, nil
}

可以看到,这里使用了 Eino 的编排(Compose)技术,通过 Chain API 实现了多路检索 + 重排序的知识库检索流程:

coze-studio-kb-retriever-flow.png

下面将依次学习下检索流程中各个节点的实现逻辑。

查询重写

该节点只有在启用查询重写且有聊天历史时才会执行,其作用为对用户原始查询进行改写优化,结合聊天上下文,将用户的当前查询改写成更适合检索的查询语句。查询重写的目的主要有两个:

  1. 使模糊问题更明确:比如用户的问题是 听说最近武汉的樱花挺不错,打算下周二带家人去看看,不知道那天天气怎么样?,需要把问题改成更明确的 武汉下周二天气怎么样?
  2. 多轮对话中的指代消解:比如用户先问 合肥明天的天气怎么样?,系统回答之后,用户接着又问 那后天呢?,这时要把问题改写成 合肥后天的天气怎么样?

其核心就一段 Prompt,位于 backend/conf/prompt/messages_to_query_template_jinja2.json 配置文件:

# 角色:
你是一名专业的查询重构工程师,擅长根据用户提供的上下文信息重写查询语句,使其更清晰、更完整,并与用户的意图相符。你应使用与用户输入相同的语言进行回复。

## 输出格式:
输出内容应为重构后的查询语句,以纯文本格式呈现。

## 示例:
示例 1:
输入:
[
  {
    "role": "user",
    "content": "世界上最大的沙漠在哪里?"
  },
  {
    "role": "assistant",
    "content": "世界上最大的沙漠是撒哈拉沙漠。"
  },
  {
    "role": "user",
    "content": "怎么去那里?"
  }
]
输出:怎么去撒哈拉沙漠?

示例 2:
输入:
[
  {
    "role": "user",
    "content": "分析当前网红欺骗公众以获取流量对当今社会的影响。"
  }
]
输出:当前网红欺骗公众以获取流量,分析这一现象对当今社会的影响。

值得注意的是,开启查询重写功能需要配置 AI 生成模型,可以打开 .env 文件,配置如下:

export BUILTIN_CM_TYPE="openai"

或者单独配置 M2Q 模型:

export M2Q_BUILTIN_CM_TYPE="openai"

由于修改的是环境变量,通过 --force-recreate 重启服务:

$ docker compose --profile '*' up -d --force-recreate --no-deps coze-server

多路检索策略

检索流程中的第二部分包含四个并行执行的子节点:

  • 向量检索节点:进行语义相似度检索,使用向量数据库,基于语义相似度查找相关文档;当搜索策略设置为纯全文搜索时跳过;
  • ES 检索节点:进行全文文本检索,使用 Elasticsearch,基于关键词匹配查找相关文档;当搜索策略设置为纯语义搜索时跳过;
  • NL2SQL 检索节点:专门处理表格数据的检索,将自然语言查询转换为 SQL 语句,查询结构化表格数据;只有在启用 NL2SQL 且存在表格文档时执行;
  • 上下文传递节点:传递原始请求上下文,确保后续节点能够访问完整的检索上下文信息;由于这里使用的是 Chain API,它是基于 Graph API 实现的,不支持跨节点传递参数;

这里的向量检索和 ES 检索都没什么特别的,主要看下 NL2SQL 检索的实现:

func (k *knowledgeSVC) nl2SqlExec(ctx context.Context, doc *model.KnowledgeDocument, retrieveCtx *RetrieveContext, opts []nl2sql.Option) (retrieveResult []*schema.Document, err error) {
    
  // 调用大模型,将自然语言查询转换为 SQL 语句
  sql, err := k.nl2Sql.NL2SQL(ctx, retrieveCtx.ChatHistory, []*document.TableSchema{packNL2SqlRequest(doc)}, opts...)

  // 对 SQL 中表名和列名进行替换
  parsedSQL, err := sqlparser.NewSQLParser().ParseAndModifySQL(sql, replaceMap)

  // 执行 SQL
  resp, err := k.rdb.ExecuteSQL(ctx, &rdb.ExecuteSQLRequest{
    SQL: parsedSQL,
  })

  return retrieveResult, nil
}

第一步仍然是调用 AI 生成大模型,因此也需要配置 AI 生成模型:

export BUILTIN_CM_TYPE="openai"

或单独配置 NL2SQL 模型:

export NL2SQL_BUILTIN_CM_TYPE="openai"

其核心也是一段 Prompt,位于 backend/conf/prompt/nl2sql_template_jinja2.json 配置文件:

角色:NL2SQL顾问

目标:
将自然语言陈述转换为MySQL标准的SQL查询。遵循约束条件,且仅返回JSON格式。

格式:
- 仅JSON格式。JSON包含字段“sql”(用于生成的SQL)、字段“err_code”(用于原因类型)、字段“err_msg”(用于详细原因,最好超过10个字)
- 不要使用“```json”标记格式

技能:
- 擅长将自然语言陈述转换为MySQL标准的SQL查询。

定义:
“err_code”原因类型定义:
- 0表示已生成SQL
- 3002表示因超时无法生成SQL
- 3003表示因缺少表结构无法生成SQL
- 3005表示因某些术语不明确无法生成SQL

示例:
问:帮我实现NL2SQL。
表结构描述:CREATE TABLE `sales_records` (
  `sales_id` bigint(20) unsigned NOT NULL COMMENT '销售员ID',
  `product_id` bigint(64) COMMENT '产品ID',
  `sale_date` datetime(3) COMMENT '销售日期和时间',
  `quantity_sold` int(11) COMMENT '销售量',
  PRIMARY KEY (`sales_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='销售记录表';
SQL需求的自然语言描述:查询上月的销量总额第一名的销售员和他的销售总额
答:{
  "sql":"SELECT sales_id, SUM(quantity_sold) AS total_sales FROM sales_records WHERE MONTH(sale_date) = MONTH(CURRENT_DATE - INTERVAL 1 MONTH) AND YEAR(sale_date) = YEAR(CURRENT_DATE - INTERVAL 1 MONTH) GROUP BY sales_id ORDER BY total_sales DESC LIMIT 1",
  "err_code":0,
  "err_msg":"SQL查询生成成功"
}

值得一提的是,给大模型的表名和列名是原始的名称:

SELECT 成绩 FROM 学生信息 WHERE 姓名 = '胡阳';

而我们之前学习表格知识库入库逻辑时提到过,Coze Studio 会动态生成一个表名和列名,都是 ID 格式,如下:

create-table-db.png

因此还需要将大模型生成的 SQL 语句中的表名和列名做一个替换,这里使用的是 TiDB 的 SQL Parser 对 SQL 语句进行解析和替换,如果有类似需求的话,可以参考下这块的逻辑。

重排序

多路召回后,为了使得和用户查询最相关的内容排在前面,我们还需要对召回结果重新排序。Coze Studio 提供了两种重排序实现:

  • RRF 实现:基于倒序排名融合算法,通过计算每个文档在各个检索结果中的排名来综合评分;
  • VikingDB 实现:调用火山引擎的重排序 API 接口;

不过看代码当前只使用了 RRF 重排序,RRF 全称为 Reciprocal Rank Fusion(倒数排名融合),是滑铁卢大学和谷歌合作开发的一种算法,它可以将具有不同相关性指标的多个结果集组合成单个结果集,感兴趣的同学可以看下它的论文:

其中最关键的部分就是下面这个公式:

rrf-score.png

其中,D 表示文档集,R 是从 1 到 |D| 的排列,k 是一个常量,默认值为 60,r(d) 表示文档在某个检索结果中的排名。从 RRF 分数的计算中,我们可以看出,RRF 不依赖于每次检索分配的绝对分数,而是依赖于相对排名,这使得它非常适合组合来自可能具有不同分数尺度或分布的查询结果。

小结

今天,我们学习了 Coze Studio 知识库检索的完整流程,通过 Eino 框架编排出一套包含查询重写、多路并行检索以及结果重排的检索流水线。其核心技术点可总结如下:

  • 查询重写:利用大模型结合对话上下文,对用户的原始问题进行优化和消歧,生成更适合机器检索的查询语句;
  • 多路并行检索:同时发起向量、全文和 NL2SQL 三种检索方式,最大化地召回相关信息,并通过并行处理提升检索效率;
  • 结果重排:采用先进的 RRF 算法,对来自不同检索通路的结果进行智能融合与排序,确保最相关的内容能够优先呈现给大模型;

至此,我们对 Coze Studio 的学习之旅也要告一段落了。从快速上手、基本功能实战,到深入后端源码,我们系统地剖析了其智能体、插件、工作流、知识库等核心模块的设计与实现。希望这一系列的文章能帮助大家对 Coze Studio 有一个全面而深入的理解,并为大家利用其构建自己的 AI 应用提供有价值的参考。


学习 Coze Studio 的知识库入库逻辑(续)

书接上文,当用户在知识库上传文档、表格或图片,然后对知识库进行设置和确认后,Coze Studio 通过发送 IndexDocuments 事件开始了文档的异步处理。这个事件被发送到消息队列后,由 knowledge 领域的事件处理器消费,我们今天就来看下这块的逻辑。

事件处理器

事件处理器的代码位于 backend/domain/knowledge/service/event_handle.go 文件,这是一个 switch 结构,负责分发知识库相关的各类事件,比如批量文档处理(IndexDocuments)、单个文档处理(IndexDocument)、修改文档分片(IndexSlice)、删除知识库数据(DeleteKnowledgeData)、文档预览(DocumentReview)等:

func (k *knowledgeSVC) HandleMessage(ctx context.Context, msg *eventbus.Message) (err error) {

  event := &entity.Event{}
  err = sonic.Unmarshal(msg.Body, event)

  // 事件处理器
  switch event.Type {
  case entity.EventTypeIndexDocuments:
    // 批量文档处理
    k.indexDocuments(ctx, event)
  case entity.EventTypeIndexDocument:
    // 单个文档处理
    k.indexDocument(ctx, event)
  case entity.EventTypeIndexSlice:
    // 修改文档分片
    k.indexSlice(ctx, event)
  case entity.EventTypeDeleteKnowledgeData:
    // 删除知识库数据
    k.deleteKnowledgeDataEventHandler(ctx, event)
  case entity.EventTypeDocumentReview:
    // 文档预览
    k.documentReviewEventHandler(ctx, event)
  }
  return nil
}

其中批量文档处理(IndexDocuments)事件的逻辑很简单,就是遍历所有文档,为每个文档单独发送一个单文档处理(IndexDocument)事件。

文档处理流程

单文档处理(IndexDocument)事件的代码位于 indexDocument() 函数,这个处理函数是整个知识库入库流程的核心,其执行流程可分为以下几个关键步骤:

  1. 状态检查与清理

    • 首先检查知识库和文档是否处于可写入状态,防止并发操作导致数据不一致;
    • 如果不是追加模式,会先清除该文档在数据库(knowledge_document_slice 表)和向量数据库中的旧数据;
  2. 解析与分片

    • 将文档状态设置为分片中(Chunking);
    • 从对象存储(TOS)下载文档的原始文件;
    • 根据文档类型(文本、图片、表格)获取对应的解析器(Parser),对文件内容进行解析和分片,生成一系列文档分片(schema.Document);
  3. 数据持久化

    • 为每个文档分片生成唯一的 ID;
    • 将文档分片批量存入 knowledge_document_slice 数据表中;
    • 对于表格类型,还会将解析出的结构化数据行插入到之前动态创建的物理数据表中;
  4. 向量化与索引

    • 将持久化后的文档分片转换为向量数据库要求的格式;
    • 调用 searchstoreStore 方法,将文档分片进行向量化,并存入 Milvus 等向量数据库中。这个过程会根据文档 ID 进行分区(Partition),便于后续的高效检索和管理;
  5. 状态更新

    • 所有文档分片成功存入向量数据库后,将其在 knowledge_document_slice 表中的状态更新为 Done
    • 最后,将 knowledge_document 表中对应文档的状态更新为 Enable,表示该文档已处理完成并可供检索;

接下来,我们详细看看每一步的实现逻辑。

状态检查与清理

// 检查知识库和文档状态
if valid, err := k.isWritableKnowledgeAndDocument(ctx, doc.KnowledgeID, doc.ID); err != nil {
  return err
} else if !valid {
  return errorx.New(errno.ErrKnowledgeNonRetryableCode, ...)
}

// 清除旧数据
collectionName := getCollectionName(doc.KnowledgeID)
if !doc.IsAppend {
  ids, err := k.sliceRepo.GetDocumentSliceIDs(ctx, []int64{doc.ID})
  if len(ids) > 0 {
    // 删除分片记录
    err = k.sliceRepo.DeleteByDocument(ctx, doc.ID)
    for _, manager := range k.searchStoreManagers {
      // 删除 search store 中的数据
      s, err := manager.GetSearchStore(ctx, collectionName)
      s.Delete(ctx, slices.Transform(event.SliceIDs, func(id int64) string {
        return strconv.FormatInt(id, 10)
      }))
    }
  }
}

首先通过 isWritableKnowledgeAndDocument() 函数分别检查知识库和文档的状态是否可写(不是被禁用或被删除),只有两者都可写时才继续后面的流程。然后如果是非追加模式,查询当前文档在 knowledge_document_slice 表中是否已存在分片记录,如果存在,则删除该文档的所有分片,同时删除该文档在搜索存储中的数据。

这里的 搜索存储(Search Store) 是 Coze Studio 对搜索引擎的统一抽象层,为不同的数据库(Milvus、Elasticsearch、VikingDB)提供统一的操作接口,它继承 indexer.Indexerretriever.Retriever,支持向量或文本的存储和索引构建,并提供基于向量或文本的相似性搜索能力。要注意的是,搜索存储要通过 Manager 的 GetSearchStore() 来获取,Manager 是搜索存储的管理器,负责管理搜索存储的生命周期,比如创建、删除、获取等。目前支持全文搜索和向量搜索两种 Manager 如下:

var sManagers []searchstore.Manager

// es full text search
sManagers = append(sManagers, sses.NewManager(&sses.ManagerConfig{Client: c.ES}))

// vector search
mgr, err := getVectorStore(ctx)
sManagers = append(sManagers, mgr)

其中向量搜索根据 VECTOR_STORE_TYPE 环境变量可选 Milvus 或 VikingDB,默认是 Milvus 数据库。

解析与分片

// 将文档状态设置为分片中
k.documentRepo.SetStatus(ctx, doc.ID, int32(entity.DocumentStatusChunking), "")

// 从对象存储下载文档的原始文件
bodyBytes, err := k.storage.GetObject(ctx, doc.URI)

// 根据文档类型(文本、图片、表格)获取对应的解析器
docParser, err := k.parseManager.GetParser(convert.DocumentToParseConfig(doc))

// 对文件内容进行解析和分片,生成一系列文档分片
parseResult, err := docParser.Parse(ctx, bytes.NewReader(bodyBytes), parser.WithExtraMeta(map[string]any{
  document.MetaDataKeyCreatorID: doc.CreatorID,
  document.MetaDataKeyExternalStorage: map[string]any{
    "document_id": doc.ID,
  },
}))

接下来,从对象存储下载文档的原始文件,然后对文件内容进行解析和分片。根据不同的文档类型,Coze Studio 提供了不同的解析器:

func (m *manager) GetParser(config *parser.Config) (parser.Parser, error) {
  var pFn parseFn
  switch config.FileExtension {
  case parser.FileExtensionPDF:
    pFn = parseByPython(config, m.storage, m.ocr, goutil.GetPython3Path(), goutil.GetPythonFilePath("parse_pdf.py"))
  case parser.FileExtensionTXT:
    pFn = parseText(config)
  case parser.FileExtensionMarkdown:
    pFn = parseMarkdown(config, m.storage, m.ocr)
  case parser.FileExtensionDocx:
    pFn = parseByPython(config, m.storage, m.ocr, goutil.GetPython3Path(), goutil.GetPythonFilePath("parse_docx.py"))
  case parser.FileExtensionCSV:
    pFn = parseCSV(config)
  case parser.FileExtensionXLSX:
    pFn = parseXLSX(config)
  case parser.FileExtensionJSON:
    pFn = parseJSON(config)
  case parser.FileExtensionJsonMaps:
    pFn = parseJSONMaps(config)
  case parser.FileExtensionJPG, parser.FileExtensionJPEG, parser.FileExtensionPNG:
    pFn = parseImage(config, m.model)
  default:
    return nil, fmt.Errorf("[Parse] document type not support, type=%s", config.FileExtension)
  }
  return &p{parseFn: pFn}, nil
}

解析器做的比较通用,将文本、表格、图片知识库的所有配置参数混在一起,不用区分知识库是什么类型。这里对这些解析器的实现流程做个大概的总结:

  • PDF 解析器:运行 Python 脚本 parse_pdf.py 从 PDF 文件中提取出文本、图片、表格等内容,主要使用的是 pdfplumber 库;文本内容调用 chunkCustom 进行分块;图片转为 base64 存储到对象存储,如果开启了 OCR 功能,还调用 OCR 接口将图片转换为文本;表格数据则转为 CSV 格式处理;
  • TXT 解析器:直接读取文件内容,调用 chunkCustom 进行文本分块;
  • Markdown 解析器:使用 yuin/goldmark 将 Markdown 文件解析成 AST,遍历文本节点,按分隔符和块大小分块,支持自动下载图片并保存到对象存储,如果开启了 OCR 功能,也会调用 OCR 接口做文字识别;
  • Docx 解析器:通过运行 Python 脚本 parse_docx.py 从 DOCX 文件中提取文本、图片和表格内容,主要使用的是 python-docx 库;之后的逻辑和 PDF 解析器一样;
  • CSV 解析器:通过 encoding/csv 逐行读取 CSV 数据,支持自动处理 BOM 头,转换为行迭代器统一处理;
  • XLSX 解析器:通过 qax-os/excelize 库打开 Excel 文件,逐行读取,并自动补齐列数,转换为行迭代器统一处理;
  • JSON 解析器:解析 JSON 数组为 map 切片,支持动态提取表头,即首个对象的 key,支持自定义列配置,转换为行迭代器统一处理;
  • 图片解析器:读取图片,转为 base64,并调用多模态大模型生成图片描述,支持手动标注模式;

可以看出,Coze Studio 对常见的文件类型都提供了很好的支持:对于 PDF 和 Word 等复杂文档,使用 Python 来处理,通过 Go 与 Python 脚本的管道通信机制;对于表格数据,都使用了行迭代器统一处理;对于图像数据,使用 OCR 进行文字识别,或调用多模态大模型生成图片描述。

我们之前在介绍 Eino 组件时学习过,Eino Ext 内置了一些文档处理类的组件,支持解析 TXT、HTML、DOCX、XLSX、PDF 等格式的文件。但是很显然,Go 在这方面的生态相比于 Python 来说还不够成熟,因此 Coze Studio 采用了 Go + Python 这种折中的方式。

数据持久化

// 批量生成所有文档分片的 ID
allIDs := make([]int64, 0, len(parseResult))
for l := 0; l < len(parseResult); l += 100 {
    ids, err := k.idgen.GenMultiIDs(ctx, batchSize)
    allIDs = append(allIDs, ids...)
}

// 将 parseResult 转换为 sliceModels
sliceModels := make([]*model.KnowledgeDocumentSlice, 0, len(parseResult))
for i, src := range parseResult {
    sliceModel := &model.KnowledgeDocumentSlice{
        ID:          allIDs[i],
        KnowledgeID: doc.KnowledgeID,
        DocumentID:  doc.ID,
        Content:     parseResult[i].Content,
        // 将分片状态设置为处理中(Processing)
        Status:      int32(model.SliceStatusProcessing),
    }
    // 表格类型的分片 特殊处理
    if doc.Type == knowledge.DocumentTypeTable {
        sliceEntity, err := convertFn(src, doc.KnowledgeID, doc.ID, doc.CreatorID)
        sliceModel.Content = sliceEntity.GetSliceContent()
    }
    sliceModels = append(sliceModels, sliceModel)
}

// 批量保存文档分片
err = k.sliceRepo.BatchCreate(ctx, sliceModels)

// 保存表格类型的数据
if doc.Type == knowledge.DocumentTypeTable {
    err = k.upsertDataToTable(ctx, &doc.TableInfo, sliceEntities)
}

经过上一步后,我们得到了一系列的文档分片 parseResult,接下来的逻辑就是将其批量存入 knowledge_document_slice 数据表中。首先通过 GenMultiIDs 为每个分片生成唯一的 ID,这里使用批量生成的方式,一次生成 100 个;然后将 parseResult 转换为 KnowledgeDocumentSlice 类型,并将分片状态设置为处理中(SliceStatusProcessing);最后调用 sliceRepo.BatchCreate 批量保存。

这里值得注意的一点是对表格数据的处理,表格按行分片,内容是一行数据的 JSON 格式,Key 为列名,Value 为单元格的值,如下:

table-slice.png

此外,我们回顾下昨天学习文档处理器的内容:对于表格知识库,首次插入文档时,除了会新建一条 knowledge_document 记录,还会根据 Excel 的列结构动态地创建一个物理数据表;这里在解析完表格之后,表格的分片数据还会插入到这个物理数据表中:

create-table-db.png

向量化与索引

// 字段列表和索引字段
fields, err := k.mapSearchFields(doc)
indexingFields := getIndexingFields(fields)

for _, manager := range k.searchStoreManagers {
  // 为每个知识库创建一个独立的集合或索引
  manager.Create(ctx, &searchstore.CreateRequest{
    CollectionName: collectionName,
    Fields:         fields,
    CollectionMeta: nil,
  })
  // 将文档分片保存到搜索存储中(Milvus、Elasticsearch、VikingDB)
  ss, err := manager.GetSearchStore(ctx, collectionName)
  _, err = ss.Store(ctx, ssDocs,
    searchstore.WithIndexerPartitionKey(fieldNameDocumentID),
    searchstore.WithPartition(strconv.FormatInt(doc.ID, 10)),
    searchstore.WithIndexingFields(indexingFields),
  )
}

接下来,我们还需要将文档分片保存到搜索存储(Milvus、Elasticsearch、VikingDB)中。首先,使用 Manager 的 Create 方法为每个知识库创建一个独立的集合,集合的名字为 opencoze_<kb_id>,每种搜索存储的概念不太一样,创建集合的方式也是不一样的。比如在 Milvus 中,分为创建集合、创建索引和加载集合三步,确保 Milvus 集合加载到内存中,使其可用于搜索操作:

func (m *milvusManager) Create(ctx context.Context, req *searchstore.CreateRequest) error {
  // 创建集合
  if err := m.createCollection(ctx, req); err != nil {
    return fmt.Errorf("[Create] create collection failed, %w", err)
  }
  // 创建索引
  if err := m.createIndexes(ctx, req); err != nil {
    return fmt.Errorf("[Create] create indexes failed, %w", err)
  }
  // 加载集合
  if exists, err := m.loadCollection(ctx, req.CollectionName); err != nil {
    return fmt.Errorf("[Create] load collection failed, %w", err)
  } else if !exists {
    return fmt.Errorf("[Create] load collection failed, collection=%v does not exist", req.CollectionName)
  }
  return nil
}

在 Milvus 向量数据库中,load_collection 是一个非常重要的操作,其主要作用是将指定的集合(Collection)从磁盘加载到内存中,以便进行高效的向量检索和查询操作,刚创建或刚插入数据的集合需要先执行 load_collection 后才能进行查询操作。

而在 Elasticsearch 中,直接创建索引即可:

func (e *esManager) Create(ctx context.Context, req *searchstore.CreateRequest) error {
  cli := e.config.Client
  index := req.CollectionName
  // 创建索引
  err = cli.CreateIndex(ctx, index, properties)
  return err
}

在 Milvus 中,每个知识库对应一个集合:

milvus-indexes.png

根据文档类型创建对应的字段和索引:

milvus-indexes-schema.png

而在 Elasticsearch 中,每个知识库对应一个索引:

es-indexes.png

然后,将文档分片依次保存到搜索存储中,对于 Milvus 来说,会有一个向量列,通过 Embedding 计算:

milvus-indexes-data.png

而 Elasticsearch 则直接存储文档分片:

es-indexes-data.png

整个过程中规中矩,并没有多少需要特别关注的地方。其中 Embedding 我们之前介绍过,支持三种接入方式:

  • OpenAI - 兼容 OpenAI 协议的 Embedding 接口;
  • ARK - 火山引擎提供的 Embedding 服务,支持多种模型,如 doubao-embeddingdoubao-embedding-largedoubao-embedding-visionbge-large-zhbge-m3bge-visualized-m3 等;其中 bge-m3 比较特殊,它是唯一一个 同时支持稠密向量和稀疏向量 的模型,如果使用这个模型,在 Milvus 集合中不仅会创建 dense_text_content 稠密向量列,还会创建一个 sparse_text_content 稀疏向量列;
  • HTTP - 调用本地部署的模型服务,需要满足 Coze 自己的一套 接口协议,暂不支持 Ollama 或 Xinference 协议;

状态更新

// 将分片状态设置为已完成(Done)
err = k.sliceRepo.BatchSetStatus(ctx, allIDs, int32(model.SliceStatusDone), "")

// 将文档状态设置为启用(Enable)
err = k.documentRepo.SetStatus(ctx, doc.ID, int32(entity.DocumentStatusEnable), "")

// 更新文档的分片信息
// 1. 统计文档的分片数量
// 2. 计算文档所有分片内容的总大小
err = k.documentRepo.UpdateDocumentSliceInfo(ctx, event.Document.ID)

最后一步,将分片状态设置为已完成(Done),将文档状态设置为启用(Enable),同时更新文档的分片信息,包括分片数量和分片大小,对应 knowledge_document 表中的 slice_countsize 字段。

小结

今天我们接续昨天的内容,深入探究了 Coze Studio 知识库文档入库的异步处理全流程。整个过程由消息队列驱动,其核心逻辑总结如下:

  • 事件驱动架构:文档上传后,系统通过发送 IndexDocuments 事件触发异步处理流程,该事件再分发为针对单个文档的 IndexDocument 事件,由专门的事件处理器进行消费和处理;
  • 核心处理流水线indexDocument 函数是整个流程的核心,它编排了从文档解析到最终索引完成的五个关键步骤:状态检查与清理、解析与分片、数据持久化、向量化与索引,以及最终的状态更新;
  • 多格式文档解析:系统内置了针对不同文件类型(PDF, DOCX, Markdown, 表格, 图片等)的精细化解析器。特别地,对于 PDF、DOCX 等复杂格式,结合了 Python 脚本进行处理,弥补了 Go 生态在文档解析方面的不足;
  • 双重存储策略:解析后的文档分片不仅被持久化到 MySQL 数据库(knowledge_document_slice 表)中,同时也被向量化并存入由 Search Store 抽象层管理的 Milvus 或 Elasticsearch 中,实现了元数据与向量索引的分离存储;

至此,一份用户上传的文档就完成了从解析、切片、存储到向量化的完整入库流程,可以被智能体检索和使用了。


学习 Coze Studio 的知识库入库逻辑

经过前面几天的学习,我们已经深入探讨了 Coze Studio 的智能体、插件、工作流等核心功能的实现原理。今天,我们将继续探索 Coze Studio 的知识库功能,学习当我们在前端页面上点击 “创建知识库” 并上传文档时,后端服务是如何处理这些入库请求的。

知识库创建接口

我们知道,创建知识库是构建 RAG 应用的第一步,Coze Studio 支持 文本表格图片 三种不同类型的知识库:

coze-kb-create.png

这三种知识库的创建流程基本类似,都分为两步:

  1. 输入名称、描述、图标和导入类型,点击 “完成创建” 或 “创建并导入”,这一步调用 /api/knowledge/create 接口,创建的知识库实例对应 knowledge 数据表;
  2. 上传文档、表格或图片,对知识库进行设置,预览,然后开始对文档进行处理,这一步调用 /api/knowledge/document/create 接口,上传的文档对应 knowledge_document 数据表;

下面我们着重看下第二个接口的实现逻辑,接口层代码位于 backend/api/handler/coze/knowledge_service.go 文件:

// @router /api/knowledge/document/create [POST]
func CreateDocument(ctx context.Context, c *app.RequestContext) {

  // 绑定入参
  var req dataset.CreateDocumentRequest
  err = c.BindAndValidate(&req)

  // 调用 knowledge 应用层
  resp := new(dataset.CreateDocumentResponse)
  resp, err = application.KnowledgeSVC.CreateDocument(ctx, &req)
  c.JSON(consts.StatusOK, resp)
}

然后,调用 knowledge 应用层:

func (k *KnowledgeApplicationService) CreateDocument(ctx context.Context, req *dataset.CreateDocumentRequest) (*dataset.CreateDocumentResponse, error) {

  // 调用 knowledge 领域层
  resp := dataset.NewCreateDocumentResponse()
  createResp, err := k.DomainSVC.CreateDocument(ctx, &service.CreateDocumentRequest{
    Documents: documents,
  })
  return resp, nil
}

紧接着,做一些参数的转换,再调用 knowledge 领域层,这里是文档处理的核心:

func (k *knowledgeSVC) CreateDocument(ctx context.Context, request *CreateDocumentRequest) (response *CreateDocumentResponse, err error) {

  // 根据不同的知识库类型,创建对应的文档处理器
  docProcessor := impl.NewDocProcessor(ctx, &impl.DocProcessorConfig{
    ...
  })

  // 1. 前置的动作,上传 tos 等
  err = docProcessor.BeforeCreate()
  // 2. 构建 落库
  err = docProcessor.BuildDBModel()
  // 3. 插入数据库
  err = docProcessor.InsertDBModel()
  // 4. 发起索引任务
  err = docProcessor.Indexing()
  // 5. 返回处理后的文档信息
  docs := docProcessor.GetResp()
  return &CreateDocumentResponse{
    Documents: docs,
  }, nil
}

文档处理器

Coze Studio 使用工厂模式根据文档来源和类型创建不同的文档处理器:

  • customDocProcessor: 自定义内容 + 非表格类型
  • customTableProcessor: 自定义内容 + 表格类型
  • baseDocProcessor: 本地文件 + 非表格类型 或 默认
  • localTableProcessor: 本地文件 + 表格类型

其中,本地文件指的是用户上传文件,自定义内容指的是手动录入文档内容。所有处理器都实现相同的接口:BeforeCreateBuildDBModelInsertDBModelIndexingGetResp,流程图如下:

create-document-flow.png

前置动作

前置动作 BeforeCreate 是文档创建流程中的第一个步骤,主要是对自定义内容和表格进行处理,比如在 customDocProcessor 中,将用户输入的文本内容上传到对象存储:

func (c *customDocProcessor) BeforeCreate() error {
  for i := range c.Documents {
    // 文件扩展名
    c.Documents[i].FileExtension = getFormatType(c.Documents[i].Type)
    // 根据当前时间为文档生成唯一的存储路径
    uri := getTosUri(c.UserID, string(c.Documents[i].FileExtension))
    // 把用户直接输入的文本内容持久化到存储系统
    _ := c.storage.PutObject(c.ctx, uri, []byte(c.Documents[i].RawContent))
    c.Documents[i].URI = uri
  }
  return nil
}

其中文件扩展名只支持 .txt.json 两种,分别对应文本知识库和表格知识库:

func getFormatType(tp knowledge.DocumentType) parser.FileExtension {
  docType := parser.FileExtensionTXT
  if tp == knowledge.DocumentTypeTable {
    docType = parser.FileExtensionJSON
  }
  return docType
}

生成的文件位于 FileBizType.Knowledge 目录下,文件名由用户 ID 和当前时间组成:

func getTosUri(userID int64, fileType string) string {
  fileName := fmt.Sprintf("FileBizType.Knowledge/%d_%d.%s", userID, time.Now().UnixNano(), fileType)
  return fileName
}

可以在 Minio 的 opencoze 桶下找到对应的文件:

minio-file-biz-type.png

构建数据模型

第二步 BuildDBModel 比较简单,构建文档的数据模型,为后面插入 knowledge_document 数据表做准备。这里有两点提一下:

  1. 文档 ID 使用 idgen.GenMultiIDs() 批量生成,通过 Redis 实现,参考 backend/infra/impl/idgen/idgen.go 文件;
  2. 对于表格型知识库,上传的表格文件必须具备相同的数据结构,如果往已存在的表格知识库中添加文档,也就是追加模式,这时会复用之前表格的数据结构,直接跳过插入数据表这一步;这也意味着,每个表格知识库里只会有一条文档记录,这条记录的 table_info 字段至关重要,参见下一节;

插入数据

第三步 InsertDBModel 将上一步构建好的文档记录批量插入到 knowledge_document 数据表中:

func (p *baseDocProcessor) InsertDBModel() (err error) {
  
  // 如果是表格知识库,且不是追加模式
  // 说明是首次创建,使用 CREATE TABLE 创建一个数据表
  if !isTableAppend(p.Documents) {
    err = p.createTable()
  }

  // 开启事务
  tx, err := p.knowledgeRepo.InitTx()
  
  defer func() {
    if err != nil {
      // 出错时回滚
      tx.Rollback()
    } else {
      // 正常时提交
      tx.Commit()
    }
  }()

  // 批量插入文档记录
  err = p.documentRepo.CreateWithTx(ctx, tx, p.docModels)
  
  // 更新知识库时间
  err = p.knowledgeRepo.UpdateWithTx(ctx, tx, p.Documents[0].KnowledgeID, map[string]interface{}{
    "updated_at": time.Now().UnixMilli(),
  })
  return nil
}

这里的逻辑也有两点值得提一下:

  1. 支持批量插入多条文档记录,使用数据库事务,确保一批数据全部插入成功或失败;
  2. 对于表格知识库,首次插入文档时,除了会新建一条 knowledge_document 记录,还会根据 Excel 的列结构动态地创建一个数据表;

比如下面这个 Excel 文件,包含五列:

create-table-excel.png

查看新建的 knowledge_document 记录,有一个 table_info 字段,内容如下:

{
  "columns": [
    {
      "ID": 7533389719145545728,
      "Name": "姓名",
      "Type": 1,
      "Indexing": true,
      "Sequence": 0,
      "Description": ""
    },
    {
      "ID": 7533389719145562112,
      "Name": "学号",
      "Type": 2,
      "Indexing": false,
      "Sequence": 1,
      "Description": ""
    },
    {
      "ID": 7533389719145578496,
      "Name": "年龄",
      "Type": 2,
      "Indexing": false,
      "Sequence": 2,
      "Description": ""
    },
    {
      "ID": 7533389719145594880,
      "Name": "性别",
      "Type": 1,
      "Indexing": false,
      "Sequence": 3,
      "Description": ""
    },
    {
      "ID": 7533389719145611264,
      "Name": "成绩",
      "Type": 2,
      "Indexing": false,
      "Sequence": 4,
      "Description": ""
    },
    {
      "ID": 7533389719145627648,
      "Name": "_knowledge_document_slice_id",
      "Type": 2,
      "Indexing": false,
      "Sequence": -1,
      "Description": "主键ID"
    }
  ],
  "table_desc": "",
  "virtual_table_name": "学生信息",
  "physical_table_name": "table_7533389719149740032"
}

这里的 physical_table_name 就是对应创建的数据表,注意表里的列名都变成了 ID 格式:

create-table-db.png

后面在查询表格知识库时,实际上就是在查询这个表。

计算索引

经过前面的几步,我们完成了自定义内容的处理、知识库和文档的插入、数据表的创建等等,不过这些都还只是开胃小菜,接下来开始才是重头戏。在索引计算 Indexing 中,我们通过 MQ 发送 IndexDocuments 事件,开启异步处理文件:

func (p *baseDocProcessor) Indexing() error {

  // 创建 IndexDocuments 事件
  event := events.NewIndexDocumentsEvent(p.Documents[0].KnowledgeID, p.Documents)
  body, err := sonic.Marshal(event)
  
  // 通过 MQ 发送事件
  err = p.producer.Send(p.ctx, body)
  return nil
}

Coze Studio 支持多种 MQ 实现:

事件 IndexDocuments 发送之后,再通过 GetResp 返回创建文档的响应,整个文档上传的接口就结束了。

未完待续

今天,我们简单过了一遍 Coze Studio 知识库创建和文档上传的后端逻辑。整个过程从用户上传文档触发 CreateDocument 接口开始,在完成文档元信息入库后,通过发送 IndexDocuments 事件,将耗时的文档处理转为异步执行任务。尽管文档上传的接口结束了,但是这只是 Coze Studio 文档处理流程的冰山一角,我们明天将继续研读 Coze Studio 关于文档处理流程的代码。


学习 Coze Studio 的工作流执行逻辑

今天,我们来学习下 Coze Studio 中工作流的执行逻辑。我们知道,Eino 框架支持以 编排(Compose) 的方式对各种原子组件进行组合和串联,并且它支持 图(Graph)链(Chain)工作流(Workflow) 三种编排方式。其中,链基于图实现,是最简单的;工作流和图处于同一级别,他们之间的区别在于,工作流提供了字段级别的映射能力以及控制流与数据流分离等特性。在 Coze Studio 中,智能体是通过 Graph API 实现的,工作流是通过 Workflow API 实现的。

工作流 vs. 图

工作流和图的区别在于,它提供了字段级别的映射能力,节点的输入可以由任意前驱节点的任意输出字段组合而成。比如下面这个例子:

workflow-field-mapping.png

假设 f1 和 f2 是两个现有的业务函数,f1 的输出为 F1 和 F2,而 f2 的输入为 F3,很显然两者并不匹配,如果使用 Graph API 编排,由于类型对齐的要求,我们有两种做法:

  1. 定义一个公共结构体,将 f1 的输出类型和 f2 的输入类型改成这个公共结构体。不仅有开发成本,而且对业务逻辑造成了入侵;
  2. 将 f1 的输出类型和 f2 的输入类型都改成 map。丢失了强类型对齐的特性。

这时就可以使用 Workflow API 编排,将 f1 的输出字段 F1 直接映射到 f2 的输入字段 F3,同时保留 f1 和 f2 的原始函数签名。每个节点由业务场景决定输入输出,不需要考虑谁给我输入,以及谁用我的输出。

工作流的另一个特点是控制流与数据流分离,看下面这个场景:

workflow-data-control-separate.png

节点 D 同时引用了 A、B、C 的某些输出字段。其中 A->D 的这条虚线,是单纯的 数据流,不传递 控制 信息,即 A 执行完成与否,不决定 D 是否开始执行。

节点 D 到 E 之间的粗箭头,代表节点 E 不引用节点 D 的任何输出,是单纯的 控制流,不传递 数据,即 D 执行完成与否,决定 E 是否开始执行,但是 D 的输出不影响 E 的输入。

图中其他的线,是控制流与数据流合一的。

需要注意的是,数据流能传递的前提,是一定有一条控制流存在,比如 A->D 的数据流,依赖 A->branch->B->D 或者 A->branch->C->D 的控制流存在。即数据流只能引用前驱节点的输出。

学习 Workflow API

下面我们通过一个简单例子,演示下 Workflow API 的基本用法。假设我们现在有一个现成的业务函数 add(),实现 AB 两个整数的相加:

type AddParam struct {
  A int
  B int
}

func add(ctx context.Context, param AddParam) (int, error) {
  return param.A + param.B, nil
}

我们要使用这个函数,编排出一个任务链,实现 X+Y+Z 三个整数的加法:

type Request struct {
  X int
  Y int
  Z int
}

示例代码如下:

func main() {

  // 创建工作流
  wf := compose.NewWorkflow[Request, int]()

  // 第一个 add 节点
  wf.AddLambdaNode("add1", compose.InvokableLambda(add)).
    AddInput(
      compose.START,
      compose.MapFields("X", "A"),
      compose.MapFields("Y", "B"),
    )

  // 第二个 add 节点
  wf.AddLambdaNode("add2", compose.InvokableLambda(add)).
    AddInput(compose.START, compose.MapFields("Z", "A")).
    AddInput("add1", compose.ToField("B"))

  wf.End().AddInput("add2")

  // 编译工作流
  run, err := wf.Compile(context.Background())
  if err != nil {
    panic(err)
  }

  // 调用工作流
  result, _ := run.Invoke(context.Background(), Request{
    X: 1,
    Y: 2,
    Z: 3,
  })
  println(result)
}

Workflow API 和 Graph API 的语法类似,整个编排流程还是很清晰的:

  1. 使用 compose.NewWorkflow() 创建工作流,和 compose.NewGraph() 基本一致;
  2. 使用 AddLambdaNode() 将用户自定义函数添加为工作流节点,除此之外,之前学习过的 Eino 组件都可以作为节点,比如 AddChatModelNode()AddToolsNode() 等;
  3. 使用 compose.InvokableLambda() 将一个普通函数转为为 Lambda 节点所需的 *compose.Lambda 类型;函数的入参必须是 ctx context.Context, input I,出参必须是 output O, err error
  4. 使用 AddInput() 为节点添加输入字段映射,可以从任意多个前驱节点的输出中引用任意多个字段,可以链式连续调用;字段映射有多种不同的场景:

    • 顶层字段到顶层字段:MapFields(string, string),非常简单的一对一映射,比如上面 add1 节点的 X->AY->B 的映射;
    • 全部输出到顶层字段:ToField(string),比如上面 add2 节点的 B 参数,直接引用 add1 的完整输出;
    • 顶层字段到全部输入:FromField(string)
    • 嵌套字段到嵌套字段:MapFieldPaths(FieldPath, FieldPath),只要上游或下游有一方是嵌套的,就需要用;
    • 全部输出到嵌套字段:ToFieldPath(FieldPath)
    • 嵌套字段到全部输入:FromFieldPath(FieldPath)
    • 全部输出到全部输入:直接使用 AddInput(),不需要传字段映射,比如上面的结束节点;

工作流如下图所示:

workflow-sample.png

关于 Workflow API 还有一些高级技巧,比如实现只有数据流没有控制流或者只有控制流没有数据流的场景、在工作流中使用分支、使用静态值等,感兴趣的同学可以参考官方的文档:

工作流的试运行接口

接下来,我们大致过一遍 Coze Studio 工作流的运行流程,当用户在页面点击 “试运行” 按钮时,触发接口 /api/workflow_api/test_run,其代码入口位于 backend/api/handler/coze/workflow_service.go 文件:

// @router /api/workflow_api/test_run [POST]
func WorkFlowTestRun(ctx context.Context, c *app.RequestContext) {
  
  // 绑定参数
  var req workflow.WorkFlowTestRunRequest
  err = c.BindAndValidate(&req)

  // 调用 workflow 应用层
  resp, err := appworkflow.SVC.TestRun(ctx, &req)
  
  c.JSON(consts.StatusOK, resp)
}

直接调用 workflow 应用层的 TestRun() 方法:

func (w *ApplicationService) TestRun(ctx context.Context, req *workflow.WorkFlowTestRunRequest) (_ *workflow.WorkFlowTestRunResponse, err error) {

  // 构造执行配置,调用 workflow 领域层
  exeCfg := vo.ExecuteConfig{
    ID:           mustParseInt64(req.GetWorkflowID()),
    From:         vo.FromDraft,
    // ...
  }
  exeID, err := GetWorkflowDomainSVC().AsyncExecute(ctx, exeCfg, maps.ToAnyValue(req.Input))
  
  // 返回异步执行的 ID
  return &workflow.WorkFlowTestRunResponse{
    Data: &workflow.WorkFlowTestRunData{
      WorkflowID: req.WorkflowID,
      ExecuteID:  fmt.Sprintf("%d", exeID),
    },
  }, nil
}

接着再调用 workflow 领域层的 AsyncExecute() 方法:

func (i *impl) AsyncExecute(ctx context.Context, config vo.ExecuteConfig, input map[string]any) (int64, error) {
  
  // 查询工作流
  wfEntity, err = i.Get(ctx, &vo.GetPolicy{...})

  // Canvas 前端画布
  c := &vo.Canvas{}
  sonic.UnmarshalString(wfEntity.Canvas, c)

  // 将 Canvas 转换为 Workflow Schema
  workflowSC, err := adaptor.CanvasToWorkflowSchema(ctx, c)

  // 创建工作流
  //  1. 使用 Eino 的 `NewWorkflow()` 创建工作流
  //  2. 获取所有复合节点并添加到工作流
  //  3. 添加普通节点,根据节点类型,封装不同的节点
  wf, err := compose.NewWorkflow(ctx, workflowSC, wfOpts...)

  // 将输入转换为工作流的入参
  convertedInput, ws, err := nodes.ConvertInputs(ctx, input, wf.Inputs(), cOpts...)
  inStr, err := sonic.MarshalString(input)
  
  // 工作流运行准备工作:
  //  1. 生成 executeID 并创建执行记录
  //  2. 处理中断执行、恢复执行等逻辑
  //  3. 使用 Eino 的 Callbacks 机制为工作流和节点注入回调
  //  4. 创建一个 goroutine 处理回调事件,更新工作流或节点执行状态
  cancelCtx, executeID, opts, _, err := compose.NewWorkflowRunner(wfEntity.GetBasic(), workflowSC, config,
    compose.WithInput(inStr)).Prepare(ctx)

  // 异步执行工作流
  wf.AsyncRun(cancelCtx, convertedInput, opts...)

  // 返回 executeID
  return executeID, nil
}

这里省略了不少无关代码,整体逻辑还是比较复杂的,大致可分为四步:

第一步,结构转换:将前端画布的 Canvas 结构转换为后端工作流的 Workflow Schema 结构;处理父节点和子节点的层次关系,支持嵌套的子工作流(如循环、批处理节点);

第二步,创建工作流:使用 Eino 的 NewWorkflow() 创建工作流,然后根据节点类型,通过 NodeSchema.New() 向工作流中添加对应的节点实例,每个节点类型都有专门的实现包,参考 backend/domain/workflow/internal/compose/node_schema.go 文件:

node-schema-new.png

说实话,这里的实现有点 low,让我感到有点意外,所有的节点实现全部都是写死的逻辑,没什么扩展性,如果要新增一个节点类型,就得在这里加一个 case 分支。

第三步,工作流预处理:使用 Eino 的 Callbacks 机制为工作流和节点注入回调,同时创建一个 goroutine 监听回调事件,包括工作流开始、结束、出错、节点开始、结束等等:

event-handler.png

将工作流或节点的执行状态更新到 workflow_executionnode_execution 数据表中。另外,它会通过 Redis 生成一个全局唯一的 executeID 并创建执行记录。

第四步,异步执行:对工作流进行异步执行,并返回上一步生成的 executeID,调用方可以拿着这个 executeID 查询执行进度。

其实,Coze Studio 中的工作流有多种执行方式,除了这里的 AsyncExecute 异步执行之外,还支持 SyncExecute 同步执行,StreamExecute 流式执行,AsyncExecuteNode 单节点执行等。另外,Coze Studio 的工作流还支持中断,比如遇到问答节点、输入节点、或需要鉴权的工具节点时,这时会等待用户反馈,反馈后可以通过 AsyncResumeStreamResume 从断点处恢复执行。

小结

今天,我们学习了 Coze Studio 工作流的执行原理。我们首先研究了 Eino 框架提供的 Workflow API,并将其与 Graph API 进行了对比,重点理解了其强大的字段映射能力以及控制流与数据流分离的特性。

接着,我们通过分析工作流 “试运行” 功能的后端源码,详细拆解了其完整的执行流程。该流程可分为四步:首先将前端画布的 Canvas 结构转换为 Eino 可识别的 Workflow Schema,然后基于 Schema 动态创建工作流实例,接着通过回调机制注入事件监听以追踪执行状态,最后异步触发整个工作流的运行。

前端调用 “试运行” 接口后,紧接着会轮询 /api/workflow_api/get_process 接口,获取工作流的执行进度,包括每个节点的入参和出参,如下所示:

coze-workflow-amap-weather.png

这个接口就比较简单了,就是查询 workflow_executionnode_execution 数据表,获取工作流或节点的执行状态,感兴趣的朋友可以自行研究。


学习 Coze Studio 的工具使用

昨天,我们详细拆解了 Coze Studio 智能体的核心执行逻辑,了解到其本质是一个基于 Eino 框架构建的智能体图。在分析的最后我们提到,其中的一些关键实现细节,例如 ReAct Agent 与工具的交互逻辑,仍有待深入。今天,我们就来填上这个坑,学习下 Coze Studio 是如何实现工具调用的。

我们知道,Eino 提供了很多可复用的原子组件,通过 Graph API 可以对其进行编排,实现出复杂的业务逻辑。不过有些业务逻辑存在一定的通用性,Eino 将这些场景进行抽象,提供了一些构建大模型应用的模式,被称为 Flow。目前 Eino 提供了两种常用的 Agent 模式:

除此之外还提供了一些常用的召回策略,比如 MultiQueryRetrieverParentIndexer 等。

ReAct Agent

我们今天主要关注 ReAct Agent 部分。ReAct 这个词出自 Shunyu Yao 等人的这篇论文 《ReAct: Synergizing Reasoning and Acting in Language Models》,它是由 ReasonAct 两个词组合而成,表示一种将 推理行动 与大模型相结合的通用范式:

react.png

传统的 Reason Only 型应用(如 Chain-of-Thought Prompting)具备很强的语言能力,擅长通用文本的逻辑推断,但由于不会和外部环境交互,因此它的认知非常受限;而传统的 Act Only 型应用(如 WebGPTSayCanACT-1)能和外界进行交互,解决某类特定问题,但它的行为逻辑较简单,不具备通用的推理能力。

ReAct 的思想,旨在将这两种应用的优势结合起来。针对一个复杂问题,首先使用大模型的推理能力制定出解决该问题的行动计划,这好比人的大脑,可以对问题进行分析思考;然后使用行动能力与外部源(例如知识库或环境)进行交互,以获取额外信息,这好比人的五官和手脚,可以感知世界,并执行动作;大模型对行动的结果进行跟踪,并不断地更新行动计划,直到问题被解决。

通过这种模式,我们能基于大模型构建更为强大的 AI 应用,可以阅读原始论文了解更多关于 ReAct 的信息:

回顾 Function Call 原理

原始的 ReAct 论文是通过 提示工程(Prompt Engineering) 实现的。后来,OpenAI 对 GPT 模型进行了一项重大更新,推出了 Function Call 功能,在 Chat Completions API 中添加了函数调用能力,这一功能推出后,迅速风靡世界,开发者可以通过 API 的方式实现类似于 ChatGPT 插件的数据交互能力,将大模型集成到自己的业务和应用中。随后,越来越多的大模型也都开始支持 Function Call 功能,目前已经成为大模型调用工具的事实规范。

简单来说,Function Call 就是在调用大模型时提供几个工具选项,大模型判断用户的问题是否需要调用某个工具。如果不需要则直接回复,这和传统的调用没有区别;如果需要调用则返回合适的工具和对应的参数给用户,用户拿到后调用对应的工具,再将调用结果送给大模型,最后,大模型根据工具的调用结果来回答用户的问题。OpenAI 官方文档 中有一张详细的流程图:

function-calling-diagram.png

其中要注意的是,第二次调用大模型时,可能仍然会返回 tool_calls 响应,这时可以循环处理。

定义 Eino 工具

了解了 ReAct Agent 的原理后,接下来,我们再来看下 Eino 中 ReAct Agent 的用法。为了让大模型能调用我们的工具,我们需要使用 Eino 提供的 ToolsNode 组件,该组件提供了三个层次的接口:

// 基础工具接口,提供工具信息,包括工具名称、描述和参数等
type BaseTool interface {
  Info(ctx context.Context) (*schema.ToolInfo, error)
}

// 可调用的工具接口,支持同步调用
type InvokableTool interface {
  BaseTool
  InvokableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (string, error)
}

// 支持流式输出的工具接口
type StreamableTool interface {
  BaseTool
  StreamableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (*schema.StreamReader[string], error)
}

在 Eino 中,BaseTool 接口要求任何一个工具都要实现 Info() 接口返回工具信息,包括工具名称、描述和参数等。而根据一个工具被调用后的返回结构是否是流式的,可以分为 InvokableToolStreamableTool,两个接口实现其一即可。

下面是我写的一个简单示例,通过 Info()InvokableRun() 方法实现了天气查询工具:

type ToolGetWeather struct {
}

type ToolGetWeatherParam struct {
  City string `json:"city"`
  Date string `json:"date"`
}

func (t *ToolGetWeather) Info(ctx context.Context) (*schema.ToolInfo, error) {
  return &schema.ToolInfo{
    Name: "get_weather",
    Desc: "查询天气",
    ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
      "city": {
        Type:     "string",
        Desc:     "城市名称",
        Required: true,
      },
      "date": {
        Type:     "string",
        Desc:     "日期",
        Required: true,
      },
    }),
  }, nil
}

func (t *ToolGetWeather) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {

  // 解析参数
  p := &ToolGetWeatherParam{}
  err := json.Unmarshal([]byte(argumentsInJSON), p)
  if err != nil {
    return "", err
  }

  res := p.City + p.Date + "天气晴,气温30摄氏度"
  return res, nil
}

这里有两点值得注意:

  1. Info() 函数中,我们使用了工具方法 schema.NewParamsOneOfByParams() 返回工具参数信息,这个方法生成 params map[string]*ParameterInfo 类型的参数约束;其实,Eino 还支持 OpenAPI 定义的 JSON Schema 方式,返回 *openapi3.Schema 类型的参数约束;
  2. InvokableRun() 的入参和出参都是字符串,需要开发者自行处理参数的反序列化和序列化。

除此之外,Eino 还提供了多种定义工具的方式:

  • 使用 NewTool()InferTool() 方法将本地函数转为工具;
  • 使用 Eino Ext 中提供的内置工具,比如 Bing 搜索、Google 搜索、DuckDuckGo 搜索、SearXNG 搜索、维基百科搜索、浏览器使用、命令行工具、HTTP 请求、Sequential Thinking 等;
  • 使用 MCP 工具;

具体内容可参考 Eino 如何创建工具的文档:

创建 ReAct Agent

定义工具之后,我们就可以通过 react.NewAgent() 创建一个 ReAct Agent 并自动调用工具了:

func main() {

  ctx := context.Background()

  // 初始化大模型
  model, _ := openai.NewChatModel(ctx, &openai.ChatModelConfig{
    BaseURL: os.Getenv("OPENAI_BASE_URL"),
    APIKey:  os.Getenv("OPENAI_API_KEY"),
    Model:   "gpt-4o",
  })

  // 初始化工具
  toolGetWeather := &ToolGetWeather{}

  // ReAct 智能体
  agent, _ := react.NewAgent(ctx, &react.AgentConfig{
    ToolCallingModel: model,
    ToolsConfig: compose.ToolsNodeConfig{
      Tools: []tool.BaseTool{toolGetWeather},
    },
  })

  r, _ := agent.Generate(ctx, []*schema.Message{
    {
      Role:    schema.User,
      Content: "北京明天的天气怎么样?",
    },
  })
  println(r.Content)
}

这里的代码非常简单,Agent 的调用和 Graph 的调用基本类似,也提供了 Generate() 非流式 和 Stream() 流式调用两种方式。另外,Agent 还提供了 ExportGraph() 方法可以将 ReAct Agent 内部的图导出来,作为子图嵌在其他图中运行。Coze Studio 就使用了这种方式:

// 如果工具数量大于 0,则使用 ReAct Agent
var isReActAgent bool
if len(agentTools) > 0 {
    isReActAgent = true
}

// 构建 ReAct Agent 子图并导出
if isReActAgent {
  agent, _ := react.NewAgent(ctx, &react.AgentConfig{
    ToolCallingModel: chatModel,
    ToolsConfig: compose.ToolsNodeConfig{
      Tools: agentTools,
    },
    ToolReturnDirectly: toolsReturnDirectly,
    ModelNodeName:      keyOfReActAgentChatModel,
    ToolsNodeName:      keyOfReActAgentToolsNode,
  })
  
  agentGraph, agentNodeOpts = agent.ExportGraph()
  agentNodeName = keyOfReActAgent
} else {
  agentNodeName = keyOfLLM
}

// 添加 ReAct Agent 或 Chat Model 作为智能体节点
if isReActAgent {
  _ = g.AddGraphNode(agentNodeName, agentGraph, agentNodeOpts...)
} else {
  _ = g.AddChatModelNode(agentNodeName, chatModel, agentNodeOpts...)
}

相比于我们上面的示例代码,Coze Studio 创建 ReAct Agent 时多了一个 ToolReturnDirectly 参数,不知道大家还记不记得我们之前在实战工作流时讲过,工作流的结束节点有两种返回方式:

coze-workflow-end-node.png

  • 返回变量:工作流运行结束后会以 JSON 格式输出所有返回参数,智能体在对话中触发工作流后,会自动总结 JSON 格式的内容,并以自然语言回复用户;
  • 返回文本:工作流运行结束后,智能体将直接使用指定的内容回复对话;

ToolReturnDirectly 是一个字典,里面包含的就是 返回文本 类型的工作流名字。

func newWorkflowTools(ctx context.Context, conf *workflowConfig) ([]workflow.ToolFromWorkflow, map[string]struct{}, error) {
  
  // 查询所有工作流,作为工具
  workflowTools, _ := crossworkflow.DefaultSVC().WorkflowAsModelTool(ctx, policies)

  // 找出所有 TerminatePlan == UseAnswerContent 的工作流,也就是 返回文本 类型的
  toolsReturnDirectly := make(map[string]struct{})
  for _, workflowTool := range workflowTools {
    if workflowTool.TerminatePlan() == vo.UseAnswerContent {
      toolInfo, _ := workflowTool.Info(ctx)
      toolsReturnDirectly[toolInfo.Name] = struct{}{}
    }
  }

  return workflowTools, toolsReturnDirectly, err
}

感兴趣的朋友可以看下 Eino 的源码,可以发现 ReAct Agent 其实也是基于 Graph API 实现的,只是这个图稍微复杂点,里面增加了分支、循环以及直接输出等逻辑:

react-flow.png

Coze Studio 的工具列表

最后,我们再来看下 Coze Studio 构建工具列表的逻辑:

agentTools := make([]tool.BaseTool, 0, len(pluginTools)+len(wfTools)+len(dbTools)+len(avTools))
// 插件工具
agentTools = append(agentTools, pluginTools)
// 工作流工具
agentTools = append(agentTools, wfTools)
// 数据库工具
agentTools = append(agentTools, dbTools)
// 变量工具
agentTools = append(agentTools, avTools)

可以发现,除了 插件工具工作流工具,这里还有两个特别的工具:

  • 数据库工具:当智能体里添加了数据库时自动出现的工具,工具名称对应用户创建的数据表名,工具参数为 sql 语句,每个数据表对应一个数据库工具;
  • 变量工具:当智能体里添加了用户变量时自动出现的工具,工具名称固定为 setKeywordMemory,工具参数为 keyvalue,所有变量共用这个工具;

下面是我创建的 todo_list 数据库对应的工具 Schema:

{
  "name": "todo_list",
  "description": "Mysql query tool. Table name is 'todo_list'. This table's desc is 待办清单.\n\nTable structure:\n- item (text): 待办事项\n- status (number): 状态,0 为待完成,1 为已完成\n\nUse SQL to query this table. You can write SQL statements directly to operate.",
  "parameters": {
    "properties": {
      "sql": {
        "description": "SQL query to execute against the database. You can use standard SQL syntax like SELECT",
        "type": "string"
      }
    },
    "required": [
      "sql"
    ],
    "type": "object"
  }
}

下面是变量工具的 Schema:

{
  "name": "setKeywordMemory",
  "description": "\n## Skills Conditions\n1. When the user's intention is to set a variable and the user provides the variable to be set, call the tool.\n2. If the user wants to set a variable but does not provide the variable, do not call the tool.\n3. If the user's intention is not to set a variable, do not call the tool.\n\n## Constraints\n- Only make decisions regarding tool invocation based on the user's intention and input related to variable setting.\n- Do not call the tool in any other situation not meeting the above conditions.\n",
  "parameters": {
    "properties": {
      "data": {
        "items": {
          "properties": {
            "keyword": {
              "description": "the keyword of memory variable",
              "type": "string"
            },
            "value": {
              "description": "the value of memory variable",
              "type": "string"
            }
          },
          "required": [
            "keyword",
            "value"
          ],
          "type": "object"
        },
        "type": "array"
      }
    },
    "required": [
      "data"
    ],
    "type": "object"
  }
}

这两个都属于记忆工具,当用户的问题需要查询或更新记忆时,就会自动调用这些工具:

coze-studio-memory-tools.png

点击右上角的大脑图标,也可以查看记忆中的内容。

小结

今天我们深入探讨了 Coze Studio 实现工具调用的底层逻辑。其核心是利用了 Eino 框架提供的 ReAct Agent,它遵循 ReAct(推理+行动) 的思想,通过大模型的 Function Call 能力,将强大的推理能力与和外部世界交互的行动能力相结合,从而解决复杂的任务。

我们学习了如何在 Eino 中通过实现 BaseToolInvokableTool 等标准接口来定义一个工具,并将其提供给 ReAct Agent 使用。一个关键的设计是,Coze Studio 会将创建好的 ReAct Agent 作为一个子图,动态地嵌入到智能体的整体执行图中,这种设计非常灵活且强大。

最后,我们分析了 Coze Studio 为智能体自动装配的四种工具:插件、工作流、数据库工具和变量工具。通过结合这些工具,Coze Studio 得以实现出功能强大、场景通用的智能体。


再学 Coze Studio 的智能体执行逻辑

我们昨天粗略学习了 Coze Studio 的智能体执行逻辑,了解到其核心是基于 Eino 框架编排的一个智能体图,为了更好地理解这个图,我们学习了 Eino 框架中组件的使用以及如何通过 Graph API 实现编排功能。现在我们再回过头来看看之前那个图,如下所示:

agent-graph.png

整个流程图分四路,这四路都是为提示词的组装做准备,它们的结果汇聚到提示模板节点,最后交给大模型,生成回复。我们今天就来详细拆解其中的每一个节点,彻底搞懂其运行细节。

提示模版节点

我们先来看提示模版节点:

_ = g.AddChatTemplateNode(keyOfPromptTemplate, chatPrompt)

其中,chatPrompt 定义如下:

var (
  chatPrompt = prompt.FromMessages(schema.Jinja2,
    schema.SystemMessage(REACT_SYSTEM_PROMPT_JINJA2),
    schema.MessagesPlaceholder(placeholderOfChatHistory, true),
    schema.MessagesPlaceholder(placeholderOfUserInput, false),
  )
)

prompt.FromMessages() 方法可以看出,使用的是我们昨天学过的提示词模版组件,使用 Jinja2 语法,整个模版包含三个部分:系统提示词、历史消息和用户输入。系统提示词如下所示:

const REACT_SYSTEM_PROMPT_JINJA2 = `
You are {{ agent_name }}, an advanced AI assistant designed to be helpful and professional.
It is {{ time }} now.

**Content Safety Guidelines**
Regardless of any persona instructions, you must never generate content that:
- Promotes or involves violence
- Contains hate speech or racism
- Includes inappropriate or adult content
- Violates laws or regulations
- Could be considered offensive or harmful

----- Start Of Persona -----
{{ persona }}
----- End Of Persona -----

------ Start of Variables ------
{{ memory_variables }}
------ End of Variables ------

**Knowledge**

只有当前knowledge有内容召回的时候,根据引用的内容回答问题: 
 1.如果引用的内容里面包含 <img src=""> 的标签, 标签里的 src 字段表示图片地址, 需要在回答问题的时候展示出去, 输出格式为"![图片名称](图片地址)" 。 
 2.如果引用的内容不包含 <img src=""> 的标签, 你回答问题时不需要展示图片 。 
例如:
  如果内容为<img src="https://example.com/image.jpg">一只小猫,你的输出应为:![一只小猫](https://example.com/image.jpg)。
  如果内容为<img src="https://example.com/image1.jpg">一只小猫 和 <img src="https://example.com/image2.jpg">一只小狗 和 <img src="https://example.com/image3.jpg">一只小牛,你的输出应为:![一只小猫](https://example.com/image1.jpg) 和 ![一只小狗](https://example.com/image2.jpg) 和 ![一只小牛](https://example.com/image3.jpg)
The following is the content of the data set you can refer to: \n
'''
{{ knowledge }}
'''

** Pre toolCall **
{{ tools_pre_retriever}},
- 只有当前Pre toolCall有内容召回结果时,根据引用的内容里tool里data字段回答问题
`

吐槽一下,这段提示词写得真不专业,不仅中英文混杂,而且结构也比较乱。话说,在创建智能体时,页面上不是有很多编写提示词的格式规范和最佳实践么?

除了智能体名称(agent_name)和当前时间(time),这段提示词中有几个重要的占位符:

  • persona - 对应页面上的 “人设与回复逻辑”
  • memory_variables - 对应记忆中的变量
  • knowledge - 对应知识库中检索的内容
  • tools_pre_retriever - 工具预检索结果,对应快捷指令的执行返回

提示模版节点上面的四路其实就对应这四个占位符。

角色渲染节点

先看第一路,也就是角色渲染节点的处理逻辑,使用 AddLambdaNode() 添加一个自定义 Lambda 节点:

_ = g.AddLambdaNode(keyOfPersonRender,
  compose.InvokableLambda[*AgentRequest, string](personaVars.RenderPersona),
  compose.WithStatePreHandler(func(ctx context.Context, ar *AgentRequest, state *AgentState) (*AgentRequest, error) {
    state.UserInput = ar.Input
    return ar, nil
  }),
  compose.WithOutputKey(placeholderOfPersona))

这里通过 compose.InvokableLambda() 将一个普通函数 personaVars.RenderPersona() 转换为 *compose.Lambda,该函数入参为 *AgentRequest 出参为 string。特别注意的是,下一个节点提示词模版的入参是 map[string]any,因此这里通过 compose.WithOutputKey() 设置输出的 key,将 string 转换为 map

函数 RenderPersona() 的逻辑如下:

// 从 “人设与回复逻辑” 中提取所有的变量占位符
persona := conf.Agent.Prompt.GetPrompt()
personaVariableNames: extractJinja2Placeholder(persona)

// 从数据库查询当前智能体定义的记忆变量,对应 `variables_meta` 和 `variable_instance` 表
variables, _ := loadAgentVariables(ctx, avConf)

// 替换 “人设与回复逻辑” 中的所有变量
func (p *personaRender) RenderPersona(ctx context.Context, req *AgentRequest) (persona string, err error) {
  variables := make(map[string]string, len(p.personaVariableNames))
  for _, name := range p.personaVariableNames {
    // 优先从请求中提取变量
    if val, ok := req.Variables[name]; ok {
      variables[name] = val
      continue
    }
    // 兜底使用智能体的记忆变量
    if val, ok := p.variables[name]; ok {
      variables[name] = val
      continue
    }
    // 默认空字符串
    variables[name] = ""
  }
  // 直接使用提示词模版组件格式化
  msgs, _ := prompt.FromMessages(schema.Jinja2, schema.UserMessage(p.persona)).Format(ctx, maps.ToAnyValue(variables))
  return msgs[0].Content, nil
}

主要分为三步:

  1. 从 “人设与回复逻辑” 中提取所有的变量占位符,占位符格式为 Jinja2 格式,比如 {{name}}{{age}} 等;
  2. 从数据库查询当前智能体定义的记忆变量,对应 variables_metavariable_instance 表;variables_meta 表中存的是我们在智能体页面创建的变量,包含变量名、描述和默认值,而 variable_instance 表存的智能体运行过程中动态写入的变量值;
  3. 最后将 “人设与回复逻辑” 中的所有变量占位符替换成变量值;

比如我们在智能体中定义如下记忆变量:

agent-variables.png

然后可以在 “人设与回复逻辑” 中使用这些变量:

agent-variables-2.png

提示变量节点

第二路比较简单,组装所有的变量:

_ = g.AddLambdaNode(keyOfPromptVariables,
  compose.InvokableLambda[*AgentRequest, map[string]any](promptVars.AssemblePromptVariables))

这些变量包括系统提示词模板中的当前时间、智能体名称、用户输入、历史消息,以及上一步查询出来的智能体记忆变量:

func (p *promptVariables) AssemblePromptVariables(ctx context.Context, req *AgentRequest) (variables map[string]any, err error) {
  variables = make(map[string]any)
  // 当前时间
  variables[placeholderOfTime] = time.Now().Format("Monday 2006/01/02 15:04:05 -07")
  // 智能体名称
  variables[placeholderOfAgentName] = p.Agent.Name
  // 用户输入
  variables[placeholderOfUserInput] = []*schema.Message{req.Input}
  // 历史消息
  variables[placeholderOfChatHistory] = req.History

  // 智能体记忆变量列表
  if p.avs != nil {
    var memoryVariablesList []string
    for k, v := range p.avs {
      variables[k] = v
      memoryVariablesList = append(memoryVariablesList, fmt.Sprintf("%s: %s\n", k, v))
    }
    variables[placeholderOfVariables] = memoryVariablesList
  }
  return variables, nil
}

智能体的记忆变量被转为键值对,替换上面系统提示词模版中的 {{ memory_variables }} 占位符。

知识检索节点

第三路负责知识库的检索,使用了两个自定义 Lambda 节点:

// 新建知识库检索器
kr, err := newKnowledgeRetriever(ctx, &retrieverConfig{
  knowledgeConfig: conf.Agent.Knowledge,
})

// 第一个节点检索
_ = g.AddLambdaNode(keyOfKnowledgeRetriever,
  compose.InvokableLambda[*AgentRequest, []*schema.Document](kr.Retrieve),
  compose.WithNodeName(keyOfKnowledgeRetriever))

// 第二个节点组装成字符串
_ = g.AddLambdaNode(keyOfKnowledgeRetrieverPack,
  compose.InvokableLambda[[]*schema.Document, string](kr.PackRetrieveResultInfo),
  compose.WithOutputKey(placeholderOfKnowledge),
)

首先新建一个知识库检索器,将其 Retrieve() 方法作为第一个节点,知识库检索后,该方法返回 []*schema.Document 文档列表,因此还需要第二个节点,调用 PackRetrieveResultInfo 将其组装成字符串,和上面的角色渲染节点一样,这里也通过 compose.WithOutputKey 设置输出 Key,替换系统提示词模版中的 {{ knowledge }} 占位符。

可以看出只要智能体关联了知识库,用户每次对话时总是会触发一次知识库检索,调用跨领域的 knowledge 服务,关于知识库检索的逻辑比较复杂,我们后面再看。

工具预检索节点

接着是最后一路,负责工具预检索,其实就是执行快捷指令:

_ = g.AddLambdaNode(keyOfToolsPreRetriever,
  compose.InvokableLambda[*AgentRequest, []*schema.Message](tr.toolPreRetrieve),
  compose.WithOutputKey(keyOfToolsPreRetriever),
  compose.WithNodeName(keyOfToolsPreRetriever),
)

Coze Studio 支持两种类型的快捷指令,一种是纯 Prompt,比如下面的翻译指令:

agent-shortcut-prompt.png

另一种是调用插件或工作流,比如下面的天气查询指令:

agent-shortcut-workflow.png

配置快捷指令后,智能体的对话框上方会多出快捷指令的按钮:

agent-shortcut-tips.png

我们可以为快捷指令添加组件(也就是变量),当用户点击快捷指令按钮时,会提示用户填写变量的值,变量值会替换指令内容中的占位符,然后作为用户输入调用智能体会话接口。如果用户调用的是纯 Prompt 快捷指令,这和正常会话没有任何区别:

agent-shortcut-prompt-use.png

如果用户调用的是插件或工作流快捷指令,同样也会将指令内容中的占位符替换掉作为用户输入,同时还会传入工具信息,后端会先调用工具,然后根据工具调用结果回答用户问题:

agent-shortcut-workflow-use.png

工具预检索节点就是根据传入的工具信息调用对应的插件或工作流,然后将调用的结果替换系统提示词模版中的 {{ tools_pre_retriever }} 占位符。

智能体节点

当系统提示词组装完毕,流程进入智能体节点,开始调用大模型。这里的智能体节点根据是否存在工具,有两种实现:

if isReActAgent {
  _ = g.AddGraphNode(agentNodeName, agentGraph, agentNodeOpts...)
} else {
  _ = g.AddChatModelNode(agentNodeName, chatModel, agentNodeOpts...)
}

当存在工具时,使用 ReAct Agent 实现,当不存在工具时,直接使用大模型实现。

建议子图

Coze Studio 支持为智能体开启问题建议,如果开启,则在智能体回复后,根据 Prompt 提供最多 3 条用户提问建议:

agent-sugguest.png

这时会走建议分支的逻辑:

suggestGraph, nsg := newSuggestGraph(ctx, conf, chatModel)
if nsg {
  // 建议预输入解析
  _ = g.AddLambdaNode(keyOfSuggestPreInputParse, compose.ToList[*schema.Message](),
    compose.WithStatePostHandler(func(ctx context.Context, out []*schema.Message, state *AgentState) ([]*schema.Message, error) {
      out = append(out, state.UserInput)
      return out, nil
    }),
  )
  // 建议子图
  _ = g.AddGraphNode(keyOfSuggestGraph, suggestGraph)
}

建议分支有两个节点,第一个节点做参数准备,第二个节点生成问题建议。其中有意思的是,问题建议功能也是通过智能体图的方式构建的,如下所示:

sugguest-graph.png

整个流程分为:初始化变量 -> 替换提示词 -> 调用大模型 -> 解析结果,总体比较简单,此处略过,看下它的提示词就明白了:

const SUGGESTION_PROMPT_JINJA2 = `
你是一个推荐系统,请完成下面的推荐任务。
### 对话 
用户: {{_input_}}
AI: {{_answer_}}

personal: {{ suggest_persona }}

围绕兴趣点给出3个用户紧接着最有可能问的几个具有区分度的不同问题,问题需要满足上面的问题要求,推荐的三个问题必须以字符串数组形式返回。

注意:
- 推荐的三个问题必须以字符串数组形式返回
- 推荐的三个问题必须以字符串数组形式返回
- 推荐的三个问题必须以字符串数组形式返回
`

小结

今天我们详细拆解了 Coze Studio 智能体的核心执行逻辑。其本质是一个基于 Eino 框架构建的智能体图,运行流程总结如下:

  1. 四路并行处理:为了动态构建最终的系统提示词,图的执行分为四个并行的分支,分别负责处理:

    • 角色: 渲染“人设与回复逻辑”,并将用户定义的记忆变量填充进去。
    • 变量: 组装包括当前时间、智能体名称、历史消息在内的各类上下文变量。
    • 知识库检索: 对接知识库进行检索,并将召回的内容注入提示词。
    • 工具预检索: 执行用户点击的快捷指令,填充执行结果。
  2. 汇总提示词:上述四路的结果最终汇集到提示词模版节点,填充到一个预设的 Jinja2 模版中,形成完整的上下文。
  3. 调用大模型:组装好的提示词被送入最终的智能体节点。如果智能体配置了工具,则会启用一个 ReAct Agent;否则,直接调用基础的大语言模型生成回复。
  4. 问题建议:此外,如果开启了 “问题建议” 功能,在主流程结束后还会触发一个独立的建议子图,用于生成后续的推荐问题。

通过今天的分析,我们对 Coze Studio 的宏观执行流程有了清晰的认识。然而,其中的一些关键实现细节,例如知识库的向量检索与重排机制、工作流的具体执行过程,以及 ReAct Agent 与工具的交互逻辑,仍有待深入。这些将是我们后续研究的重点。


学习 Coze Studio 的智能体执行逻辑

我们昨天学习了 Coze Studio 智能体会话接口的完整后端处理流程,从接口层、到应用层、到领域层、最后通过跨领域防腐层,将最终的执行任务交给了 single_agent 领域。今天我们将继续这个过程,深入研究下智能体的执行逻辑。

智能体执行

智能体执行的逻辑代码位于 backend/domain/agent/singleagent/service/single_agent_impl.go 文件:

func (s *singleAgentImpl) StreamExecute(ctx context.Context, req *entity.ExecuteRequest) (events *schema.StreamReader[*entity.AgentEvent], err error) {

  // 获取智能体信息
  ae, err := s.ObtainAgentByIdentity(ctx, req.Identity)
  
    // 构建智能体图
  conf := &agentflow.Config{
    Agent:        ae,
    UserID:       req.UserID,
    Identity:     req.Identity,
    ModelMgr:     s.ModelMgr,
    ModelFactory: s.ModelFactory,
    CPStore:      s.CPStore,
  }
  rn, err := agentflow.BuildAgent(ctx, conf)
  
  // 执行智能体图
  exeReq := &agentflow.AgentRequest{
    UserID:   req.UserID,
    Input:    req.Input,
    History:  req.History,
    Identity: req.Identity,
    ResumeInfo:   req.ResumeInfo,
    PreCallTools: req.PreCallTools,
  }
  return rn.StreamExecute(ctx, rn.PreHandlerReq(ctx, exeReq))
}

首先根据配置信息构建智能体图,这里引入了 图(Graph) 这个新概念,Coze Studio 通过 Eino 智能体框架,将智能体编排成一个有向图,构建的大体流程如下:

func BuildAgent(ctx context.Context, conf *Config) (r *AgentRunner, err error) {

  // 初始化节点...

  // 创建一个新图
  g := compose.NewGraph[*AgentRequest, *schema.Message](
    compose.WithGenLocalState(func(ctx context.Context) (state *AgentState) {
      return &AgentState{}
    }))

  // 向图中添加节点...
  // 连接节点之间的边...
  
  // 图编译
  runner, err := g.Compile(ctx, opts...)

  // 返回图
  return &AgentRunner{
        runner: runner,
    }, nil
}

这个智能体图比较复杂,方法中充斥着大量节点初始化以及向图中添加节点和边的代码,此处做了省略,暂时只要关注 compose.NewGraph() 以及 g.Compile() 两行代码,这是使用 Eino 创建并编译智能体的常见方式,构建完成的智能体图如下所示:

agent-graph.png

这个图结构体现了完整的智能体处理流水线,从输入预处理到最终响应生成:

  • 并行初始化:从开始节点同时触发 4 个输入处理节点,并行处理提高效率;
  • 汇聚处理:所有预处理结果汇聚到提示模板节点;
  • 条件分支:根据是否有工具选择不同的 Agent 处理方式,如果有工具,使用 ReAct Agent,否则直接调大模型;
  • 问题建议:根据配置决定是否启用建议功能;

最后,完成图的编译后,在 StreamExecute() 方法中通过 runner.Stream() 流式调用它:

func (r *AgentRunner) StreamExecute(ctx context.Context, req *AgentRequest) (sr *schema.StreamReader[*entity.AgentEvent], err error) {

  // 回调处理器
  hdl, sr, sw := newReplyCallback(ctx, executeID.String())

  go func() {

    // 注册回调
    var composeOpts []compose.Option
    composeOpts = append(composeOpts, compose.WithCallbacks(hdl))
    _ = compose.RegisterSerializableType[*AgentState]("agent_state")

    // 流式运行图
    _, _ = r.runner.Stream(ctx, req, composeOpts...)
  }()

  return sr, nil
}

可以看出,智能体执行的核心就是这个图的构建和执行,在我们深入细节之前,先来快速了解下字节开源的 Eino 智能体框架。

了解 Eino 智能体框架

目前已经有很多智能体框架了,比如 LangGraph、LlamaIndex、Agno 等,但这些基本上都是 Python 语言开发的。Eino 是字节开源的一款使用 Go 语言开发的智能体框架,其主要特点是强类型,易维护,高可靠。

Eino 遵循模块化设计,架构图如下:

eino-framework.png

主要分为六大模块:

  • Schema:提供 Eino 最基础的一些结构与方法定义,比如 MessageDocumentStreamReaderStreamWriter 等;
  • Components:组件是 Eino 应用的基本构成元素,比如 ChatModelChatTemplateToolLambda 等,它们功能各异,但都遵循统一的接口规范,Eino Ext 为每种组件提供默认的实现;
  • Compose:对多个组件进行流程编排,支持复杂的图(Graph)、简单的链(Chain)以及支持字段映射的工作流(Workflow),实现复杂的业务逻辑;
  • Callbacks:为智能体的运行提供切面机制,通过不同的触发时机,可实现日志、跟踪、监控等功能,内置集成了 LangfuseAPMPlusCozeLoop 等平台;
  • Flow:大模型应用是存在通用场景和模式的,Eino 将这些场景进行抽象,提供了一些可以帮助开发者快速构建大模型应用的模版,比如 ReAct 智能体、多智能体、多查询检索等;
  • Devops Tools:Eino 提供了一套开发工具链,通过 Eino Dev 插件进行可视化 Graph 搭建和代码生成,以及对其进行可视化调试;

Eino 不仅仅是一款智能体框架,更准确的说,它应该是一款大模型应用开发框架,通过这些模块,开发者可快速开发出满足自己业务需求的大模型应用。

学习 Eino 组件

大模型应用开发有三种主要的应用模式:

  • 直接对话模式:处理用户输入并生成相应回答;
  • 知识处理模式:对文本文档进行语义化处理、存储和检索;
  • 工具调用模式:基于上下文做出决策并调用相应工具;

Eino 从这些模式中提取出一些常用的能力,并将这些能力抽象为可复用的 组件(Components),这些组件功能各异,大致可以分为下面几大类,Eino 为每种组件都提供了多种不同的实现:

  • 对话处理类组件

    • 提示词模板(ChatTemplate):有 Default 和 MCP 两种实现,默认实现支持 FStringGoTemplateJinja2 等变量语法,MCP 实现 支持加载 MCP Server 中定义的 Prompt 资源;
    • 大模型对话(ChatModel):支持 OpenAI、DeepSeek、Qwen、Claude、Gemini、Ollama、火山引擎、百度千帆平台等;
  • 文本语义处理类组件

    • 加载文档(Document Loader):支持从本地文件、Web URL 和 AWS S3 存储桶中加载文档,一般和 Document Parser 结合使用;
    • 解析文档(Document Parser):支持解析 TXT、HTML、DOCX、XLSX、PDF 等格式的文件;
    • 处理文档(Document Transformer):分为 Splitter 和 Reranker 两种,Splitter 支持 Markdown 分割器、HTML 分割器、递归分割器、语义分割器等,Reranker 支持基于得分的重排序;
    • 文本语义化(Embedding):支持 OpenAI、Ollama、火山引擎、百度千帆平台、阿里云百炼平台 DashScope、腾讯云混元等;
    • 索引存储(Indexer):一般配合 Embedding 一起使用,也可以使用分词索引,支持 Elasticsearch 8.x、Milvus 2.x、Redis Stack 以及火山引擎 VikingDB 等;
    • 文本召回(Retriever):从 Indexer 中检索内容,同样支持 Elasticsearch 8.x、Milvus 2.x、Redis Stack 以及火山引擎 VikingDB 等,同时还支持检索火山知识库和 Dify 知识库;
  • 决策执行类组件

    • 调用工具(ToolsNode):用户可基于接口规范实现自己的工具,Eino 也提供了几个内置的工具,比如 Bing 搜索、Google 搜索、DuckDuckGo 搜索、SearXNG 搜索、维基百科搜索、浏览器使用、命令行工具、HTTP 请求、Sequential Thinking 以及 MCP 工具;
  • 用户自定义组件

    • 自定义代码逻辑(Lambda):允许用户在工作流中嵌入自定义的函数逻辑;

这里以大模型组件为例,带大家了解下 Eino 组件的用法。下面是一个简单的调用示例:

package main

import (
  "context"
  "os"

  "github.com/cloudwego/eino-ext/components/model/openai"
  "github.com/cloudwego/eino/schema"
)

func main() {

  ctx := context.Background()

  // 初始化大模型
  model, _ := openai.NewChatModel(ctx, &openai.ChatModelConfig{
    BaseURL: os.Getenv("OPENAI_BASE_URL"),
    APIKey:  os.Getenv("OPENAI_API_KEY"),
    Model:   "gpt-4o",
  })

  // 准备消息
  messages := []*schema.Message{
    schema.SystemMessage("你是一个翻译专家,擅长中文和英文之间的互译。"),
    schema.UserMessage("你好,世界!"),
  }

  // 生成回复
  response, _ := model.Generate(ctx, messages)
  println(response.Content)
}

我们首先通过 openai.NewChatModel 创建了一个 ChatModel 组件,它的接口定义如下:

type BaseChatModel interface {
  Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
  Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
    *schema.StreamReader[*schema.Message], error)
}

它有 GenerateStream 两个方法,可以看出,组件输入是消息数组([]*schema.Message),输出是消息(*schema.Message),这里使用 Generate 生成一次性回复,也可以使用 Stream 流式输出:

  // 流式读取消息
  reader, _ := model.Stream(ctx, messages)
  defer reader.Close()
  for {
    chunk, err := reader.Recv()
    if err != nil {
      break
    }
    print(chunk.Content)
  }

“组件优先” 是 Eino 的一大设计原则,每个组件都是一个职责划分比较清晰且功能单一的模块,都可以独立使用,其他组件的用法和上面的大模型组件基本类似。我们要特别注意每个组件的输入和输出类型,这在后面编排时非常有用。

使用 Eino 进行 Graph 编排

组件只能提供原子能力,在一个大模型应用中,还需要根据场景化的业务逻辑,对这些原子能力进行组合和串联,这就是 编排(Compose)。Eino 支持三种编排方式:图(Graph)链(Chain)工作流(Workflow),三者之间的关系如下:

compose-api.png

其中,链最简单,可以认为是简单的有向无环图,它是基于图实现的;工作流也是无环图,和图的区别在于,它提供了字段级别映射能力,节点的输入可以由任意前驱节点的任意输出字段组合而成;在 Coze Studio 中,智能体是通过 Graph API 实现的,工作流是通过 Workflow API 实现的。

回顾上面 Coze Studio 构建智能体图时所看到的 compose.NewGraph() 以及 g.Compile() 两行代码,其实就是 Graph API。下面通过一个简单的例子快速掌握如何使用 Eino 进行 Graph 编排:

package main

import (
  "context"
  "os"

  "github.com/cloudwego/eino-ext/components/model/openai"
  "github.com/cloudwego/eino/components/prompt"
  "github.com/cloudwego/eino/compose"
  "github.com/cloudwego/eino/schema"
)

func main() {

  ctx := context.Background()

  // 初始化大模型
  model, _ := openai.NewChatModel(ctx, &openai.ChatModelConfig{
    BaseURL: os.Getenv("OPENAI_BASE_URL"),
    APIKey:  os.Getenv("OPENAI_API_KEY"),
    Model:   "gpt-4o",
  })

  // 创建模板,使用 FString 格式
  template := prompt.FromMessages(schema.FString,
    // 系统消息模板
    schema.SystemMessage("你是一个翻译专家,擅长{from}和{to}之间的互译,如果用户输入的是{from}将其翻译成{to},如果用户输入的是{to}将其翻译成{from}"),
    // 用户消息模板
    schema.UserMessage("用户输入: {question}"),
  )

  // 构造图
  graph := compose.NewGraph[map[string]any, *schema.Message]()

  _ = graph.AddChatTemplateNode("template", template)
  _ = graph.AddChatModelNode("model", model)

  _ = graph.AddEdge(compose.START, "template")
  _ = graph.AddEdge("template", "model")
  _ = graph.AddEdge("model", compose.END)

  result, err := graph.Compile(ctx)
  if err != nil {
    panic(err)
  }

  // 调用图
  output, _ := result.Invoke(ctx, map[string]any{
    "from":     "中文",
    "to":       "法语",
    "question": "你好,世界!",
  })
  println(output.Content)
}

我们首先通过 openai.NewChatModel 创建了一个 ChatModel 组件,再通过 prompt.FromMessages 创建了一个 ChatTemplate 组件,然后使用 Graph API 将这两个组件连起来,构造成一个简单的图,如下:

simple-graph.png

这个图非常简单,实际上是一个链,感兴趣的同学也可以使用 Chain API 来实现它。

在编排图时,我们需要特别注意每个组件的输入和输出类型,比如:

  • ChatTemplate 的入参是 map[string]any,出参是 []*schema.Message
  • ChatModel 的入参是 []*schema.Message,出参是 ChatModel

ChatTemplate 的出参刚好可以对上 ChatModel 的入参,因此两个组件可以连起来,如果组件之间的参数不一致,需要在中间插入 Lambda 组件自定义参数转换,如果不匹配,在 Compile 编译时就会报错;此外,图的入参就是第一个节点的入参,图的出参就是最后一个节点的出参,使用 compose.NewGraph 创建图时需要明确的指定图的入参和出参类型,比如这里的入参为 map[string]any,出参为 *schema.Message,在调用 Invoke 时传入适当的参数类型。

关于编排的更多细节,可以参考官方的文档:

未完待续

本文我们顺着 Coze Studio 的代码,从智能体的执行入口 StreamExecute 开始,了解了其核心是基于 Eino 框架构建的一个智能体图。为了更好地理解这个图,我们快速学习了 Eino 框架的核心概念,包括其模块化设计、丰富的组件生态以及强大的编排能力。最后,通过一个简单的例子,我们掌握了如何使用 Eino 的组件和 Graph API 构建一个基本的大模型应用。

本文对智能体的图构建流程只是粗略带过,在下一篇文章中,我们将回头深入分析 Coze Studio 中那个复杂的智能体图,详细拆解其中的每一个节点和逻辑分支,彻底搞懂其运行细节。


学习 Coze Studio 的智能体会话接口

前面我们已经学习了 Coze Studio 的代码架构,对项目整体有了一个大致的了解。今天,我们将深入智能体执行的核心,研究下用户在和智能体对话时,后端服务是如何处理会话请求的。

接口层实现

我们首先来看下智能体会话的接口层实现。正如上一篇所述,Coze Studio 中的接口均通过 IDL 定义,位于 idl/conversation/agentrun_service.thrift 文件:

service AgentRunService {
  run.AgentRunResponse AgentRun(1: run.AgentRunRequest request) (
    api.post='/api/conversation/chat', 
    api.category="conversation", 
    api.gen_path= "agent_run"
  )
  run.ChatV3Response ChatV3(1: run.ChatV3Request request)(
    api.post = "/v3/chat",
    api.category="chat",
    api.tag="openapi",
    api.gen_path="chat"
  )
}

这里还有一个 /v3/chat 接口,它是正式发布的智能体对外的 API 接口,其实现和 /api/conversation/chat 基本一致,这里不作过多介绍。

然后使用 hz 工具将 IDL 自动生成 API 处理器,位于 backend/api/handler/coze/agent_run_service.go 文件:

// @router /api/conversation/chat [POST]
func AgentRun(ctx context.Context, c *app.RequestContext) {
    
  // 绑定并校验入参
  var req run.AgentRunRequest
  c.BindAndValidate(&req)
  
  // 新建 SSE 发送器
  sseSender := sseImpl.NewSSESender(sse.NewStream(c))
  c.SetStatusCode(http.StatusOK)
  c.Response.Header.Set("X-Accel-Buffering", "no")

  // 调用 conversation 应用服务
  conversation.ConversationSVC.Run(ctx, sseSender, &req)
}

API 接口层没有什么复杂的逻辑,主要是绑定并校验入参,然后调用应用层。

这里值得一提是 X-Accel-Buffering 响应头的使用,这是一个特殊的 HTTP 响应头,主要用于控制反向代理(如 Nginx)的缓冲行为,当它的值被设置为 no 时,表示告诉代理服务器不要对当前响应进行缓冲。如果没有这个设置,Nginx 等代理可能会缓冲响应内容直到缓冲区填满或响应完成,这会导致客户端无法实时获取数据,产生延迟感。这个设置通常用于需要 流式传输 的场景,比如:实时日志输出、大型文件下载、SSE 服务器推送等。

应用层实现

接着再来看下应用层的实现,位于 backend/application/conversation/agent_run.go 文件:

func (c *ConversationApplicationService) Run(ctx context.Context, sseSender *sseImpl.SSenderImpl, ar *run.AgentRunRequest) error {

  // 从当前会话中获取用户 ID
  userID := ctxutil.MustGetUIDFromCtx(ctx)

  // 如果 RegenMessageID > 0 说明是重新生成,将对应的运行记录和消息删除
  if ar.RegenMessageID != nil && ptr.From(ar.RegenMessageID) > 0 {
    msgMeta, err := c.MessageDomainSVC.GetByID(ctx, ptr.From(ar.RegenMessageID))
    if msgMeta != nil {
      err = c.AgentRunDomainSVC.Delete(ctx, []int64{msgMeta.RunID})
      delErr := c.MessageDomainSVC.Delete(ctx, &msgEntity.DeleteMeta{
        RunIDs: []int64{msgMeta.RunID},
      })
    }
  }

  // 查询 Agent 信息
  agentInfo, caErr := c.checkAgent(ctx, ar)

  // 获取当前会话,如果不存在,则创建新会话
  conversationData, ccErr := c.checkConversation(ctx, ar, userID)

  // 获取快捷指令
  var shortcutCmd *cmdEntity.ShortcutCmd
  if ar.GetShortcutCmdID() > 0 {
    cmdID := ar.GetShortcutCmdID()
    cmdMeta, err := c.ShortcutDomainSVC.GetByCmdID(ctx, cmdID, 0)
    shortcutCmd = cmdMeta
  }

  // 构造智能体运行参数
  arr, err := c.buildAgentRunRequest(ctx, ar, userID, agentInfo.SpaceID, conversationData, shortcutCmd)
  
  // 调用 agent_run 领域服务
  streamer, err := c.AgentRunDomainSVC.AgentRun(ctx, arr)
  
  // 从 streamer 拉取消息,根据消息类型构建对应的响应体,通过 sseSender 发送出去,实现流式输出
  c.pullStream(ctx, sseSender, streamer, ar)
  return nil
}

为了方便表述,我对原代码的顺序做了一些调整,主要包括三大块逻辑:

  1. 处理用户的重新生成请求:一般来说只会在最后一轮对话上出现重新生成按钮,实现逻辑就是将对应的运行记录和消息删除,然后继续正常的会话即可;
  2. 构造智能体运行参数:查询必要的信息,包括 Agent 信息、会话信息、快捷指令等,构造智能体运行参数;
  3. 运行智能体:调用 agent_run 领域服务,并通过 SSE 实现流式输出;

从代码可以看出,应用层整合了多个领域层服务,这里的每一个领域基本上都对应一个数据库表:

  • 消息领域服务(MessageDomainSVC):对应数据库中的 message 表,一条消息代表用户的一次提问或助手的一次回答,一次工具调用或一次工具响应;
  • 智能体运行领域服务(AgentRunDomainSVC):对应 run_record 表,代表一次用户和助手之间的所有交互,一条运行记录包含多条消息;
  • 单智能体领域服务(SingleAgentDomainSVC):对应 single_agent_draft 表,代表我们创建的智能体;
  • 会话领域服务(ConversationDomainSVC):对应 conversation 表,当你和智能体第一次交互时会自动创建一个会话,后续的交互都在这个会话下,一个会话包含多条运行记录;
  • 快捷指令领域服务(ShortcutDomainSVC):对应 shortcut_command 表,开启快捷指令后会在对话输入框上方出现快捷输入按钮,方便用户快速发起预设对话;快捷指令和正常的流程有些区别,因此需要特殊处理;

Coze Studio 使用 GORM 访问数据库,这是一个功能强大的 Go 语言 ORM(对象关系映射)库,它简化了 Go 程序与数据库的交互,提供了优雅的 API 和丰富的功能:

gorm.png

感兴趣的同学可以在每个领域的 internal/dal 下找到 modelquery 两个目录,这些都是通过 gorm.io/gen 自动生成的,包含对数据库表的增删改查代码:

backend/domain/agent/singleagent/internal/dal
├── model
│   ├── single_agent_draft.gen.go
│   ├── single_agent_publish.gen.go
│   └── single_agent_version.gen.go
├── query
│   ├── gen.go
│   ├── single_agent_draft.gen.go
│   ├── single_agent_publish.gen.go
│   └── single_agent_version.gen.go

领域层实现

上面的 Run() 方法中,最核心的一句是调用领域层的 AgentRun() 方法,其实现位于 backend/domain/conversation/agentrun/service/agent_run_impl.go 文件中,如下:

func (c *runImpl) AgentRun(ctx context.Context, arm *entity.AgentRunMeta) (*schema.StreamReader[*entity.AgentRunResponse], error) {

  // 新建一个容量 20 的双向管道
  sr, sw := schema.Pipe[*entity.AgentRunResponse](20)

  // 将 StreamWriter 传入 c.run 方法
  safego.Go(ctx, func() {
    defer sw.Close()
    _ = c.run(ctx, sw, rtDependence)
  })
  // 将 StreamReader 返回上层,供应用层读取
  return sr, nil
}

这里的逻辑比较简单,主要是通过 schema.Pipe 创建一个双向管道,用于上层读取消息和下层写入消息,然后使用 safego.Go 调用 c.run() 方法。safego.Go 是对原生的 go 的一层包装,它的主要作用是创建一个 goroutine 来执行传入的函数,并在 goroutine 中添加了错误恢复机制,确保在 goroutine 中发生的 panic 会被捕获和处理,而不会导致整个程序崩溃:

func Go(ctx context.Context, fn func()) {
  go func() {
    defer goutil.Recovery(ctx)

    fn()
  }()
}

调用的 c.run() 方法如下:

func (c *runImpl) run(ctx context.Context, sw *schema.StreamWriter[*entity.AgentRunResponse], rtDependence *runtimeDependence) (err error) {

  // 获取智能体信息
  agentInfo, err := c.handlerAgent(ctx, rtDependence)
  rtDependence.agentInfo = agentInfo
  
  // 获取最近 N 轮历史对话
  history, err := c.handlerHistory(ctx, rtDependence)
  
  // 创建一条新的运行记录 `run_record`
  runRecord, err := c.createRunRecord(ctx, sw, rtDependence)
  rtDependence.runID = runRecord.ID

  // 创建一条新的用户消息 `message`
  input, err := c.handlerInput(ctx, sw, rtDependence)
  rtDependence.questionMsgID = input.ID
  
  // 流式执行智能体
  err = c.handlerStreamExecute(ctx, sw, history, input, rtDependence)
  return
}

这里仍然是一系列数据库的操作,包括查询智能体信息、获取历史对话、创建运行记录、创建用户消息等,不过需要注意的是,这些基本上都是对其他领域的操作,Coze Studio 在这里引入了跨领域防腐层,防止领域之间的直接依赖。至此,整个对话接口的流程图如下所示:

agent-run-flow.png

到这里,终于走到了智能体会话接口的最末端,通过调用 crossagent 跨领域服务,执行逻辑从 agent_run 领域进入 single_agent 领域,正式开始执行智能体。

小结

今天,我们学习了 Coze Studio 智能体会话接口的完整后端处理流程,其代码严格遵循 DDD 的分层原则,从接口层、到应用层、到领域层、最后通过跨领域防腐层,将最终的执行任务交给了 single_agent 领域。至此,准备工作已经就绪,接下来,我们将进入 single_agent 领域,揭开智能体内部执行逻辑的神秘面纱。


学习 Coze Studio 的代码架构

经过几天的实战和学习,我们已经全面体验了 Coze Studio 从智能体、插件、工作流到知识库的各项核心功能。今天,我们开始研究下它的源码,看看这些功能背后的实现原理。

项目架构

Coze Studio 的架构设计严格遵循 领域驱动设计(DDD) 的核心原则进行构建,我们可以看下它的整体项目结构:

├── backend/              # 后端服务
│   ├── api/              # API 处理器和路由
│   ├── application/      # 应用层,组合领域对象和基础设施实现
│   ├── conf/             # 配置文件
│   ├── crossdomain/      # 跨领域防腐层
│   ├── domain/           # 领域层,包含核心业务逻辑
│   ├── infra/            # 基础设施实现层
│   ├── pkg/              # 无外部依赖的工具方法
│   └── types/            # 类型定义
├── common/               # 公共组件
├── docker/               # Docker 配置
├── frontend/             # 前端应用
│   ├── apps/             # 应用程序
│   ├── config/           # 配置文件
│   ├── infra/            # 基础设施
│   └── packages/         # 包
├── idl/                  # 接口定义语言文件

我们主要关注 backend 目录下的内容,从子目录的名称可以很明显看出是 DDD 的分层架构:

  • API 层(api:实现 HTTP 端点,使用 Hertz 服务器处理请求和响应,包含中间件组件;
  • 应用层(application:​组合各种领域对象和基础设施实现,提供 API 服务;
  • 领域层(domain:包含核心业务逻辑,定义领域实体和值对象,实现业务规则和工作流;
  • 跨领域防腐层(crossdomain:​定义跨领域接口,防止领域间直接依赖;
  • 基础设施层(infra:又分为契约层和实现层;契约层(contract ​定义所有外部依赖的接口,作为领域逻辑和基础设施之间的边界,包括存储系统、缓存机制、消息队列、配置管理等接口;实现层(impl 为契约层定义的接口提供具体的实现;
  • 工具包(pkg:无外部依赖的工具方法,可以被任何层直接使用;

领域驱动设计

领域驱动设计(Domain-Driven Design,简称 DDD) 是一种针对复杂业务系统的软件开发方法论,它的核心思想是 以业务领域为中心,软件的设计和实现都围绕业务领域的核心概念、规则和流程展开,而非单纯技术架构。通过抽象业务领域中的实体、关系和规则,构建 领域模型(Domain Model),并将模型映射为代码,使代码既能反映业务逻辑,又能被业务人员理解。

Eric Evans 在 2004 年出版了《领域驱动设计》一书,提出了经典的 DDD 4 层架构:

ddd-4-layers.png

我们可以在 backend/domain 目录下找到 Coze Studio 的所有领域模型:

├── agent           # 智能体,只有单智能体
├── app             # 应用
├── connector       # 连接器,Chat SDK 或 API
├── conversation    # 会话
├── datacopy        # 数据复制任务
├── knowledge       # 知识库
├── memory          # 记忆,包括数据库和变量
├── openauth        # 认证
├── permission      # 权限
├── plugin          # 插件和工具
├── prompt          # 提示词
├── search          # 搜索
├── shortcutcmd     # 快捷指令
├── template        # 模板
├── upload          # 一些默认图标的常量
├── user            # 用户
└── workflow        # 工作流

这里除了领域模型,还定义了对应的实体、值对象、聚合和领域服务等核心领域对象:

  • 实体(Entity):有唯一标识、状态可变的对象,其身份比属性更重要;
  • 值对象(Value Object):无唯一标识、不可变的对象,由属性定义(属性相同则视为相等);
  • 聚合(Aggregate):一组紧密关联的实体和值对象的集合,通过 聚合根(Aggregate Root) 对外暴露接口,保证数据一致性;
  • 领域服务(Domain Service):封装跨实体/聚合的业务逻辑,无法归属到单个实体时使用;
  • 领域事件(Domain Event):领域中发生的重要事件,用于解耦跨上下文的业务流程;
  • 仓储(Repository):封装数据持久化逻辑,为领域模型提供数据访问接口(屏蔽数据库细节);
  • 限界上下文(Bounded Context):领域模型的边界,每个上下文内有独立的模型和通用语言,上下文间通过接口通信;限界上下文可作为微服务拆分的依据,每个微服务对应一个或多个限界上下文,降低服务间耦合;

在 DDD 开发中,往往还会引入了一个跨领域防腐层(Anti-Corruption Layer,简称 ACL),它通过隔离领域间的直接依赖,防止领域间出现耦合,确保各领域的独立性。Coze Studio 也使用了该设计模式,将跨领域模型定义在 backend/crossdomain 目录下:

├── contract
│   ├── crossagent
│   ├── crossagentrun
│   ├── crossconnector
│   ├── crossconversation
│   ├── crossdatabase
│   ├── crossdatacopy
│   ├── crossknowledge
│   ├── crossmessage
│   ├── crossplugin
│   ├── crosssearch
│   ├── crossuser
│   ├── crossvariables
│   └── crossworkflow
├── impl
│   ├── ...
└── workflow
    ├── ...

跨领域其实就是对不同领域之间的调用增加了一层适配层,比如会话领域中的 agentrun 在调用智能体领域时,不是直接调用 agent,而是调用防腐层 crossagent,这样做的好处是当智能体领域发生变化时,会话领域可以不受影响。

基础设施层

当领域模型构建完成后,接着就可以实现基础设施层,包括实现具体的数据库持久化逻辑,集成外部服务 API 和基础设施,实现消息队列的事件发布和订阅机制等;我们可以在 backend/infra 目录下看到这些,一般将基础设施层分为契约层和实现层,因为大多数基础设施都有多种不同的实现:

├── contract
│   ├── cache         # 缓存,默认基于 Redis 实现
│   ├── chatmodel     # 对话模型,比如 OpenAI、ARK、DeepSeek 等
│   ├── coderunner    # 代码执行器,比如 Python、JavaScript 等
│   ├── document      # 文档相关,包括文档解析、文档检索、重排序、图片理解、OCR、NL2SQL 等
│   ├── dynconf       # 动态配置,比如 Zookeeper、Etcd、Nacos 等
│   ├── embedding     # 嵌入模型,包括 OpenAI、ARK 和 HTTP 三种实现
│   ├── es            # Elasticsearch 增删改查,针对不同的 ES 版本有不同的实现
│   ├── eventbus      # 事件总线,包括 Kafka、NSQ、RMQ 等实现
│   ├── idgen         # ID 生成器
│   ├── imagex        # 火山引擎的 veImageX 服务
│   ├── messages2query # 问题改写
│   ├── modelmgr       # 模型管理器
│   ├── orm            # 对象关系映射,默认使用 GORM 框架
│   ├── rdb            # 关系型数据库,默认使用 MySQL 数据库
│   ├── sqlparser      # SQL 解析器
│   ├── sse            # 服务器发送事件,默认使用 Hertz 的 SSE 实现
│   └── storage        # 存储服务,比如 Minio、S3、TOS 等
└── impl
    ├── 同契约层

接口层实现

领域层的上面是应用层和接口层。应用层通过组合领域对象和基础设施,实现具体的业务用例;接口层则将具体的业务功能包装成 HTTP 接口,供前端或 SDK 调用。

Coze Studio 使用字节自家开源的 Hertz 框架来实现接口层。这是一个使用 Golang 编写的 HTTP 框架,具有高易用性、高性能、高扩展性等特点,它的设计参考了 fasthttpginecho 等开源框架,并结合字节内部的需求,目前在字节内部已被广泛使用。

Hertz 包括服务端和客户端,提供了路由、多协议、多网络库的支持,内置常用中间件,并集成了日志、监控、服务注册发现等三方扩展,框架图如下所示:

hertz.png

为了更好地理解 Coze Studio 的代码,我们不妨快速熟悉下 Hertz 的使用。首先,创建 hertz_demo 文件夹,然后进入该目录,创建 main.go 文件,内容如下:

package main

import (
  "context"

  "github.com/cloudwego/hertz/pkg/app"
  "github.com/cloudwego/hertz/pkg/app/server"
  "github.com/cloudwego/hertz/pkg/common/utils"
  "github.com/cloudwego/hertz/pkg/protocol/consts"
)

func main() {
  h := server.Default(server.WithHostPorts(":9999"))

  h.GET("/ping", func(ctx context.Context, c *app.RequestContext) {
    c.JSON(consts.StatusOK, utils.H{"message": "pong"})
  })

  h.Spin()
}

使用 go mod init 命令生成 go.mod 文件:

$ go mod init hertz_demo

再使用 go mod tidy 命令整理并拉取依赖:

$ go mod tidy

最后使用 go run 启动服务:

$ go run hertz_demo

如果看到类似下面这样的日志,则说明服务已启动成功:

2025/08/05 07:12:36.758686 engine.go:681: [Debug] HERTZ: Method=GET    absolutePath=/ping --> handlerName=main.main.func1 (num=2 handlers)
2025/08/05 07:12:36.759819 engine.go:417: [Info] HERTZ: Using network library=netpoll
2025/08/05 07:12:36.760315 transport.go:149: [Info] HERTZ: HTTP server listening on address=[::]:9999

使用 curl 对接口进行测试:

$ curl http://localhost:9999/ping
{"message":"pong"}

这样一个简单的基于 Hertz 的 Web 服务就开发好了,如果想对 Hertz 做深入学习,可参考官方文档:

hz 代码生成

在上面的演示中,我们创建并编写 main.go 文件是从零开始的,其实,Hertz 还提供了一个 hz 命令行工具,可以快速生成 Hertz 项目的脚手架。

在安装 hz 之前,首先确保 GOPATH 环境变量已经被正确的定义,并且将 $GOPATH/bin 添加到 PATH 环境变量之中:

export GOPATH=$HOME/go
export PATH=$GOPATH/bin:$PATH

然后就可以通过下面的命令安装 hz

$ go install github.com/cloudwego/hertz/cmd/hz@latest

运行 hz -v 验证是否安装成功:

$ hz -v
hz version v0.9.7

如果能正常显示版本号,则说明 hz 已成功安装。接下来,我们使用 hz 来生成一个 Hertz 项目。首先,创建 hz_demo 文件夹,然后进入该目录,执行如下命令:

$ hz new -module hz_demo

该命令会生成如下目录结构:

├── biz
│   ├── handler
│   │   └── ping.go
│   └── router
│       └── register.go
├── build.sh
├── go.mod
├── main.go
├── router.go
├── router_gen.go
└── script
    └── bootstrap.sh

仔细对比 Coze Studio 的 backend 目录结构,可以发现两者几无二致,基本上可以确定,Coze Studio 的 backend 模块也是使用 hz 自动生成的。

接着安装依赖:

$ go mod tidy

并启动服务:

$ go run hz_demo

接口定义语言

hz 的另一大特点是,它可以基于 接口定义语言(Interface Definition Language,简称 IDL) 生成 Hertz 项目的脚手架。IDL 是一种中立的、跨语言的规范,用于描述软件组件之间的接口(如数据结构、函数、服务定义等),它不依赖于特定编程语言,而是通过统一的语法定义接口契约,再由工具生成不同语言的代码(如 C++、Java、Python、Golang 等),在分布式系统中,不同服务可能使用不同语言开发,通过 IDL 可确保数据格式和交互方式一致。

Thrift 和 Protobuf 是两种主流的 IDL 实现,均用于跨语言数据序列化和服务通信,广泛应用于分布式系统:

  • Thrift 是由 Facebook 开发的开源 IDL 框架,后捐给 Apache 基金会,支持数据序列化和 RPC 服务开发;
  • Protobuf 是 Google 开发的开源 IDL 框架,专注于高效的数据序列化,常与 gRPC 配合实现 RPC 通信;

hz 对 Thrift 和 Protobuf 两种 IDL 都提供了支持,但是在使用之前,需要安装相应的编译器:thriftgoprotoc,这里以 Thrift 为例,使用下面的命令安装 thriftgo 编译器:

$ GO111MODULE=on go install github.com/cloudwego/thriftgo@latest

然后我们创建一个新目录 idl_demo,并新建一个 idl/hello.thrift 文件:

// idl/hello.thrift
namespace go hello.example

struct HelloReq {
  1: string Name (api.query="name"); // 添加 api 注解为方便进行参数绑定
}

struct HelloResp {
  1: string RespBody;
}

service HelloService {
  HelloResp HelloMethod(1: HelloReq request) (api.get="/hello");
}

这个文件声明了 hello.example 命名空间,并定义了 HelloReqHelloResp 两个结构体,分别对应 HelloService 服务中 HelloMethod 接口的请求和响应,同时还定义了该接口为 GET 请求,地址为 /hello

再通过下面的命令生成项目脚手架:

$ hz new -module idl_demo \
    -idl idl/hello.thrift \
    -handler_dir api/handler \
    -router_dir api/router \
    -model_dir api/model

其中 -handler_dir-router_dir-model_dir 用于将对应的目录生成到 api 目录下,而不是默认的 biz 目录,这和 Coze Studio 的代码做法一致。新生成的目录结构如下:

├── api
│   ├── handler
│   │   ├── hello
│   │   │   └── example
│   │   │       └── hello_service.go
│   │   └── ping.go
│   ├── model
│   │   └── hello
│   │       └── example
│   │           └── hello.go
│   └── router
│       ├── hello
│       │   └── example
│       │       ├── hello.go
│       │       └── middleware.go
│       └── register.go
├── build.sh
├── go.mod
├── idl
│   └── hello.thrift
├── main.go
├── router.go
├── router_gen.go
└── script
    └── bootstrap.sh

HelloService 服务的实现位于 api/handler 目录下,hello/example 对应 idl 文件中的命名空间,我们可以打开 hello_service.go 对其进行编辑:

// HelloMethod .
// @router /hello [GET]
func HelloMethod(ctx context.Context, c *app.RequestContext) {
  var err error
  var req example.HelloReq
  err = c.BindAndValidate(&req)
  if err != nil {
    c.String(consts.StatusBadRequest, err.Error())
    return
  }

  resp := new(example.HelloResp)
  resp.RespBody = "hello, " + req.Name  // <-- 新增代码
  c.JSON(consts.StatusOK, resp)
}

接着和上面一样,安装依赖,启动服务:

$ go mod tidy
$ go run hz_demo

使用 curl 验证通过 IDL 定义的 /hello 接口是否能正常调用:

$ curl "http://localhost:9999/hello?name=zhangsan"
{"RespBody":"hello, zhangsan"}

Coze Studio 的源码中有一个 idl 目录,里面包含大量的 Thrift 文件,定义了平台所有接口和结构体,backend/api 目录下的 handlerroutermodel 就是基于这些 IDL 通过 hz 自动生成的。

小结

今天,我们对 Coze Studio 的代码架构做了一番研究。首先学习了其基于领域驱动设计(DDD)的后端实现,掌握了领域层、跨领域防腐层、基础设施层等概念;然后通过实践 Hertz 框架、hz 命令行工具以及接口定义语言(IDL),理解了其 API 层的构建方式。通过今天的学习,相信大家对 Cozs Studio 的代码全貌有了一个直观的了解,在阅读 Coze Studio 源码时不至于迷路。

接下来,我们就深入到具体的业务实现里,看看它的智能体、插件、工作流、知识库以及记忆等核心功能是如何实现的。


实战 Coze Studio 知识库使用

昨天我们学习了 Coze Studio 的工作流功能,通过在可视化画布上拖拽节点,迅速编排和搭建复杂的工作流。今天,我们将继续探索 Coze 的另一个核心功能 —— 知识库,学习如何为智能体注入私有或专业领域的知识,解决大模型幻觉和专业领域知识不足的问题。

创建知识库

大模型虽然知识渊博,但其知识截止于训练日期,并且对于私有或专业领域的知识一无所知。知识库功能正是为了解决这个问题而生的,它通过 RAG 技术,让智能体能够基于我们提供的专属资料库来回答问题。

我们首先进入 “资源库” 页面,点击 “+ 资源” 并选择 “知识库”,创建知识库:

coze-kb-create.png

Coze Studio 提供了三种不同类型的知识库:

  • 文本格式:文本知识库支持用户上传 PDF、TXT、DOC、DOCX、MD 等格式的文件,解析和分片后存储于向量数据库;它基于内容片段进行检索和召回,大模型结合召回的内容生成最终内容回复,适用于知识问答等场景;
  • 表格格式:表格知识库支持用户上传 CSV 和 XLSX 文件,导入表格数据,按行进行划分;它基于索引列的匹配进行检索,同时也支持基于 NL2SQL 的查询和计算;
  • 照片类型:照片知识库支持用户上传 JPG、JPEG 和 PNG 图片,通过大模型对图片进行标注;它基于标注信息的相似度匹配,找到与用户问题最相关的图片,给大模型用于内容生成;

Coze Studio 的知识库功能依赖于 Embedding 服务和向量化存储组件。

其中 Embedding 服务支持三种接入方式:

  • OpenAI - 兼容 OpenAI 协议的 Embedding 接口
  • ARK - 火山引擎提供的 Embedding 服务
  • HTTP - 调用本地部署的模型服务,需要满足 Coze 自己的一套 接口协议,暂不支持 Ollama 或 Xinference 协议

这里我使用的是 OpenAI 的 text-embedding-3-small 模型,下面是 .env 配置示例,注意它的向量维度为 1536:

export EMBEDDING_TYPE="openai"
export OPENAI_EMBEDDING_BASE_URL=""
export OPENAI_EMBEDDING_MODEL="text-embedding-3-small"
export OPENAI_EMBEDDING_API_KEY="sk-xxx"
export OPENAI_EMBEDDING_BY_AZURE=false
export OPENAI_EMBEDDING_API_VERSION=""
export OPENAI_EMBEDDING_DIMS=1536
export OPENAI_EMBEDDING_REQUEST_DIMS=1536

另外,向量化存储组件支持开源向量数据库 Milvus 和火山向量数据库 VikingDB,默认使用 Milvus,一般不修改:

export VECTOR_STORE_TYPE="milvus"
export MILVUS_ADDR="localhost:19530"

如果你想切换成 VikingDB,可以使用下面的配置:

export VECTOR_STORE_TYPE="vikingdb"
export VIKING_DB_HOST=""
export VIKING_DB_REGION=""
export VIKING_DB_AK=""
export VIKING_DB_SK=""
export VIKING_DB_SCHEME=""
export VIKING_DB_MODEL_NAME=""

由于修改的是环境变量,通过 --force-recreate 重启服务:

$ docker compose --profile '*' up -d --force-recreate --no-deps coze-server

文本知识库

创建知识库时选择 “文本格式”,并填写知识库名称和描述,导入类型选 “本地文档”,然后点击 “创建并导入” 按钮,进入 “新增知识库” 的 “上传” 页面:

coze-kb-create-2.png

如果导入类型选 “自定义”,则进入 “新增知识库” 的 “文本填写” 页面,支持用户手动录入文档名称和文档内容。

我们上传一个简单的 PDF 文件(可以包含文本、表格、图片等元素),进入 “创建设置” 页面:

coze-kb-create-3.png

和其他的 RAG 系统类似,我们需要为知识库配置 文档解析策略分段策略。Coze Studio 支持两种文档解析策略:

  • 精准解析:支持从文档中提取图片、表格等元素,需要耗费更长的时间;支持设置过滤内容,可以将一些特定的页面排除掉;
  • 快速解析:不会对文档提取图像、表格等元素,适用于纯文本;

如果上传的文件是扫描件,往往需要开启 OCR 功能,Coze Studio 目前只支持火山引擎的 通用 OCR 服务,可以免费开通试用:

volcengine-ocr.png

开通后创建密钥,获取 AK 和 SK,然后将其填到 .env 配置文件中:

export OCR_TYPE="ve"
export VE_OCR_AK="AK..."
export VE_OCR_SK="SK..."

同样的,由于修改的是环境变量,通过 --force-recreate 重启服务:

$ docker compose --profile '*' up -d --force-recreate --no-deps coze-server

如果上传的文件是纯文本,选择快速解析即可。

Coze Studio 也支持两种分段策略:

  • 自动分段与清洗:使用内置的分段与预处理规则;
  • 自定义:使用用户自定义的分段规则、分段长度与预处理规则;

配置好文档解析策略和分段策略后,点击 “下一步”,进入分段预览页面:

coze-kb-create-4.png

如果预览没问题的话,再次点击 “下一步”,系统开始对文件进行处理,等处理结束后,我们的文本知识库就构建完成了:

coze-kb-create-done.png

表格知识库

创建知识库时选择 “表格格式”,并填写知识库名称和描述,导入类型选 “本地文档”,然后点击 “创建并导入” 按钮,进入 “新增知识库” 的 “上传” 页面:

coze-kb-create-table-2.png

我们上传一个简单的 Excel 文件(必须有明确的表头行和数据行),进入 “表结构配置” 页面:

coze-kb-create-table-3.png

在这里可以切换数据表,一个表格数据库只能对应一个数据表;然后选择表头所在行以及数据起始行,再为表格的每一列添加描述和数据类型;并选择一个或多个列作为索引,用于和用户的问题进行相似度匹配;然后继续 “下一步”,进入预览页面:

coze-kb-create-table-4.png

预览没问题的话,再次点击 “下一步”,系统开始对表格进行处理,等处理结束后,我们的表格知识库就构建完成了。

照片知识库

创建知识库时选择 “照片类型”,并填写知识库名称和描述,导入类型选 “本地文档”,然后点击 “创建并导入” 按钮,进入 “新增知识库” 的 “上传” 页面:

coze-kb-create-image-2.png

我们上传几张图片,进入 “标注设置” 页面:

coze-kb-create-image-3.png

Coze Studio 支持两种标注图片的方式:

  • 智能标注:通过大模型深度理解图片,自动提供全面详细的内容描述信息;
  • 人工标注:不执行处理,在图片导入完成后,手动添加图片描述;

为了使用智能标注功能,我们还需要配置 AI 生成模型,可以通过下面的参数配置:

export BUILTIN_CM_TYPE="openai"

支持 OpenAI、ARK、DeepSeek、Ollama、Qwen、Gemini 等模型,不同模型的配置有所区别,对于 OpenAI 配置如下:

export BUILTIN_CM_OPENAI_BASE_URL=""
export BUILTIN_CM_OPENAI_API_KEY="sk-xxx"
export BUILTIN_CM_OPENAI_BY_AZURE=false
export BUILTIN_CM_OPENAI_MODEL="gpt-4o"

注意:AI 生成模型不仅用于图像标注(Image Annotation),还用于文本转 SQL(NL2SQL)、一句话生成 Query(Message to Query)等其他场景。如果需要在不同场景中使用不同模型,你可以通过添加前缀来针对特定场景应用特定配置,比如 IM_BUILTIN_CM_TYPENL2SQL_BUILTIN_CM_TYPEM2Q_BUILTIN_CM_TYPE

配置完成后记得通过 --force-recreate 重启服务。

点击 “下一步”,如果是选择了人工标注,系统不会做任何处理直接结束;如果是选择了智能标注,则通过 AI 生成模型对图片进行处理,等处理结束后,我们的照片知识库也就构建完成了:

coze-kb-create-image-done.jpg

大模型为每张图片生成一句话描述,可以点击图片进行查看和编辑:

coze-kb-create-image-done-2.png

使用知识库

知识库解析完成后,不需要发布,我们直接就可以在创建智能体时,将知识库添加到文本、表格或照片等知识中。智能体可以检索文本:

coze-kb-qa.png

可以检索图片:

coze-kb-qa-2.png

也可以检索表格:

coze-kb-qa-3.png

创建数据库

在 Coze Studio 的资源库中,还有一种资源和知识库中的表格非常像,那就是数据库:

coze-db.png

数据库可以作为智能体的记忆使用,实现收藏夹、todo list、书籍管理、财务管理等功能,也可以作为工作流中的节点使用。我们不妨拿高德的城市编码表来测试一下,它这个 Excel 表格包含 citynameadcodecitycode 三列:

coze-db-excel.png

因此我们需要在创建数据库时,定义好对应的表结构:

coze-db-table.png

然后进入数据库的测试数据或线上数据页面导入数据,线上数据是应用程序在实际运行时产生的数据,测试数据主要用于辅助调试,与线上数据是隔离的:

coze-db-table-data.png

不知道大家还记不记得,我们昨天创建了一个 “高德天气查询” 工作流,其中通过代码组件将城市名称转换为高德城市编码,在代码组件中,我们将 Excel 中的所有数据保存到列表中,代码写得既冗余,又不好维护,而现在我们将 Excel 中的数据保存到数据库表中,那么就可以直接查询数据库来简化这个过程了。

对于数据库的查询操作,可以通过 “查询数据” 或 “SQL 自定义” 两个组件来实现,不过我测试下来发现,使用 “查询数据” 组件,城市名称怎么都模糊匹配不了,不清楚具体原因,后面阅读源码的时候再研究一下,有清楚的朋友欢迎评论区留言。这里先用 “SQL 自定义” 组件实现,编写 SQL 语句从数据库表中查询对应的城市记录:

select * from amap_city_code where LOCATE({{input}}, cityname) > 0

注意,我这里使用 LOCATE 函数来实现模糊匹配的功能,这和 LIKE 语句差不多,那么为什么不用 LIKE 呢?这是因为 Coze Studio 的 SQL 中使用 {{input}} 占位符来替换变量,但是替换时会自动用引号引起来,如果我们写 LIKE '%{{input}}%' 会被替换成 LIKE '%'城市名'%',导致 SQL 查询报错,因此不得不找个偏门的法子。

配置好的节点如下图所示:

coze-workflow-sql.png

该节点会返回 outputList 结构化的表格数据,因此代码节点的 Python 代码可以精简:

async def main(args: Args) -> Output:
  outputList = args.params['outputList']
  city_id = outputList[0]['adcode'] if outputList else '110000'
  ret: Output = {
    'city_id': city_id
  }
  return ret

优化后的工作流如下:

coze-workflow.png

在这个工作流中,不仅代码更加简洁,而且城市编码表存放在数据库中,更容易维护。

小结

我们今天主要学习了 Coze Studio 的知识库功能,学习了如何创建和配置文本、表格和照片三种不同类型的知识库,了解了其背后的 Embedding、向量存储、OCR、图片标注等配置。此外,我们还学习了数据库功能,通过一个实际案例,将昨天创建的 “高德天气查询” 工作流进行了优化,利用数据库替代了硬编码的城市列表,使代码更简洁、维护更容易。

至此,我们已经全面体验了 Coze Studio 从智能体、插件、工作流到知识库的各项核心功能。可以看出,相对于官方的扣子平台,开源版阉割了不少功能,比如:

  • 创建智能体里少了 对话流模式多 Agents 模式,少了 提示词对比调试自动优化提示词 功能;编排里技能少了 触发器异步任务,记忆少了 长期记忆文件盒子,对话体验少了 音视频语音输入 等选项;调试智能体时少了 调试详情 功能;
  • 创建应用里少了整个 用户界面 模块,不能搭建低代码应用;
  • 资源库里少了 对话流卡片音色 等类型的资源;工作流中的组件也不完全,少了 图像处理音视频处理会话管理消息管理 等组件;插件工具的创建方式少了 代码插件端侧插件;创建文本知识库时少了 在线数据飞书公众号Notion 等导入类型,表格知识库少了 API飞书 等导入类型;
  • 工作空间里还少了 发布管理模型管理效果评测 等模块,而且也没有 团队空间 的功能;

除了这些,还有一些产品细节,就不一一列举了。尽管如此,Coze Studio 的几个核心功能基本上都开源了,而且随着开源社区的不断贡献,相信这些缺失的模块很快就能补上。在接下来的学习中,我们将深入这几个核心功能的源码,研究下它们背后的实现原理。