Fork me on GitHub

2025年8月

学习 Coze Studio 的工作流执行逻辑

今天,我们来学习下 Coze Studio 中工作流的执行逻辑。我们知道,Eino 框架支持以 编排(Compose) 的方式对各种原子组件进行组合和串联,并且它支持 图(Graph)链(Chain)工作流(Workflow) 三种编排方式。其中,链基于图实现,是最简单的;工作流和图处于同一级别,他们之间的区别在于,工作流提供了字段级别的映射能力以及控制流与数据流分离等特性。在 Coze Studio 中,智能体是通过 Graph API 实现的,工作流是通过 Workflow API 实现的。

工作流 vs. 图

工作流和图的区别在于,它提供了字段级别的映射能力,节点的输入可以由任意前驱节点的任意输出字段组合而成。比如下面这个例子:

workflow-field-mapping.png

假设 f1 和 f2 是两个现有的业务函数,f1 的输出为 F1 和 F2,而 f2 的输入为 F3,很显然两者并不匹配,如果使用 Graph API 编排,由于类型对齐的要求,我们有两种做法:

  1. 定义一个公共结构体,将 f1 的输出类型和 f2 的输入类型改成这个公共结构体。不仅有开发成本,而且对业务逻辑造成了入侵;
  2. 将 f1 的输出类型和 f2 的输入类型都改成 map。丢失了强类型对齐的特性。

这时就可以使用 Workflow API 编排,将 f1 的输出字段 F1 直接映射到 f2 的输入字段 F3,同时保留 f1 和 f2 的原始函数签名。每个节点由业务场景决定输入输出,不需要考虑谁给我输入,以及谁用我的输出。

工作流的另一个特点是控制流与数据流分离,看下面这个场景:

workflow-data-control-separate.png

节点 D 同时引用了 A、B、C 的某些输出字段。其中 A->D 的这条虚线,是单纯的 数据流,不传递 控制 信息,即 A 执行完成与否,不决定 D 是否开始执行。

节点 D 到 E 之间的粗箭头,代表节点 E 不引用节点 D 的任何输出,是单纯的 控制流,不传递 数据,即 D 执行完成与否,决定 E 是否开始执行,但是 D 的输出不影响 E 的输入。

图中其他的线,是控制流与数据流合一的。

需要注意的是,数据流能传递的前提,是一定有一条控制流存在,比如 A->D 的数据流,依赖 A->branch->B->D 或者 A->branch->C->D 的控制流存在。即数据流只能引用前驱节点的输出。

学习 Workflow API

下面我们通过一个简单例子,演示下 Workflow API 的基本用法。假设我们现在有一个现成的业务函数 add(),实现 AB 两个整数的相加:

type AddParam struct {
  A int
  B int
}

func add(ctx context.Context, param AddParam) (int, error) {
  return param.A + param.B, nil
}

我们要使用这个函数,编排出一个任务链,实现 X+Y+Z 三个整数的加法:

type Request struct {
  X int
  Y int
  Z int
}

示例代码如下:

func main() {

  // 创建工作流
  wf := compose.NewWorkflow[Request, int]()

  // 第一个 add 节点
  wf.AddLambdaNode("add1", compose.InvokableLambda(add)).
    AddInput(
      compose.START,
      compose.MapFields("X", "A"),
      compose.MapFields("Y", "B"),
    )

  // 第二个 add 节点
  wf.AddLambdaNode("add2", compose.InvokableLambda(add)).
    AddInput(compose.START, compose.MapFields("Z", "A")).
    AddInput("add1", compose.ToField("B"))

  wf.End().AddInput("add2")

  // 编译工作流
  run, err := wf.Compile(context.Background())
  if err != nil {
    panic(err)
  }

  // 调用工作流
  result, _ := run.Invoke(context.Background(), Request{
    X: 1,
    Y: 2,
    Z: 3,
  })
  println(result)
}

Workflow API 和 Graph API 的语法类似,整个编排流程还是很清晰的:

  1. 使用 compose.NewWorkflow() 创建工作流,和 compose.NewGraph() 基本一致;
  2. 使用 AddLambdaNode() 将用户自定义函数添加为工作流节点,除此之外,之前学习过的 Eino 组件都可以作为节点,比如 AddChatModelNode()AddToolsNode() 等;
  3. 使用 compose.InvokableLambda() 将一个普通函数转为为 Lambda 节点所需的 *compose.Lambda 类型;函数的入参必须是 ctx context.Context, input I,出参必须是 output O, err error
  4. 使用 AddInput() 为节点添加输入字段映射,可以从任意多个前驱节点的输出中引用任意多个字段,可以链式连续调用;字段映射有多种不同的场景:

    • 顶层字段到顶层字段:MapFields(string, string),非常简单的一对一映射,比如上面 add1 节点的 X->AY->B 的映射;
    • 全部输出到顶层字段:ToField(string),比如上面 add2 节点的 B 参数,直接引用 add1 的完整输出;
    • 顶层字段到全部输入:FromField(string)
    • 嵌套字段到嵌套字段:MapFieldPaths(FieldPath, FieldPath),只要上游或下游有一方是嵌套的,就需要用;
    • 全部输出到嵌套字段:ToFieldPath(FieldPath)
    • 嵌套字段到全部输入:FromFieldPath(FieldPath)
    • 全部输出到全部输入:直接使用 AddInput(),不需要传字段映射,比如上面的结束节点;

工作流如下图所示:

workflow-sample.png

关于 Workflow API 还有一些高级技巧,比如实现只有数据流没有控制流或者只有控制流没有数据流的场景、在工作流中使用分支、使用静态值等,感兴趣的同学可以参考官方的文档:

工作流的试运行接口

接下来,我们大致过一遍 Coze Studio 工作流的运行流程,当用户在页面点击 “试运行” 按钮时,触发接口 /api/workflow_api/test_run,其代码入口位于 backend/api/handler/coze/workflow_service.go 文件:

// @router /api/workflow_api/test_run [POST]
func WorkFlowTestRun(ctx context.Context, c *app.RequestContext) {
  
  // 绑定参数
  var req workflow.WorkFlowTestRunRequest
  err = c.BindAndValidate(&req)

  // 调用 workflow 应用层
  resp, err := appworkflow.SVC.TestRun(ctx, &req)
  
  c.JSON(consts.StatusOK, resp)
}

直接调用 workflow 应用层的 TestRun() 方法:

func (w *ApplicationService) TestRun(ctx context.Context, req *workflow.WorkFlowTestRunRequest) (_ *workflow.WorkFlowTestRunResponse, err error) {

  // 构造执行配置,调用 workflow 领域层
  exeCfg := vo.ExecuteConfig{
    ID:           mustParseInt64(req.GetWorkflowID()),
    From:         vo.FromDraft,
    // ...
  }
  exeID, err := GetWorkflowDomainSVC().AsyncExecute(ctx, exeCfg, maps.ToAnyValue(req.Input))
  
  // 返回异步执行的 ID
  return &workflow.WorkFlowTestRunResponse{
    Data: &workflow.WorkFlowTestRunData{
      WorkflowID: req.WorkflowID,
      ExecuteID:  fmt.Sprintf("%d", exeID),
    },
  }, nil
}

接着再调用 workflow 领域层的 AsyncExecute() 方法:

func (i *impl) AsyncExecute(ctx context.Context, config vo.ExecuteConfig, input map[string]any) (int64, error) {
  
  // 查询工作流
  wfEntity, err = i.Get(ctx, &vo.GetPolicy{...})

  // Canvas 前端画布
  c := &vo.Canvas{}
  sonic.UnmarshalString(wfEntity.Canvas, c)

  // 将 Canvas 转换为 Workflow Schema
  workflowSC, err := adaptor.CanvasToWorkflowSchema(ctx, c)

  // 创建工作流
  //  1. 使用 Eino 的 `NewWorkflow()` 创建工作流
  //  2. 获取所有复合节点并添加到工作流
  //  3. 添加普通节点,根据节点类型,封装不同的节点
  wf, err := compose.NewWorkflow(ctx, workflowSC, wfOpts...)

  // 将输入转换为工作流的入参
  convertedInput, ws, err := nodes.ConvertInputs(ctx, input, wf.Inputs(), cOpts...)
  inStr, err := sonic.MarshalString(input)
  
  // 工作流运行准备工作:
  //  1. 生成 executeID 并创建执行记录
  //  2. 处理中断执行、恢复执行等逻辑
  //  3. 使用 Eino 的 Callbacks 机制为工作流和节点注入回调
  //  4. 创建一个 goroutine 处理回调事件,更新工作流或节点执行状态
  cancelCtx, executeID, opts, _, err := compose.NewWorkflowRunner(wfEntity.GetBasic(), workflowSC, config,
    compose.WithInput(inStr)).Prepare(ctx)

  // 异步执行工作流
  wf.AsyncRun(cancelCtx, convertedInput, opts...)

  // 返回 executeID
  return executeID, nil
}

这里省略了不少无关代码,整体逻辑还是比较复杂的,大致可分为四步:

第一步,结构转换:将前端画布的 Canvas 结构转换为后端工作流的 Workflow Schema 结构;处理父节点和子节点的层次关系,支持嵌套的子工作流(如循环、批处理节点);

第二步,创建工作流:使用 Eino 的 NewWorkflow() 创建工作流,然后根据节点类型,通过 NodeSchema.New() 向工作流中添加对应的节点实例,每个节点类型都有专门的实现包,参考 backend/domain/workflow/internal/compose/node_schema.go 文件:

node-schema-new.png

说实话,这里的实现有点 low,让我感到有点意外,所有的节点实现全部都是写死的逻辑,没什么扩展性,如果要新增一个节点类型,就得在这里加一个 case 分支。

第三步,工作流预处理:使用 Eino 的 Callbacks 机制为工作流和节点注入回调,同时创建一个 goroutine 监听回调事件,包括工作流开始、结束、出错、节点开始、结束等等:

event-handler.png

将工作流或节点的执行状态更新到 workflow_executionnode_execution 数据表中。另外,它会通过 Redis 生成一个全局唯一的 executeID 并创建执行记录。

第四步,异步执行:对工作流进行异步执行,并返回上一步生成的 executeID,调用方可以拿着这个 executeID 查询执行进度。

其实,Coze Studio 中的工作流有多种执行方式,除了这里的 AsyncExecute 异步执行之外,还支持 SyncExecute 同步执行,StreamExecute 流式执行,AsyncExecuteNode 单节点执行等。另外,Coze Studio 的工作流还支持中断,比如遇到问答节点、输入节点、或需要鉴权的工具节点时,这时会等待用户反馈,反馈后可以通过 AsyncResumeStreamResume 从断点处恢复执行。

小结

今天,我们学习了 Coze Studio 工作流的执行原理。我们首先研究了 Eino 框架提供的 Workflow API,并将其与 Graph API 进行了对比,重点理解了其强大的字段映射能力以及控制流与数据流分离的特性。

接着,我们通过分析工作流 “试运行” 功能的后端源码,详细拆解了其完整的执行流程。该流程可分为四步:首先将前端画布的 Canvas 结构转换为 Eino 可识别的 Workflow Schema,然后基于 Schema 动态创建工作流实例,接着通过回调机制注入事件监听以追踪执行状态,最后异步触发整个工作流的运行。

前端调用 “试运行” 接口后,紧接着会轮询 /api/workflow_api/get_process 接口,获取工作流的执行进度,包括每个节点的入参和出参,如下所示:

coze-workflow-amap-weather.png

这个接口就比较简单了,就是查询 workflow_executionnode_execution 数据表,获取工作流或节点的执行状态,感兴趣的朋友可以自行研究。


学习 Coze Studio 的工具使用

昨天,我们详细拆解了 Coze Studio 智能体的核心执行逻辑,了解到其本质是一个基于 Eino 框架构建的智能体图。在分析的最后我们提到,其中的一些关键实现细节,例如 ReAct Agent 与工具的交互逻辑,仍有待深入。今天,我们就来填上这个坑,学习下 Coze Studio 是如何实现工具调用的。

我们知道,Eino 提供了很多可复用的原子组件,通过 Graph API 可以对其进行编排,实现出复杂的业务逻辑。不过有些业务逻辑存在一定的通用性,Eino 将这些场景进行抽象,提供了一些构建大模型应用的模式,被称为 Flow。目前 Eino 提供了两种常用的 Agent 模式:

除此之外还提供了一些常用的召回策略,比如 MultiQueryRetrieverParentIndexer 等。

ReAct Agent

我们今天主要关注 ReAct Agent 部分。ReAct 这个词出自 Shunyu Yao 等人的这篇论文 《ReAct: Synergizing Reasoning and Acting in Language Models》,它是由 ReasonAct 两个词组合而成,表示一种将 推理行动 与大模型相结合的通用范式:

react.png

传统的 Reason Only 型应用(如 Chain-of-Thought Prompting)具备很强的语言能力,擅长通用文本的逻辑推断,但由于不会和外部环境交互,因此它的认知非常受限;而传统的 Act Only 型应用(如 WebGPTSayCanACT-1)能和外界进行交互,解决某类特定问题,但它的行为逻辑较简单,不具备通用的推理能力。

ReAct 的思想,旨在将这两种应用的优势结合起来。针对一个复杂问题,首先使用大模型的推理能力制定出解决该问题的行动计划,这好比人的大脑,可以对问题进行分析思考;然后使用行动能力与外部源(例如知识库或环境)进行交互,以获取额外信息,这好比人的五官和手脚,可以感知世界,并执行动作;大模型对行动的结果进行跟踪,并不断地更新行动计划,直到问题被解决。

通过这种模式,我们能基于大模型构建更为强大的 AI 应用,可以阅读原始论文了解更多关于 ReAct 的信息:

回顾 Function Call 原理

原始的 ReAct 论文是通过 提示工程(Prompt Engineering) 实现的。后来,OpenAI 对 GPT 模型进行了一项重大更新,推出了 Function Call 功能,在 Chat Completions API 中添加了函数调用能力,这一功能推出后,迅速风靡世界,开发者可以通过 API 的方式实现类似于 ChatGPT 插件的数据交互能力,将大模型集成到自己的业务和应用中。随后,越来越多的大模型也都开始支持 Function Call 功能,目前已经成为大模型调用工具的事实规范。

简单来说,Function Call 就是在调用大模型时提供几个工具选项,大模型判断用户的问题是否需要调用某个工具。如果不需要则直接回复,这和传统的调用没有区别;如果需要调用则返回合适的工具和对应的参数给用户,用户拿到后调用对应的工具,再将调用结果送给大模型,最后,大模型根据工具的调用结果来回答用户的问题。OpenAI 官方文档 中有一张详细的流程图:

function-calling-diagram.png

其中要注意的是,第二次调用大模型时,可能仍然会返回 tool_calls 响应,这时可以循环处理。

定义 Eino 工具

了解了 ReAct Agent 的原理后,接下来,我们再来看下 Eino 中 ReAct Agent 的用法。为了让大模型能调用我们的工具,我们需要使用 Eino 提供的 ToolsNode 组件,该组件提供了三个层次的接口:

// 基础工具接口,提供工具信息,包括工具名称、描述和参数等
type BaseTool interface {
  Info(ctx context.Context) (*schema.ToolInfo, error)
}

// 可调用的工具接口,支持同步调用
type InvokableTool interface {
  BaseTool
  InvokableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (string, error)
}

// 支持流式输出的工具接口
type StreamableTool interface {
  BaseTool
  StreamableRun(ctx context.Context, argumentsInJSON string, opts ...Option) (*schema.StreamReader[string], error)
}

在 Eino 中,BaseTool 接口要求任何一个工具都要实现 Info() 接口返回工具信息,包括工具名称、描述和参数等。而根据一个工具被调用后的返回结构是否是流式的,可以分为 InvokableToolStreamableTool,两个接口实现其一即可。

下面是我写的一个简单示例,通过 Info()InvokableRun() 方法实现了天气查询工具:

type ToolGetWeather struct {
}

type ToolGetWeatherParam struct {
  City string `json:"city"`
  Date string `json:"date"`
}

func (t *ToolGetWeather) Info(ctx context.Context) (*schema.ToolInfo, error) {
  return &schema.ToolInfo{
    Name: "get_weather",
    Desc: "查询天气",
    ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
      "city": {
        Type:     "string",
        Desc:     "城市名称",
        Required: true,
      },
      "date": {
        Type:     "string",
        Desc:     "日期",
        Required: true,
      },
    }),
  }, nil
}

func (t *ToolGetWeather) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {

  // 解析参数
  p := &ToolGetWeatherParam{}
  err := json.Unmarshal([]byte(argumentsInJSON), p)
  if err != nil {
    return "", err
  }

  res := p.City + p.Date + "天气晴,气温30摄氏度"
  return res, nil
}

这里有两点值得注意:

  1. Info() 函数中,我们使用了工具方法 schema.NewParamsOneOfByParams() 返回工具参数信息,这个方法生成 params map[string]*ParameterInfo 类型的参数约束;其实,Eino 还支持 OpenAPI 定义的 JSON Schema 方式,返回 *openapi3.Schema 类型的参数约束;
  2. InvokableRun() 的入参和出参都是字符串,需要开发者自行处理参数的反序列化和序列化。

除此之外,Eino 还提供了多种定义工具的方式:

  • 使用 NewTool()InferTool() 方法将本地函数转为工具;
  • 使用 Eino Ext 中提供的内置工具,比如 Bing 搜索、Google 搜索、DuckDuckGo 搜索、SearXNG 搜索、维基百科搜索、浏览器使用、命令行工具、HTTP 请求、Sequential Thinking 等;
  • 使用 MCP 工具;

具体内容可参考 Eino 如何创建工具的文档:

创建 ReAct Agent

定义工具之后,我们就可以通过 react.NewAgent() 创建一个 ReAct Agent 并自动调用工具了:

func main() {

  ctx := context.Background()

  // 初始化大模型
  model, _ := openai.NewChatModel(ctx, &openai.ChatModelConfig{
    BaseURL: os.Getenv("OPENAI_BASE_URL"),
    APIKey:  os.Getenv("OPENAI_API_KEY"),
    Model:   "gpt-4o",
  })

  // 初始化工具
  toolGetWeather := &ToolGetWeather{}

  // ReAct 智能体
  agent, _ := react.NewAgent(ctx, &react.AgentConfig{
    ToolCallingModel: model,
    ToolsConfig: compose.ToolsNodeConfig{
      Tools: []tool.BaseTool{toolGetWeather},
    },
  })

  r, _ := agent.Generate(ctx, []*schema.Message{
    {
      Role:    schema.User,
      Content: "北京明天的天气怎么样?",
    },
  })
  println(r.Content)
}

这里的代码非常简单,Agent 的调用和 Graph 的调用基本类似,也提供了 Generate() 非流式 和 Stream() 流式调用两种方式。另外,Agent 还提供了 ExportGraph() 方法可以将 ReAct Agent 内部的图导出来,作为子图嵌在其他图中运行。Coze Studio 就使用了这种方式:

// 如果工具数量大于 0,则使用 ReAct Agent
var isReActAgent bool
if len(agentTools) > 0 {
    isReActAgent = true
}

// 构建 ReAct Agent 子图并导出
if isReActAgent {
  agent, _ := react.NewAgent(ctx, &react.AgentConfig{
    ToolCallingModel: chatModel,
    ToolsConfig: compose.ToolsNodeConfig{
      Tools: agentTools,
    },
    ToolReturnDirectly: toolsReturnDirectly,
    ModelNodeName:      keyOfReActAgentChatModel,
    ToolsNodeName:      keyOfReActAgentToolsNode,
  })
  
  agentGraph, agentNodeOpts = agent.ExportGraph()
  agentNodeName = keyOfReActAgent
} else {
  agentNodeName = keyOfLLM
}

// 添加 ReAct Agent 或 Chat Model 作为智能体节点
if isReActAgent {
  _ = g.AddGraphNode(agentNodeName, agentGraph, agentNodeOpts...)
} else {
  _ = g.AddChatModelNode(agentNodeName, chatModel, agentNodeOpts...)
}

相比于我们上面的示例代码,Coze Studio 创建 ReAct Agent 时多了一个 ToolReturnDirectly 参数,不知道大家还记不记得我们之前在实战工作流时讲过,工作流的结束节点有两种返回方式:

coze-workflow-end-node.png

  • 返回变量:工作流运行结束后会以 JSON 格式输出所有返回参数,智能体在对话中触发工作流后,会自动总结 JSON 格式的内容,并以自然语言回复用户;
  • 返回文本:工作流运行结束后,智能体将直接使用指定的内容回复对话;

ToolReturnDirectly 是一个字典,里面包含的就是 返回文本 类型的工作流名字。

func newWorkflowTools(ctx context.Context, conf *workflowConfig) ([]workflow.ToolFromWorkflow, map[string]struct{}, error) {
  
  // 查询所有工作流,作为工具
  workflowTools, _ := crossworkflow.DefaultSVC().WorkflowAsModelTool(ctx, policies)

  // 找出所有 TerminatePlan == UseAnswerContent 的工作流,也就是 返回文本 类型的
  toolsReturnDirectly := make(map[string]struct{})
  for _, workflowTool := range workflowTools {
    if workflowTool.TerminatePlan() == vo.UseAnswerContent {
      toolInfo, _ := workflowTool.Info(ctx)
      toolsReturnDirectly[toolInfo.Name] = struct{}{}
    }
  }

  return workflowTools, toolsReturnDirectly, err
}

感兴趣的朋友可以看下 Eino 的源码,可以发现 ReAct Agent 其实也是基于 Graph API 实现的,只是这个图稍微复杂点,里面增加了分支、循环以及直接输出等逻辑:

react-flow.png

Coze Studio 的工具列表

最后,我们再来看下 Coze Studio 构建工具列表的逻辑:

agentTools := make([]tool.BaseTool, 0, len(pluginTools)+len(wfTools)+len(dbTools)+len(avTools))
// 插件工具
agentTools = append(agentTools, pluginTools)
// 工作流工具
agentTools = append(agentTools, wfTools)
// 数据库工具
agentTools = append(agentTools, dbTools)
// 变量工具
agentTools = append(agentTools, avTools)

可以发现,除了 插件工具工作流工具,这里还有两个特别的工具:

  • 数据库工具:当智能体里添加了数据库时自动出现的工具,工具名称对应用户创建的数据表名,工具参数为 sql 语句,每个数据表对应一个数据库工具;
  • 变量工具:当智能体里添加了用户变量时自动出现的工具,工具名称固定为 setKeywordMemory,工具参数为 keyvalue,所有变量共用这个工具;

下面是我创建的 todo_list 数据库对应的工具 Schema:

{
  "name": "todo_list",
  "description": "Mysql query tool. Table name is 'todo_list'. This table's desc is 待办清单.\n\nTable structure:\n- item (text): 待办事项\n- status (number): 状态,0 为待完成,1 为已完成\n\nUse SQL to query this table. You can write SQL statements directly to operate.",
  "parameters": {
    "properties": {
      "sql": {
        "description": "SQL query to execute against the database. You can use standard SQL syntax like SELECT",
        "type": "string"
      }
    },
    "required": [
      "sql"
    ],
    "type": "object"
  }
}

下面是变量工具的 Schema:

{
  "name": "setKeywordMemory",
  "description": "\n## Skills Conditions\n1. When the user's intention is to set a variable and the user provides the variable to be set, call the tool.\n2. If the user wants to set a variable but does not provide the variable, do not call the tool.\n3. If the user's intention is not to set a variable, do not call the tool.\n\n## Constraints\n- Only make decisions regarding tool invocation based on the user's intention and input related to variable setting.\n- Do not call the tool in any other situation not meeting the above conditions.\n",
  "parameters": {
    "properties": {
      "data": {
        "items": {
          "properties": {
            "keyword": {
              "description": "the keyword of memory variable",
              "type": "string"
            },
            "value": {
              "description": "the value of memory variable",
              "type": "string"
            }
          },
          "required": [
            "keyword",
            "value"
          ],
          "type": "object"
        },
        "type": "array"
      }
    },
    "required": [
      "data"
    ],
    "type": "object"
  }
}

这两个都属于记忆工具,当用户的问题需要查询或更新记忆时,就会自动调用这些工具:

coze-studio-memory-tools.png

点击右上角的大脑图标,也可以查看记忆中的内容。

小结

今天我们深入探讨了 Coze Studio 实现工具调用的底层逻辑。其核心是利用了 Eino 框架提供的 ReAct Agent,它遵循 ReAct(推理+行动) 的思想,通过大模型的 Function Call 能力,将强大的推理能力与和外部世界交互的行动能力相结合,从而解决复杂的任务。

我们学习了如何在 Eino 中通过实现 BaseToolInvokableTool 等标准接口来定义一个工具,并将其提供给 ReAct Agent 使用。一个关键的设计是,Coze Studio 会将创建好的 ReAct Agent 作为一个子图,动态地嵌入到智能体的整体执行图中,这种设计非常灵活且强大。

最后,我们分析了 Coze Studio 为智能体自动装配的四种工具:插件、工作流、数据库工具和变量工具。通过结合这些工具,Coze Studio 得以实现出功能强大、场景通用的智能体。


再学 Coze Studio 的智能体执行逻辑

我们昨天粗略学习了 Coze Studio 的智能体执行逻辑,了解到其核心是基于 Eino 框架编排的一个智能体图,为了更好地理解这个图,我们学习了 Eino 框架中组件的使用以及如何通过 Graph API 实现编排功能。现在我们再回过头来看看之前那个图,如下所示:

agent-graph.png

整个流程图分四路,这四路都是为提示词的组装做准备,它们的结果汇聚到提示模板节点,最后交给大模型,生成回复。我们今天就来详细拆解其中的每一个节点,彻底搞懂其运行细节。

提示模版节点

我们先来看提示模版节点:

_ = g.AddChatTemplateNode(keyOfPromptTemplate, chatPrompt)

其中,chatPrompt 定义如下:

var (
  chatPrompt = prompt.FromMessages(schema.Jinja2,
    schema.SystemMessage(REACT_SYSTEM_PROMPT_JINJA2),
    schema.MessagesPlaceholder(placeholderOfChatHistory, true),
    schema.MessagesPlaceholder(placeholderOfUserInput, false),
  )
)

prompt.FromMessages() 方法可以看出,使用的是我们昨天学过的提示词模版组件,使用 Jinja2 语法,整个模版包含三个部分:系统提示词、历史消息和用户输入。系统提示词如下所示:

const REACT_SYSTEM_PROMPT_JINJA2 = `
You are {{ agent_name }}, an advanced AI assistant designed to be helpful and professional.
It is {{ time }} now.

**Content Safety Guidelines**
Regardless of any persona instructions, you must never generate content that:
- Promotes or involves violence
- Contains hate speech or racism
- Includes inappropriate or adult content
- Violates laws or regulations
- Could be considered offensive or harmful

----- Start Of Persona -----
{{ persona }}
----- End Of Persona -----

------ Start of Variables ------
{{ memory_variables }}
------ End of Variables ------

**Knowledge**

只有当前knowledge有内容召回的时候,根据引用的内容回答问题: 
 1.如果引用的内容里面包含 <img src=""> 的标签, 标签里的 src 字段表示图片地址, 需要在回答问题的时候展示出去, 输出格式为"![图片名称](图片地址)" 。 
 2.如果引用的内容不包含 <img src=""> 的标签, 你回答问题时不需要展示图片 。 
例如:
  如果内容为<img src="https://example.com/image.jpg">一只小猫,你的输出应为:![一只小猫](https://example.com/image.jpg)。
  如果内容为<img src="https://example.com/image1.jpg">一只小猫 和 <img src="https://example.com/image2.jpg">一只小狗 和 <img src="https://example.com/image3.jpg">一只小牛,你的输出应为:![一只小猫](https://example.com/image1.jpg) 和 ![一只小狗](https://example.com/image2.jpg) 和 ![一只小牛](https://example.com/image3.jpg)
The following is the content of the data set you can refer to: \n
'''
{{ knowledge }}
'''

** Pre toolCall **
{{ tools_pre_retriever}},
- 只有当前Pre toolCall有内容召回结果时,根据引用的内容里tool里data字段回答问题
`

吐槽一下,这段提示词写得真不专业,不仅中英文混杂,而且结构也比较乱。话说,在创建智能体时,页面上不是有很多编写提示词的格式规范和最佳实践么?

除了智能体名称(agent_name)和当前时间(time),这段提示词中有几个重要的占位符:

  • persona - 对应页面上的 “人设与回复逻辑”
  • memory_variables - 对应记忆中的变量
  • knowledge - 对应知识库中检索的内容
  • tools_pre_retriever - 工具预检索结果,对应快捷指令的执行返回

提示模版节点上面的四路其实就对应这四个占位符。

角色渲染节点

先看第一路,也就是角色渲染节点的处理逻辑,使用 AddLambdaNode() 添加一个自定义 Lambda 节点:

_ = g.AddLambdaNode(keyOfPersonRender,
  compose.InvokableLambda[*AgentRequest, string](personaVars.RenderPersona),
  compose.WithStatePreHandler(func(ctx context.Context, ar *AgentRequest, state *AgentState) (*AgentRequest, error) {
    state.UserInput = ar.Input
    return ar, nil
  }),
  compose.WithOutputKey(placeholderOfPersona))

这里通过 compose.InvokableLambda() 将一个普通函数 personaVars.RenderPersona() 转换为 *compose.Lambda,该函数入参为 *AgentRequest 出参为 string。特别注意的是,下一个节点提示词模版的入参是 map[string]any,因此这里通过 compose.WithOutputKey() 设置输出的 key,将 string 转换为 map

函数 RenderPersona() 的逻辑如下:

// 从 “人设与回复逻辑” 中提取所有的变量占位符
persona := conf.Agent.Prompt.GetPrompt()
personaVariableNames: extractJinja2Placeholder(persona)

// 从数据库查询当前智能体定义的记忆变量,对应 `variables_meta` 和 `variable_instance` 表
variables, _ := loadAgentVariables(ctx, avConf)

// 替换 “人设与回复逻辑” 中的所有变量
func (p *personaRender) RenderPersona(ctx context.Context, req *AgentRequest) (persona string, err error) {
  variables := make(map[string]string, len(p.personaVariableNames))
  for _, name := range p.personaVariableNames {
    // 优先从请求中提取变量
    if val, ok := req.Variables[name]; ok {
      variables[name] = val
      continue
    }
    // 兜底使用智能体的记忆变量
    if val, ok := p.variables[name]; ok {
      variables[name] = val
      continue
    }
    // 默认空字符串
    variables[name] = ""
  }
  // 直接使用提示词模版组件格式化
  msgs, _ := prompt.FromMessages(schema.Jinja2, schema.UserMessage(p.persona)).Format(ctx, maps.ToAnyValue(variables))
  return msgs[0].Content, nil
}

主要分为三步:

  1. 从 “人设与回复逻辑” 中提取所有的变量占位符,占位符格式为 Jinja2 格式,比如 {{name}}{{age}} 等;
  2. 从数据库查询当前智能体定义的记忆变量,对应 variables_metavariable_instance 表;variables_meta 表中存的是我们在智能体页面创建的变量,包含变量名、描述和默认值,而 variable_instance 表存的智能体运行过程中动态写入的变量值;
  3. 最后将 “人设与回复逻辑” 中的所有变量占位符替换成变量值;

比如我们在智能体中定义如下记忆变量:

agent-variables.png

然后可以在 “人设与回复逻辑” 中使用这些变量:

agent-variables-2.png

提示变量节点

第二路比较简单,组装所有的变量:

_ = g.AddLambdaNode(keyOfPromptVariables,
  compose.InvokableLambda[*AgentRequest, map[string]any](promptVars.AssemblePromptVariables))

这些变量包括系统提示词模板中的当前时间、智能体名称、用户输入、历史消息,以及上一步查询出来的智能体记忆变量:

func (p *promptVariables) AssemblePromptVariables(ctx context.Context, req *AgentRequest) (variables map[string]any, err error) {
  variables = make(map[string]any)
  // 当前时间
  variables[placeholderOfTime] = time.Now().Format("Monday 2006/01/02 15:04:05 -07")
  // 智能体名称
  variables[placeholderOfAgentName] = p.Agent.Name
  // 用户输入
  variables[placeholderOfUserInput] = []*schema.Message{req.Input}
  // 历史消息
  variables[placeholderOfChatHistory] = req.History

  // 智能体记忆变量列表
  if p.avs != nil {
    var memoryVariablesList []string
    for k, v := range p.avs {
      variables[k] = v
      memoryVariablesList = append(memoryVariablesList, fmt.Sprintf("%s: %s\n", k, v))
    }
    variables[placeholderOfVariables] = memoryVariablesList
  }
  return variables, nil
}

智能体的记忆变量被转为键值对,替换上面系统提示词模版中的 {{ memory_variables }} 占位符。

知识检索节点

第三路负责知识库的检索,使用了两个自定义 Lambda 节点:

// 新建知识库检索器
kr, err := newKnowledgeRetriever(ctx, &retrieverConfig{
  knowledgeConfig: conf.Agent.Knowledge,
})

// 第一个节点检索
_ = g.AddLambdaNode(keyOfKnowledgeRetriever,
  compose.InvokableLambda[*AgentRequest, []*schema.Document](kr.Retrieve),
  compose.WithNodeName(keyOfKnowledgeRetriever))

// 第二个节点组装成字符串
_ = g.AddLambdaNode(keyOfKnowledgeRetrieverPack,
  compose.InvokableLambda[[]*schema.Document, string](kr.PackRetrieveResultInfo),
  compose.WithOutputKey(placeholderOfKnowledge),
)

首先新建一个知识库检索器,将其 Retrieve() 方法作为第一个节点,知识库检索后,该方法返回 []*schema.Document 文档列表,因此还需要第二个节点,调用 PackRetrieveResultInfo 将其组装成字符串,和上面的角色渲染节点一样,这里也通过 compose.WithOutputKey 设置输出 Key,替换系统提示词模版中的 {{ knowledge }} 占位符。

可以看出只要智能体关联了知识库,用户每次对话时总是会触发一次知识库检索,调用跨领域的 knowledge 服务,关于知识库检索的逻辑比较复杂,我们后面再看。

工具预检索节点

接着是最后一路,负责工具预检索,其实就是执行快捷指令:

_ = g.AddLambdaNode(keyOfToolsPreRetriever,
  compose.InvokableLambda[*AgentRequest, []*schema.Message](tr.toolPreRetrieve),
  compose.WithOutputKey(keyOfToolsPreRetriever),
  compose.WithNodeName(keyOfToolsPreRetriever),
)

Coze Studio 支持两种类型的快捷指令,一种是纯 Prompt,比如下面的翻译指令:

agent-shortcut-prompt.png

另一种是调用插件或工作流,比如下面的天气查询指令:

agent-shortcut-workflow.png

配置快捷指令后,智能体的对话框上方会多出快捷指令的按钮:

agent-shortcut-tips.png

我们可以为快捷指令添加组件(也就是变量),当用户点击快捷指令按钮时,会提示用户填写变量的值,变量值会替换指令内容中的占位符,然后作为用户输入调用智能体会话接口。如果用户调用的是纯 Prompt 快捷指令,这和正常会话没有任何区别:

agent-shortcut-prompt-use.png

如果用户调用的是插件或工作流快捷指令,同样也会将指令内容中的占位符替换掉作为用户输入,同时还会传入工具信息,后端会先调用工具,然后根据工具调用结果回答用户问题:

agent-shortcut-workflow-use.png

工具预检索节点就是根据传入的工具信息调用对应的插件或工作流,然后将调用的结果替换系统提示词模版中的 {{ tools_pre_retriever }} 占位符。

智能体节点

当系统提示词组装完毕,流程进入智能体节点,开始调用大模型。这里的智能体节点根据是否存在工具,有两种实现:

if isReActAgent {
  _ = g.AddGraphNode(agentNodeName, agentGraph, agentNodeOpts...)
} else {
  _ = g.AddChatModelNode(agentNodeName, chatModel, agentNodeOpts...)
}

当存在工具时,使用 ReAct Agent 实现,当不存在工具时,直接使用大模型实现。

建议子图

Coze Studio 支持为智能体开启问题建议,如果开启,则在智能体回复后,根据 Prompt 提供最多 3 条用户提问建议:

agent-sugguest.png

这时会走建议分支的逻辑:

suggestGraph, nsg := newSuggestGraph(ctx, conf, chatModel)
if nsg {
  // 建议预输入解析
  _ = g.AddLambdaNode(keyOfSuggestPreInputParse, compose.ToList[*schema.Message](),
    compose.WithStatePostHandler(func(ctx context.Context, out []*schema.Message, state *AgentState) ([]*schema.Message, error) {
      out = append(out, state.UserInput)
      return out, nil
    }),
  )
  // 建议子图
  _ = g.AddGraphNode(keyOfSuggestGraph, suggestGraph)
}

建议分支有两个节点,第一个节点做参数准备,第二个节点生成问题建议。其中有意思的是,问题建议功能也是通过智能体图的方式构建的,如下所示:

sugguest-graph.png

整个流程分为:初始化变量 -> 替换提示词 -> 调用大模型 -> 解析结果,总体比较简单,此处略过,看下它的提示词就明白了:

const SUGGESTION_PROMPT_JINJA2 = `
你是一个推荐系统,请完成下面的推荐任务。
### 对话 
用户: {{_input_}}
AI: {{_answer_}}

personal: {{ suggest_persona }}

围绕兴趣点给出3个用户紧接着最有可能问的几个具有区分度的不同问题,问题需要满足上面的问题要求,推荐的三个问题必须以字符串数组形式返回。

注意:
- 推荐的三个问题必须以字符串数组形式返回
- 推荐的三个问题必须以字符串数组形式返回
- 推荐的三个问题必须以字符串数组形式返回
`

小结

今天我们详细拆解了 Coze Studio 智能体的核心执行逻辑。其本质是一个基于 Eino 框架构建的智能体图,运行流程总结如下:

  1. 四路并行处理:为了动态构建最终的系统提示词,图的执行分为四个并行的分支,分别负责处理:

    • 角色: 渲染“人设与回复逻辑”,并将用户定义的记忆变量填充进去。
    • 变量: 组装包括当前时间、智能体名称、历史消息在内的各类上下文变量。
    • 知识库检索: 对接知识库进行检索,并将召回的内容注入提示词。
    • 工具预检索: 执行用户点击的快捷指令,填充执行结果。
  2. 汇总提示词:上述四路的结果最终汇集到提示词模版节点,填充到一个预设的 Jinja2 模版中,形成完整的上下文。
  3. 调用大模型:组装好的提示词被送入最终的智能体节点。如果智能体配置了工具,则会启用一个 ReAct Agent;否则,直接调用基础的大语言模型生成回复。
  4. 问题建议:此外,如果开启了 “问题建议” 功能,在主流程结束后还会触发一个独立的建议子图,用于生成后续的推荐问题。

通过今天的分析,我们对 Coze Studio 的宏观执行流程有了清晰的认识。然而,其中的一些关键实现细节,例如知识库的向量检索与重排机制、工作流的具体执行过程,以及 ReAct Agent 与工具的交互逻辑,仍有待深入。这些将是我们后续研究的重点。


学习 Coze Studio 的智能体执行逻辑

我们昨天学习了 Coze Studio 智能体会话接口的完整后端处理流程,从接口层、到应用层、到领域层、最后通过跨领域防腐层,将最终的执行任务交给了 single_agent 领域。今天我们将继续这个过程,深入研究下智能体的执行逻辑。

智能体执行

智能体执行的逻辑代码位于 backend/domain/agent/singleagent/service/single_agent_impl.go 文件:

func (s *singleAgentImpl) StreamExecute(ctx context.Context, req *entity.ExecuteRequest) (events *schema.StreamReader[*entity.AgentEvent], err error) {

  // 获取智能体信息
  ae, err := s.ObtainAgentByIdentity(ctx, req.Identity)
  
    // 构建智能体图
  conf := &agentflow.Config{
    Agent:        ae,
    UserID:       req.UserID,
    Identity:     req.Identity,
    ModelMgr:     s.ModelMgr,
    ModelFactory: s.ModelFactory,
    CPStore:      s.CPStore,
  }
  rn, err := agentflow.BuildAgent(ctx, conf)
  
  // 执行智能体图
  exeReq := &agentflow.AgentRequest{
    UserID:   req.UserID,
    Input:    req.Input,
    History:  req.History,
    Identity: req.Identity,
    ResumeInfo:   req.ResumeInfo,
    PreCallTools: req.PreCallTools,
  }
  return rn.StreamExecute(ctx, rn.PreHandlerReq(ctx, exeReq))
}

首先根据配置信息构建智能体图,这里引入了 图(Graph) 这个新概念,Coze Studio 通过 Eino 智能体框架,将智能体编排成一个有向图,构建的大体流程如下:

func BuildAgent(ctx context.Context, conf *Config) (r *AgentRunner, err error) {

  // 初始化节点...

  // 创建一个新图
  g := compose.NewGraph[*AgentRequest, *schema.Message](
    compose.WithGenLocalState(func(ctx context.Context) (state *AgentState) {
      return &AgentState{}
    }))

  // 向图中添加节点...
  // 连接节点之间的边...
  
  // 图编译
  runner, err := g.Compile(ctx, opts...)

  // 返回图
  return &AgentRunner{
        runner: runner,
    }, nil
}

这个智能体图比较复杂,方法中充斥着大量节点初始化以及向图中添加节点和边的代码,此处做了省略,暂时只要关注 compose.NewGraph() 以及 g.Compile() 两行代码,这是使用 Eino 创建并编译智能体的常见方式,构建完成的智能体图如下所示:

agent-graph.png

这个图结构体现了完整的智能体处理流水线,从输入预处理到最终响应生成:

  • 并行初始化:从开始节点同时触发 4 个输入处理节点,并行处理提高效率;
  • 汇聚处理:所有预处理结果汇聚到提示模板节点;
  • 条件分支:根据是否有工具选择不同的 Agent 处理方式,如果有工具,使用 ReAct Agent,否则直接调大模型;
  • 问题建议:根据配置决定是否启用建议功能;

最后,完成图的编译后,在 StreamExecute() 方法中通过 runner.Stream() 流式调用它:

func (r *AgentRunner) StreamExecute(ctx context.Context, req *AgentRequest) (sr *schema.StreamReader[*entity.AgentEvent], err error) {

  // 回调处理器
  hdl, sr, sw := newReplyCallback(ctx, executeID.String())

  go func() {

    // 注册回调
    var composeOpts []compose.Option
    composeOpts = append(composeOpts, compose.WithCallbacks(hdl))
    _ = compose.RegisterSerializableType[*AgentState]("agent_state")

    // 流式运行图
    _, _ = r.runner.Stream(ctx, req, composeOpts...)
  }()

  return sr, nil
}

可以看出,智能体执行的核心就是这个图的构建和执行,在我们深入细节之前,先来快速了解下字节开源的 Eino 智能体框架。

了解 Eino 智能体框架

目前已经有很多智能体框架了,比如 LangGraph、LlamaIndex、Agno 等,但这些基本上都是 Python 语言开发的。Eino 是字节开源的一款使用 Go 语言开发的智能体框架,其主要特点是强类型,易维护,高可靠。

Eino 遵循模块化设计,架构图如下:

eino-framework.png

主要分为六大模块:

  • Schema:提供 Eino 最基础的一些结构与方法定义,比如 MessageDocumentStreamReaderStreamWriter 等;
  • Components:组件是 Eino 应用的基本构成元素,比如 ChatModelChatTemplateToolLambda 等,它们功能各异,但都遵循统一的接口规范,Eino Ext 为每种组件提供默认的实现;
  • Compose:对多个组件进行流程编排,支持复杂的图(Graph)、简单的链(Chain)以及支持字段映射的工作流(Workflow),实现复杂的业务逻辑;
  • Callbacks:为智能体的运行提供切面机制,通过不同的触发时机,可实现日志、跟踪、监控等功能,内置集成了 LangfuseAPMPlusCozeLoop 等平台;
  • Flow:大模型应用是存在通用场景和模式的,Eino 将这些场景进行抽象,提供了一些可以帮助开发者快速构建大模型应用的模版,比如 ReAct 智能体、多智能体、多查询检索等;
  • Devops Tools:Eino 提供了一套开发工具链,通过 Eino Dev 插件进行可视化 Graph 搭建和代码生成,以及对其进行可视化调试;

Eino 不仅仅是一款智能体框架,更准确的说,它应该是一款大模型应用开发框架,通过这些模块,开发者可快速开发出满足自己业务需求的大模型应用。

学习 Eino 组件

大模型应用开发有三种主要的应用模式:

  • 直接对话模式:处理用户输入并生成相应回答;
  • 知识处理模式:对文本文档进行语义化处理、存储和检索;
  • 工具调用模式:基于上下文做出决策并调用相应工具;

Eino 从这些模式中提取出一些常用的能力,并将这些能力抽象为可复用的 组件(Components),这些组件功能各异,大致可以分为下面几大类,Eino 为每种组件都提供了多种不同的实现:

  • 对话处理类组件

    • 提示词模板(ChatTemplate):有 Default 和 MCP 两种实现,默认实现支持 FStringGoTemplateJinja2 等变量语法,MCP 实现 支持加载 MCP Server 中定义的 Prompt 资源;
    • 大模型对话(ChatModel):支持 OpenAI、DeepSeek、Qwen、Claude、Gemini、Ollama、火山引擎、百度千帆平台等;
  • 文本语义处理类组件

    • 加载文档(Document Loader):支持从本地文件、Web URL 和 AWS S3 存储桶中加载文档,一般和 Document Parser 结合使用;
    • 解析文档(Document Parser):支持解析 TXT、HTML、DOCX、XLSX、PDF 等格式的文件;
    • 处理文档(Document Transformer):分为 Splitter 和 Reranker 两种,Splitter 支持 Markdown 分割器、HTML 分割器、递归分割器、语义分割器等,Reranker 支持基于得分的重排序;
    • 文本语义化(Embedding):支持 OpenAI、Ollama、火山引擎、百度千帆平台、阿里云百炼平台 DashScope、腾讯云混元等;
    • 索引存储(Indexer):一般配合 Embedding 一起使用,也可以使用分词索引,支持 Elasticsearch 8.x、Milvus 2.x、Redis Stack 以及火山引擎 VikingDB 等;
    • 文本召回(Retriever):从 Indexer 中检索内容,同样支持 Elasticsearch 8.x、Milvus 2.x、Redis Stack 以及火山引擎 VikingDB 等,同时还支持检索火山知识库和 Dify 知识库;
  • 决策执行类组件

    • 调用工具(ToolsNode):用户可基于接口规范实现自己的工具,Eino 也提供了几个内置的工具,比如 Bing 搜索、Google 搜索、DuckDuckGo 搜索、SearXNG 搜索、维基百科搜索、浏览器使用、命令行工具、HTTP 请求、Sequential Thinking 以及 MCP 工具;
  • 用户自定义组件

    • 自定义代码逻辑(Lambda):允许用户在工作流中嵌入自定义的函数逻辑;

这里以大模型组件为例,带大家了解下 Eino 组件的用法。下面是一个简单的调用示例:

package main

import (
  "context"
  "os"

  "github.com/cloudwego/eino-ext/components/model/openai"
  "github.com/cloudwego/eino/schema"
)

func main() {

  ctx := context.Background()

  // 初始化大模型
  model, _ := openai.NewChatModel(ctx, &openai.ChatModelConfig{
    BaseURL: os.Getenv("OPENAI_BASE_URL"),
    APIKey:  os.Getenv("OPENAI_API_KEY"),
    Model:   "gpt-4o",
  })

  // 准备消息
  messages := []*schema.Message{
    schema.SystemMessage("你是一个翻译专家,擅长中文和英文之间的互译。"),
    schema.UserMessage("你好,世界!"),
  }

  // 生成回复
  response, _ := model.Generate(ctx, messages)
  println(response.Content)
}

我们首先通过 openai.NewChatModel 创建了一个 ChatModel 组件,它的接口定义如下:

type BaseChatModel interface {
  Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
  Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
    *schema.StreamReader[*schema.Message], error)
}

它有 GenerateStream 两个方法,可以看出,组件输入是消息数组([]*schema.Message),输出是消息(*schema.Message),这里使用 Generate 生成一次性回复,也可以使用 Stream 流式输出:

  // 流式读取消息
  reader, _ := model.Stream(ctx, messages)
  defer reader.Close()
  for {
    chunk, err := reader.Recv()
    if err != nil {
      break
    }
    print(chunk.Content)
  }

“组件优先” 是 Eino 的一大设计原则,每个组件都是一个职责划分比较清晰且功能单一的模块,都可以独立使用,其他组件的用法和上面的大模型组件基本类似。我们要特别注意每个组件的输入和输出类型,这在后面编排时非常有用。

使用 Eino 进行 Graph 编排

组件只能提供原子能力,在一个大模型应用中,还需要根据场景化的业务逻辑,对这些原子能力进行组合和串联,这就是 编排(Compose)。Eino 支持三种编排方式:图(Graph)链(Chain)工作流(Workflow),三者之间的关系如下:

compose-api.png

其中,链最简单,可以认为是简单的有向无环图,它是基于图实现的;工作流也是无环图,和图的区别在于,它提供了字段级别映射能力,节点的输入可以由任意前驱节点的任意输出字段组合而成;在 Coze Studio 中,智能体是通过 Graph API 实现的,工作流是通过 Workflow API 实现的。

回顾上面 Coze Studio 构建智能体图时所看到的 compose.NewGraph() 以及 g.Compile() 两行代码,其实就是 Graph API。下面通过一个简单的例子快速掌握如何使用 Eino 进行 Graph 编排:

package main

import (
  "context"
  "os"

  "github.com/cloudwego/eino-ext/components/model/openai"
  "github.com/cloudwego/eino/components/prompt"
  "github.com/cloudwego/eino/compose"
  "github.com/cloudwego/eino/schema"
)

func main() {

  ctx := context.Background()

  // 初始化大模型
  model, _ := openai.NewChatModel(ctx, &openai.ChatModelConfig{
    BaseURL: os.Getenv("OPENAI_BASE_URL"),
    APIKey:  os.Getenv("OPENAI_API_KEY"),
    Model:   "gpt-4o",
  })

  // 创建模板,使用 FString 格式
  template := prompt.FromMessages(schema.FString,
    // 系统消息模板
    schema.SystemMessage("你是一个翻译专家,擅长{from}和{to}之间的互译,如果用户输入的是{from}将其翻译成{to},如果用户输入的是{to}将其翻译成{from}"),
    // 用户消息模板
    schema.UserMessage("用户输入: {question}"),
  )

  // 构造图
  graph := compose.NewGraph[map[string]any, *schema.Message]()

  _ = graph.AddChatTemplateNode("template", template)
  _ = graph.AddChatModelNode("model", model)

  _ = graph.AddEdge(compose.START, "template")
  _ = graph.AddEdge("template", "model")
  _ = graph.AddEdge("model", compose.END)

  result, err := graph.Compile(ctx)
  if err != nil {
    panic(err)
  }

  // 调用图
  output, _ := result.Invoke(ctx, map[string]any{
    "from":     "中文",
    "to":       "法语",
    "question": "你好,世界!",
  })
  println(output.Content)
}

我们首先通过 openai.NewChatModel 创建了一个 ChatModel 组件,再通过 prompt.FromMessages 创建了一个 ChatTemplate 组件,然后使用 Graph API 将这两个组件连起来,构造成一个简单的图,如下:

simple-graph.png

这个图非常简单,实际上是一个链,感兴趣的同学也可以使用 Chain API 来实现它。

在编排图时,我们需要特别注意每个组件的输入和输出类型,比如:

  • ChatTemplate 的入参是 map[string]any,出参是 []*schema.Message
  • ChatModel 的入参是 []*schema.Message,出参是 ChatModel

ChatTemplate 的出参刚好可以对上 ChatModel 的入参,因此两个组件可以连起来,如果组件之间的参数不一致,需要在中间插入 Lambda 组件自定义参数转换,如果不匹配,在 Compile 编译时就会报错;此外,图的入参就是第一个节点的入参,图的出参就是最后一个节点的出参,使用 compose.NewGraph 创建图时需要明确的指定图的入参和出参类型,比如这里的入参为 map[string]any,出参为 *schema.Message,在调用 Invoke 时传入适当的参数类型。

关于编排的更多细节,可以参考官方的文档:

未完待续

本文我们顺着 Coze Studio 的代码,从智能体的执行入口 StreamExecute 开始,了解了其核心是基于 Eino 框架构建的一个智能体图。为了更好地理解这个图,我们快速学习了 Eino 框架的核心概念,包括其模块化设计、丰富的组件生态以及强大的编排能力。最后,通过一个简单的例子,我们掌握了如何使用 Eino 的组件和 Graph API 构建一个基本的大模型应用。

本文对智能体的图构建流程只是粗略带过,在下一篇文章中,我们将回头深入分析 Coze Studio 中那个复杂的智能体图,详细拆解其中的每一个节点和逻辑分支,彻底搞懂其运行细节。


学习 Coze Studio 的智能体会话接口

前面我们已经学习了 Coze Studio 的代码架构,对项目整体有了一个大致的了解。今天,我们将深入智能体执行的核心,研究下用户在和智能体对话时,后端服务是如何处理会话请求的。

接口层实现

我们首先来看下智能体会话的接口层实现。正如上一篇所述,Coze Studio 中的接口均通过 IDL 定义,位于 idl/conversation/agentrun_service.thrift 文件:

service AgentRunService {
  run.AgentRunResponse AgentRun(1: run.AgentRunRequest request) (
    api.post='/api/conversation/chat', 
    api.category="conversation", 
    api.gen_path= "agent_run"
  )
  run.ChatV3Response ChatV3(1: run.ChatV3Request request)(
    api.post = "/v3/chat",
    api.category="chat",
    api.tag="openapi",
    api.gen_path="chat"
  )
}

这里还有一个 /v3/chat 接口,它是正式发布的智能体对外的 API 接口,其实现和 /api/conversation/chat 基本一致,这里不作过多介绍。

然后使用 hz 工具将 IDL 自动生成 API 处理器,位于 backend/api/handler/coze/agent_run_service.go 文件:

// @router /api/conversation/chat [POST]
func AgentRun(ctx context.Context, c *app.RequestContext) {
    
  // 绑定并校验入参
  var req run.AgentRunRequest
  c.BindAndValidate(&req)
  
  // 新建 SSE 发送器
  sseSender := sseImpl.NewSSESender(sse.NewStream(c))
  c.SetStatusCode(http.StatusOK)
  c.Response.Header.Set("X-Accel-Buffering", "no")

  // 调用 conversation 应用服务
  conversation.ConversationSVC.Run(ctx, sseSender, &req)
}

API 接口层没有什么复杂的逻辑,主要是绑定并校验入参,然后调用应用层。

这里值得一提是 X-Accel-Buffering 响应头的使用,这是一个特殊的 HTTP 响应头,主要用于控制反向代理(如 Nginx)的缓冲行为,当它的值被设置为 no 时,表示告诉代理服务器不要对当前响应进行缓冲。如果没有这个设置,Nginx 等代理可能会缓冲响应内容直到缓冲区填满或响应完成,这会导致客户端无法实时获取数据,产生延迟感。这个设置通常用于需要 流式传输 的场景,比如:实时日志输出、大型文件下载、SSE 服务器推送等。

应用层实现

接着再来看下应用层的实现,位于 backend/application/conversation/agent_run.go 文件:

func (c *ConversationApplicationService) Run(ctx context.Context, sseSender *sseImpl.SSenderImpl, ar *run.AgentRunRequest) error {

  // 从当前会话中获取用户 ID
  userID := ctxutil.MustGetUIDFromCtx(ctx)

  // 如果 RegenMessageID > 0 说明是重新生成,将对应的运行记录和消息删除
  if ar.RegenMessageID != nil && ptr.From(ar.RegenMessageID) > 0 {
    msgMeta, err := c.MessageDomainSVC.GetByID(ctx, ptr.From(ar.RegenMessageID))
    if msgMeta != nil {
      err = c.AgentRunDomainSVC.Delete(ctx, []int64{msgMeta.RunID})
      delErr := c.MessageDomainSVC.Delete(ctx, &msgEntity.DeleteMeta{
        RunIDs: []int64{msgMeta.RunID},
      })
    }
  }

  // 查询 Agent 信息
  agentInfo, caErr := c.checkAgent(ctx, ar)

  // 获取当前会话,如果不存在,则创建新会话
  conversationData, ccErr := c.checkConversation(ctx, ar, userID)

  // 获取快捷指令
  var shortcutCmd *cmdEntity.ShortcutCmd
  if ar.GetShortcutCmdID() > 0 {
    cmdID := ar.GetShortcutCmdID()
    cmdMeta, err := c.ShortcutDomainSVC.GetByCmdID(ctx, cmdID, 0)
    shortcutCmd = cmdMeta
  }

  // 构造智能体运行参数
  arr, err := c.buildAgentRunRequest(ctx, ar, userID, agentInfo.SpaceID, conversationData, shortcutCmd)
  
  // 调用 agent_run 领域服务
  streamer, err := c.AgentRunDomainSVC.AgentRun(ctx, arr)
  
  // 从 streamer 拉取消息,根据消息类型构建对应的响应体,通过 sseSender 发送出去,实现流式输出
  c.pullStream(ctx, sseSender, streamer, ar)
  return nil
}

为了方便表述,我对原代码的顺序做了一些调整,主要包括三大块逻辑:

  1. 处理用户的重新生成请求:一般来说只会在最后一轮对话上出现重新生成按钮,实现逻辑就是将对应的运行记录和消息删除,然后继续正常的会话即可;
  2. 构造智能体运行参数:查询必要的信息,包括 Agent 信息、会话信息、快捷指令等,构造智能体运行参数;
  3. 运行智能体:调用 agent_run 领域服务,并通过 SSE 实现流式输出;

从代码可以看出,应用层整合了多个领域层服务,这里的每一个领域基本上都对应一个数据库表:

  • 消息领域服务(MessageDomainSVC):对应数据库中的 message 表,一条消息代表用户的一次提问或助手的一次回答,一次工具调用或一次工具响应;
  • 智能体运行领域服务(AgentRunDomainSVC):对应 run_record 表,代表一次用户和助手之间的所有交互,一条运行记录包含多条消息;
  • 单智能体领域服务(SingleAgentDomainSVC):对应 single_agent_draft 表,代表我们创建的智能体;
  • 会话领域服务(ConversationDomainSVC):对应 conversation 表,当你和智能体第一次交互时会自动创建一个会话,后续的交互都在这个会话下,一个会话包含多条运行记录;
  • 快捷指令领域服务(ShortcutDomainSVC):对应 shortcut_command 表,开启快捷指令后会在对话输入框上方出现快捷输入按钮,方便用户快速发起预设对话;快捷指令和正常的流程有些区别,因此需要特殊处理;

Coze Studio 使用 GORM 访问数据库,这是一个功能强大的 Go 语言 ORM(对象关系映射)库,它简化了 Go 程序与数据库的交互,提供了优雅的 API 和丰富的功能:

gorm.png

感兴趣的同学可以在每个领域的 internal/dal 下找到 modelquery 两个目录,这些都是通过 gorm.io/gen 自动生成的,包含对数据库表的增删改查代码:

backend/domain/agent/singleagent/internal/dal
├── model
│   ├── single_agent_draft.gen.go
│   ├── single_agent_publish.gen.go
│   └── single_agent_version.gen.go
├── query
│   ├── gen.go
│   ├── single_agent_draft.gen.go
│   ├── single_agent_publish.gen.go
│   └── single_agent_version.gen.go

领域层实现

上面的 Run() 方法中,最核心的一句是调用领域层的 AgentRun() 方法,其实现位于 backend/domain/conversation/agentrun/service/agent_run_impl.go 文件中,如下:

func (c *runImpl) AgentRun(ctx context.Context, arm *entity.AgentRunMeta) (*schema.StreamReader[*entity.AgentRunResponse], error) {

  // 新建一个容量 20 的双向管道
  sr, sw := schema.Pipe[*entity.AgentRunResponse](20)

  // 将 StreamWriter 传入 c.run 方法
  safego.Go(ctx, func() {
    defer sw.Close()
    _ = c.run(ctx, sw, rtDependence)
  })
  // 将 StreamReader 返回上层,供应用层读取
  return sr, nil
}

这里的逻辑比较简单,主要是通过 schema.Pipe 创建一个双向管道,用于上层读取消息和下层写入消息,然后使用 safego.Go 调用 c.run() 方法。safego.Go 是对原生的 go 的一层包装,它的主要作用是创建一个 goroutine 来执行传入的函数,并在 goroutine 中添加了错误恢复机制,确保在 goroutine 中发生的 panic 会被捕获和处理,而不会导致整个程序崩溃:

func Go(ctx context.Context, fn func()) {
  go func() {
    defer goutil.Recovery(ctx)

    fn()
  }()
}

调用的 c.run() 方法如下:

func (c *runImpl) run(ctx context.Context, sw *schema.StreamWriter[*entity.AgentRunResponse], rtDependence *runtimeDependence) (err error) {

  // 获取智能体信息
  agentInfo, err := c.handlerAgent(ctx, rtDependence)
  rtDependence.agentInfo = agentInfo
  
  // 获取最近 N 轮历史对话
  history, err := c.handlerHistory(ctx, rtDependence)
  
  // 创建一条新的运行记录 `run_record`
  runRecord, err := c.createRunRecord(ctx, sw, rtDependence)
  rtDependence.runID = runRecord.ID

  // 创建一条新的用户消息 `message`
  input, err := c.handlerInput(ctx, sw, rtDependence)
  rtDependence.questionMsgID = input.ID
  
  // 流式执行智能体
  err = c.handlerStreamExecute(ctx, sw, history, input, rtDependence)
  return
}

这里仍然是一系列数据库的操作,包括查询智能体信息、获取历史对话、创建运行记录、创建用户消息等,不过需要注意的是,这些基本上都是对其他领域的操作,Coze Studio 在这里引入了跨领域防腐层,防止领域之间的直接依赖。至此,整个对话接口的流程图如下所示:

agent-run-flow.png

到这里,终于走到了智能体会话接口的最末端,通过调用 crossagent 跨领域服务,执行逻辑从 agent_run 领域进入 single_agent 领域,正式开始执行智能体。

小结

今天,我们学习了 Coze Studio 智能体会话接口的完整后端处理流程,其代码严格遵循 DDD 的分层原则,从接口层、到应用层、到领域层、最后通过跨领域防腐层,将最终的执行任务交给了 single_agent 领域。至此,准备工作已经就绪,接下来,我们将进入 single_agent 领域,揭开智能体内部执行逻辑的神秘面纱。


学习 Coze Studio 的代码架构

经过几天的实战和学习,我们已经全面体验了 Coze Studio 从智能体、插件、工作流到知识库的各项核心功能。今天,我们开始研究下它的源码,看看这些功能背后的实现原理。

项目架构

Coze Studio 的架构设计严格遵循 领域驱动设计(DDD) 的核心原则进行构建,我们可以看下它的整体项目结构:

├── backend/              # 后端服务
│   ├── api/              # API 处理器和路由
│   ├── application/      # 应用层,组合领域对象和基础设施实现
│   ├── conf/             # 配置文件
│   ├── crossdomain/      # 跨领域防腐层
│   ├── domain/           # 领域层,包含核心业务逻辑
│   ├── infra/            # 基础设施实现层
│   ├── pkg/              # 无外部依赖的工具方法
│   └── types/            # 类型定义
├── common/               # 公共组件
├── docker/               # Docker 配置
├── frontend/             # 前端应用
│   ├── apps/             # 应用程序
│   ├── config/           # 配置文件
│   ├── infra/            # 基础设施
│   └── packages/         # 包
├── idl/                  # 接口定义语言文件

我们主要关注 backend 目录下的内容,从子目录的名称可以很明显看出是 DDD 的分层架构:

  • API 层(api:实现 HTTP 端点,使用 Hertz 服务器处理请求和响应,包含中间件组件;
  • 应用层(application:​组合各种领域对象和基础设施实现,提供 API 服务;
  • 领域层(domain:包含核心业务逻辑,定义领域实体和值对象,实现业务规则和工作流;
  • 跨领域防腐层(crossdomain:​定义跨领域接口,防止领域间直接依赖;
  • 基础设施层(infra:又分为契约层和实现层;契约层(contract ​定义所有外部依赖的接口,作为领域逻辑和基础设施之间的边界,包括存储系统、缓存机制、消息队列、配置管理等接口;实现层(impl 为契约层定义的接口提供具体的实现;
  • 工具包(pkg:无外部依赖的工具方法,可以被任何层直接使用;

领域驱动设计

领域驱动设计(Domain-Driven Design,简称 DDD) 是一种针对复杂业务系统的软件开发方法论,它的核心思想是 以业务领域为中心,软件的设计和实现都围绕业务领域的核心概念、规则和流程展开,而非单纯技术架构。通过抽象业务领域中的实体、关系和规则,构建 领域模型(Domain Model),并将模型映射为代码,使代码既能反映业务逻辑,又能被业务人员理解。

Eric Evans 在 2004 年出版了《领域驱动设计》一书,提出了经典的 DDD 4 层架构:

ddd-4-layers.png

我们可以在 backend/domain 目录下找到 Coze Studio 的所有领域模型:

├── agent           # 智能体,只有单智能体
├── app             # 应用
├── connector       # 连接器,Chat SDK 或 API
├── conversation    # 会话
├── datacopy        # 数据复制任务
├── knowledge       # 知识库
├── memory          # 记忆,包括数据库和变量
├── openauth        # 认证
├── permission      # 权限
├── plugin          # 插件和工具
├── prompt          # 提示词
├── search          # 搜索
├── shortcutcmd     # 快捷指令
├── template        # 模板
├── upload          # 一些默认图标的常量
├── user            # 用户
└── workflow        # 工作流

这里除了领域模型,还定义了对应的实体、值对象、聚合和领域服务等核心领域对象:

  • 实体(Entity):有唯一标识、状态可变的对象,其身份比属性更重要;
  • 值对象(Value Object):无唯一标识、不可变的对象,由属性定义(属性相同则视为相等);
  • 聚合(Aggregate):一组紧密关联的实体和值对象的集合,通过 聚合根(Aggregate Root) 对外暴露接口,保证数据一致性;
  • 领域服务(Domain Service):封装跨实体/聚合的业务逻辑,无法归属到单个实体时使用;
  • 领域事件(Domain Event):领域中发生的重要事件,用于解耦跨上下文的业务流程;
  • 仓储(Repository):封装数据持久化逻辑,为领域模型提供数据访问接口(屏蔽数据库细节);
  • 限界上下文(Bounded Context):领域模型的边界,每个上下文内有独立的模型和通用语言,上下文间通过接口通信;限界上下文可作为微服务拆分的依据,每个微服务对应一个或多个限界上下文,降低服务间耦合;

在 DDD 开发中,往往还会引入了一个跨领域防腐层(Anti-Corruption Layer,简称 ACL),它通过隔离领域间的直接依赖,防止领域间出现耦合,确保各领域的独立性。Coze Studio 也使用了该设计模式,将跨领域模型定义在 backend/crossdomain 目录下:

├── contract
│   ├── crossagent
│   ├── crossagentrun
│   ├── crossconnector
│   ├── crossconversation
│   ├── crossdatabase
│   ├── crossdatacopy
│   ├── crossknowledge
│   ├── crossmessage
│   ├── crossplugin
│   ├── crosssearch
│   ├── crossuser
│   ├── crossvariables
│   └── crossworkflow
├── impl
│   ├── ...
└── workflow
    ├── ...

跨领域其实就是对不同领域之间的调用增加了一层适配层,比如会话领域中的 agentrun 在调用智能体领域时,不是直接调用 agent,而是调用防腐层 crossagent,这样做的好处是当智能体领域发生变化时,会话领域可以不受影响。

基础设施层

当领域模型构建完成后,接着就可以实现基础设施层,包括实现具体的数据库持久化逻辑,集成外部服务 API 和基础设施,实现消息队列的事件发布和订阅机制等;我们可以在 backend/infra 目录下看到这些,一般将基础设施层分为契约层和实现层,因为大多数基础设施都有多种不同的实现:

├── contract
│   ├── cache         # 缓存,默认基于 Redis 实现
│   ├── chatmodel     # 对话模型,比如 OpenAI、ARK、DeepSeek 等
│   ├── coderunner    # 代码执行器,比如 Python、JavaScript 等
│   ├── document      # 文档相关,包括文档解析、文档检索、重排序、图片理解、OCR、NL2SQL 等
│   ├── dynconf       # 动态配置,比如 Zookeeper、Etcd、Nacos 等
│   ├── embedding     # 嵌入模型,包括 OpenAI、ARK 和 HTTP 三种实现
│   ├── es            # Elasticsearch 增删改查,针对不同的 ES 版本有不同的实现
│   ├── eventbus      # 事件总线,包括 Kafka、NSQ、RMQ 等实现
│   ├── idgen         # ID 生成器
│   ├── imagex        # 火山引擎的 veImageX 服务
│   ├── messages2query # 问题改写
│   ├── modelmgr       # 模型管理器
│   ├── orm            # 对象关系映射,默认使用 GORM 框架
│   ├── rdb            # 关系型数据库,默认使用 MySQL 数据库
│   ├── sqlparser      # SQL 解析器
│   ├── sse            # 服务器发送事件,默认使用 Hertz 的 SSE 实现
│   └── storage        # 存储服务,比如 Minio、S3、TOS 等
└── impl
    ├── 同契约层

接口层实现

领域层的上面是应用层和接口层。应用层通过组合领域对象和基础设施,实现具体的业务用例;接口层则将具体的业务功能包装成 HTTP 接口,供前端或 SDK 调用。

Coze Studio 使用字节自家开源的 Hertz 框架来实现接口层。这是一个使用 Golang 编写的 HTTP 框架,具有高易用性、高性能、高扩展性等特点,它的设计参考了 fasthttpginecho 等开源框架,并结合字节内部的需求,目前在字节内部已被广泛使用。

Hertz 包括服务端和客户端,提供了路由、多协议、多网络库的支持,内置常用中间件,并集成了日志、监控、服务注册发现等三方扩展,框架图如下所示:

hertz.png

为了更好地理解 Coze Studio 的代码,我们不妨快速熟悉下 Hertz 的使用。首先,创建 hertz_demo 文件夹,然后进入该目录,创建 main.go 文件,内容如下:

package main

import (
  "context"

  "github.com/cloudwego/hertz/pkg/app"
  "github.com/cloudwego/hertz/pkg/app/server"
  "github.com/cloudwego/hertz/pkg/common/utils"
  "github.com/cloudwego/hertz/pkg/protocol/consts"
)

func main() {
  h := server.Default(server.WithHostPorts(":9999"))

  h.GET("/ping", func(ctx context.Context, c *app.RequestContext) {
    c.JSON(consts.StatusOK, utils.H{"message": "pong"})
  })

  h.Spin()
}

使用 go mod init 命令生成 go.mod 文件:

$ go mod init hertz_demo

再使用 go mod tidy 命令整理并拉取依赖:

$ go mod tidy

最后使用 go run 启动服务:

$ go run hertz_demo

如果看到类似下面这样的日志,则说明服务已启动成功:

2025/08/05 07:12:36.758686 engine.go:681: [Debug] HERTZ: Method=GET    absolutePath=/ping --> handlerName=main.main.func1 (num=2 handlers)
2025/08/05 07:12:36.759819 engine.go:417: [Info] HERTZ: Using network library=netpoll
2025/08/05 07:12:36.760315 transport.go:149: [Info] HERTZ: HTTP server listening on address=[::]:9999

使用 curl 对接口进行测试:

$ curl http://localhost:9999/ping
{"message":"pong"}

这样一个简单的基于 Hertz 的 Web 服务就开发好了,如果想对 Hertz 做深入学习,可参考官方文档:

hz 代码生成

在上面的演示中,我们创建并编写 main.go 文件是从零开始的,其实,Hertz 还提供了一个 hz 命令行工具,可以快速生成 Hertz 项目的脚手架。

在安装 hz 之前,首先确保 GOPATH 环境变量已经被正确的定义,并且将 $GOPATH/bin 添加到 PATH 环境变量之中:

export GOPATH=$HOME/go
export PATH=$GOPATH/bin:$PATH

然后就可以通过下面的命令安装 hz

$ go install github.com/cloudwego/hertz/cmd/hz@latest

运行 hz -v 验证是否安装成功:

$ hz -v
hz version v0.9.7

如果能正常显示版本号,则说明 hz 已成功安装。接下来,我们使用 hz 来生成一个 Hertz 项目。首先,创建 hz_demo 文件夹,然后进入该目录,执行如下命令:

$ hz new -module hz_demo

该命令会生成如下目录结构:

├── biz
│   ├── handler
│   │   └── ping.go
│   └── router
│       └── register.go
├── build.sh
├── go.mod
├── main.go
├── router.go
├── router_gen.go
└── script
    └── bootstrap.sh

仔细对比 Coze Studio 的 backend 目录结构,可以发现两者几无二致,基本上可以确定,Coze Studio 的 backend 模块也是使用 hz 自动生成的。

接着安装依赖:

$ go mod tidy

并启动服务:

$ go run hz_demo

接口定义语言

hz 的另一大特点是,它可以基于 接口定义语言(Interface Definition Language,简称 IDL) 生成 Hertz 项目的脚手架。IDL 是一种中立的、跨语言的规范,用于描述软件组件之间的接口(如数据结构、函数、服务定义等),它不依赖于特定编程语言,而是通过统一的语法定义接口契约,再由工具生成不同语言的代码(如 C++、Java、Python、Golang 等),在分布式系统中,不同服务可能使用不同语言开发,通过 IDL 可确保数据格式和交互方式一致。

Thrift 和 Protobuf 是两种主流的 IDL 实现,均用于跨语言数据序列化和服务通信,广泛应用于分布式系统:

  • Thrift 是由 Facebook 开发的开源 IDL 框架,后捐给 Apache 基金会,支持数据序列化和 RPC 服务开发;
  • Protobuf 是 Google 开发的开源 IDL 框架,专注于高效的数据序列化,常与 gRPC 配合实现 RPC 通信;

hz 对 Thrift 和 Protobuf 两种 IDL 都提供了支持,但是在使用之前,需要安装相应的编译器:thriftgoprotoc,这里以 Thrift 为例,使用下面的命令安装 thriftgo 编译器:

$ GO111MODULE=on go install github.com/cloudwego/thriftgo@latest

然后我们创建一个新目录 idl_demo,并新建一个 idl/hello.thrift 文件:

// idl/hello.thrift
namespace go hello.example

struct HelloReq {
  1: string Name (api.query="name"); // 添加 api 注解为方便进行参数绑定
}

struct HelloResp {
  1: string RespBody;
}

service HelloService {
  HelloResp HelloMethod(1: HelloReq request) (api.get="/hello");
}

这个文件声明了 hello.example 命名空间,并定义了 HelloReqHelloResp 两个结构体,分别对应 HelloService 服务中 HelloMethod 接口的请求和响应,同时还定义了该接口为 GET 请求,地址为 /hello

再通过下面的命令生成项目脚手架:

$ hz new -module idl_demo \
    -idl idl/hello.thrift \
    -handler_dir api/handler \
    -router_dir api/router \
    -model_dir api/model

其中 -handler_dir-router_dir-model_dir 用于将对应的目录生成到 api 目录下,而不是默认的 biz 目录,这和 Coze Studio 的代码做法一致。新生成的目录结构如下:

├── api
│   ├── handler
│   │   ├── hello
│   │   │   └── example
│   │   │       └── hello_service.go
│   │   └── ping.go
│   ├── model
│   │   └── hello
│   │       └── example
│   │           └── hello.go
│   └── router
│       ├── hello
│       │   └── example
│       │       ├── hello.go
│       │       └── middleware.go
│       └── register.go
├── build.sh
├── go.mod
├── idl
│   └── hello.thrift
├── main.go
├── router.go
├── router_gen.go
└── script
    └── bootstrap.sh

HelloService 服务的实现位于 api/handler 目录下,hello/example 对应 idl 文件中的命名空间,我们可以打开 hello_service.go 对其进行编辑:

// HelloMethod .
// @router /hello [GET]
func HelloMethod(ctx context.Context, c *app.RequestContext) {
  var err error
  var req example.HelloReq
  err = c.BindAndValidate(&req)
  if err != nil {
    c.String(consts.StatusBadRequest, err.Error())
    return
  }

  resp := new(example.HelloResp)
  resp.RespBody = "hello, " + req.Name  // <-- 新增代码
  c.JSON(consts.StatusOK, resp)
}

接着和上面一样,安装依赖,启动服务:

$ go mod tidy
$ go run hz_demo

使用 curl 验证通过 IDL 定义的 /hello 接口是否能正常调用:

$ curl "http://localhost:9999/hello?name=zhangsan"
{"RespBody":"hello, zhangsan"}

Coze Studio 的源码中有一个 idl 目录,里面包含大量的 Thrift 文件,定义了平台所有接口和结构体,backend/api 目录下的 handlerroutermodel 就是基于这些 IDL 通过 hz 自动生成的。

小结

今天,我们对 Coze Studio 的代码架构做了一番研究。首先学习了其基于领域驱动设计(DDD)的后端实现,掌握了领域层、跨领域防腐层、基础设施层等概念;然后通过实践 Hertz 框架、hz 命令行工具以及接口定义语言(IDL),理解了其 API 层的构建方式。通过今天的学习,相信大家对 Cozs Studio 的代码全貌有了一个直观的了解,在阅读 Coze Studio 源码时不至于迷路。

接下来,我们就深入到具体的业务实现里,看看它的智能体、插件、工作流、知识库以及记忆等核心功能是如何实现的。


实战 Coze Studio 知识库使用

昨天我们学习了 Coze Studio 的工作流功能,通过在可视化画布上拖拽节点,迅速编排和搭建复杂的工作流。今天,我们将继续探索 Coze 的另一个核心功能 —— 知识库,学习如何为智能体注入私有或专业领域的知识,解决大模型幻觉和专业领域知识不足的问题。

创建知识库

大模型虽然知识渊博,但其知识截止于训练日期,并且对于私有或专业领域的知识一无所知。知识库功能正是为了解决这个问题而生的,它通过 RAG 技术,让智能体能够基于我们提供的专属资料库来回答问题。

我们首先进入 “资源库” 页面,点击 “+ 资源” 并选择 “知识库”,创建知识库:

coze-kb-create.png

Coze Studio 提供了三种不同类型的知识库:

  • 文本格式:文本知识库支持用户上传 PDF、TXT、DOC、DOCX、MD 等格式的文件,解析和分片后存储于向量数据库;它基于内容片段进行检索和召回,大模型结合召回的内容生成最终内容回复,适用于知识问答等场景;
  • 表格格式:表格知识库支持用户上传 CSV 和 XLSX 文件,导入表格数据,按行进行划分;它基于索引列的匹配进行检索,同时也支持基于 NL2SQL 的查询和计算;
  • 照片类型:照片知识库支持用户上传 JPG、JPEG 和 PNG 图片,通过大模型对图片进行标注;它基于标注信息的相似度匹配,找到与用户问题最相关的图片,给大模型用于内容生成;

Coze Studio 的知识库功能依赖于 Embedding 服务和向量化存储组件。

其中 Embedding 服务支持三种接入方式:

  • OpenAI - 兼容 OpenAI 协议的 Embedding 接口
  • ARK - 火山引擎提供的 Embedding 服务
  • HTTP - 调用本地部署的模型服务,需要满足 Coze 自己的一套 接口协议,暂不支持 Ollama 或 Xinference 协议

这里我使用的是 OpenAI 的 text-embedding-3-small 模型,下面是 .env 配置示例,注意它的向量维度为 1536:

export EMBEDDING_TYPE="openai"
export OPENAI_EMBEDDING_BASE_URL=""
export OPENAI_EMBEDDING_MODEL="text-embedding-3-small"
export OPENAI_EMBEDDING_API_KEY="sk-xxx"
export OPENAI_EMBEDDING_BY_AZURE=false
export OPENAI_EMBEDDING_API_VERSION=""
export OPENAI_EMBEDDING_DIMS=1536
export OPENAI_EMBEDDING_REQUEST_DIMS=1536

另外,向量化存储组件支持开源向量数据库 Milvus 和火山向量数据库 VikingDB,默认使用 Milvus,一般不修改:

export VECTOR_STORE_TYPE="milvus"
export MILVUS_ADDR="localhost:19530"

如果你想切换成 VikingDB,可以使用下面的配置:

export VECTOR_STORE_TYPE="vikingdb"
export VIKING_DB_HOST=""
export VIKING_DB_REGION=""
export VIKING_DB_AK=""
export VIKING_DB_SK=""
export VIKING_DB_SCHEME=""
export VIKING_DB_MODEL_NAME=""

由于修改的是环境变量,通过 --force-recreate 重启服务:

$ docker compose --profile '*' up -d --force-recreate --no-deps coze-server

文本知识库

创建知识库时选择 “文本格式”,并填写知识库名称和描述,导入类型选 “本地文档”,然后点击 “创建并导入” 按钮,进入 “新增知识库” 的 “上传” 页面:

coze-kb-create-2.png

如果导入类型选 “自定义”,则进入 “新增知识库” 的 “文本填写” 页面,支持用户手动录入文档名称和文档内容。

我们上传一个简单的 PDF 文件(可以包含文本、表格、图片等元素),进入 “创建设置” 页面:

coze-kb-create-3.png

和其他的 RAG 系统类似,我们需要为知识库配置 文档解析策略分段策略。Coze Studio 支持两种文档解析策略:

  • 精准解析:支持从文档中提取图片、表格等元素,需要耗费更长的时间;支持设置过滤内容,可以将一些特定的页面排除掉;
  • 快速解析:不会对文档提取图像、表格等元素,适用于纯文本;

如果上传的文件是扫描件,往往需要开启 OCR 功能,Coze Studio 目前只支持火山引擎的 通用 OCR 服务,可以免费开通试用:

volcengine-ocr.png

开通后创建密钥,获取 AK 和 SK,然后将其填到 .env 配置文件中:

export OCR_TYPE="ve"
export VE_OCR_AK="AK..."
export VE_OCR_SK="SK..."

同样的,由于修改的是环境变量,通过 --force-recreate 重启服务:

$ docker compose --profile '*' up -d --force-recreate --no-deps coze-server

如果上传的文件是纯文本,选择快速解析即可。

Coze Studio 也支持两种分段策略:

  • 自动分段与清洗:使用内置的分段与预处理规则;
  • 自定义:使用用户自定义的分段规则、分段长度与预处理规则;

配置好文档解析策略和分段策略后,点击 “下一步”,进入分段预览页面:

coze-kb-create-4.png

如果预览没问题的话,再次点击 “下一步”,系统开始对文件进行处理,等处理结束后,我们的文本知识库就构建完成了:

coze-kb-create-done.png

表格知识库

创建知识库时选择 “表格格式”,并填写知识库名称和描述,导入类型选 “本地文档”,然后点击 “创建并导入” 按钮,进入 “新增知识库” 的 “上传” 页面:

coze-kb-create-table-2.png

我们上传一个简单的 Excel 文件(必须有明确的表头行和数据行),进入 “表结构配置” 页面:

coze-kb-create-table-3.png

在这里可以切换数据表,一个表格数据库只能对应一个数据表;然后选择表头所在行以及数据起始行,再为表格的每一列添加描述和数据类型;并选择一个或多个列作为索引,用于和用户的问题进行相似度匹配;然后继续 “下一步”,进入预览页面:

coze-kb-create-table-4.png

预览没问题的话,再次点击 “下一步”,系统开始对表格进行处理,等处理结束后,我们的表格知识库就构建完成了。

照片知识库

创建知识库时选择 “照片类型”,并填写知识库名称和描述,导入类型选 “本地文档”,然后点击 “创建并导入” 按钮,进入 “新增知识库” 的 “上传” 页面:

coze-kb-create-image-2.png

我们上传几张图片,进入 “标注设置” 页面:

coze-kb-create-image-3.png

Coze Studio 支持两种标注图片的方式:

  • 智能标注:通过大模型深度理解图片,自动提供全面详细的内容描述信息;
  • 人工标注:不执行处理,在图片导入完成后,手动添加图片描述;

为了使用智能标注功能,我们还需要配置 AI 生成模型,可以通过下面的参数配置:

export BUILTIN_CM_TYPE="openai"

支持 OpenAI、ARK、DeepSeek、Ollama、Qwen、Gemini 等模型,不同模型的配置有所区别,对于 OpenAI 配置如下:

export BUILTIN_CM_OPENAI_BASE_URL=""
export BUILTIN_CM_OPENAI_API_KEY="sk-xxx"
export BUILTIN_CM_OPENAI_BY_AZURE=false
export BUILTIN_CM_OPENAI_MODEL="gpt-4o"

注意:AI 生成模型不仅用于图像标注(Image Annotation),还用于文本转 SQL(NL2SQL)、一句话生成 Query(Message to Query)等其他场景。如果需要在不同场景中使用不同模型,你可以通过添加前缀来针对特定场景应用特定配置,比如 IM_BUILTIN_CM_TYPENL2SQL_BUILTIN_CM_TYPEM2Q_BUILTIN_CM_TYPE

配置完成后记得通过 --force-recreate 重启服务。

点击 “下一步”,如果是选择了人工标注,系统不会做任何处理直接结束;如果是选择了智能标注,则通过 AI 生成模型对图片进行处理,等处理结束后,我们的照片知识库也就构建完成了:

coze-kb-create-image-done.jpg

大模型为每张图片生成一句话描述,可以点击图片进行查看和编辑:

coze-kb-create-image-done-2.png

使用知识库

知识库解析完成后,不需要发布,我们直接就可以在创建智能体时,将知识库添加到文本、表格或照片等知识中。智能体可以检索文本:

coze-kb-qa.png

可以检索图片:

coze-kb-qa-2.png

也可以检索表格:

coze-kb-qa-3.png

创建数据库

在 Coze Studio 的资源库中,还有一种资源和知识库中的表格非常像,那就是数据库:

coze-db.png

数据库可以作为智能体的记忆使用,实现收藏夹、todo list、书籍管理、财务管理等功能,也可以作为工作流中的节点使用。我们不妨拿高德的城市编码表来测试一下,它这个 Excel 表格包含 citynameadcodecitycode 三列:

coze-db-excel.png

因此我们需要在创建数据库时,定义好对应的表结构:

coze-db-table.png

然后进入数据库的测试数据或线上数据页面导入数据,线上数据是应用程序在实际运行时产生的数据,测试数据主要用于辅助调试,与线上数据是隔离的:

coze-db-table-data.png

不知道大家还记不记得,我们昨天创建了一个 “高德天气查询” 工作流,其中通过代码组件将城市名称转换为高德城市编码,在代码组件中,我们将 Excel 中的所有数据保存到列表中,代码写得既冗余,又不好维护,而现在我们将 Excel 中的数据保存到数据库表中,那么就可以直接查询数据库来简化这个过程了。

对于数据库的查询操作,可以通过 “查询数据” 或 “SQL 自定义” 两个组件来实现,不过我测试下来发现,使用 “查询数据” 组件,城市名称怎么都模糊匹配不了,不清楚具体原因,后面阅读源码的时候再研究一下,有清楚的朋友欢迎评论区留言。这里先用 “SQL 自定义” 组件实现,编写 SQL 语句从数据库表中查询对应的城市记录:

select * from amap_city_code where LOCATE({{input}}, cityname) > 0

注意,我这里使用 LOCATE 函数来实现模糊匹配的功能,这和 LIKE 语句差不多,那么为什么不用 LIKE 呢?这是因为 Coze Studio 的 SQL 中使用 {{input}} 占位符来替换变量,但是替换时会自动用引号引起来,如果我们写 LIKE '%{{input}}%' 会被替换成 LIKE '%'城市名'%',导致 SQL 查询报错,因此不得不找个偏门的法子。

配置好的节点如下图所示:

coze-workflow-sql.png

该节点会返回 outputList 结构化的表格数据,因此代码节点的 Python 代码可以精简:

async def main(args: Args) -> Output:
  outputList = args.params['outputList']
  city_id = outputList[0]['adcode'] if outputList else '110000'
  ret: Output = {
    'city_id': city_id
  }
  return ret

优化后的工作流如下:

coze-workflow.png

在这个工作流中,不仅代码更加简洁,而且城市编码表存放在数据库中,更容易维护。

小结

我们今天主要学习了 Coze Studio 的知识库功能,学习了如何创建和配置文本、表格和照片三种不同类型的知识库,了解了其背后的 Embedding、向量存储、OCR、图片标注等配置。此外,我们还学习了数据库功能,通过一个实际案例,将昨天创建的 “高德天气查询” 工作流进行了优化,利用数据库替代了硬编码的城市列表,使代码更简洁、维护更容易。

至此,我们已经全面体验了 Coze Studio 从智能体、插件、工作流到知识库的各项核心功能。可以看出,相对于官方的扣子平台,开源版阉割了不少功能,比如:

  • 创建智能体里少了 对话流模式多 Agents 模式,少了 提示词对比调试自动优化提示词 功能;编排里技能少了 触发器异步任务,记忆少了 长期记忆文件盒子,对话体验少了 音视频语音输入 等选项;调试智能体时少了 调试详情 功能;
  • 创建应用里少了整个 用户界面 模块,不能搭建低代码应用;
  • 资源库里少了 对话流卡片音色 等类型的资源;工作流中的组件也不完全,少了 图像处理音视频处理会话管理消息管理 等组件;插件工具的创建方式少了 代码插件端侧插件;创建文本知识库时少了 在线数据飞书公众号Notion 等导入类型,表格知识库少了 API飞书 等导入类型;
  • 工作空间里还少了 发布管理模型管理效果评测 等模块,而且也没有 团队空间 的功能;

除了这些,还有一些产品细节,就不一一列举了。尽管如此,Coze Studio 的几个核心功能基本上都开源了,而且随着开源社区的不断贡献,相信这些缺失的模块很快就能补上。在接下来的学习中,我们将深入这几个核心功能的源码,研究下它们背后的实现原理。


实战 Coze Studio 工作流搭建

昨天我们初步探索了 Coze Studio 的智能体和插件功能,体验了如何快速构建并扩展 AI 应用。今天,我们将更进一步,深入学习 Coze Studio 的核心功能之一 —— 工作流(Workflow),它是构建复杂、强大、专业的智能体的关键。

工作流介绍

如果说插件是智能体的 “单兵武器”,那么工作流就是将这些武器组合起来的 “战术编排”。工作流允许我们通过可视化的方式,将大模型、插件、代码、逻辑判断等节点串联起来,形成一个自动化的任务处理流程。Coze Studio 内置了很多处理节点可供选用:

coze-workflow-nodes.png

这些组件大致可分为 6 大类,每个组件的功能汇总如下:

  • 基础节点

    • 大模型:调用大语言模型,使用变量和提示词生成回复;
    • 插件:调用插件运行指定工具;
    • 工作流:调用其他已发布的工作流,实现工作流嵌套工作流的效果;
  • 业务逻辑

    • 代码:支持通过编写代码,处理输入变量来生成返回值;
    • 选择器:连接多个下游分支,若设定的条件成立则仅运行对应的分支,若均不成立则只运行 “否则” 分支;
    • 循环:通过设定循环次数和逻辑,重复执行一系列任务;
    • 意图识别:通过大模型来识别用户输入的意图,实现类似选择器的效果;
    • 批处理:通过设定批量运行次数和逻辑,运行批处理体内的任务,和循环节点的区别在于,循环是按顺序逐个处理,批处理则是按批并行处理,运行效率更高;
    • 变量聚合:对多个分支的输出进行聚合处理;
  • 输入&输出

    • 输入:类似于开始节点,支持中间过程的信息输入;
    • 输出:类似于结束节点,支持中间过程的消息输出,支持流式和非流式两种方式;
  • 数据库

    • SQL 自定义:基于用户自定义的 SQL 完成对数据库的增删改查操作;
    • 更新数据:修改表中已存在的数据记录,用户指定更新条件和内容来更新数据;
    • 查询数据:从表获取数据,用户可定义查询条件、选择列等,输出符合条件的数据;
    • 删除数据:从表中删除数据记录,用户指定删除条件来删除符合条件的记录;
    • 新增数据:向表添加新数据记录,用户输入数据内容后插入数据库;
  • 知识库&数据

    • 知识库检索:在选定的知识中,根据输入变量召回最匹配的信息,并以列表形式返回;
    • 知识库写入:可以向文档类型的知识库写入知识片段,仅可以添加一个知识库;
    • 变量赋值:用于应用变量或用户变量的赋值;
  • 组件

    • 文本处理:用于处理多个字符串类型变量的格式,包括字符串拼接和字符串分割;
    • 问答:支持中间向用户提问问题,支持预置选项提问和开放式问题提问两种方式;
    • HTTP 请求:用于发送 API 请求,从接口返回数据;

创建工作流

下面我们来创建一个简单的工作流,假设我们现在要对接 高德的天气查询接口

amap-weather-api.png

这个接口就没有昨天那么直接了,该接口的 city 参数是城市编码,并不是城市名称,然而大模型是不知道每个城市的具体编码的,如果直接将该接口配置在插件里,调用必然会失败。这个时候我们就可以创建一个工作流,先将用户输入的城市名自动转换为城市编码,然后再调用高德的天气查询接口。

首先进入 “资源库” 页面,点击 “+ 资源” 并选择 “工作流”;然后为工作流命名,例如 “amap_weather”,工作流名称只能是英文;接着进入可视化编排画布:

coze-workflow-canvas.png

所有工作流都由一个开始节点和一个结束节点构成,我们先从开始节点配起,开始节点用于定义工作流的输入,这里我们添加 cityextensions 两个字符串类型的参数,用于接收用户想要查询的城市名称和气象类型;

coze-workflow-start-node.png

接着在开始节点后面加一个代码节点,为其定义输入参数 city_name 和输出参数 city_id,注意 city_name 参数设置成引用开始节点的 city 参数,然后编写一段 Python 代码将城市名称转成城市编码:

coze-workflow-code-node.png

代码内容如下:

city_list = [
  "北京市,110000",
  "天津市,120000",
  "石家庄市,130100",
  # ...
]
async def main(args: Args) -> Output:
  city_name = args.params['city_name']
  city_id = '110000'
  for city_info in city_list:
    city_info = city_info.split(',')
    if city_name in city_info[0]:
      city_id = city_info[1]
      break
  ret: Output = {
    'city_id': city_id
  }
  return ret

这里的 city_list 为城市编码表,可以从高德官网下载。然后点击上面的小三角测试该节点:

coze-workflow-code-node-debug.png

验证没问题后,接着再在代码节点后面加一个 HTTP 请求节点,配置如下参数:

  • 接口方式 - GET
  • API 地址 - https://restapi.amap.com/v3/weather/weatherInfo
  • 请求参数

    • key - 填写高德 API KEY,可以从高德开放平台免费申请
    • city - 引用自代码节点的 city_id 参数
    • extensions - 引用自开始节点的 extensions 参数
    • output - 填写 json 表示希望接口返回 JSON 格式

配置好的 HTTP 请求节点如下:

coze-workflow-http-node.png

仍然还是点击小三角测试:

coze-workflow-http-node-debug.png

测试通过后,将 HTTP 请求节点和结束节点连接起来,并在结束节点上添加 output 参数,引用 HTTP 请求节点的出参:

coze-workflow-end-node.png

结束节点有两种返回方式:

  • 返回变量:工作流运行结束后会以 JSON 格式输出所有返回参数,智能体在对话中触发工作流后,会自动总结 JSON 格式的内容,并以自然语言回复用户;
  • 返回文本:工作流运行结束后,智能体将直接使用指定的内容回复对话;

这里选择 “返回变量” 方式,至此,整个工作流搭建完成,可以点击下方的 “试运行” 按钮,对整个工作流进行测试:

coze-workflow-amap-weather.png

使用工作流

这样一个简单的工作流就创建好了,发布之后,我们就可以在创建智能体时,像添加插件一样,将这个工作流添加到智能体的技能中。当用户提问时,如果命中了这个工作流的意图,智能体就会自动执行整个流程:

coze-amap-weather-agent.png

注意,结束节点不要选 “返回文本”,可能导致工作流正常调用,但智能体却显示 “运行中止” 这样的情况。

代码沙箱配置

在上面的工作流示例中,代码节点默认会在服务所处的 Python 虚拟环境中直接运行,进入 Coze 容器,可以看到默认的 Python 版本和已安装的依赖:

docker-python-env.png

如果你有隔离需求,可以开启沙箱功能,沙箱为我们提供了一个安全隔离的代码执行环境。沙箱配置位于 docker/.env 文件:

# 支持 `local` 和 `sandbox` 两种类型,默认 `local`
export CODE_RUNNER_TYPE="sandbox"

Coze Studio 的沙箱功能通过 Deno + Pyodide 实现的。Pyodide 是一个将 Python 运行时编译为 WebAssembly 的项目,使 Python 能够直接在 JavaScript 环境中运行,比如浏览器,无需后端服务器支持。它由 Mozilla 开发并维护,旨在将 Python 的科学计算生态系统带到 Web 平台。

pyodide.png

Pyodide 非常适合作为代码沙箱环境,因为它在设计上就具备良好的安全性,可以有效限制代码的执行范围和权限,我们可以通过下面这些环境来限制它:

# 允许访问的环境变量,使用逗号分割,比如 `PATH,USERNAME`
export CODE_RUNNER_ALLOW_ENV=""
# 允许读取的路径,使用逗号分割,比如 `/tmp,./data`
export CODE_RUNNER_ALLOW_READ=""
# 允许写入的路径,使用逗号分割,比如 `/tmp,./data`
export CODE_RUNNER_ALLOW_WRITE=""
# 允许执行的子进程,比如 `python,git`
export CODE_RUNNER_ALLOW_RUN=""
# 允许访问的网络地址,域名或 IP
# pyodide 通过下面的 CDN 地址下载必须的依赖包,用于运行 Python 代码,最好不要删
export CODE_RUNNER_ALLOW_NET="cdn.jsdelivr.net"
# 允许访问的外部函数接口(Foreign Function Interface),例如 `/usr/lib/libm.so`
export CODE_RUNNER_ALLOW_FFI=""
# node_modules 目录位置,默认当前目录
export CODE_RUNNER_NODE_MODULES_DIR=""
# 超时时间,默认 60 秒
export CODE_RUNNER_TIMEOUT_SECONDS=""
# 内存限制,默认 100M
export CODE_RUNNER_MEMORY_LIMIT_MB=""

修改环境变量配置之后,通过下面的命令重启 Coze 服务生效:

$ docker compose --profile '*' up -d --force-recreate --no-deps coze-server

注意,由于修改了环境变量,这里用的 --force-recreate 参数强制容器重新创建,而不是重启。

小结

今天,我们深入学习了 Coze Studio 的工作流功能,我们不仅了解了工作流的基本概念和其丰富的节点类型,还通过一个实际的天气查询案例,动手创建了一个包含代码和 HTTP 请求的完整工作流。此外,我们还探讨了如何将工作流集成到智能体中,以及如何配置代码沙箱以确保执行安全。

接下来,我们将继续探索 Coze Studio 的另一个强大功能 —— 知识库,学习如何为我们的智能体注入专属知识,让它变得更加专业和智能。


实战 Coze Studio 智能体开发

昨天,我们成功在本地部署了 Coze Studio,并配置好了大模型。今天,我们将正式开始 Coze 智能体的探索之旅,学习如何利用其强大的可视化编排能力,创建并发布我们自己的 AI 智能体。

创建智能体

首先进入 “项目开发” 页面,点击 “创建” 按钮,Coze 支持创建两种不同的项目:

coze-create-agent.png

我们选择 “创建智能体”,然后为智能体取一个好听的名字,比如 “翻译小能手”:

coze-create-agent-name.png

接着,进入智能体的配置页面:

coze-create-agent-detail.png

整个配置页面可分为两大块:左侧为 编排 区域,右侧为 预览与调试 区域。在编排区域里,可以对智能体进行以下配置:

  • 人设与回复逻辑:指定模型的角色、设计回复的语言风格、限制模型的回答范围,让对话更符合用户预期;
  • 技能:为智能体配置各种扩展能力,让其可以调用外部插件或用户自定义工作流;
  • 知识:为智能体提供特定领域的知识背景,包括文本、表格和图片,让其可以回答领域内的问题;
  • 记忆:变量用来存储动态变化的信息,数据库用来管理和处理结构化数据,这两种方式都为智能体提供了一定的记忆能力;
  • 对话体验:开场白是用户进入智能体后自动展示的引导信息;用户问题建议用于在智能体回复后,根据对话内容推荐 3 条相关问题;快捷指令是对话输入框上方的按钮,方便用户快速发起预设对话;背景图片为智能体的对话窗口增加背景,增加对话沉浸感。

这些高级配置项我们暂时不管,对于 “翻译小能手”,我们只需要配置其人设与回复逻辑即可:

你是一个翻译助手,你的任务是将用户输入翻译成其他的语言,
如果用户输入是中文,翻译成英文,如果用户输入是英文,则翻译成中文。

一个简单的翻译智能体就制作完成了:

coze-translate-agent.png

另外,我们也可以打开 “探索” - “模版” 页面,Coze Studio 内置了两个智能体模版:英语聊天和导购陪练,都是纯提示词实现的,感兴趣的可以复制到工作空间看看:

coze-agent-template.png

自定义插件

单纯靠提示词实现的智能体能力非常有限,它无法连接和访问外部工具,我们可以为智能体添加一些技能,比如联网搜索、科学计算或绘制图片,扩展智能体的能力。上一节提到,在 Coze Studio 中,技能分为插件和工作流两种,下面我们先来学习下如何配置和使用插件。

这一节,我们将创建一个带技能的智能体,比如调用 “天气查询” 接口回答用户关于天气的问题。首先进入 “资源库” 页面,点击 “+ 资源” 按钮:

coze-add-resource.png

Coze 这里提供了五种不同类型的资源:

  • 插件:插件是一个工具集,一个插件内可以包含一个或多个工具,即 API 接口;
  • 工作流:通过在可视化画布上拖拽节点迅速搭建工作流,实现特定的业务逻辑;
  • 知识库:用户在这里上传外部知识内容,包括文本、表格和图片,解决大模型幻觉、专业领域知识不足的问题;
  • 提示词:可以将常用的提示词放在这里统一管理,在创建智能体或配置工作流中的大模型节点时可快速引用;
  • 数据库:管理和处理结构化数据,用户可通过自然语言插入、查询、修改或删除数据库中的数据;

我们这里选择创建插件:

coze-create-plugin.png

填写插件名称和描述,创建方式选 “云侧插件 - 基于已有服务创建”,插件 URL 填 https://query.asilu.com,该站点提供了不少免费而简单的 API 接口,用来测试智能体再合适不过。

注意插件的 URL 是根路径,插件下的所有工具都在这个 URL 之下。

Header 列表不用动,如果你的接口需要一些特别的请求头,可以在这里设置;授权方式选 “不需要授权”,除此之外,Coze Studio 还支持另两种授权方式:

  • Service 认证:通常是指一种简化的认证方式,其中 API 调用需要某种秘钥或令牌来验证其合法性,这种秘钥可能会通过查询参数或请求头传递;
  • OAuth 认证:这是一个开放标准,允许第三方应用在不共享用户密码的情况下访问用户账户的特定资源;

点击确定后进入插件页面,然后创建一个 get_weather 工具:

coze-create-plugin-tool.png

这个天气查询接口非常简单,只有一个 city 参数,如下:

因此我们需要配置三个地方:

  • 工具路径:/weather/baidu
  • 请求方法:Get 方法
  • 输入参数:只有一个 “city”,参数描述 “城市名”,参数类型 “String”,传入方法 “Query”

输出参数可以不用手工配,点击 “自动解析” 弹出一个调试页面,输入参数后只要接口能正常访问,Coze 会自动根据接口的返回值将输出参数配置好:

coze-create-plugin-tool-output.png

配置结束后,点击右上角的 “试运行” 对接口进行测试,测试完成后,这个插件就可以发布了。

然后,我们再创建一个 “天气助手” 智能体,将 get_weather 工具添加到它的技能里,这个智能体就可以为我们查询天气了:

coze-weather-agent.png

配置内置插件

其实,Coze Studio 提供了两种插件类型,即自定义插件和官方内置插件。官方内置插件由后台统一配置,支持所有用户使用;而自定义插件的使用范围为当前工作空间。可以打开 “探索” - “插件” 页面,查看所有官方内置插件:

coze-plugin-list.png

可以看到内置的插件还是蛮丰富的,一共 18 个,但是要注意的是,有些插件上带有 “未授权” 的标签,这表示此插件需要授权,这些插件我们还不能使用,需要配置授权信息。

所有内置的插件位于 backend/conf/plugin/pluginproduct 目录:

$ tree backend/conf/plugin/pluginproduct
backend/conf/plugin/pluginproduct
├── bocha_search.yaml # 博查搜索
├── chestnut_sign.yaml # 板栗看板
├── gaode_map.yaml # 高德地图
├── image_compression.yaml # 图片压缩
├── lark_authentication_authorization.yaml # 飞书认证及授权
├── lark_base.yaml # 飞书多维表格
├── lark_calendar.yaml # 飞书日历
├── lark_docx.yaml # 飞书云文档
├── lark_message.yaml # 飞书消息
├── lark_sheet.yaml # 飞书电子表格
├── lark_task.yaml # 飞书任务
├── lark_wiki.yaml # 飞书知识库
├── library_search.yaml # 文库搜索
├── maker_smart_design.yaml # 创客贴智能设计
├── plugin_meta.yaml # 所有插件的元配置,包括授权信息
├── sky_eye_check.yaml # 天眼查
├── sohu_hot_news.yaml # 搜狐热闻
├── wolfram_alpha.yaml # Wolfram Alpha
└── worth_buying.yaml # 什么值得买

其中 plugin_meta.yaml 为所有插件的元配置,授权信息就是配在这个文件中。我们以 “博查搜索” 为例,在该文件中找到对应的配置块:

coze-plugin-meta.png

根据提示,访问 博查 AI 开放平台

bocha-home.png

博查搜索是国内为数不多的一家搜索服务提供商,虽然不是免费,但价格确实很便宜,一次调用仅 3 分钱,为国外同类产品的 1/3。点击 “控制台” 注册、充值、并创建 API KEY,将其配置在 payload 中的 service_token 字段:

  auth:
    type: service_http
    key: Authorization
    sub_type: token/api_key
    # service token apply to https://open.bochaai.com/
    payload: '{"key": "Authorization", "service_token": "sk-xxx", "location": "Header"}'

这里的 auth.type 就对应上一节我们创建插件时选择的 “授权方式”,支持 service_httpoauth 两种,具体的授权信息位于 payload 里。关于插件配置和授权配置的详细解释,可以参考官方文档:

配置好插件授权后,重启 Coze 服务:

$ docker compose --profile "*" restart coze-server

此时 “博查搜索” 插件就可以正常使用了。我们可以创建一个 “搜索专家” 智能体,让它调用搜索引擎来回答用户的问题:

coze-search-agent.png

小结

今天,我们正式开启了 Coze Studio 的智能体开发之旅。我们从最基础的提示词工程开始,创建了一个简单的 “翻译小能手” 智能体,体验了仅通过人设与回复逻辑就能快速构建应用的过程。

接着,我们深入学习了如何通过插件扩展智能体的能力。我们一步步创建自定义的 “天气查询” 插件,并通过它实现了 “天气助手” 智能体;另外,还学习了如何配置和使用 Coze Studio 内置的插件,然后使用 “博查搜索” 插件,让智能体能够连接互联网获取实时信息。

通过今天的实战,我们掌握了 Coze Studio 中智能体和插件的基本用法,在后面的学习中,我们将继续探索工作流、知识库等更高级的功能,敬请期待。


Coze Studio 快速上手指南

相信大家最近都被 Coze 开源的新闻刷屏了吧,作为国内智能体平台的扛把子,字节的这一波操作确实让人猝不及防。Coze 是国内最早一批做智能体平台的,它的很多功能做得确实很不错,用户体验也很赞;但是,随着今年深度搜索和深度研究等概念的兴起,智能体平台的定位有点尴尬,字节本身也把精力投到 扣子空间 这个产品上了,这估计也是字节选择开源 Coze 的一个原因吧。

本次开源包括 Coze Studio(扣子开发平台)Coze Loop(扣子罗盘) 两个核心项目,并采用了 Apache 2.0 许可协议,这意味着开发者可以自由修改甚至闭源商用,这让那些其他的智能体开发平台,比如 Dify,瞬间就不香了。

我也算是 Coze 的老用户了,一直关注着它的最新动态,只要一有新功能推出,我总是第一时间去体验,然后和小伙伴们讨论,猜猜它后端可能是怎么实现的。现在开源了,这不得好好研究下它的代码嘛。

Coze Studio 介绍

根据官网的介绍,Coze Studio 源自 扣子开发平台,是一个一站式 AI 智能体开发平台,通过 Coze Studio 提供的可视化设计与编排工具,开发者可以通过零代码或低代码的方式,快速打造和调试智能体、应用和工作流,实现强大的 AI 应用开发和更多定制化业务逻辑。

coze-logo.png

它的核心功能包括:

  • 模型服务:管理模型列表,可接入 OpenAI、火山方舟等在线或离线模型服务;
  • 搭建智能体:编排、发布、管理智能体,支持配置工作流、知识库等资源;
  • 搭建应用:创建、发布应用,通过工作流搭建业务逻辑;
  • 搭建工作流:创建、修改、发布、删除工作流;
  • 开发资源:支持创建并管理以下资源:插件、知识库、数据库、提示词;
  • API 与 SDK:创建会话、发起对话等 OpenAPI,通过 Chat SDK 将智能体或应用集成到自己的应用;

本地部署

Coze 提供了项目所需的所有镜像,使用 Docker Compose 可以快速进行部署。我们首先克隆源码:

$ git clone https://github.com/coze-dev/coze-studio.git

进入 docker 目录:

$ cd coze-studio/docker

这个目录下的 docker-compose.yml 文件定义了部署 Coze 包含的各个组件,包括:

  • coze-mysql - MySQL 结构化数据存储
  • coze-redis - Redis 缓存
  • coze-elasticsearch - Elasticsearch 存储
  • coze-minio - Minio 对象存储
  • coze-milvus - Milvus 向量数据库
  • coze-etcd - Milvus 依赖 etcd 管理元数据
  • coze-server - Coze 后端服务

Coze 默认使用 NSQ 作为消息中间件服务,这是一个基于 Go 语言编写的内存分布式消息中间件,包括下面三个组件:

  • coze-nsqd - 负责接收、排队和向客户端投递消息,处理消息收发和队列维护的组件
  • coze-nsqlookupd - 管理拓扑信息的守护进程,相当于中心管理服务和服务发现组件
  • coze-nsqadmin - 一个 Web UI,用于实时查看聚合的集群统计信息,并执行各种管理任务

此外,有些组件在部署时需要初始化一些数据,Coze 通过下面这三个组件来初始化:

  • coze-minio-setup - 导入图标类的资源文件
  • coze-mysql-setup-schema - 初始化 MySQL 表结构,使用 Atlas 工具根据 HCL 文件创建数据库表结构
  • coze-mysql-setup-init-sql - 初始化 MySQL 表结构,导入初始化数据,和 coze-mysql-setup-schema 的区别是,这个服务使用 MySQL 原生客户端执行 SQL 脚本文件

为什么有两个初始化 MySQL 的服务?可能是为了兼容不同的部署方式,或者是逐步从传统 SQL 到 Atlas 迁移升级。

这个目录下还有一个 .env.example 文件,里面包含大量的项目配置,我们需要将其复制一份出来,另存为 .env 文件:

$ cp .env.example .env

如果你希望修改数据库用户名或密码之类的,可以编辑这个文件,默认情况下不用动。直接 docker compose 启动即可:

$ docker compose --profile "*" up -d

等待所有容器启动完毕:

coze-docker-up.png

其中 coze-minio-setupcoze-mysql-setup-schemacoze-mysql-setup-init-sql 这几个容器完成初始化任务后就退出了,因此处于 Exited 状态,是正常现象。

如果一切顺利,通过浏览器访问 http://localhost:8888/ 即可进入 Coze Studio 页面:

coze-login.png

输入邮箱和密码,点击注册,进入工作空间:

coze-space.png

模型配置

不过这个时候我们还无法使用平台功能,比如创建智能体,会报如下错误:

coze-create-agent-fail.png

Coze Studio 是一款基于大模型的 AI 应用开发平台,因此我们还必须配置模型服务。模型配置为 YAML 文件,统一放在 backend/conf/model 目录中,可以存在多个,每个文件对应一个可访问的模型。Coze Studio 支持常见的模型服务,如 OpenAI、DeepSeek、豆包等,为方便开发者快速配置,Coze Studio 在 backend/conf/model/template 目录下提供了常见模型的模板文件:

$ tree backend/conf/model/template 
backend/conf/model/template
├── model_template_ark.yaml
├── model_template_ark_doubao-1.5-lite.yaml
├── model_template_ark_doubao-1.5-pro-256k.yaml
├── model_template_ark_doubao-1.5-pro-32k.yaml
├── model_template_ark_doubao-1.5-thinking-pro.yaml
├── model_template_ark_doubao-1.5-thinking-vision-pro.yaml
├── model_template_ark_doubao-1.5-vision-lite.yaml
├── model_template_ark_doubao-1.5-vision-pro.yaml
├── model_template_ark_doubao-seed-1.6-flash.yaml
├── model_template_ark_doubao-seed-1.6-thinking.yaml
├── model_template_ark_doubao-seed-1.6.yaml
├── model_template_ark_volc_deepseek-r1.yaml
├── model_template_ark_volc_deepseek-v3.yaml
├── model_template_basic.yaml
├── model_template_claude.yaml
├── model_template_deepseek.yaml
├── model_template_gemini.yaml
├── model_template_ollama.yaml
├── model_template_openai.yaml
└── model_template_qwen.yaml

可以看到,除了字节自家的豆包(ARK 表示 火山方舟,豆包系列的大模型都支持),Coze Studio 也内置了 OpenAI、DeepSeek、Claude、Ollama、Qwen、Gemini 等模型的支持。

最近,国产开源模型大爆发,从月之暗面的 Kimi K2、阿里的 Qwen3 到智谱的 GLM 4.5,模型效果一个比一个好。这些模型在魔搭也都上线了:

modelscope.jpg

其中,Qwen3 和 GLM 4.5 都提供了推理 API 可以免费调用,每天 2000 次额度,我们不妨用 Qwen3 来测试一下。魔搭的推理 API 兼容 OpenAI 接口协议,因此我们这里使用 OpenAI 模版,将其复制到 backend/conf/model 目录:

$ cp backend/conf/model/template/model_template_openai.yaml backend/conf/model/model_modelscope_qwen3_coder.yaml

模版文件对各个参数已经有了比较详细的解释,一般来说,大多数参数都不用动,只需要关注其中几个重要参数即可:

  • id - 模型 ID,由开发者自行定义,必须是非 0 的整数,且全局唯一,模型 ID 定下来之后最好就不要改了;
  • name - 模型在平台上展示的名称;
  • description - 模型在平台上展示的简介,分中英文;
  • default_parameters - 模型默认参数,包括 temperaturemax_tokenstop_pfrequency_penaltypresence_penaltyresponse_format 等,基本上不用动;
  • meta.capability - 模型具备的能力,根据实际情况配置,比如是否支持 function call,是否支持 json mode,是否支持 reasoning,是否支持多模态,等;
  • meta.conn_config.base_url - 模型服务的接口地址,如果使用的不是 OpenAI 官方接口,可以在这里配置;比如这里我使用魔搭的推理 API 接口;
  • meta.conn_config.api_key - 模型服务的 API Key;注册魔搭平台后,在 “账号设置” - “访问令牌” 页面创建;
  • meta.conn_config.model - 模型名,不同厂商的命名规则可能不一样;比如这里我使用 Qwen3-Coder 来测试,在魔搭上的模型名为 Qwen/Qwen3-Coder-480B-A35B-Instruct
  • meta.conn_config.openai - OpenAI 专属配置,这里将 by_azure 设置为 false

更多参数介绍,请参考官方的 模型配置文档

修改后的配置文件内容如下:

coze-model-conf.png

然后执行以下命令重启 Coze 服务,使配置生效:

$ docker compose --profile "*" restart coze-server

再次点击创建智能体,此时就可以成功创建了,模型下拉列表可以看到我们配置的模型服务:

coze-create-agent-model.png

在右边的 “预览与调试” 对话框聊上两句,测试下模型服务是否正常:

coze-create-agent-success.png

小结

本文作为 Coze Studio 的快速上手指南,我们首先介绍了 Coze 开源的背景及其核心功能,然后详细讲解了如何使用 Docker Compose 在本地环境中完成部署,并以 OpenAI 兼容模型为例,演示了如何配置和验证模型服务。通过这些步骤,我们成功搭建了一个可用的 Coze Studio 本地开发环境。

至此,一切准备就绪,让我们一起开始 Coze 智能体的探索之旅吧!