Fork me on GitHub

2025年10月

重温 Java 21 之记录模式

记录模式(Record Patterns) 是对 记录类(Records) 这个特性的延伸,所以,我们先大致了解下什么是记录类,然后再来看看什么是记录模式。

什么是记录类(Records)?

记录类早在 Java 14 就已经引入了,它类似于 Tuple,提供了一种更简洁、更紧凑的方式来表示不可变数据,记录类经过三个版本的迭代(JEP 359JEP 384JEP 395),最终在 Java 16 中发布了正式版本。

记录类的概念在其他编程语言中其实早已有之,比如 Kotlin 的 Data class 或者 Scala 的 Case class。它本质上依然是一个类,只不过使用关键字 record 来定义:

record Point(int x, int y) { }

记录类的定义非常灵活,我们可以在单独文件中定义,也可以在类内部定义,甚至在函数内部定义。记录类的使用和普通类无异,使用 new 创建即可:

Point p1 = new Point(10, 20);
System.out.println("x = " + p1.x());
System.out.println("y = " + p1.y());
System.out.println("p1 is " + p1.toString());

记录类具备如下特点:

  • 它是一个 final 类;
  • 它不能继承其他类,也不能继承其他记录类;
  • 它的所有字段也是 final 的,所以一旦创建就不能修改;
  • 它内置实现了构造函数,函数参数就是所有的字段;
  • 它内置实现了所有字段的 getter 方法,没有 setter 方法;
  • 它内置实现了 equals()hashCode()toString() 函数;

所以上面的示例和下面的 Point 类是等价的:

public final class Point {
  final int x;
  final int y;

  public Point(int x, int y) {
    this.x = x;
    this.y = y;
  }

  public int x() {
    return x;
  }

  public int y() {
    return y;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    Point point = (Point) o;
    return x == point.x && y == point.y;
  }

  @Override
  public int hashCode() {
    return Objects.hash(x, y);
  }

  @Override
  public String toString() {
    return "Point{" +
        "x=" + x +
        ", y=" + y +
        '}';
  }
}

我们也可以在记录类中声明新的方法:

record Point(int x, int y) {
    boolean isOrigin() {
        return x == 0 && y == 0;
    }
}

记录类的很多特性和 Lombok 非常类似,比如下面通过 Lombok 的 @Value 注解创建一个不可变对象:

@Value
public class Point {
  int x;
  int y;
}

不过记录类和 Lombok 还是有一些区别的:

  • 根据 JEP 395 的描述,记录类是作为不可变数据的透明载体,也就是说记录类无法隐藏字段;然而,Lombok 允许我们修改字段名称和访问级别;
  • 记录类适合创建小型对象,当类中存在很多字段时,记录类会变得非常臃肿;使用 Lombok 的 @Builder 构建器模式可以写出更干净的代码;
  • 记录类只能创建不可变对象,而 Lombok 的 @Data 可以创建可变对象;
  • 记录类不支持继承,但是 Lombok 创建的类可以继承其他类或被其他类继承;

什么是记录模式(Record Patterns)?

相信很多人都写过类似下面这样的代码:

if (obj instanceof Integer) {
  int intValue = ((Integer) obj).intValue();
  System.out.println(intValue);
}

这段代码实际上做了三件事:

  • Test:测试 obj 的类型是否为 Integer
  • Conversion:将 obj 的类型转换为 Integer
  • Destructuring:从 Integer 类中提取出 int 值;

这三个步骤构成了一种通用的模式:测试并进行强制类型转换,这种模式被称为 模式匹配(Pattern Matching)。虽然简单,但是却很繁琐。Java 16 在 JEP 394 中正式发布了 instanceof 模式匹配 的特性,帮我们减少这种繁琐的条件状态提取:

if (obj instanceof Integer intValue) {
  System.out.println(intValue);
}

这里的 Integer intValue 被称为 类型模式(Type Patterns),其中 Integer 是匹配的断言,intValue 是匹配成功后的变量,这个变量可以直接使用,不需要再进行类型转换了。

匹配的断言也支持记录类:

if (obj instanceof Point p) {
  int x = p.x();
  int y = p.y();
  System.out.println(x + y);
}

不过,这里虽然测试和转换代码得到了简化,但是从记录类中提取值仍然不是很方便,我们还可以进一步简化这段代码:

if (obj instanceof Point(int x, int y)) {
  System.out.println(x + y);
}

这里的 Point(int x, int y) 就是 Java 21 中的 记录模式(Record Patterns),可以说它是 instanceof 模式匹配的一个特例,专门用于从记录类中提取数据;记录模式也经过了三个版本的迭代:JEP 405JEP 432JEP 440,现在终于在 Java 21 中发布了正式版本。

此外,记录模式还支持嵌套,我们可以在记录模式中嵌套另一个模式,假设有下面两个记录类:

record Address(String province, String city) {}
record Person(String name, Integer age, Address address) {}

我们可以一次性提取出外部记录和内部记录的值:

if (obj instanceof Person(String name, Integer age, Address(String province, String city))) {
  System.out.println("Name: " + name);
  System.out.println("Age: " + age);
  System.out.println("Address: " + province + " " + city);
}

仔细体会上面的代码,是不是非常优雅?

switch 模式匹配

学习了记录模式,我们再来看看 Java 21 中的另一个特性,它和上面学习的 instanceof 模式匹配 息息相关。

除了 instanceof 模式匹配,其实还有另一种模式匹配叫做 switch 模式匹配,这个特性经历了 JEP 406JEP 420JEP 427JEP 433JEP 441 五个版本的迭代,从 Java 17 开始首个预览版本到 Java 21 正式发布足足开发了 2 年时间。

在介绍这个功能之前,有一个前置知识点需要复习一下:在 Java 14 中发布了一个特性叫做 Switch Expressions,这个特性允许我们在 case 中使用 Lambda 表达式来简化 switch 语句的写法:

int result = switch (type) {
  case "child" -> 0;
  case "adult" -> 1;
  default -> -1;
};
System.out.println(result);

这种写法不仅省去了繁琐的 break 关键词,而且 switch 作为表达式可以直接赋值给一个变量。switch 模式匹配 则更进一步,允许我们在 case 语句中进行类型的测试和转换,下面是 switch 模式匹配的一个示例:

String formatted = switch (obj) {
  case Integer i -> String.format("int %d", i);
  case Long l    -> String.format("long %d", l);
  case Double d  -> String.format("double %f", d);
  case String s  -> String.format("string %s", s);
  default        -> "unknown";
};
System.out.println(formatted);

作为对比,如果不使用 switch 模式匹配,我们只能写出下面这样的面条式代码:

String formatted;
if (obj instanceof Integer i) {
  formatted = String.format("int %d", i);
} else if (obj instanceof Long l) {
  formatted = String.format("long %d", l);
} else if (obj instanceof Double d) {
  formatted = String.format("double %f", d);
} else if (obj instanceof String s) {
  formatted = String.format("string %s", s);
} else {
  formatted = "unknown";
}
System.out.println(formatted);

更多关于 switch 模式匹配的用法可以参考这篇文章:

小结

今天我们学习了 Java 21 中的 记录模式switch 模式匹配 两个特性。

记录模式(Record Patterns) 建立在记录类和模式匹配的基础之上,进一步简化了代码的书写:

  1. 记录类(Records) 是一种轻量级的、不可变的数据载体,从 Java 16 开始正式发布,相比于 Lombok 更加原生,避免了注解编译时的额外处理,代码更加透明;
  2. 模式匹配 将类型测试、转换和值提取这三个繁琐的步骤统一起来,使得代码更加简洁。从 Java 16 开始支持 instanceof 模式匹配,允许在条件语句中直接进行类型的测试和转换;
  3. 记录模式instanceof 模式匹配的特例,专门用于从记录类中提取数据,通过记录模式,我们可以一次性从记录对象中提取多个字段,使得代码更加直观易读;

switch 模式匹配 进一步扩展了模式匹配的应用场景,允许在 switch 表达式中进行类型的测试、转换和值的提取,相比于传统的 if-else 链式判断,代码结构更加清晰。


重温 Java 21 之分代式 ZGC

想要搞清楚 Java 21 中的 分代式 ZGC(Generational ZGC) 这个特性,我们需要先搞清楚什么是 ZGC。

ZGC 简介

ZGC(The Z Garbage Collector) 是由 Oracle 开发的一款垃圾回收器,最初在 Java 11 中以实验性功能推出,并经过几个版本的迭代,最终在 Java 15 中被宣布为 Production Ready,相比于其他的垃圾回收器,ZGC 更适用于大内存、低延迟服务的内存管理和回收。下图展示的是不同的垃圾回收器所专注的目标也各不相同:

gc-landscape.png

低延迟服务的最大敌人是 GC 停顿,所谓 GC 停顿指的是垃圾回收期间的 STW(Stop The World),当 STW 时,所有的应用线程全部暂停,等待 GC 结束后才能继续运行。要想实现低延迟,就要想办法减少 GC 的停顿时间,根据 JEP 333 的介绍,最初 ZGC 的目标是:

  • GC 停顿时间不超过 10ms;
  • 支持处理小到几百 MB,大到 TB 量级的堆;
  • 相对于使用 G1,应用吞吐量的降低不超过 15%;

经过几年的发展,目前 ZGC 的最大停顿时间已经优化到了不超过 1 毫秒(Sub-millisecond,亚毫秒级),且停顿时间不会随着堆的增大而增加,甚至不会随着 root-set 或 live-set 的增大而增加(通过 JEP 376 Concurrent Thread-Stack Processing 实现),支持处理最小 8MB,最大 16TB 的堆:

zgc-goals.png

ZGC 之所以能实现这么快的速度,不仅是因为它在算法上做了大量的优化和改进,而且还革命性的使用了大量的创新技术,包括:

  • Concurrent:全链路并发,ZGC 在整个垃圾回收阶段几乎全部实现了并发;
  • Region-based:和 G1 类似,ZGC 是一种基于区域的垃圾回收器;
  • Compacting:垃圾回收的过程中,ZGC 会产生内存碎片,所以要进行内存整理;
  • NUMA-aware:NUMA 全称 Non-Uniform Memory Access(非一致内存访问),是一种多内存访问技术,使用 NUMA,CPU 会访问离它最近的内存,提升读写效率;
  • Using colored pointers:染色指针是一种将数据存放在指针里的技术,ZGC 通过染色指针来标记对象,以及实现对象的多重视图;
  • Using load barriers:当应用程序从堆中读取对象引用时,JIT 会向应用代码中注入一小段代码,这就是读屏障;通过读屏障操作,不仅可以让应用线程帮助完成对象的标记(mark),而且当对象地址发生变化时,还能自动实现对象转移(relocate)和重映射(remap);

关于这些技术点,网上的参考资料有很多,有兴趣的同学可以通过本文的更多部分进一步学习,其中最有意思的莫过于 染色指针读屏障,下面重点介绍这两项。

染色指针

在 64 位的操作系统中,一个指针有 64 位,但是由于内存大小限制,其实有很多高阶位是用不上的,所以我们可以在指针的高阶位中嵌入一些元数据,这种在指针中存储元数据的技术就叫做 染色指针(Colored Pointers)。染色指针是 ZGC 的核心设计之一,以前的垃圾回收器都是使用对象头来标记对象,而 ZGC 则通过染色指针来标记对象。ZGC 将一个 64 位的指针划分成三个部分:

colored-pointers.png

其中,前面的 16 位暂时没用,预留给以后使用;后面的 44 位表示对象的地址,所以 ZGC 最大可以支持 2^44=16T 内存;中间的 4 位即染色位,分别是:

  • Finalizable:标识这个对象只能通过 Finalizer 才能访问;
  • Remapped:标识这个对象是否在转移集(Relocation Set)中;
  • Marked1:用于标记可到达的对象(活跃对象);
  • Marked0:用于标记可到达的对象(活跃对象);

此外,染色指针不仅用来标记对象,还可以实现对象地址的多重视图,上述 Marked0、Marked1、Remapped 三个染色位其实代表了三种地址视图,分别对应三个虚拟地址,这三个虚拟地址指向同一个物理地址,并且在同一时间,三个虚拟地址有且只有一个有效,整个视图映射关系如下:

zgc-mmap.png

这三个地址视图的切换是由垃圾回收的不同阶段触发的:

  • 初始化阶段:程序启动时,ZGC 完成初始化,整个堆内存空间的地址视图被设置为 Remapped;
  • 标记阶段:当进入标记阶段时,视图转变为 Marked0 或者 Marked1;
  • 转移阶段:从标记阶段结束进入转移阶段时,视图再次被设置为 Remapped;

读屏障

读屏障(Load Barriers) 是 ZGC 的另一项核心技术,当应用程序从堆中读取对象引用时,JIT 会向应用代码中注入一小段代码:

load-barriers.png

在上面的代码示例中,只有第一行是从堆中读取对象引用,所以只会在第一行后面注入代码,注入的代码类似于这样:

String n = person.name; // Loading an object reference from heap
if (n & bad_bit_mask) {
  slow_path(register_for(n), address_of(person.name));
}

这行代码虽然简单,但是用途却很大,在垃圾回收的不同阶段,触发的逻辑也有所不同:在标记阶段,通过读屏障操作,可以让应用线程帮助 GC 线程一起完成对象的标记或重映射;在转移阶段,如果对象地址发生变化,还能自动实现对象转移。

ZGC 工作流程

整个 ZGC 可以划分成下面六个阶段:

zgc-phases.png

其中有三个是 STW 阶段,尽管如此,但是 ZGC 对 STW 的停顿时间有着严格的要求,一般不会超过 1 毫秒。这六个阶段的前三个可以统称为 标记(Mark)阶段

  • Pause Mark Start - 标记开始阶段,将地址视图被设置成 Marked0 或 Marked1(交替设置);这个阶段会 STW,它只标记 GC Roots 直接可达的对象,GC Roots 类似于局部变量,通过它可以访问堆上其他对象,这样的对象不会太多,所以 STW 时间很短;
  • Concurrent Mark/Remap - 并发标记阶段,GC 线程和应用线程是并发执行的,在第一步的基础上,继续往下标记存活的对象;另外,这个阶段还会对上一个 GC 周期留下来的失效指针进行重映射修复;
  • Pause Mark End - 标记结束阶段,由于并发标记阶段应用线程还在运行,所以可能会修改对象的引用,导致漏标,这个阶段会标记这些漏标的对象;

ZGC 的后三个阶段统称为 转移(Relocation)阶段(也叫重定位阶段):

  • Concurrent Prepare for Relocate - 为转移阶段做准备,比如筛选所有可以被回收的页面,将垃圾比较多的页面作为接下来转移候选集(EC);
  • Pause Relocate Start - 转移开始阶段,将地址视图从 Marked0 或者 Marked1 调整为 Remapped,从 GC Roots 出发,遍历根对象的直接引用的对象,对这些对象进行转移;
  • Concurrent Relocate - 并发转移阶段,将之前选中的转移集中存活的对象移到新的页面,转移完成的页面即可被回收掉,并发转移完成之后整个 ZGC 周期完成。注意这里只转移了对象,并没有对失效指针进行重映射,ZGC 通过转发表存储旧地址到新地址的映射,如果这个阶段应用线程访问这些失效指针,会触发读屏障机制自动修复,对于没有访问到的失效指针,要到下一个 GC 周期的并发标记阶段才会被修复。

为什么要分代?

在 ZGC 面世之前,Java 内置的所有垃圾回收器都实现了分代回收(G1 是逻辑分代):

垃圾回收器(别名)用法说明
Serial GC、Serial Copying-XX:+UseSerialGC串行,用于年轻代,使用复制算法
Serial Old、MSC-XX:+UseSerialOldGC串行,用于老年代,使用标记-整理算法
ParNew GC-XX:+UseParNewGCSerial GC 的并行版本,用于年轻代,使用复制算法
Parallel GC、Parallel Scavenge-XX:+UseParallelGC并行,用于年轻代,使用复制算法
Parallel Old、Parallel Compacting-XX:+UseParallelOldGC并行,用于老年代,使用标记-整理算法
CMS、Concurrent Mark Sweep-XX:+UseConcMarkSweepGC并发,用于老年代,使用标记-清除算法
G1、Garbage First-XX:+UseG1GC并发,既可以用于年轻代,也可以用于老年代,使用复制 + 标记-整理算法,用来取代 CMS

这些分代回收器之间可以搭配使用,周志明老师在《深入理解 Java 虚拟机》这本书中总结了各种回收器之间的关系:

gc-pairs.png

其中,Serial + CMS 和 ParNew + Serial Old 这两个组件在 Java 9 之后已经被取消,而 CMS 与 Serial Old 之间的连线表示 CMS 在并发失败的时候(Concurrent Mode Failure)会切换成 Serial Old 备用方案。

分代的基本思想源自于 弱分代假说(Weak Generational Hypothesis),这个假说认为绝大部分对象都是朝生夕死的,也就是说年轻对象往往很快死去,而老对象往往会保留下来。根据这个假说,JVM 将内存区域划分为 年轻代(Young Generation)老年代(Old Generation),新生代又进一步划分为 伊甸园区(Eden)第一幸存区(S0)第二幸存区(S1)

伊甸园区用来分配新创建的对象,如果没有足够的空间,就会触发一次 年轻代 GC(Young GC,Minor GC) 来释放内存空间,这里一般使用 标记-复制(Mark-Copy) 算法,将存活的对象标记下来,然后复制到一个幸存区中;年轻代的内存空间一般较小,所以可以更频繁地触发 GC,清理掉那些朝生夕死的对象,从而提高应用程序的性能;如果 GC 后伊甸园区还没有足够的空间存放新创建的对象,或者幸存区中某个对象的存活时间超过一定的阈值,这时就会将对象分配到老年代,如果老年代的空间也满了,就会触发一次 老年代 GC(Old GC,Full GC);老年代的内存空间要大的多,而且其中的对象大部分是存活的,GC 发生的频率要小很多,所以不再使用标记-复制算法,而是采用移动对象的方式来实现内存碎片的整理。

但是在上面的 ZGC 的工作流程中,我们却没有看到分代的影子,这也就意味着每次 ZGC 都是对整个堆空间进行扫描,尽管 ZGC 的 STW 时间已经被优化到不到 1ms,但是其他几个阶段是和应用线程一起执行的,这势必会影响到应用程序的吞吐量。让 ZGC 支持分代是一项巨大的工程,开发团队足足花了三年时间才让我们有幸在 Java 21 中体验到这一令人激动的特性。

除了 ZGC,Java 11 之后还引入了一些新的垃圾回收器:

垃圾回收器用法说明
ZGC-XX:+UseZGC低延迟 GC,from JDK 11
Epsilon GC-XX:+UseEpsilonGCNo-op GC,什么都不做,用于测试,from JDK 11
Shenandoah-XX:+UseShenandoahGCCPU 密集型 GC,from JDK 12

ZGC 实践

使用 -XX:+PrintCommandLineFlags,可以打印出 Java 的默认命令行参数:

$ java -XX:+PrintCommandLineFlags -version
-XX:ConcGCThreads=1 -XX:G1ConcRefinementThreads=4 -XX:GCDrainStackTargetSize=64 -XX:InitialHeapSize=128639872 -XX:MarkStackSize=4194304 -XX:MaxHeapSize=2058237952 -XX:MinHeapSize=6815736 -XX:+PrintCommandLineFlags -XX:ReservedCodeCacheSize=251658240 -XX:+SegmentedCodeCache -XX:+UseCompressedOops -XX:+UseG1GC 
openjdk version "21" 2023-09-19
OpenJDK Runtime Environment (build 21+35-2513)
OpenJDK 64-Bit Server VM (build 21+35-2513, mixed mode, sharing)

从上面的结果可以看出,Java 21 默认使用的仍然是 G1 垃圾回收器,它从 Java 9 就开始做为默认垃圾回收器了。

注意:Java 8 中默认的垃圾回收器是 Parallel GC。

如果想开启 ZGC,我们需要加上 -XX:+UseZGC 参数:

$ java -XX:+UseZGC -Xmx100M -Xlog:gc ZgcTest.java

其中 -Xlog:gc 参数表示打印出 GC 过程中的日志(就是 Java 8 的 -XX:+PrintGC 参数),输出结果如下:

[0.157s][info][gc] Using The Z Garbage Collector
[0.420s][info][gc] GC(0) Garbage Collection (Warmup) 14M(14%)->12M(12%)
[0.472s][info][gc] GC(1) Garbage Collection (System.gc()) 18M(18%)->8M(8%)

也可以使用 -Xlog:gc* 参数打印出 GC 过程中的详细日志(就是 Java 8 的 -XX+PrintGCDetails 参数),输出结果如下:

$ java -XX:+UseZGC -Xmx100M -Xlog:gc* ZgcTest.java
[0.010s][info][gc,init] Initializing The Z Garbage Collector
[0.011s][info][gc,init] Version: 21+35-2513 (release)
[0.011s][info][gc,init] Using legacy single-generation mode
[0.011s][info][gc,init] Probing address space for the highest valid bit: 47
[0.011s][info][gc,init] NUMA Support: Disabled
[0.011s][info][gc,init] CPUs: 4 total, 4 available
[0.011s][info][gc,init] Memory: 7851M
[0.011s][info][gc,init] Large Page Support: Disabled
[0.011s][info][gc,init] GC Workers: 1 (dynamic)
[0.011s][info][gc,init] Address Space Type: Contiguous/Unrestricted/Complete
[0.011s][info][gc,init] Address Space Size: 1600M x 3 = 4800M
[0.011s][info][gc,init] Heap Backing File: /memfd:java_heap
[0.011s][info][gc,init] Heap Backing Filesystem: tmpfs (0x1021994)
[0.012s][info][gc,init] Min Capacity: 8M
[0.012s][info][gc,init] Initial Capacity: 100M
[0.012s][info][gc,init] Max Capacity: 100M
[0.012s][info][gc,init] Medium Page Size: N/A
[0.012s][info][gc,init] Pre-touch: Disabled
[0.012s][info][gc,init] Available space on backing filesystem: N/A
[0.014s][info][gc,init] Uncommit: Enabled
[0.014s][info][gc,init] Uncommit Delay: 300s
[0.134s][info][gc,init] Runtime Workers: 1
[0.134s][info][gc     ] Using The Z Garbage Collector
[0.149s][info][gc,metaspace] CDS archive(s) mapped at: [0x0000006800000000-0x0000006800cb0000-0x0000006800cb0000), size 13303808, SharedBaseAddress: 0x0000006800000000, ArchiveRelocationMode: 1.
[0.149s][info][gc,metaspace] Compressed class space mapped at: 0x0000006801000000-0x0000006841000000, reserved size: 1073741824
[0.149s][info][gc,metaspace] Narrow klass base: 0x0000006800000000, Narrow klass shift: 0, Narrow klass range: 0x100000000
[0.357s][info][gc,start    ] GC(0) Garbage Collection (Warmup)
[0.357s][info][gc,task     ] GC(0) Using 1 workers
[0.357s][info][gc,phases   ] GC(0) Pause Mark Start 0.007ms
[0.366s][info][gc,phases   ] GC(0) Concurrent Mark 8.442ms
[0.366s][info][gc,phases   ] GC(0) Pause Mark End 0.005ms
[0.366s][info][gc,phases   ] GC(0) Concurrent Mark Free 0.000ms
[0.367s][info][gc,phases   ] GC(0) Concurrent Process Non-Strong References 1.092ms
[0.367s][info][gc,phases   ] GC(0) Concurrent Reset Relocation Set 0.000ms
[0.373s][info][gc,phases   ] GC(0) Concurrent Select Relocation Set 5.587ms
[0.373s][info][gc,phases   ] GC(0) Pause Relocate Start 0.003ms
[0.375s][info][gc,phases   ] GC(0) Concurrent Relocate 2.239ms
[0.375s][info][gc,load     ] GC(0) Load: 0.65/0.79/0.63
[0.375s][info][gc,mmu      ] GC(0) MMU: 2ms/99.7%, 5ms/99.9%, 10ms/99.9%, 20ms/99.9%, 50ms/100.0%, 100ms/100.0%
[0.375s][info][gc,marking  ] GC(0) Mark: 1 stripe(s), 2 proactive flush(es), 1 terminate flush(es), 0 completion(s), 0 continuation(s) 
[0.375s][info][gc,marking  ] GC(0) Mark Stack Usage: 32M
[0.375s][info][gc,nmethod  ] GC(0) NMethods: 889 registered, 90 unregistered
[0.375s][info][gc,metaspace] GC(0) Metaspace: 8M used, 8M committed, 1088M reserved
[0.375s][info][gc,ref      ] GC(0) Soft: 142 encountered, 0 discovered, 0 enqueued
[0.375s][info][gc,ref      ] GC(0) Weak: 747 encountered, 602 discovered, 224 enqueued
[0.375s][info][gc,ref      ] GC(0) Final: 0 encountered, 0 discovered, 0 enqueued
[0.375s][info][gc,ref      ] GC(0) Phantom: 146 encountered, 144 discovered, 143 enqueued
[0.375s][info][gc,reloc    ] GC(0) Small Pages: 7 / 14M, Empty: 0M, Relocated: 3M, In-Place: 0
[0.375s][info][gc,reloc    ] GC(0) Large Pages: 1 / 2M, Empty: 0M, Relocated: 0M, In-Place: 0
[0.375s][info][gc,reloc    ] GC(0) Forwarding Usage: 1M
[0.375s][info][gc,heap     ] GC(0) Min Capacity: 8M(8%)
[0.375s][info][gc,heap     ] GC(0) Max Capacity: 100M(100%)
[0.375s][info][gc,heap     ] GC(0) Soft Max Capacity: 100M(100%)
[0.375s][info][gc,heap     ] GC(0)                Mark Start          Mark End        Relocate Start      Relocate End           High               Low         
[0.375s][info][gc,heap     ] GC(0)  Capacity:      100M (100%)        100M (100%)        100M (100%)        100M (100%)        100M (100%)        100M (100%)   
[0.375s][info][gc,heap     ] GC(0)      Free:       84M (84%)          82M (82%)          82M (82%)          88M (88%)          88M (88%)          78M (78%)    
[0.375s][info][gc,heap     ] GC(0)      Used:       16M (16%)          18M (18%)          18M (18%)          12M (12%)          22M (22%)          12M (12%)    
[0.375s][info][gc,heap     ] GC(0)      Live:         -                 6M (6%)            6M (6%)            6M (6%)             -                  -          
[0.375s][info][gc,heap     ] GC(0) Allocated:         -                 2M (2%)            2M (2%)            3M (4%)             -                  -          
[0.375s][info][gc,heap     ] GC(0)   Garbage:         -                 9M (10%)           9M (10%)           1M (2%)             -                  -          
[0.375s][info][gc,heap     ] GC(0) Reclaimed:         -                  -                 0M (0%)            7M (8%)             -                  -          
[0.375s][info][gc          ] GC(0) Garbage Collection (Warmup) 16M(16%)->12M(12%)
[0.403s][info][gc,start    ] GC(1) Garbage Collection (System.gc())
[0.403s][info][gc,task     ] GC(1) Using 1 workers
[0.403s][info][gc,phases   ] GC(1) Pause Mark Start 0.006ms
[0.410s][info][gc,phases   ] GC(1) Concurrent Mark 7.316ms
[0.410s][info][gc,phases   ] GC(1) Pause Mark End 0.006ms
[0.410s][info][gc,phases   ] GC(1) Concurrent Mark Free 0.001ms
[0.412s][info][gc,phases   ] GC(1) Concurrent Process Non-Strong References 1.621ms
[0.412s][info][gc,phases   ] GC(1) Concurrent Reset Relocation Set 0.001ms
[0.414s][info][gc,phases   ] GC(1) Concurrent Select Relocation Set 2.436ms
[0.414s][info][gc,phases   ] GC(1) Pause Relocate Start 0.003ms
[0.415s][info][gc,phases   ] GC(1) Concurrent Relocate 0.865ms
[0.415s][info][gc,load     ] GC(1) Load: 0.65/0.79/0.63
[0.415s][info][gc,mmu      ] GC(1) MMU: 2ms/99.7%, 5ms/99.8%, 10ms/99.9%, 20ms/99.9%, 50ms/100.0%, 100ms/100.0%
[0.415s][info][gc,marking  ] GC(1) Mark: 1 stripe(s), 2 proactive flush(es), 1 terminate flush(es), 0 completion(s), 0 continuation(s) 
[0.415s][info][gc,marking  ] GC(1) Mark Stack Usage: 32M
[0.415s][info][gc,nmethod  ] GC(1) NMethods: 983 registered, 129 unregistered
[0.415s][info][gc,metaspace] GC(1) Metaspace: 9M used, 9M committed, 1088M reserved
[0.415s][info][gc,ref      ] GC(1) Soft: 155 encountered, 0 discovered, 0 enqueued
[0.415s][info][gc,ref      ] GC(1) Weak: 729 encountered, 580 discovered, 58 enqueued
[0.415s][info][gc,ref      ] GC(1) Final: 0 encountered, 0 discovered, 0 enqueued
[0.415s][info][gc,ref      ] GC(1) Phantom: 49 encountered, 47 discovered, 46 enqueued
[0.415s][info][gc,reloc    ] GC(1) Small Pages: 6 / 12M, Empty: 0M, Relocated: 1M, In-Place: 0
[0.415s][info][gc,reloc    ] GC(1) Large Pages: 2 / 4M, Empty: 2M, Relocated: 0M, In-Place: 0
[0.415s][info][gc,reloc    ] GC(1) Forwarding Usage: 0M
[0.415s][info][gc,heap     ] GC(1) Min Capacity: 8M(8%)
[0.415s][info][gc,heap     ] GC(1) Max Capacity: 100M(100%)
[0.415s][info][gc,heap     ] GC(1) Soft Max Capacity: 100M(100%)
[0.415s][info][gc,heap     ] GC(1)                Mark Start          Mark End        Relocate Start      Relocate End           High               Low         
[0.415s][info][gc,heap     ] GC(1)  Capacity:      100M (100%)        100M (100%)        100M (100%)        100M (100%)        100M (100%)        100M (100%)   
[0.415s][info][gc,heap     ] GC(1)      Free:       84M (84%)          84M (84%)          84M (84%)          92M (92%)          92M (92%)          82M (82%)    
[0.415s][info][gc,heap     ] GC(1)      Used:       16M (16%)          16M (16%)          16M (16%)           8M (8%)           18M (18%)           8M (8%)     
[0.415s][info][gc,heap     ] GC(1)      Live:         -                 4M (5%)            4M (5%)            4M (5%)             -                  -          
[0.415s][info][gc,heap     ] GC(1) Allocated:         -                 0M (0%)            2M (2%)            2M (2%)             -                  -          
[0.415s][info][gc,heap     ] GC(1)   Garbage:         -                11M (11%)           9M (9%)            1M (1%)             -                  -          
[0.415s][info][gc,heap     ] GC(1) Reclaimed:         -                  -                 2M (2%)           10M (10%)            -                  -          
[0.415s][info][gc          ] GC(1) Garbage Collection (System.gc()) 16M(16%)->8M(8%)
[0.416s][info][gc,heap,exit] Heap
[0.416s][info][gc,heap,exit]  ZHeap           used 8M, capacity 100M, max capacity 100M
[0.416s][info][gc,heap,exit]  Metaspace       used 9379K, committed 9600K, reserved 1114112K
[0.416s][info][gc,heap,exit]   class space    used 1083K, committed 1216K, reserved 1048576K

从日志中可以看到 ZGC 的整个过程。默认情况下并没有开启分代式 ZGC,如果想开启分代式 ZGC,我们还需要加上 -XX:+ZGenerational 参数:

$ java -XX:+UseZGC -XX:+ZGenerational -Xmx100M -Xlog:gc* ZgcTest.java

这个输出比较多,此处就省略了,从输出中可以看到不同分代的回收情况。关于 ZGC,还有很多微调参数,详细内容可参考 ZGC 的官方文档

注意,在 Java 23 中分代式 ZGC 已经是默认选项,不需要再用 -XX:+ZGenerational 参数开启,另外,在 Java 24 中非分代模式已被正式移除。

小结

今天我们学习了 Java 21 引入的 分代式 ZGC(Generational ZGC) 这一新的垃圾回收特性,它在原有 ZGC 的基础上融入了分代回收的思想,进一步优化了应用的吞吐量和延迟性能。

单代 ZGC 汇聚了多个技术创新,包括 染色指针读屏障 等技术,具有亚毫秒级的 STW 延迟,但由于每次 GC 都要扫描整个堆,这会对应用吞吐量造成影响;分代式 ZGC 继承了 ZGC 的低延迟特性,同时通过分代设计,使得频繁发生的年轻代 GC 更加高效,而老年代 GC 的频率大幅降低,从而实现了更好的整体性能。


重温 Java 21 之字符串模板

字符串模板是很多语言都具备的特性,它允许在字符串中使用占位符来动态替换变量的值,这种构建字符串的方式比传统的字符串拼接或格式化更为简洁和直观。相信学过 JavaScript 的同学对下面这个 Template literals 的语法不陌生:

const name = 'zhangsan'
const age = 18
const message = `My name is ${name}, I'm ${age} years old.`
console.log(message)

如上所示,JavaScript 通过反引号 ` 来定义字符串模板,而 Java 21 则引入了一个叫做 模版表达式(Template expressions) 的概念来定义字符串模板。下面是一个简单示例:

String name = "zhangsan";
int age = 18;
String message = STR."My name is \{name}, I'm \{age} years old.";
System.out.println(message);

看上去和 JavaScript 的 Template literals 非常相似,但还是有一些区别的,模版表达式包含三个部分:

  • 首先是一个 模版处理器(template processor):这里使用的是 STR 模板处理器,也可以是 RAWFMT 等,甚至可以自定义;
  • 中间是一个点号(.);
  • 最后跟着一个字符串模板,模板中使用 \{name}\{age} 这样的占位符语法,这被称为 内嵌表达式(embedded expression)

当模版表达式运行的时候,模版处理器会将模版内容与内嵌表达式的值组合起来,生成结果。

不过,当我们执行上述代码时,很可能会报 Invalid escape sequence (valid ones are \b \t \n \f \r \" \' \\ ) 这样的错:

preview-feature-error.png

这是因为字符串模板还只是一个预览特性,根据 JEP 12: Preview Features,我们需要添加 --enable-preview 参数开启预览特性,使用 javac 编译时,还需要添加 --release 参数。使用下面的命令将 .java 文件编译成 .class 文件:

$ javac --enable-preview --release 21 StringTemplates.java 
Note: StringTemplates.java uses preview features of Java SE 21.
Note: Recompile with -Xlint:preview for details.

再使用下面的命令运行 .class 文件:

$ java --enable-preview StringTemplates
My name is zhangsan, I'm 18 years old.

从 Java 11 开始,我们可以直接运行 .java 文件了,参见 JEP 330,所以上面的两个命令也可以省略成一个命令:

$ java --enable-preview --source 21 StringTemplates.java

STR 模版处理器

STR 模板处理器中的内嵌表达式还有很多其他写法,比如执行数学运算:

int x = 1, y = 2;
String s1 = STR."\{x} + \{y} = \{x + y}";

调用方法:

String s2 = STR."Java version is \{getVersion()}";

访问字段:

Person p = new Person(name, age);
String s3 = STR."My name is \{p.name}, I'm \{p.age} years old.";

内嵌表达式中可以直接使用双引号,不用 \" 转义:

String s4 = STR."I'm \{age >= 18 ? "an adult" : "a child"}.";

内嵌表达式中可以编写注释和换行:

String s5 = STR."I'm \{
    // check the age
    age >= 18 ? "an adult" : "a child"
}.";

多行模板表达式

在 Java 13 的 JEP 355 中首次引入了 文本块(Text Blocks) 特性,并经过 Java 14 的 JEP 368 和 Java 15 的 JEP 378 两个版本的迭代,使得该特性正式可用,这个特性可以让我们在 Java 代码中愉快地使用多行字符串。在使用文本块之前,定义一个 JSON 格式的字符串可能会写出像下面这样无法直视的代码来:

String json1 = "{\n" +
               "  \"name\": \"zhangsan\",\n" +
               "  \"age\": 18\n" +
               "}\n";

但是在使用文本块之后,这样的代码就变得非常清爽:

String json2 = """
               {
                 "name": "zhangsan",
                 "age": 18
               }
               """;

文本块以三个双引号 """ 开始,同样以三个双引号结束,看上去和 Python 的多行字符串类似,不过 Java 的文本块会自动处理换行和缩进,使用起来更方便。上面的文本块在 Java 中输出如下:

{
  "name": "zhangsan",
  "age": 18
}

注意开头没有换行,结尾有一个换行。而在 Python 中输出如下:


               {
                 "name": "zhangsan",
                 "age": 18
               }

不仅开头和结尾都有换行,而且每一行有很多缩进,这里可以看出 Python 的处理很简单,它直接把 """ 之间的内容原样输出了,而 Java 是根据最后一个 """ 和内容之间的相对缩进来决定输出。很显然,我们更喜欢 Java 这样的输出结果,如果希望 Python 有同样的输出结果,就得这样写:

json = """{
  "name": "zhangsan",
  "age": 18
}
"""

这在代码的可读性上就比不上 Java 了,这里不得不感叹 Java 的设计,在细节的处理上做的确实不错。

言归正传,说回字符串模板这个特性,我们也可以在文本块中使用,如下:

String json3 = STR."""
               {
                 "name": "\{name}",
                 "age": \{age}
               }
               """;

FMT 模板处理器

FMT 是 Java 21 内置的另一个模版处理器,它不仅有 STR 模版处理器的插值功能,还可以对输出进行格式化操作。格式说明符(format specifiers) 放在嵌入表达式的左侧,如下所示:

%7.2f\{price}

支持的格式说明符参见 java.util.Formatter 文档。

不过在我的环境里编译时,会报错 cannot find symbol: variable FMT,就算是把镜像更换成 openjdk:22-jdk 也是一样的错,不清楚是为什么。

小结

我们今天学习了 Java 21 引入的 字符串模板(String Templates) 特性,它通过 模板表达式内嵌表达式 的设计,使得字符串的构造更加简洁、直观和类型安全。字符串模板的核心优势在于:

  • 简洁性:相比传统的字符串拼接或 String.format() 等方法,模板表达式的写法更加简洁易读,特别是在处理复杂的多行字符串时,结合文本块特性,代码的可读性得到了显著提升;
  • 灵活的模板处理器:Java 21 内置了 STRFMT 两个模板处理器,满足不同的需求,同时开发者也可以实现自定义的模板处理器来扩展功能;
  • 与文本块的完美配合:字符串模板与文本块特性搭配使用,在处理 JSON、XML、HTML 等多行格式化文本时,表现出了显著的优势,使代码变得更加优雅;

总的来说,字符串模板是 Java 语言在现代化演进中的又一个重要步伐,通过借鉴其他编程语言(如 JavaScript、Python、Kotlin 等)的成功经验,为 Java 开发者提供了更加便利和舒适的开发体验。


重温 Java 21 学习笔记

2025 年 9 月 16 日,Oracle 正式发布了 Java 25 版本,这是 Java 时隔两年发布的又一个 LTS 版本,上一个 LTS 版本是 2023 年 9 月 19 日发布的 Java 21

java-25.png

还记得当年发布 Java 21 的时候,市场反响很大,它被认为是最近几年内最为重要的版本,带来了一系列重要的功能和特性,包括:记录模式switch 模式匹配字符串模板分代式 ZGC不需要定义类的 Main 方法,等等等等,不过其中最为重要的一项,当属由 Loom 项目 发展而来的 虚拟线程。Java 程序一直以文件体积大、启动速度慢、内存占用多被人诟病,但是有了虚拟线程,再结合 GraalVM 的原生镜像,我们就可以写出媲美 C、Rust 或 Go 一样小巧灵活、高性能、可伸缩的应用程序。

转眼间,距离 Java 25 的发布已经 1 个多月了,网上相关的文章也已经铺天盖地,为了不使自己落伍,于是便打算花点时间学习一下,顺便看看我之前写的 Java 21 的学习笔记,重温下 Java 21 的相关知识。

尽管在坊间一直流传着 版本任你发,我用 Java 8 这样的说法,但是作为一线 Java 开发人员,最好还是紧跟大势,未雨绸缪,有备无患。而且最重要的是,随着 Spring Boot 2.7.18 的发布,2.x 版本将不再提供开源支持,而 3.x 不支持 Java 8,最低也得 Java 17,所以仍然相信这种说法的人除非不使用 Spring Boot,要么不升级 Spring Boot,否则学习 Java 新版本都是势在必行。

特性一览

接下来,我们先来看下 Java 21 的全部特性,包括下面 15 个 JEP:

由于内容较多,我将分成几篇来介绍,这是第一篇,先学习下 431 和 449 两个简单的特性。

有序集合

Java 集合框架(Java Collections Framework,JCF) 为集合的表示和操作提供了一套统一的体系架构,让开发人员可以使用标准的接口来组织和操作集合,而不必关心底层的数据结构或实现方式。JCF 的接口大致可以分为 CollectionMap 两组,一共 15 个:

jcf-interfaces.png

在过去的 20 个版本里,这些接口已经被证明非常有用,在日常开发中发挥了重要的作用。那么 Java 21 为什么又要增加一个新的 有序集合(Sequenced Collections) 接口呢?

不一致的顺序操作

这是因为这些接口在处理集合顺序问题时很不一致,导致了无谓的复杂性,比如要获取集合的第一个元素:

获取第一个元素
Listlist.get(0)
Dequedeque.getFirst()
SortedSetsortedSet.first()
LinkedHashSetlinkedHashSet.iterator().next()

可以看到,不同的集合有着不同的实现。再比如获取集合的最后一个元素:

获取最后一个元素
Listlist.get(list.size() - 1)
Dequedeque.getLast()
SortedSetsortedSet.last()
LinkedHashSet-

List 的实现显得非常笨重,而 LinkedHashSet 根本没有提供直接的方法,只能将整个集合遍历一遍才能获取最后一个元素。

除了获取集合的第一个元素和最后一个元素,对集合进行逆序遍历也是各不相同,比如 NavigableSet 提供了 descendingSet() 方法来逆序遍历:

for (var e : navSet.descendingSet()) {
  process(e);
}

Deque 通过 descendingIterator() 来逆序遍历:

for (var it = deque.descendingIterator(); it.hasNext();) {
  var e = it.next();
  process(e);
}

List 则是通过 listIterator() 来逆序遍历:

for (var it = list.listIterator(list.size()); it.hasPrevious();) {
  var e = it.previous();
  process(e);
}

由此可见,与顺序相关的处理方法散落在 JCF 的不同地方,使用起来极为不便。于是,Java 21 为我们提供了一个描述和操作有序集合的新接口,这个接口定义了一些与顺序相关的方法,将这些散落在各个地方的逻辑集中起来,让我们更方便地处理有序集合。

统一的有序集合接口

与顺序相关的操作主要包括三个方面:

  • 获取集合的第一个或最后一个元素
  • 向集合的最前面或最后面插入或删除元素
  • 按照逆序遍历集合

为此,Java 21 新增了三个有序接口:SequencedCollectionSequencedSetSequencedMap,他们的定义如下:

interface SequencedCollection<E> extends Collection<E> {
  SequencedCollection<E> reversed();
  void addFirst(E);
  void addLast(E);
  E getFirst();
  E getLast();
  E removeFirst();
  E removeLast();
}

interface SequencedSet<E> extends Set<E>, SequencedCollection<E> {
  SequencedSet<E> reversed();
}

interface SequencedMap<K,V> extends Map<K,V> {
  SequencedMap<K,V> reversed();
  SequencedSet<K> sequencedKeySet();
  SequencedCollection<V> sequencedValues();
  SequencedSet<Entry<K,V>> sequencedEntrySet();
  V putFirst(K, V);
  V putLast(K, V);
  Entry<K, V> firstEntry();
  Entry<K, V> lastEntry();
  Entry<K, V> pollFirstEntry();
  Entry<K, V> pollLastEntry();
}

他们在 JCF 大家庭中的位置如下图所示:

sequenced-collection.png

有了这些接口,对于所有的有序集合,我们都可以通过下面的方法来获取第一个和最后一个元素:

System.out.println("The first element is: " + list.getFirst());
System.out.println("The last element is: " + list.getLast());

逆序遍历也变得格外简单:

list.reversed().forEach(it -> System.out.println(it));

弃用 Windows 32-bit x86 移植,为删除做准备

这个特性比较简单。随着 64 位架构的普及,32 位操作系统逐渐被淘汰,比如微软从 Windows 10 开始就只提供 64 位版本了,Windows 10 将是最后一个支持 32 位的 Windows 操作系统,而且 2025 年 10 月后将不再支持

64 位架构相比于 32 位,在性能和安全方面都有巨大的提升。比如 64 位架构可以提供更大的内存地址空间,从而提高应用程序的性能和扩展性,同时它也引入了更多的保护机制,提高了应用程序的安全性。

但由于架构的差异,同时兼容 32 位和 64 位需要不少的维护成本,很多 Java 的新特性已经不支持 32 位系统了,比如虚拟线程,所以弃用 32 位势在必行。

在 Windows 32-bit x86 系统下构建 Java 21 的源码将报如下错误:

$ bash ./configure
...
checking compilation type... native
configure: error: The Windows 32-bit x86 port is deprecated and may be removed in a future release. \
Use --enable-deprecated-ports=yes to suppress this error.
configure exiting with result code 1
$

暂时可以通过 --enable-deprecated-ports=yes 参数来解决:

$ bash ./configure --enable-deprecated-ports=yes

小结

从今天开始,我将和大家一起踏上 Java 新特性的学习之旅,我们先从 Java 21 的 15 个 JEP 开始,这些特性涵盖了从简单的语言特性到复杂的运行时优化等多个方面,值得每一个 Java 开发人员学习和了解。

本篇作为系列的第一篇,学习了其中的两个简单特性:有序集合(Sequenced Collections)弃用 Windows 32-bit x86 移植

  • 有序集合的引入解决了 Java 集合框架长期存在的不一致问题,通过提供统一的顺序操作接口,使得处理集合的第一个和最后一个元素、逆序遍历等操作变得简洁统一;
  • 而 Windows 32-bit 支持的移植弃用则进一步精简了 Java 的维护成本,让团队能够更专注于高价值的功能开发;

在后续的篇章中,我们将深入探讨其他更加强大和复杂的特性,包括字符串模板、记录模式、虚拟线程等,敬请期待。


学习 Dify 的代码沙箱

在上一篇文章中,我们讲到了 Dify 的工具系统,其中有一个代码执行的内置工具非常重要,无论是在工作流的代码节点中执行用户代码,还是在智能体中作为 Code Interpreter 调用,都离不开这个工具。为了执行用户代码,Dify 需要一个安全的、隔离的、可控的代码执行环境,这也就是本文的主角 —— Dify 代码沙箱

如果没有完善的沙箱隔离机制,恶意用户可以利用代码执行漏洞访问系统文件、盗取数据、甚至获得整个服务器的控制权。因此,Dify 在代码沙箱的设计和实现上下了不少功夫,其中有不少值得学习的点。本文将详细介绍 Dify 代码沙箱的工作原理,以及它所采用的各种安全隔离技术。

代码沙箱概述

代码沙箱(Code Sandbox) 是一个隔离的代码执行环境,允许在受控的环境中安全地运行不信任的代码。它的核心目标是在提供代码执行功能的同时,防止恶意或有缺陷的代码对系统造成危害。

在 Dify 中,代码沙箱主要用于以下场景:

  1. 工作流代码节点:用户可以在工作流中添加代码节点,用 Python 或 JavaScript 处理工作流中的数据
  2. 代码执行工具:作为智能体的工具,大模型可以自主调用代码执行器完成计算任务
  3. 模板转换:在数据处理过程中,使用代码对数据进行转换和清洗

这些场景的共同点是,代码来自于用户或 AI 生成,具有不可预测性,因此必须在沙箱中执行。

沙箱的安全需求

设计一个安全的代码沙箱需要满足以下需求:

需求说明
进程隔离用户代码运行在独立的进程中,不影响主程序
文件系统隔离代码无法访问主机系统的文件和目录
网络隔离代码可以根据配置选择性地访问网络
系统调用限制代码不能调用危险的系统调用(如 fork、exec 等)
权限限制代码以低权限用户身份运行
资源限制限制代码的 CPU、内存和执行时间
代码隐私用户的代码在传输和执行过程中受到保护

Dify 通过一个独立的沙箱服务 dify-sandbox 来实现这些需求,它采用了 Linux 提供的多种安全机制,构建了一套分层的防御体系。

代码执行的实现

在深入 Dify 的代码沙箱之前,让我们来看下代码执行相关的逻辑,其实现位于 CodeExecutor 类:

class CodeExecutor:

  @classmethod
  def execute_code(cls, language: CodeLanguage, preload: str, code: str) -> str:
    """
    调用代码沙箱,执行代码
    """

    # 接口地址
    url = code_execution_endpoint_url / "v1" / "sandbox" / "run"

    # 简单鉴权
    headers = {"X-Api-Key": dify_config.CODE_EXECUTION_API_KEY}

    # 接口入参
    data = {
      "language": cls.code_language_to_running_language.get(language),
      "code": code,
      "preload": preload,
      "enable_network": True,
    }

    # 发送请求
    response = post(
      str(url),
      json=data,
      headers=headers,
      timeout=Timeout(
        connect=dify_config.CODE_EXECUTION_CONNECT_TIMEOUT,
        read=dify_config.CODE_EXECUTION_READ_TIMEOUT,
        write=dify_config.CODE_EXECUTION_WRITE_TIMEOUT,
        pool=None,
      ),
    )

    # 获取 stdout 输出
    response_data = response.json()
    response_code = CodeExecutionResponse(**response_data)
    return response_code.data.stdout or ""

  @classmethod
  def execute_workflow_code_template(cls, language: CodeLanguage, code: str, inputs: Mapping[str, Any]):
    """
    执行工作流代码节点
    """

    # 将用户代码和输入参数嵌入预置的代码模板
    template_transformer = cls.code_template_transformers.get(language)
    runner, preload = template_transformer.transform_caller(code, inputs)

    try:
      # 调用代码执行
      response = cls.execute_code(language, preload, runner)
    except CodeExecutionError as e:
      raise e

    # 将执行结果转换为工作流的节点出参
    return template_transformer.transform_response(response)

其中 execute_workflow_code_template() 函数负责工作流中的代码节点的执行,注意它并没有直接执行用户的代码,而是做了一层模板转换。因为代码执行服务是通过标准输出获取执行结果的,而工作流的代码节点中用户输入的代码必须包含一个 main() 入口:

def main(x: int, y: int) -> dict:
  return {
    'sum' : x + y
  }

如果直接执行不会有任何输出,所以 Dify 在调用代码执行服务之前,先使用一段预置的代码模板将用户代码和输入参数包起来。模板如下:

# 用户代码,申明 main 函数
{cls._code_placeholder}

import json
from base64 import b64decode

# 输入参数
inputs_obj = json.loads(b64decode('{cls._inputs_placeholder}').decode('utf-8'))

# 执行 main 函数
output_obj = main(**inputs_obj)

# 将输出转换为 JSON 格式并打印输出
output_json = json.dumps(output_obj, indent=4)
result = f'''<<RESULT>>{{output_json}}<<RESULT>>'''
print(result)

最终的输出结果会被转换为 JSON 格式并通过 print 打印出来,打印的时候加上 <<RESULT>> 这个特殊标签,防止 main 函数里其他的 print 对结果造成干扰,方便解析。

另外,CodeExecutor 中还有一个 execute_code() 函数,这才是真正的代码执行入口,支持 Python 或 JavaScript 两种编程语言,它负责将代码发送到代码沙箱服务并处理返回结果:

$ curl -X POST http://127.0.0.1:8194/v1/sandbox/run \
  -H "X-Api-Key: dify-sandbox" \
  -H "Content-Type: application/json" \
  -d '{
    "language": "python3",
    "code": "print('"'"'hello'"'"')",
    "preload": "",
    "enable_network": true
  }'

该接口通过 X-Api-Key 头支持简单的鉴权,该值可以在 .env 文件中修改:

CODE_EXECUTION_ENDPOINT=http://127.0.0.1:8194
CODE_EXECUTION_API_KEY=dify-sandbox

Dify 代码沙箱架构

Dify 的代码沙箱服务以 Docker 容器运行,采用了 防御纵深(Defense in Depth) 的设计思想,实现了多层安全防御,即使某一层防御被突破,其他层仍然能够保护系统。其执行流程如下:

sandbox-flow.png

整个过程涉及多个安全层,下面我们逐一探讨。

第一层:代码加密与编码

Dify 的代码沙箱在执行用户代码时,首先会将用户代码写到一个临时文件中,然后再启动 Python 或 Node.js 去运行该脚本文件,通过捕获 Python 或 Node.js 进程的 stdoutstderr 获取代码执行结果。

为了防止用户的代码以明文形式出现在磁盘上,Dify 在写入临时文件时对代码做了一次简单的加密:

// 生成一个 512 字节的随机密钥
key_len := 64
key := make([]byte, key_len)
_, err := rand.Read(key)

// 加密代码:采用简单的 XOR 加密,将代码与密钥进行 XOR 操作
encrypted_code := make([]byte, len(code))
for i := 0; i < len(code); i++ {
    encrypted_code[i] = code[i] ^ key[i%key_len]
}

// 对加密后的代码进行 Base64 编码
code = base64.StdEncoding.EncodeToString(encrypted_code)
// 对密钥进行 Base64 编码
encoded_key := base64.StdEncoding.EncodeToString(key)

用户的代码也不是直接执行的,而是通过另一个模板文件 prescript.py 动态生成的,可以在这个模板文件中看到用户代码的解密过程:

from base64 import b64decode

# 解码密钥
key = b64decode(key)

# 解码用户代码
code = b64decode("{{code}}")

# 定义解密函数
def decrypt(code, key):
  key_len = len(key)
  code_len = len(code)
  code = bytearray(code)
  for i in range(code_len):
    code[i] = code[i] ^ key[i % key_len]  # XOR 操作
  return bytes(code)

# 解密用户代码
code = decrypt(code, key)

# 执行用户代码
exec(code)

虽然 XOR 加密的安全性相对较弱,但由于这个密钥是一次性的,考虑到所有操作都在容器中,以及配合其他安全机制,这种方案在实践中是足够的。

第二层:进程隔离

每次代码执行都会创建一个独立的子进程,用户代码在这个进程中运行,与沙箱服务的主进程完全隔离。这样,即使代码因某种原因崩溃或消耗过多资源,也不会影响沙箱服务本身。另外,参考 docker/volumes/sandbox/conf/config.yaml 文件,沙箱还支持配置以下资源限制:

配置项说明默认值
max_workers最大并发执行进程数4
max_requests请求队列大小50
worker_timeout单个代码执行的超时时间5 秒

超过超时时间的进程会被强制杀死,这是一个重要的资源保护机制。

第三层:文件系统隔离

chroot 是 Linux 提供的一个系统调用,可以改变进程的根目录。当一个进程执行 chroot("/some/path") 后,对于这个进程来说,/some/path 就变成了新的根目录,进程无法访问此目录之外的任何文件。

在 Dify 沙箱中,Python 代码执行时会被 chroot 到一个特殊的沙箱目录,如 /var/sandbox/sandbox-python 目录。这个目录包含了 Python 运行时所需的最小文件集合。这样,即使用户代码尝试执行 open("/etc/passwd"),它实际上会尝试打开 /var/sandbox/sandbox-python/etc/passwd,而这个文件并不存在,因此访问会被拒绝。

这个最小化的文件系统被称为 chroot 监狱,即使用户代码知道绝对路径,也无法访问这个监狱之外的文件。

Dify 沙箱通过 Go 语言的 syscall.Chroot() 系统调用实现该功能:

import "syscall"

func InitSeccomp(uid int, gid int, enable_network bool) error {

  err := syscall.Chroot(".")

  // 其他安全措施...
}

需要注意的是,chroot 本身并不是一个强大的安全机制,如果进程有能力调用 chdir() 和其他系统调用,可能找到逃逸的方式,需要配合其他机制来形成完整的防护。因此,Dify 在应用 chroot 的同时,还使用了 seccomp 来限制进程可以调用的系统调用,后面我们会具体介绍。

第四层:用户权限隔离

沙箱中的代码不应该以 root 身份运行。Dify 在代码执行时,会使用 setuidsetgid 系统调用,将进程的用户身份和组身份切换到一个非特权用户:

import "syscall"

func InitSeccomp(uid int, gid int, enable_network bool) error {

  // 其他安全措施...

  // setuid
  err = syscall.Setuid(uid)

  // setgid
  err = syscall.Setgid(gid)
}

这个非特权用户名为 sandbox,ID 为 65537,是由沙箱服务在启动时自动创建的。

这样做的好处是:

  1. 限制文件和目录的访问权限(基于文件的 Unix 权限位)
  2. 防止进程获得 root 权限进行的操作
  3. 减小代码逃逸后的影响范围

除了权限降级外,Dify 还使用了 prctl(PR_SET_NO_NEW_PRIVS) 系统调用来禁止进程及其所有子进程获得新的权限:

import "github.com/langgenius/dify-sandbox/internal/core/lib"

func InitSeccomp(uid int, gid int, enable_network bool) error {

  // 其他安全措施...

  lib.SetNoNewPrivs()

  // 其他安全措施...
}

其中 SetNoNewPrivs() 函数通过调用 Go 语言的 syscall.Syscall6() 系统调用设置进程的 PR_SET_NO_NEW_PRIVS 标志:

func SetNoNewPrivs() error {
  // syscall.SYS_PRCTL 表示 Linux 的 prctl 系统调用,用于操作进程的各种属性
  // 它的第一个参数 0x26 是 PR_SET_NO_NEW_PRIVS 标志常数的十六进制值
  // 它的第二个参数 1 表示启用该标志
  _, _, e := syscall.Syscall6(syscall.SYS_PRCTL, 0x26, 1, 0, 0, 0, 0)
}

这个标志的作用是,即使进程调用了一个设置了 setuid 位的二进制文件(如 sudo),也无法获得额外的权限。这是一个额外的保护层,防止通过特殊的二进制文件实现权限提升。

第五层:系统调用隔离

Seccomp(Secure Computing Mode) 是 Linux 内核提供的一个强大的安全机制,它允许进程通过 BPF(Berkeley Packet Filter) 程序来过滤系统调用。当进程执行被禁用的系统调用时,内核会立即终止该进程或返回错误。

Seccomp 使用 BPF 字节码来实现系统调用过滤,当进程执行系统调用时,内核会执行这个 BPF 程序来决定是否允许该调用。BPF 程序的结果有几种可能:

返回值说明
SECCOMP_RET_ALLOW允许系统调用执行
SECCOMP_RET_ERRNO返回错误码,进程继续执行
SECCOMP_RET_KILL_PROCESS杀死整个进程(推荐)
SECCOMP_RET_KILL_THREAD杀死当前线程
SECCOMP_RET_TRAP发送信号给进程

Dify 采用 白名单模式,即只有显式允许的系统调用才能执行,其他所有系统调用都会被拒绝。Dify 针对不同的编程语言(Python 和 Node.js)以及不同的系统架构(ARM64 和 AMD64)提供了不同的白名单:

  • internal/static/python_syscall/ - Python 允许的系统调用
  • internal/static/nodejs_syscall/ - Node.js 允许的系统调用

比如针对 ARM64 架构下的 Python 语言,白名单如下:

allow-syscalls.png

这个白名单分三个部分:

  • ALLOW_SYSCALLS 允许的系统调用
  • ALLOW_ERROR_SYSCALLS 允许的系统调用,但是返回报错
  • ALLOW_NETWORK_SYSCALLS 允许的网络系统调用

可以看到,危险的系统调用如 execveforkptrace 等都被禁止了,这确保了用户代码无法执行任意的系统命令或创建新进程。

系统调用隔离的实现同样位于 InitSeccomp 函数中:

import "github.com/langgenius/dify-sandbox/internal/core/lib"

func InitSeccomp(uid int, gid int, enable_network bool) error {

  // 其他安全措施...

  allowed_syscalls = append(allowed_syscalls, python_syscall.ALLOW_SYSCALLS...)
  if enable_network {
    allowed_syscalls = append(allowed_syscalls, python_syscall.ALLOW_NETWORK_SYSCALLS...)
  }
  allowed_not_kill_syscalls = append(allowed_not_kill_syscalls, python_syscall.ALLOW_ERROR_SYSCALLS...)

  err = lib.Seccomp(allowed_syscalls, allowed_not_kill_syscalls)

  // 其他安全措施...
}

其中 lib.Seccomp() 函数的实现如下:

import (
    "syscall"
    sg "github.com/seccomp/libseccomp-golang"
)

func Seccomp(allowed_syscalls []int, allowed_not_kill_syscalls []int) error {
    
  // 初始化 Seccomp 过滤器
  ctx, err := sg.NewFilter(sg.ActKillProcess)

  // 添加规则:允许的系统调用
  for _, syscall := range allowed_syscalls {
    ctx.AddRule(sg.ScmpSyscall(syscall), sg.ActAllow)
  }

  // 添加规则:报错的系统调用
  for _, syscall := range allowed_not_kill_syscalls {
    ctx.AddRule(sg.ScmpSyscall(syscall), sg.ActErrno)
  }

  // 将过滤器规则导出成字节码
  file := os.NewFile(uintptr(writer.Fd()), "pipe")
  ctx.ExportBPF(file)

  // 应用 Seccomp 规则
  _, _, err2 := syscall.Syscall(
    SYS_SECCOMP,
    uintptr(SeccompSetModeFilter),
    uintptr(SeccompFilterFlagTSYNC),
    uintptr(unsafe.Pointer(&bpf)),
  )

它通过 libseccomp-golang 库,创建 Seccomp 过滤器并添加对应的白名单规则。

第六层:网络隔离

除了上述安全机制外,Dify 还允许根据配置选择性地启用或禁用网络访问:

enable_network: True

这是通过上面的 Seccomp 过滤器来实现的,当禁用网络后,所有和网络相关的系统调用(比如 socketconnect 等)都将被拒绝。

另外,细心的朋友可能会注意到,在 Docker Compose 部署中,沙箱服务是运行在一个隔离的网络中,使用 SSRF 代理容器来控制出站连接:

sandbox:
  image: langgenius/dify-sandbox:0.2.12
  environment:
    ENABLE_NETWORK: ${SANDBOX_ENABLE_NETWORK:-true}
    HTTP_PROXY: ${SANDBOX_HTTP_PROXY:-http://ssrf_proxy:3128}
    HTTPS_PROXY: ${SANDBOX_HTTPS_PROXY:-http://ssrf_proxy:3128}
  networks:
    - ssrf_proxy_network

因此即使代码绕过了 Seccomp 限制(理论上不可能),Docker 网络层也会提供额外的防护。

共享库的奥秘

看到这里,大家可能会疑惑,上述大多数的安全措施都位于 InitSeccomp() 函数中,这是一个 Go 函数,它是如何作用在用户编写的 Python 或 Node.js 脚本上的呢?

答案在于 python.so 这个 C 共享库。

Dify 通过 CGO(Go 的 C 互操作机制)将 InitSeccomp() 中的防护逻辑编译成一个 so 库文件,编译命令如下:

CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build \
  -o internal/core/runner/python/python.so \
  -buildmode=c-shared \
  -ldflags="-s -w" \
  cmd/lib/python/main.go

其中 CGO_ENABLED=1 表示启用 CGO,允许 Go 调用 C 代码;GOOS=linux 表示目标操作系统为 Linux,Dify 的沙箱服务只能跑在 Linux 环境下;-buildmode=c-shared 表示编译为 C 共享库格式;最后的 -ldflags="-s -w" 用于移除符号表和调试信息,减小文件大小。

编译后的 python.so 文件主要导出函数是 DifySeccomp,它在 cmd/lib/python/main.go 中定义:

package main

import (
  "github.com/langgenius/dify-sandbox/internal/core/lib/python"
)
import "C"

// 导出的函数
func DifySeccomp(uid int, gid int, enable_network bool) {
  python.InitSeccomp(uid, gid, enable_network)
}

func main() {}

这个函数被 Python 脚本通过 ctypes 调用,在用户代码执行前完成所有的安全隔离设置:

import ctypes

# 动态加载 python.so,这是 Dify 沙箱的 C 共享库
lib = ctypes.CDLL("./python.so")
lib.DifySeccomp.argtypes = [ctypes.c_uint32, ctypes.c_uint32, ctypes.c_bool]
lib.DifySeccomp.restype = None

# 应用安全隔离:chroot + seccomp + setuid + SetNoNewPrivs
lib.DifySeccomp({{uid}}, {{gid}}, {{enable_network}})

# 执行用户代码 ...
code = decrypt(code, key)
exec(code)

实际应用示例

让我们通过一个例子来理解 Seccomp 的实际效果,假设用户在工作流的代码节点中尝试执行以下恶意操作:

def main() -> dict:    
  return {
    "result": "hello",
  }

# 尝试删除整个系统
import os
os.system("rm -rf /")

当 Python 执行 os.system() 时,实际上会调用 execve 系统调用来启动新的进程。但由于 execve 被 Seccomp 过滤器拒绝了,内核会立即终止这个进程并返回错误:

code-execute-error.png

这种保护是在内核级别进行的,任何绕过尝试都会失败。

小结

代码沙箱作为 AI 应用安全的基石,其重要性不言而喻。我们今天详细学习了 Dify 代码沙箱的工作原理,它巧妙地结合了 Linux 提供的多种安全机制:

  • Chroot 提供文件系统隔离
  • Setuid/Setgid 实现权限降级
  • SetNoNewPrivs 防止权限提升
  • Seccomp 在系统调用级别进行细粒度的控制

通过这些机制的组合,Dify 实现了进程隔离、文件系统隔离、用户权限隔离、系统调用隔离及网络隔离等多层防御,打造了一个既安全又实用的代码执行环境,允许 AI 应用能够动态执行代码,同时避免恶意代码对系统造成伤害。


学习 Dify 的工具系统

在这一个月的时间里,我们通过源码深入学习了 Dify 的整个会话流程,了解了从应用生成器到运行器的核心机制,从限流控制、文件处理、跟踪调试,到提示词组装、内容审核、外部数据扩展,再到最后的知识库检索和模型调用。通过这个完整的会话过程,我们几乎把 Dify 的各个方面都摸了个遍。

不过,关于 Dify 还有很多未展开的话题值得深入研究,比如智能体、工作流、对话流这些应用的实现,不过这里面的细节也很多,为了防止这个系列过于冗长,我决定不再继续讲解应用的源码,而是挑一些比较重要且有意思的点来写,比如今天将要学习的工具系统。

工具使得 AI 应用能够跳出纯文本对话的局限,连接外部世界的各种服务和功能。无论是在智能体应用中自主决策调用工具,还是在工作流中精确编排工具的执行,工具系统都是 Dify 应用能力的重要体现。本文将详细介绍 Dify 工具的使用和开发,包括内置工具、自定义工具、MCP 工具、工作流工具、工具插件等,带你全面认识 Dify 的工具系统。

内置工具

我们之前在实战智能体的时候,曾经使用过 Dify 内置的几个小工具:

  • 语音转写:支持 TTS 文本转语音和 ASR 语音转文本;
  • 代码执行:运行 Python 代码并返回执行结果;
  • 时间计算:各种时间小工具,比如获取当前时间、时区转换、时间戳转换、计算星期几等;
  • 网页抓取:获取指定 URL 的网页内容;

我们打开 “工具” 页面,可以看到所有内置的工具以及来自 Dify 市场的工具,用户可以择需安装:

builtin-tools.png

所有内置工具的实现位于 api/core/tools/builtin_tool/providers 目录,以约定的目录结构进行组织:

.
├── audio              # 按供应商分组
│   ├── _assets        # 图标资源
│   │   └── icon.svg
│   ├── audio.py       # 供应商代码,主要是认证鉴权
│   ├── audio.yaml     # 供应商配置文件,名称、描述、作者、图标、标签等
│   └── tools
│       ├── asr.py     # 工具的具体实现代码
│       ├── asr.yaml   # 工具配置文件,名称、描述以及参数的详细信息
│       ├── tts.py
│       └── tts.yaml
├── code
├── time
└── webscraper

可以看到,内置工具按供应商进行分组,每个供应商可以包含多个工具。供应商和工具都有对应的 YAML 配置文件,用于定义其基本信息和参数,以及对应的 Python 实现代码。

在这里 “供应商” 这个词可能不太合适,使用 “工具箱” 可能更容易理解一点。

Dify 在启动时会自动扫描并加载这个目录下的所有工具,因此对于在本地部署 Dify 的开发人员来说,完全可以在这个目录下添加并实现自己的内置工具。

自定义工具

自定义工具允许用户通过导入 OpenAPI 规范 的 API 文档,快速集成任何 RESTful API 作为工具:

custom-tools.png

其实,考虑到兼容性,Dify 支持好几种不同的 API 规范:

  • OpenAPI 3.0/3.1:业界标准的 API 文档格式
  • Swagger 2.0:OpenAPI 的前身规范
  • OpenAI Plugin:OpenAI 插件规范

对 Schema 的解析可以参考 ApiBasedToolSchemaParser 的源码。下面是一个天气查询接口的 Schema 示例:

{
  "openapi": "3.1.0",
  "info": {
    "title": "天气查询",
    "description": "天气查询",
    "version": "v1.0.0"
  },
  "servers": [
    {
      "url": "https://query.asilu.com"
    }
  ],
  "paths": {
    "/weather/baidu": {
      "get": {
        "description": "查询某个城市的天气信息",
        "operationId": "QueryWeather",
        "parameters": [
          {
            "name": "city",
            "in": "query",
            "description": "城市名称",
            "required": true,
            "schema": {
              "type": "string"
            }
          }
        ],
        "deprecated": false
      }
    }
  },
  "components": {
    "schemas": {}
  }
}

其中 operationId 会作为工具的名称,description 是工具的描述,点击 “测试” 按钮可以对接口进行调试:

custom-tools-debug.png

自定义工具的实现位于 api/core/tools/custom_tool/,系统会自动解析 API 规范并调用相应的工具接口。值得注意的是,对外部接口的调用,统一走 SSRF 代理,防止 SSRF 攻击。

MCP 工具

MCP 工具是一种更灵活的工具集成方式,通过 Model Context Protocol 标准,允许连接任何实现了 MCP 规范的外部服务器。它目前已经是大模型调用工具的事实性规范了,几乎所有的接口提供方都在争相推出他们的 MCP Server。

我们以 高德地图 MCP Server 为例,演示下如何添加 MCP 工具。首先,需要在高德官网上申请 API KEY,然后点击 “添加 MCP 服务”:

mcp-tools.png

依次填写名称、图标、服务端点、服务器标识符等信息,再点击 “添加并授权”,就可以看到 MCP Server 下的 15 个工具都已添加成功:

mcp-tools-list.png

MCP 的完整实现位于 api/core/mcp/,包括客户端、会话管理、认证等完整的生态支持。

工作流工具

当我们开发完一个工作流并点击发布时,可以选择将其发布为一个工具:

workflow-publish.png

我们需要对工具的名称、描述和参数进行配置:

workflow-tool.png

配置完成之后,我们就可以在 “工具管理” 页面看到这个工具,并且可以在智能体和工作流等应用中使用它了。

工具插件

我们昨天曾学习过 Dify 的插件机制,并演示了如何从零开始开发一个模型插件。除了模型插件,Dify 也支持工具插件,我们完全可以按照昨天的步骤再开发一个工具插件,注意在选择插件模板时选择 “tool” 模板:

dify-plugin-init.png

生成的模板目录结构如下:

calculator
├── GUIDE.md
├── PRIVACY.md
├── README.md
├── _assets
│   ├── icon-dark.svg
│   └── icon.svg
├── main.py
├── manifest.yaml
├── provider
│   ├── calculator.py
│   └── calculator.yaml
├── requirements.txt
└── tools
    ├── add.py
    └── add.yaml

同样是先将插件清单、供应商配置、工具配置这几个 YAML 文件调整下,最后编辑 add.py 文件,完成工具的实现:

class AddTool(Tool):
  
  def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]:
    """
    加法运算
    """
    x = tool_parameters.get("x", 0)
    y = tool_parameters.get("y", 0)
    result = str(x + y)
    yield self.create_text_message(result)

这里的 create_text_message() 表示工具返回的是文本消息,Dify 定义了多种工具返回消息类型,支持文本、链接、图片、文件等丰富的内容:

# 创建文本消息
self.create_text_message(text="Hello, World!")

# 创建 JSON 消息
self.create_json_message(data={"key": "value"})

# 创建链接消息
self.create_link_message(link="https://example.com")

# 创建图片消息
self.create_image_message(image="https://example.com/image.jpg")

# 创建文件 BLOB 消息
self.create_blob_message(blob=file_bytes, meta={"mime_type": "application/pdf"})

然后将插件打包上传,就可以在智能体或工作流中使用了:

plugin-tool.png

验证工具能否正常工作:

plugin-tool-use.png

插件签名

有一点我们昨天没有提,当使用 dify plugin package 打包插件并上传到 Dify 平台时,可能会遇到下面这个签名错误:

plugin-upload-fail.png

可以在 docker/middleware.env 环境变量文件的末尾添加如下配置参数:

# 关闭插件签名
FORCE_VERIFYING_SIGNATURE=false

如果你不是通过源码部署的,可以修改 /docker/.env 文件。

然后重启 Dify 的插件服务即可:

$ docker compose -f docker-compose.middleware.yaml up -d \
  --force-recreate \
  --no-deps \
  plugin_daemon

不过这种方法将允许安装所有未审核的插件,可能存在安全隐患,最好的做法是对插件进行签名。插件的开发者首先需要创建密钥对:

$ dify signature generate -f demo

这个命令将生成 demo.private.pemdemo.public.pem 两个文件,一个私钥,一个公钥。然后开发者使用私钥对插件进行签名:

$ dify signature sign calculator.difypkg -p demo.private.pem

这个命令将生成一个带签名的插件文件 calculator.signed.difypkg,不过这个时候插件还上传不了。开发者需要将公钥提交给 Dify 平台的管理员,管理员审核通过后,将该开发者的公钥放在 public_keys 目录下,并修改平台配置:

# 开启插件签名
FORCE_VERIFYING_SIGNATURE=true
THIRD_PARTY_SIGNATURE_VERIFICATION_ENABLED=true
THIRD_PARTY_SIGNATURE_VERIFICATION_PUBLIC_KEYS=/app/storage/public_keys/demo.public.pem

重启插件服务后,开发者就可以上传带签名的插件了。

小结

Dify 的工具系统是一个分层次、多维度的生态,从内置工具的开箱即用,到自定义 API 工具的快速集成,再到 MCP 工具的标准化接入,以及最后的插件工具开发,提供了从简单到复杂、从低门槛到高灵活性的完整解决方案。

无论你是想快速为应用增强功能,还是想开发一个功能丰富的工具插件供社区使用,Dify 的工具系统都能满足你的需求。

关于本文中所提到的内置工具的开发,以及工具插件的示例,我已经将完整源码发布到 Github 上了,感兴趣的同学可以参考:


深入 Dify 的应用运行器之模型调用

在前面的几篇文章中,我们学习了 Dify 应用运行器中的外部数据扩展和知识库检索相关的内容,至此,万事俱备,只欠东风,我们已经到达应用运行器的收尾阶段。接下来就是重新组装提示词,调用大模型,完成对用户问题的回答。

我们今天就来深入学习模型调用相关的逻辑,以及 Dify 是如何通过插件化的架构来管理和调用各种大语言模型的,顺便看下 Dify 插件的实现原理,并通过一个自定义模型插件演示如何从零开发 Dify 插件。

模型实例

回顾 CompletionAppRunnerrun() 方法,在处理完外部数据和知识库检索后,核心的模型调用逻辑如下:

# 重新组装提示词,包含外部数据和知识库上下文
prompt_messages, stop = self.organize_prompt_messages(...)

# 创建模型实例
model_instance = ModelInstance(
  provider_model_bundle=application_generate_entity.model_conf.provider_model_bundle,
  model=application_generate_entity.model_conf.model,
)

# 调用大语言模型
invoke_result = model_instance.invoke_llm(
  prompt_messages=prompt_messages, # 处理后的提示消息
  model_parameters=application_generate_entity.model_conf.parameters,  # 模型参数(温度、max_tokens等)
  stop=stop, # 停止词
  stream=application_generate_entity.stream, # 是否流式输出
  user=application_generate_entity.user_id, # 用户ID(用于追踪和限流)
)

在 Dify 中,所有的模型调用都通过 ModelInstance 类来完成,这个类位于 api/core/model_manager.py,它是模型调用的统一入口。ModelInstance 不仅支持大语言模型,还支持多种其他类型的模型:

class ModelInstance:
  
  # 大语言模型调用
  def invoke_llm(self, prompt_messages, model_parameters=None, tools=None, stop=None,
        stream=True, user=None, callbacks=None) -> Union[LLMResult, Generator]:
    # 调用大语言模型,支持文本生成、对话、工具调用等
    pass

  # 文本嵌入模型调用
  def invoke_text_embedding(self, texts: list[str], user: Optional[str] = None) -> TextEmbeddingResult:
    # 将文本转换为向量表示,用于语义搜索和相似度计算
    pass

  # 重排序模型调用
  def invoke_rerank(self, query: str, docs: list[str], score_threshold: Optional[float] = None,
          top_n: Optional[int] = None, user: Optional[str] = None) -> RerankResult:
    # 对文档进行重新排序,提高检索质量
    pass

  # 内容审核模型调用
  def invoke_moderation(self, text: str, user: Optional[str] = None) -> ModerationResult:
    # 检测文本中的不当内容,如暴力、色情、仇恨言论等
    pass

  # 语音转文本模型调用
  def invoke_speech2text(self, file: IO[bytes], user: Optional[str] = None) -> str:
    # 将音频文件转换为文本
    pass

  # 文本转语音模型调用
  def invoke_tts(self, content_text: str, voice: str, user: Optional[str] = None) -> bytes:
    # 将文本转换为音频,支持多种语音选择
    pass

这种统一的接口设计让开发者可以用一致的方式调用不同类型的 AI 模型。此外,ModelInstance 还实现了模型调用的负载均衡和容错机制,当用户配置了多个 API Key 时,系统会自动进行负载均衡;如果某个 API Key 遇到速率限制或认证错误时,系统会自动将其放入冷却期,并切换到其他可用的 API Key,确保服务的连续性。

不同类型的模型实现

所有的模型实现都继承自 AIModel 基类:

class AIModel(BaseModel):
  tenant_id: str          # 租户 ID
  model_type: ModelType   # 模型类型(LLM、Embedding 等)
  plugin_id: str          # 插件 ID
  provider_name: str      # 供应商名称
  plugin_model_provider: PluginModelProviderEntity  # 插件模型供应商实体

具体的模型类型实现包括:

  • LargeLanguageModel - 大语言模型
  • TextEmbeddingModel - 文本嵌入模型
  • RerankModel - 重排序模型
  • ModerationModel - 内容审核模型
  • Speech2TextModel - 语音转文本模型
  • TTSModel - 文本转语音模型

每种模型类型都实现了对应的 invoke() 方法,并且它们的共同点是都会通过 PluginModelClient 与插件守护进程通信。以大语言模型为例,LargeLanguageModel 的实现如下:

class LargeLanguageModel(AIModel):

  def invoke(...) -> Union[LLMResult, Generator[LLMResultChunk, None, None]]:
    # 调用大语言模型
    plugin_model_manager = PluginModelClient()
    result = plugin_model_manager.invoke_llm(
      tenant_id=self.tenant_id,
      user_id=user or "unknown",
      plugin_id=self.plugin_id,
      provider=self.provider_name,
      model=model,
      credentials=credentials,
      model_parameters=model_parameters,
      prompt_messages=prompt_messages,
      tools=tools,
      stop=list(stop) if stop else None,
      stream=stream,
    )
    # 返回流式或非流式结果 ...

其中,PluginModelClient 是 Dify 应用与插件守护进程的通信桥梁,它负责将模型调用请求转发给插件守护进程:

class PluginModelClient(BasePluginClient):
  def invoke_llm(...) -> Generator[LLMResultChunk, None, None]:
    # 调用插件中的大语言模型
    response = self._request_with_plugin_daemon_response_stream(
      method="POST",
      path=f"plugin/{tenant_id}/dispatch/llm/invoke",
      type=LLMResultChunk,
      data=jsonable_encoder(
        # 构造请求数据
        {
          "user_id": user_id,
          "data": {
            "provider": provider,
            "model_type": "llm",
            "model": model,
            "credentials": credentials,
            "prompt_messages": prompt_messages,
            "model_parameters": model_parameters,
            "tools": tools,
            "stop": stop,
            "stream": stream,
          },
        }
      ),
      headers={
        "X-Plugin-ID": plugin_id,
        "Content-Type": "application/json",
      },
    )

    # 流式返回结果
    yield from response

Dify 插件化架构

可以看出,真正的模型调用是由插件守护进程执行的,这也是 Dify 最具创新性的设计之一,将所有模型供应商都以插件的形式封装在独立的 Dify Plugin Daemon 服务中。通过这种插件化的架构设计,Dify 构建了一个开放、可扩展的模型生态系统。

当然,除了模型供应商,Dify 支持多种类型的插件:

  1. 模型插件(Models):集成各种 AI 模型,如 LLM、Embedding、TTS 等
  2. 工具插件(Tools):为 Agent 和工作流提供专业能力,如数据分析、内容处理等
  3. Agent 策略插件(Agent Strategies):创建自定义推理策略(ReAct, CoT, ToT),增强 Agent 能力
  4. 扩展插件(Extensions):通过 HTTP Webhook 集成外部服务
  5. 包插件(Bundles):将多个插件组合打包

Dify 的插件系统的由两部分组成:

  1. Dify Plugin Daemon - 独立的插件守护进程,是一个 Go 开发的 Web 服务,提供了插件管理、插件调用等接口
  2. api/core/plugin/** - Dify 主服务中的客户端集成代码,包括上面我们介绍的 PluginModelClient,通过 HTTP 请求调用插件服务的各项功能

尽管插件守护进程是用 Go 开发的,但是每个插件还是用 Python 开发的。插件服务为每个插件创建一个独立的 Python 虚拟环境,并拥有独立的工作目录,每个插件运行在独立的进程中。我们进到 dify-plugin-daemon 容器中,使用 ps 命令可以看到运行的每个插件进程:

dify-plugin-daemon-ps.png

从零开发一个模型插件

让我们通过一个具体的例子来了解如何开发一个模型插件。假设我们要接入一个新的大语言模型 "MockGPT"。

第一步:准备开发环境

首先安装 Dify 插件开发工具:

$ brew tap langgenius/dify
$ brew install dify

然后运行 dify version 验证安装

$ dify version
v0.3.1

如果能正确输出版本号,则说明安装成功。

第二步:初始化插件项目

运行 dify plugin init 创建新插件项目:

dify-plugin-init.png

根据提示依次填写插件名称、作者、描述等信息,按回车进入下一步:

dify-plugin-init-2.png

看提示 Dify 是计划支持 Python 和 Go 两种编程语言来开发插件的,只不过目前仅支持 Python,继续回车进入下一步:

dify-plugin-init-3.png

这里对 Dify 的插件类型做了一个简单的介绍,并为我们准备了几个常见的插件模版,选择 llm 模版继续:

dify-plugin-init-4.png

这里是对插件权限的配置,比如允许插件反向调用 Dify 中的工具或其他模型等,我们暂时不用管,保持默认即可。再次回车项目初始化完成:

[INFO]plugin demo created successfully, you can refer to `demo/GUIDE.md` for more information about how to develop it

生成的插件目录结构如下:

demo
├── GUIDE.md
├── PRIVACY.md
├── README.md
├── _assets
│   ├── icon-dark.svg
│   └── icon.svg
├── main.py
├── manifest.yaml
├── models
│   └── llm
│       ├── llm.py
│       └── llm.yaml
├── provider
│   ├── demo.py
│   └── demo.yaml
└── requirements.txt

第三步:插件清单

编辑 manifest.yaml 文件,完善插件信息(有一些基础信息上面已经填过),包括标签、描述、图标、资源等:

version: 0.0.1
type: plugin
author: aneasystone
name: demo
label:
  en_US: Demo AI Provider                       # 修改
  zh_Hans: 演示 AI 供应商                         # 修改
description:
  en_US: A demo AI model provider for learning   # 修改
  zh_Hans: 用于学习的演示 AI 模型供应商             # 修改
icon: icon.svg
icon_dark: icon-dark.svg
resource:
  memory: 268435456
  permission: {}
plugins:
  models:
    - provider/demo.yaml
meta:
  version: 0.0.1
  arch:
    - amd64
    - arm64
  runner:
    language: python
    version: "3.12"
    entrypoint: main
  minimum_dify_version: null
created_at: 2025-10-22T07:45:17.322263+08:00
privacy: PRIVACY.md
verified: false

第四步:模型供应商配置

编辑 provider/demo.yaml 文件,修改其中的 iconprovider_source 两个地方(因为自动生成文件名不存在),其他参数保持不变:

provider: demo
label:
  en_US: "Demo"
description:
  en_US: "Models provided by demo."
  zh_Hans: "Demo 提供的模型。"
icon_small:
  en_US: "icon.svg"  # 保证 _assets 目录下存在
icon_large:
  en_US: "icon.svg"  # 保证 _assets 目录下存在
background: "#E5E7EB"
help:
  title:
    en_US: "Get your API Key from demo"
    zh_Hans: "从 Demo 获取 API Key"
  url:
    en_US: "https://__put_your_url_here__/account/api-keys"
supported_model_types:
  - llm
configurate_methods:
  - predefined-model
  - customizable-model
model_credential_schema:
  model:
    label:
      en_US: Model Name
      zh_Hans: 模型名称
    placeholder:
      en_US: Enter your model name
      zh_Hans: 输入模型名称
  credential_form_schemas:
    - variable: openai_api_key
      label:
        en_US: API Key
      type: secret-input
      required: true
      placeholder:
        zh_Hans: 在此输入您的 API Key
        en_US: Enter your API Key
provider_credential_schema:
  credential_form_schemas:
    - variable: openai_api_key
      label:
        en_US: API Key
      type: secret-input
      required: true
      placeholder:
        zh_Hans: 在此输入您的 API Key
        en_US: Enter your API Key
models:
  llm:
    predefined:
      - "models/llm/*.yaml"
extra:
  python:
    provider_source: provider/demo.py # 修改
    model_sources:
      - "models/llm/llm.py"

第五步:模型配置

编辑 models/llm/llm.yaml 文件,修改模型名称,并根据实际情况配置你的模型特性、参数和价格:

model: mock-gpt-v1     # 修改
label:
  zh_Hans: 模拟大模型 v1 # 修改
  en_US: Mock GPT v1   # 修改
model_type: llm
features:
  - multi-tool-call
  - agent-thought
  - stream-tool-call
model_properties:
  mode: chat
  context_size: 16385
parameter_rules:
  - name: temperature
    use_template: temperature
  - name: top_p
    use_template: top_p
  - name: presence_penalty
    use_template: presence_penalty
  - name: frequency_penalty
    use_template: frequency_penalty
  - name: max_tokens
    use_template: max_tokens
    default: 512
    min: 1
    max: 16385
  - name: response_format
    use_template: response_format
pricing:
  input: '0.003'
  output: '0.004'
  unit: '0.001'
  currency: USD

第六步:实现 MockGPT

编辑 models/llm/llm.py 文件,完成 MockGPT 模型的实现,代码逻辑很简单,随机选择一段话,并模拟流式输出:

class MockGptLargeLanguageModel(LargeLanguageModel):
  """
  MockGPT 实现
  """

  def _invoke(...) -> Union[LLMResult, Generator]:
    """
    调用大语言模型
    """

    # 模拟响应内容
    demo_responses = [
      "这是一个演示模型的回复。我可以帮助您了解 Dify 插件的工作原理。",
      "作为演示模型,我会生成模拟的响应内容来展示插件功能。",
      "您好!这是 Demo AI 模型的模拟输出,用于演示插件开发流程。"
    ]

    response_text = random.choice(demo_responses)

    if stream:
      return self._handle_stream_response(model, prompt_messages, response_text)
    else:
      return self._handle_sync_response(model, prompt_messages, response_text)
   
  def _handle_stream_response(self, model: str, prompt_messages: List[PromptMessage], response_text: str) -> Generator:
    """
    处理流式响应
    """

    # 模拟流式输出
    words = response_text.split()
    for i, word in enumerate(words):
      chunk_text = word + (" " if i < len(words) - 1 else "")

      delta = LLMResultChunkDelta(
        index=0,
        message=AssistantPromptMessage(content=chunk_text),
        finish_reason=None if i < len(words) - 1 else "stop",
        usage=self._calc_usage(response_text) if i == len(words) - 1 else None
      )

      yield LLMResultChunk(
        model=model,
        prompt_messages=prompt_messages,
        system_fingerprint=None,
        delta=delta
      )

      # 模拟网络延迟
      time.sleep(0.1)

  def _handle_sync_response(self, model: str, prompt_messages: List[PromptMessage], response_text: str) -> LLMResult:
    """
    处理同步响应
    """
    return LLMResult(
      model=model,
      prompt_messages=prompt_messages,
      message=AssistantPromptMessage(content=response_text),
      usage=self._calc_usage(response_text),
      system_fingerprint=None
    )

  def _calc_usage(self, text: str) -> LLMUsage:
    """
    计算使用量(模拟)
    """
    prompt_tokens = 50  # 模拟
    completion_tokens = len(text.split())

    return LLMUsage(
      prompt_tokens=prompt_tokens,
      prompt_unit_price=0.001,
      prompt_price_unit=1000,
      prompt_price=0.00005,
      completion_tokens=completion_tokens,
      completion_unit_price=0.002,
      completion_price_unit=1000,
      completion_price=completion_tokens * 0.000002,
      total_tokens=prompt_tokens + completion_tokens,
      total_price=0.00005 + completion_tokens * 0.000002,
      currency="USD",
      latency=1.5
    )
   
  def get_num_tokens(...) -> int:
    """
    计算 token 数量(模拟)
    """
    total_text = ""
    for message in prompt_messages:
      if isinstance(message.content, str):
        total_text += message.content

    # 简单估算:中文字符算1个token,英文单词算1个token
    return len(total_text.split()) + len([c for c in total_text if '\u4e00' <= c <= '\u9fff'])

  def validate_credentials(self, model: str, credentials: dict) -> None:
    """
    验证模型凭据
    """
    try:
      pass
    except Exception as ex:
      raise CredentialsValidateFailedError(str(ex))

  def get_customizable_model_schema(self, model: str, credentials: dict) -> AIModelEntity:
    """
    返回模型 Schema
    """
    entity = AIModelEntity(
      model=model,
      label=I18nObject(zh_Hans=model, en_US=model),
      model_type=ModelType.LLM,
      features=[],
      fetch_from=FetchFrom.CUSTOMIZABLE_MODEL,
      model_properties={},
      parameter_rules=[],
    )

    return entity

  @property
  def _invoke_error_mapping(self) -> dict:
    """
    错误映射
    """
    return {
      InvokeError: [Exception]
    }

插件调试

至此,一个简单的模型插件就开发好了,接下来需要测试插件是否可以正常运行。Dify 提供便捷地远程调试方式,帮助你快速在测试环境中验证插件功能。

首先点击右上角的 “插件” 进入 “插件管理” 页面,在这里可以获取插件服务的地址和调试 Key:

plugins-debug.png

回到插件项目代码,拷贝 .env.example 文件并重命名为 .env,将获取的插件服务地址和调试 Key 等信息填入其中:

INSTALL_METHOD=remote
REMOTE_INSTALL_URL=localhost:5003
REMOTE_INSTALL_KEY=0cd54aa2-7731-4368-bac4-6d2ed1299087

然后运行 python -m main 命令启动插件:

$ python -m main
{"event": "log", "data": {"level": "INFO", "message": "Installed model: demo", "timestamp": 1761101795.533712}}
INFO:dify_plugin.plugin:Installed model: demo

刷新插件列表,此时就可以看到该插件了:

plugins-debug-2.png

插件守护进程支持三种运行时:本地运行时(Local Runtime),通过运行 Python 子进程,监听 STDIN/STDOUT 管道进行通信,从而实现插件的调用;调试运行时(Debug Runtime),通过 TCP 服务器模式,监听插件连接,支持全双工通信,从而实现开发时的实时调试;无服务器运行时(Serverless Runtime),支持 AWS Lambda 等云函数平台,通过 HTTP 调用模式,可以实现插件服务的自动部署和扩缩容。

插件验证

我们进入 “模型供应商” 页面,找到 Demo 后,配置 API 密钥:

plugins-test.png

由于我们并没有做这块的校验,因此可以随便填。配置之后,进入应用页面,在模型列表中选择 “模拟大模型 v1”:

plugins-test-2.png

然后发起对话,验证模型是否可以正常输出:

plugins-test-3.png

可以看到,该模型会随机输出我们预先内置的话术,符合预期。

插件打包和发布

如果一切验证 OK,我们就可以将插件打包并发布出去。通过 dify plugin package 可以将插件打包成 difypkg 文件:

$ dify plugin package demo
[INFO]plugin packaged successfully, output path: demo.difypkg

然后在 Dify 的 “插件管理” 页面上传并安装这个插件,安装完成后,使用就和上面调试验证步骤基本一样了。

Dify 支持三种插件安装方式:

  • Marketplace:Dify 官方提供的插件市场,用户可以在此浏览、搜索并一键安装各类插件;
  • GitHub 仓库:将插件开源或托管在 GitHub 上,方便他人查看、下载和安装;
  • 本地安装:将插件打包成本地文件(正如上面的 difypkg 文件),通过文件分享的方式供他人安装;

下面是 Dify 市场的官方地址:

另外,Dify 的官方插件也都开源了,可以作为开发参考:

本文的 MockGPT 模型插件我也发布到 Github 上了,感兴趣的同学也可以参考:

小结

今天我们深入探索了 Dify 应用运行器中模型调用的核心机制。从 CompletionAppRunnerrun() 方法开始,我们了解了 ModelInstance 作为统一的模型调用入口的设计,以及不同类型的模型实现,然后学习了 Dify 的插件化架构,Dify 的插件化架构构建了一个开放的生态系统,各模型供应商可以作为独立的插件运行,这种设计提供了出色的稳定性、可扩展性和维护性。

最后,我们通过一个实际例子学习了如何从零开发一个模型插件,并学习了插件的调试、验证、打包和发布等流程,通过模型插件的实战,相信大家对 Dify 的模型调用流程有了更深入的认识。


深入 Dify 的应用运行器之知识库检索(续)

在上一篇文章中,我们从界面操作的角度了解了 Dify 知识库的功能特性,包括创建知识库、配置分段设置、选择索引方式和检索方法,以及如何在应用中集成知识库。通过这些配置,我们可以让 AI 应用获得外部知识的支持,实现更准确、更专业的回答。

今天我们将继续深入 CompletionAppRunnerrun() 方法源码,详细分析知识库检索的具体实现原理,了解 Dify 如何将用户的查询转化为向量检索、如何处理多知识库场景,以及背后的技术机制。

应用运行器回顾

让我们先回顾一下 CompletionAppRunner 中知识库检索的流程:

def run(...) -> None:

  # 1. 第一次提示词组装
  self.organize_prompt_messages(...)

  # 2. 输入内容审核
  self.moderation_for_inputs(...)

  # 3. 外部数据工具处理
  inputs = self.fill_in_inputs_from_external_data_tools(...)

  # 4. 知识库检索
  context = None
  if app_config.dataset and app_config.dataset.dataset_ids:

    # 创建回调处理器,用于记录检索命中信息
    hit_callback = DatasetIndexToolCallbackHandler(...)

    # 创建知识库检索器并执行检索
    dataset_retrieval = DatasetRetrieval(application_generate_entity)
    context = dataset_retrieval.retrieve(...)

  # 5. 第二次提示词组装,包含知识库上下文
  prompt_messages, stop = self.organize_prompt_messages(
    ..., context=context, ...
  )

  # 6. 后续处理:托管审核、令牌重计算、模型调用等
  ...

从代码中可以看出,知识库检索发生在外部数据工具处理之后,第二次提示词组装之前,确保将检索到的上下文信息整合到最终的提示词中。

知识库检索的核心逻辑

检索的核心逻辑封装在 DatasetRetrieval 类的 retrieve() 方法中:

def retrieve(...) -> Optional[str]:
  
  # 从知识库列表中筛选出所有可用的知识库
  available_datasets = []
  for dataset_id in dataset_ids:
    # 从数据库获取知识库信息
    dataset_stmt = select(Dataset).where(Dataset.tenant_id == tenant_id, Dataset.id == dataset_id)
    dataset = db.session.scalar(dataset_stmt)
    # 跳过无效知识库
    if not dataset:
      continue
    # 跳过没有文档的内部知识库(外部知识库除外)
    if dataset and dataset.available_document_count == 0 and dataset.provider != "external":
      continue
    available_datasets.append(dataset)

  # 元数据过滤
  available_datasets_ids = [dataset.id for dataset in available_datasets]
  metadata_filter_document_ids, metadata_condition = self.get_metadata_filter_condition(
    available_datasets_ids,
    query,
    ...
  )

  # 单库检索策略 vs. 多库检索策略
  all_documents = []
  if retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.SINGLE:
    # 判断模型是否支持工具调用,使用不同的选择策略
    planning_strategy = PlanningStrategy.REACT_ROUTER
    if ModelFeature.TOOL_CALL in features or ModelFeature.MULTI_TOOL_CALL in features:
        planning_strategy = PlanningStrategy.ROUTER
    # 单库检索策略:智能选择一个最相关的知识库
    all_documents = self.single_retrieve(...)
  elif retrieve_config.retrieve_strategy == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE:
    # 多库检索策略:从所有知识库中检索并合并结果
    all_documents = self.multiple_retrieve(...)

  dify_documents = [item for item in all_documents if item.provider == "dify"]
  external_documents = [item for item in all_documents if item.provider == "external"]

  # 分离内部和外部文档
  retrieval_resource_list: list[RetrievalSourceMetadata] = ...
  document_context_list: list[DocumentContext] = ...
  
  # 引用归属
  if hit_callback and retrieval_resource_list:
    retrieval_resource_list = sorted(retrieval_resource_list, key=lambda x: x.score or 0.0, reverse=True)
    for position, item in enumerate(retrieval_resource_list, start=1):
      item.position = position
    hit_callback.return_retriever_resource_info(retrieval_resource_list)

  # 按分数排序并合并为最终上下文
  if document_context_list:
    document_context_list = sorted(document_context_list, key=lambda x: x.score or 0.0, reverse=True)
    return str("\n".join([document_context.content for document_context in document_context_list]))
  return ""

其主要逻辑包括:

  • 知识库有效性验证:首先,对知识库列表进行筛选,过滤掉无效知识库,以及没有文档的内部知识库;需要注意的是,外部知识库(provider == "external")即使文档数量为 0 也会被保留,因为它们的文档数量统计可能不准确;
  • 元数据过滤处理:根据文档的属性(如标签、类别、时间等)对文档进行精确过滤,支持用户自定义过滤条件和大模型智能生成过滤条件;
  • 执行检索策略:在前文中我们提到,一个应用可以关联多个知识库,当面对多个知识库时,Dify 提供了两种检索策略:

    • 单库检索策略(SINGLE):采用智能路由的方式,从多个候选知识库中自动选择最适合的那一个进行检索;这种策略的优势是减少了检索时间和计算开销,特别适用于知识库间差异较大、各自专注于不同领域的场景;
    • 多库检索策略(MULTIPLE):并行检索所有配置的知识库,然后将结果进行合并、重排序和过滤;这种策略能够获得更全面的检索结果,适用于知识库间存在互补关系的场景;
  • 结果格式化:将检索到的文档转换为大模型可以理解的上下文格式;

元数据过滤功能

在实际检索之前,Dify 会先进行基于元数据的文档过滤,这是一个强大的功能,允许用户根据文档的属性(如标签、类别、时间等)进行过滤,这对于大型知识库的精准检索非常重要。系统支持三种元数据过滤模式:

  1. 禁用模式(disabled):不使用元数据过滤,检索所有文档
  2. 自动模式(automatic):系统根据用户问题自动生成过滤条件
  3. 手动模式(manual):用户手动配置过滤条件
def get_metadata_filter_condition(...) -> tuple[Optional[dict[str, list[str]]], Optional[MetadataCondition]]:
  
  if metadata_filtering_mode == "disabled":
    # 禁用模式
    return None, None
  elif metadata_filtering_mode == "automatic":
    # 自动模式,根据用户问题自动生成过滤条件
    automatic_metadata_filters = self._automatic_metadata_filter_func(
      dataset_ids, query, tenant_id, user_id, metadata_model_config
    )
    for sequence, filter in enumerate(automatic_metadata_filters):
      filters = self._process_metadata_filter_func(...)
  elif metadata_filtering_mode == "manual":
    # 手动模式,用户手动配置过滤条件
    if metadata_filtering_conditions:
      for sequence, condition in enumerate(metadata_filtering_conditions.conditions):
        # 支持变量替换
        expected_value = condition.value
        if expected_value is not None and condition.comparison_operator not in ("empty", "not empty"):
          if isinstance(expected_value, str):
            expected_value = self._replace_metadata_filter_value(expected_value, inputs)
        filters = self._process_metadata_filter_func(...)
  else:
    raise ValueError("Invalid metadata filtering mode")

  # 根据过滤条件筛选文档
  if filters:
    if metadata_filtering_conditions and metadata_filtering_conditions.logical_operator == "and":
      document_query = document_query.where(and_(*filters))
    else:
      document_query = document_query.where(or_(*filters))
  documents = document_query.all()
  # 根据知识库 ID 分组
  metadata_filter_document_ids = defaultdict(list) if documents else None
  for document in documents:
    metadata_filter_document_ids[document.dataset_id].append(document.id)
  return metadata_filter_document_ids, metadata_condition

其中手动模式没什么好讲的,直接根据用户配置组装过滤条件,唯一值得注意的一点是,在手动设置条件时可以使用变量,因此这里先对配置的值进行变量替换;而自动模式则是通过大模型实现,使用了专门的提示词模板,要求大模型分析用户查询并输出 JSON 格式的元数据过滤条件:

### 职位描述
您是一个文本元数据提取引擎,需根据用户输入提取文本的元数据,并设定元数据值。

### 任务
您的任务仅从提供的元数据列表中,提取输入文本中实际存在的元数据;并使用以下运算符来表达逻辑关系:

["contains", "not contains", "start with", "end with", "is", "is not", "empty", "not empty", "=", "≠", ">", "<", "≥", "≤", "before", "after"]

然后以 JSON 格式返回结果,其中键包括:

- metadata_fields(元数据字段)
- metadata_field_value(元数据字段值)
- comparison_operator(比较运算符)

### 格式
输入文本存储在变量 input_text 中,元数据以列表形式在变量 metadata_fields 中指定。

### 约束
您的响应中不得包含 JSON 数组以外的任何内容。

大模型从用户输入中提取出元数据并设置过滤条件,支持各种类型的比较操作,包括:

  1. 文本比较(contains, start with, end with, is, is not)
  2. 数值比较(=, ≠, >, <, ≥, ≤)
  3. 时间比较(before, after)
  4. 存在性检查(empty, not empty)

比如,用户输入这样的查询:

总结2023年的财务报告

大模型输出类似这样的结果:

{
  "metadata_map": [
    {
      "metadata_field_name": "year",
      "metadata_field_value": "2023",
      "comparison_operator": "="
    },
    {
      "metadata_field_name": "category",
      "metadata_field_value": "财务",
      "comparison_operator": "contains"
    }
  ]
}

单库检索策略

单库检索策略适用于知识库主题相对独立的场景,系统会先从多个知识库中智能地选择一个最相关的知识库,然后再在该知识库中进行检索。选择策略根据模型的能力分为两种:

  • ROUTER 策略:适用于支持工具调用的模型(如 GPT-4、Claude-3.5 等),使用函数调用方式让模型自主选择合适的知识库;这种方式下,每个知识库都被包装成一个工具,工具名称是知识库ID,工具描述是知识库的描述信息;模型会根据用户查询的内容,自主选择最合适的工具(即知识库)进行调用;
  • REACT_ROUTER 策略:适用于不支持工具调用的模型,使用 ReAct(Reasoning + Acting)方式,通过推理步骤指导模型选择知识库;这种方式,系统会构造一个包含所有可用知识库工具的提示词,要求模型以特定的 JSON 格式输出决策;
def single_retrieve(...) -> list[Document]:

  # 1. 构建知识库工具描述
  tools = []
  for dataset in available_datasets:
    description = dataset.description or f"useful for when you want to answer queries about the {dataset.name}"
    message_tool = PromptMessageTool(
      name=dataset.id,
      description=description,
      parameters={"type": "object", "properties": {}, "required": []},
    )
    tools.append(message_tool)

  # 2. 根据策略选择知识库
  dataset_id = None
  if planning_strategy == PlanningStrategy.REACT_ROUTER:
    # ReAct 方式
    react_multi_dataset_router = ReactMultiDatasetRouter()
    dataset_id = react_multi_dataset_router.invoke(query, tools, model_config, model_instance, user_id, tenant_id)
  elif planning_strategy == PlanningStrategy.ROUTER:
    # Function Call 方式
    function_call_router = FunctionCallMultiDatasetRouter()
    dataset_id = function_call_router.invoke(query, tools, model_config, model_instance)

  # ...

当选定知识库后,接下来就针对该库执行具体的检索操作:

  # 3. 执行检索
  if dataset_id:
    dataset = db.session.scalar(select(Dataset).where(Dataset.id == dataset_id))
    if dataset:
      if dataset.provider == "external":
        # 外部知识库检索
        external_documents = ExternalDatasetService.fetch_external_knowledge_retrieval(...)
        results = [Document(page_content=doc.get("content"), metadata=doc.get("metadata"), provider="external") for doc in external_documents]
      else:
        # 内部知识库检索,根据索引技术选择检索方法
        retrieval_model_config = dataset.retrieval_model or default_retrieval_model
        if dataset.indexing_technique == "economy":
          retrieval_method = "keyword_search"
        else:
          retrieval_method = retrieval_model_config["search_method"]

        # 调用 RetrievalService 执行检索
        results = RetrievalService.retrieve(
          retrieval_method=retrieval_method,
          dataset_id=dataset.id,
          query=query,
          ...
        )
      return results
  return []

Dify 的知识库分为外部知识库和内部知识库:针对外部知识库,使用统一的 外部知识库 API 进行检索;针对内部知识库,根据知识库的索引配置选择合适的检索方法,最终调用 RetrievalService 进行实际的向量检索或关键词检索。

多库检索策略

多库检索策略适用于知识库内容互补的场景,系统会同时从多个知识库中检索,并通过重排序获得最佳结果。实际上,我们在 Dify 页面上创建的应用,默认都是使用多库检索策略,好像没有看到哪里可以设置单库检索策略。

多库检索策略的核心逻辑位于 multiple_retrieve() 方法:

def multiple_retrieve(...):

  # 1. 一致性检查
  index_type_check = all(
    item.indexing_technique == available_datasets[0].indexing_technique
    for item in available_datasets
  )
  if not index_type_check and (not reranking_enable or reranking_mode != RerankMode.RERANKING_MODEL):
    raise ValueError("不同索引技术的知识库需要设置重排序模型")

  # 2. 为每个知识库创建检索线程
  threads = []
  all_documents: list[Document] = []
  for dataset in available_datasets:
    retrieval_thread = threading.Thread(
      target=self._retriever,
      kwargs={
        "flask_app": current_app._get_current_object(),
        "dataset_id": dataset.id,
        "query": query,
        "top_k": top_k,
        "all_documents": all_documents,  # 共享结果列表
        "document_ids_filter": document_ids_filter,
        "metadata_condition": metadata_condition,
      },
    )
    threads.append(retrieval_thread)
    retrieval_thread.start()

  # 3. 等待所有检索线程完成
  for thread in threads:
    thread.join()

  # 4. 结果融合和重排序
  if reranking_enable:
    # 开启重排序
    data_post_processor = DataPostProcessor(tenant_id, reranking_mode, reranking_model, weights, False)
    all_documents = data_post_processor.invoke(
      query=query, documents=all_documents,
      score_threshold=score_threshold, top_n=top_k
    )
  else:
    # 根据索引类型进行简单排序
    if index_type == "economy":
      all_documents = self.calculate_keyword_score(query, all_documents, top_k)
    elif index_type == "high_quality":
      all_documents = self.calculate_vector_score(all_documents, top_k, score_threshold)
    else:
      all_documents = all_documents[:top_k] if top_k else all_documents
  return all_documents

多库检索策略的流程主要分为三步:

  1. 一致性检查:确保所有知识库使用相同的索引技术,或者配置了重排序模型;
  2. 并发检索:开启多线程并发检索,每个线程检索一个知识库;根据知识库的类型调用不同的检索方法,使用 ExternalDatasetService 检索外部知识库,使用 RetrievalService 检索内部知识库,具体的逻辑和单库检索策略类似;
  3. 结果融合和排序:如果开启了重排序,通过 DataPostProcessor 对来自不同知识库的结果进行融合和排序;否则根据索引类型进行简单排序;

检索服务

RetrievalService 是 Dify 检索功能的核心服务类,支持多种检索方法的并发执行:

class RetrievalService:
  
  # 核心检索方法
  @classmethod
  def retrieve(
    cls,
    retrieval_method: str,
    dataset_id: str,
    query: str,
    top_k: int,
    score_threshold: Optional[float] = 0.0,
    reranking_model: Optional[dict] = None,
    reranking_mode: str = "reranking_model",
    weights: Optional[dict] = None,
    document_ids_filter: Optional[list[str]] = None,
  ):

    # 使用线程池并发执行不同的检索方法
    with ThreadPoolExecutor(max_workers=dify_config.RETRIEVAL_SERVICE_EXECUTORS) as executor:  # type: ignore
      futures = []
      if retrieval_method == "keyword_search":
        # 关键词检索
        futures.append(
          executor.submit(cls.keyword_search, ...)
        )
      if RetrievalMethod.is_support_semantic_search(retrieval_method):
        # 语义检索(向量检索)
        futures.append(
          executor.submit(cls.embedding_search, ...)
        )
      if RetrievalMethod.is_support_fulltext_search(retrieval_method):
        # 全文检索
        futures.append(
          executor.submit(cls.full_text_index_search, ...)
        )
      # 等待所有检索任务完成
      concurrent.futures.wait(futures, timeout=30, return_when=concurrent.futures.ALL_COMPLETED)

    # 混合检索需要后处理
    if retrieval_method == RetrievalMethod.HYBRID_SEARCH.value:
      data_post_processor = DataPostProcessor(
        str(dataset.tenant_id), reranking_mode, reranking_model, weights, False
      )
      # 重排序
      all_documents = data_post_processor.invoke(
        query=query,
        documents=all_documents,
        score_threshold=score_threshold,
        top_n=top_k,
      )

    return all_documents

Dify 支持四种不同的检索方法:关键词检索(Keyword Search)向量检索(Embedding Search)全文检索(Full-Text Search)混合检索(Hybrid Search)。其中混合检索就是同时执行向量检索和全文检索,因此这里使用线程池来并发执行,如果不是混合检索,这里实际上就是单线程。最后,针对混合检索的结果,还需要经过后处理,包括格式化、去重、重排序和阈值过滤等操作。

关键词检索

该检索方法适用于经济型索引,经济型索引使用关键词提取技术,为每个文档片段提取 10 个关键词,然后通过倒排索引进行检索。虽然准确度相对较低,但成本更加经济。其核心代码位于 Keyword.search() 方法:

class Keyword:
  def __init__(self, dataset: Dataset):
    self._dataset = dataset
    self._keyword_processor = self._init_keyword()

  def _init_keyword(self) -> BaseKeyword:
    # 根据配置初始化关键词实现,目前仅支持基于 Jieba 分词的实现
    keyword_type = dify_config.KEYWORD_STORE
    keyword_factory = self.get_keyword_factory(keyword_type)
    return keyword_factory(self._dataset)

  @staticmethod
  def get_keyword_factory(keyword_type: str) -> type[BaseKeyword]:
    match keyword_type:
      case KeyWordType.JIEBA:
        # 使用 Jieba 库提取关键词
        from core.rag.datasource.keyword.jieba.jieba import Jieba
        return Jieba
      case _:
        raise ValueError(f"Keyword store {keyword_type} is not supported.")
  
  # 执行关键词检索
  def search(self, query: str, **kwargs: Any) -> list[Document]:
    return self._keyword_processor.search(query, **kwargs)

它通过 Jieba 库的 jieba.analyse.extract_tags() 方法提取用户问题的关键词,并和文档片段中的关键词进行匹配,根据匹配度计算得分,然后按得分排序得到检索结果。

class JiebaKeywordTableHandler:
  
  def __init__(self):
    import jieba.analyse  # type: ignore
    from core.rag.datasource.keyword.jieba.stopwords import STOPWORDS
    jieba.analyse.default_tfidf.stop_words = STOPWORDS  # type: ignore

  def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> set[str]:
    # 使用 Jieba 库提取关键词
    import jieba.analyse  # type: ignore
    keywords = jieba.analyse.extract_tags(
      sentence=text,
      topK=max_keywords_per_chunk,
    )

    return set(self._expand_tokens_with_subtokens(set(keywords)))

向量检索

这是一种基于向量相似度的检索方法,适用于高质量索引。其核心代码位于 embedding_search() 方法:

@classmethod
def embedding_search(...):
  with flask_app.app_context():
    dataset = cls._get_dataset(dataset_id)
    vector = Vector(dataset=dataset)

    # 执行向量相似度搜索
    documents = vector.search_by_vector(
      query,
      search_type="similarity_score_threshold",
      top_k=top_k,
      score_threshold=score_threshold,
      filter={"group_id": [dataset.id]},
      document_ids_filter=document_ids_filter,
    )

    # 应用重排序(如果配置)
    if reranking_model and retrieval_method == RetrievalMethod.SEMANTIC_SEARCH.value:
      data_post_processor = DataPostProcessor(...)
      all_documents.extend(data_post_processor.invoke(...))
    else:
      all_documents.extend(documents)

Dify 支持多种向量数据库,通过工厂模式动态选择合适的向量存储实现,包括:

  • 开源方案:Chroma、Qdrant、Milvus、Weaviate、PGVector
  • 云服务:Pinecone、Elasticsearch、OpenSearch
  • 企业级:AnalyticDB、OceanBase、Oracle、TiDB

每种向量数据库都有对应的实现类,位于 api/core/rag/datasource/vdb/ 目录下。它们都实现了统一的向量搜索接口:

class VectorBase:

  def search_by_vector(
    self,
    query: str,
    search_type: str,
    top_k: int,
    score_threshold: Optional[float] = None,
    filter: Optional[dict] = None,
    document_ids_filter: Optional[list[str]] = None,
  ) -> list[Document]:
    # 向量相似度检索实现
    pass

全文检索

基于倒排索引的全文检索:

@classmethod
def full_text_index_search(...):
  with flask_app.app_context():
    dataset = cls._get_dataset(dataset_id)
    vector_processor = Vector(dataset=dataset)

    # 执行全文搜索
    documents = vector_processor.search_by_full_text(
      cls.escape_query_for_search(query),
      top_k=top_k,
      document_ids_filter=document_ids_filter
    )

    # 同样支持重排序
    if reranking_model and retrieval_method == RetrievalMethod.FULL_TEXT_SEARCH.value:
      data_post_processor = DataPostProcessor(...)
      all_documents.extend(data_post_processor.invoke(...))
    else:
      all_documents.extend(documents)

Dify 的全文检索也是基于向量数据库实现的,它们同样也实现了统一的全文搜索接口:

class VectorBase:

  def search_by_full_text(
    self,
    query: str,
    top_k: int,
    document_ids_filter: Optional[list[str]] = None,
  ) -> list[Document]:
    # 全文检索实现
    pass

混合检索

混合检索同时使用向量检索和全文检索,能够结合语义理解和精确匹配的优势,在大多数场景下提供更好的检索效果。

hybrid-search.png

混合检索完成后,系统需要对结果进行后处理,包括格式化、去重、重排序和阈值过滤等操作。这个过程由 DataPostProcessor 负责执行:

class DataPostProcessor:

  def invoke(...) -> list[Document]:
    if self.rerank_runner:
      documents = self.rerank_runner.run(query, documents, score_threshold, top_n, user)
    if self.reorder_runner:
      documents = self.reorder_runner.run(documents)
    return documents

Dify 将重排序分为两种:

  • 重排序(Reranker) - 这是比较常见的重排序策略,根据用户配置的重排序模式进行处理,又分为两种情况,一种是使用外部重排序模型(如 CohereJina AI 等),一种是使用权重分数融合,最后排序后的结果通过分数阈值进行过滤,并截取 Top-N 结果;
  • 重新排列(Reorder) - 这是一种特殊的文档重排序策略,首先将文档按奇偶索引分离,然后将偶数位置的文档列表进行反转,最后将奇数位置文档和反转后的偶数位置文档合并;譬如原始文档顺序是 [1,2,3,4,5,6],重排序后的结果是 [1,3,5,6,4,2];这种排序策略可以缓解大模型的位置偏见,避免总是优先考虑列表前面的文档,不过这个策略默认并没有启用,可能只是一种实验性功能。

小结

经过上面的步骤,我们最终检索出和用户问题最相关的文档,然后我们还需要将检索到的文档转换为大模型可用的上下文格式。根据不同的分段模式,处理过程略有不同:

  1. 普通分段:直接根据 index_node_id 查找对应的文档分段
  2. 父子分段:先查找子分段,再根据 segment_id 查找父分段

上下文转换完成之后,经过第二次提示词组装,最终调用大模型,完成对用户问题的回答。

下面对今天的学习内容做一个总结:

  • 元数据过滤:支持自动模式和手动模式;自动模式 通过大模型分析查询自动生成过滤条件,智能且动态;手动模式 支持复杂的过滤条件配置和变量替换,灵活性强;
  • 多层次检索策略:支持单库检索策略和多库检索策略;单库检索策略 智能选择最相关的知识库,精准度高;多库检索策略 并发检索多个知识库,覆盖面广;
  • 智能路由机制:针对单库检索策略,系统根据模型特性自动选择函数调用或 ReAct 方式;函数调用方式 利用支持工具调用的模型原生能力,准确性更高;ReAct 方式 通过结构化提示词实现推理,兼容性更好;
  • 多元化检索方法:通过检索服务,Dify 集成了多种检索技术;关键词检索 基于精确匹配,适用于经济型索引;向量检索 基于向量相似度,理解能力强;全文检索 基于倒排索引,检索效率高;混合检索 融合了向量检索和全文检索两种方法的优势,并通过重排序得到更好的检索效果;
  • 重排序优化:支持外部重排序模型和权重分数融合两种重排序策略;
  • 上下文构建:将检索结果转换为大模型友好的格式,注意父子分段的处理过程;

至此,我们已经完成了 Dify 应用会话流程中几乎所有的源码解剖,目前还差最后一步,模型调用,我们明天继续。


深入 Dify 的应用运行器之知识库检索

在构建 AI 应用时,如何让大模型能够准确回答特定领域的问题,一直是开发者面临的挑战。虽然大模型具有丰富的通用知识,但在处理企业内部文档、产品规范或专业领域的问题时,往往存在信息过时、不够准确或无法涵盖最新动态的问题。Dify 的知识库功能就是为了解决这个痛点,它利用 RAG(检索增强生成) 技术,将静态的文档内容转化为可被动态检索和使用的知识源,从而提供给大模型用于回复用户的问题。

Dify 的所有应用类型都支持关联知识库。对于聊天应用和文本生成应用,我们之前在应用运行器的 run() 方法中已经看到,第二次提示词组装时,会将知识库检索内容作为上下文喂给大模型;对于智能体应用,知识库的使用稍有不同,它是作为工具提供给大模型动态调用的;而对于工作流应用,知识库则是一个独立的检索组件,可以灵活地被编排在应用流程中。

创建知识库

无论是哪种情况,我们首先得创建一个知识库。在 Dify 平台顶部导航中点击 “知识库”,然后点击 “创建知识库” 即可开始。Dify 支持三种方式来创建知识库:

  1. 导入已有文本:支持批量上传多种格式的文档,包括 TXT、Markdown、DOCX、HTML、JSON、PDF、CSV、Excel 等;
  2. 同步自 Notion 内容:支持将 Notion 内容导入到知识库,同步 Notion 内容前,须先绑定 Notion 空间;
  3. 同步自 Web 站点:支持 Jina ReaderFirecrawlWaterCrawl 等网页内容提取工具,抓取 Web 站点的内容到知识库;

dataset-create.png

另外,Dify 还支持通过 API 和知识库 ID 连接到外部知识库,比如 AWS Bedrock 知识库LlamaCloud 知识库 等,感兴趣的同学可以参考官方文档自行尝试。

dataset-connect.png

我们这里就以 Dify 文档中的 术语表 页面为例,演示下 Dify 知识库的基本用法。

分段设置

由于大语言模型的上下文窗口有限,无法一次性处理整个知识库的内容,因此需要对文档中的长文本进行分段处理。即使部分大模型已支持上传完整的文档文件,实验表明检索效率依然弱于检索单个内容分段。

合理的分段大小非常关键,它能够帮助模型准确地找到与问题最相关的内容,减少噪音信息。过大或过小的分段都可能影响召回的效果。

Dify 提供了两种分段模式,以适应不同类型的文档结构和应用场景:

通用模式

系统按照用户自定义的规则将内容拆分为独立的分段。当用户输入问题后,系统自动分析关键词,并计算与知识库中各内容分段的相关度,选取最相关的内容分段发送给大模型。

chunk-setting-general.png

在通用模式下,你可以配置以下设置:

  • 分段标识符:用于文本分割的字符,默认值为 \n\n,即按照文章段落进行分段;
  • 分段最大长度:指定分段内的文本字符数最大上限,超出该长度时将强制分段,默认值为 1024 Tokens,最大上限为 4000 Tokens;
  • 分段重叠长度:设置分段之间的重叠长度可以保留分段之间的语义关系,建议设置为最大分段长度的 10%-25%,有助于提高召回效果;

为了保证知识库质量,Dify 还提供了两种文本预处理规则,用于过滤知识库中部分无意义的内容:

  • 替换连续的空格、换行符和制表符
  • 删除所有 URL 和电子邮件地址

父子模式

父子模式采用双层分段结构,在精确度和上下文信息之间取得平衡。它包含两个层次:

  • 父区块:保持较大的文本单位(如段落、章节甚至整个文档),提供丰富的上下文信息
  • 子区块:较小的文本单位(如句子),用于精确检索,能更加精准地匹配用户所输入的问题

父子模式的配置包括父块的配置和子块的配置,如下所示:

chunk-setting-parent-child.png

其工作原理是:系统首先通过子区块进行精确检索以确保相关性,然后获取对应的父区块来补充上下文信息,从而在生成响应时既保证准确性又能提供完整的背景信息。

父子模式的优势在于:

  • 子分段能精准匹配用户问题
  • 父分段提供完整的背景信息
  • 检索效果优于传统的单层检索方式

Q&A 分段

在通用分段模式下,还有一个 Q&A 模式,开启该模式后,系统首先会对已上传的文本进行分段,然后自动为每个分段生成 Q&A 对。与常见的 Q2P(用户问题匹配文本段落) 策略不同,Q&A 模式采用 Q2Q(问题匹配问题) 策略。

q2p-vs-q2q.jpg

当用户提问时,系统会找出与之最相似的问题,然后返回对应的分段作为答案。这种方式更加精确,因为它直接针对用户问题进行匹配,可以更准确地帮助用户检索真正需要的信息。

注意,Q&A 模式要选择语言,生成对应语言的问题,启用该模式后会消耗更多的 Tokens,并且无法使用经济型索引方法。

索引方式

选定内容的分段模式后,接下来需要设置知识库的索引方法与检索设置。Dify 提供了两种索引方法:

高质量索引

使用 Embedding 嵌入模型将分段的文本块转换为数字向量,帮助更有效地压缩与存储大量文本信息,使得用户问题与文本之间的匹配更加精准。

index-setting-high.png

高质量索引支持 向量检索全文检索混合检索 三种检索设置。

经济索引

在经济模式下,每个区块使用 10 个关键词进行检索,降低了准确度但无需产生费用。仅提供 倒排索引 方式选择最相关的区块。

检索设置

不用的索引方式支持的检索设置也不同,高质量索引支持 向量检索全文检索混合检索 三种,经济索引仅支持 倒排索引 一种。

向量检索

将用户问题向量化,查询知识库中向量距离与之最接近的文本分段,也就是最相似的内容。

search-setting-vector.png

其配置参数有:

  • Rerank 模型:开启后将使用第三方 Rerank 模型再一次重排序由向量检索召回的内容分段,以优化排序结果;
  • TopK:用于筛选与用户问题相似度最高的文本片段,系统同时会根据模型的上下文窗口大小动态调整片段数量;
  • Score 阈值:用于设置文本片段筛选的相似度阈值,只召回超过设置分数的文本片段;

全文检索

也被称为 关键词检索,即索引文档中的所有词汇,用户输入问题后,通过关键词匹配知识库内对应的文本片段,返回符合关键词的文本片段。

search-setting-fulltext.png

混合检索

同时执行全文检索和向量检索,从查询结果中选择匹配用户问题的最佳结果。

search-setting-hybrid.png

Dify 支持两种混合模式:

  • 权重设置:允许用户设置语义优先(向量检索)和关键词优先(关键词检索)的权重,可以不断调试二者的权重,找到符合业务场景的最佳比例;
  • Rerank 模型:开启后将使用第三方 Rerank 模型再一次重排序由混合检索召回的内容分段,以优化排序结果;

倒排索引

倒排索引是一种用于快速检索文档中关键词的索引结构,常用于在线搜索引擎。倒排索引仅支持 TopK 设置项。

search-setting-inverted-index.png

在应用中集成知识库

至此,我们的 “术语表” 知识库已经创建完毕,接下来,我们要将该知识库集成到我们的 AI 应用中。我们进入工作台,创建一个聊天助手应用,取名为 “翻译专家”:

apps-chat.png

虽然大模型已经具备不错的翻译能力,但在遇到专业术语或领域知识时,往往词不达意,比如我们问 ToT 是什么意思?

apps-chat-debug-prev.png

很显然它不知道我们要问的是其实是思维树的缩写,为解决这个问题,我们可以在知识库设置部分点击 “添加” 按钮,引用刚刚创建的 “术语表” 知识库,一个应用可以引用多个知识库;再点击 “召回设置” 配置检索方式:

apps-chat-kb-setting.png

这个设置页面和上面的混合检索很像,只不过这里使用多路召回,系统会从多个知识库中检索知识,然后通过 Rerank 策略找到最适合的内容。

我们保持默认参数即可,然后在右侧的调试面板进行验证:

apps-chat-debug.png

这次大模型就能正常回答我们的问题了。

引用和归属

在应用的功能选项中有一个 “引用和归属” 开关,默认是开启的:

apps-chat-ref.png

当助手回答用户问题后,若涉及已关联的知识库文档,将将回复内容下方标注引用来源,用户可查看到具体的引用段落信息,包括原始分段文本、分段序号、匹配度等:

apps-chat-debug-ref.png

点击引用分段上方的 “跳转至知识库”,可以快捷访问该分段所在的知识库分段列表,方便开发者进行调试编辑。

元数据过滤

Dify 的知识库支持元数据功能,我们可以在知识库的管理界面,创建、修改和删除元数据字段(比如标签、类别、作者、时间等),并设置元数据的值:

dify-kb-metadata.png

然后我们就可以在应用中使用这些元数据对文档进行精确过滤。一共有三种模式:

  • 禁用模式:不使用元数据过滤
  • 自动模式:系统根据用户问题自动生成过滤条件
  • 手动模式:用户手动配置过滤条件

apps-chat-kb-metadata.png

通过元数据,我们可以:

  • 提升搜索效率:用户可以根据元数据标签快速筛选和查找相关信息,节省时间并提高工作效率
  • 增强数据安全性:通过元数据设置访问权限,确保只有授权用户能访问敏感信息,保障数据的安全性
  • 优化数据管理能力:元数据帮助企业或组织有效分类和存储数据,提高数据的管理和检索能力,增强数据的可用性和一致性
  • 支持自动化流程:元数据在文档管理、数据分析等场景中可以自动触发任务或操作,简化流程并提高整体效率

未完待续

Dify 的知识库功能为开发者提供了一套完整的 RAG 解决方案,从文档上传、分段处理到检索配置,通过可视化的界面,开发者可以轻松创建和管理知识库,从而让 AI 应用能够访问实时、准确的外部知识,提升回答的质量和可信度。

今天主要是对 Dify 的知识库功能走马观花地过了一遍,对其有一个整体感性的认识,明天我们将继续深入源码,重点看下知识库检索这块的实现。


深入 Dify 的应用运行器之外部数据扩展

在创建 Dify 应用时,我们可以在提示词中嵌入用户自定义变量,提高应用的灵活性和功能性。比如在之前的文本生成应用中,我们使用 {{lang}}{{query}} 两个变量,实现了一个简单的中英互译小助手:

text-generator-config.png

Dify 支持文本、段落、下拉选项、数字和复选框等字段类型:

apps-gen-variables.png

除此之外,它还支持以 API 扩展的方式创建 基于 API 的变量

今天我们就来学习下 Dify 中与扩展相关的内容。

使用 API 扩展

首先我们进入 “设置” 页面,打开 “API 扩展” 菜单:

setting-api-extension.png

点击 “新增” 按钮,添加已开发好的 API 扩展:

setting-api-extension-add.png

然后创建一个 “天气查询” 应用,类型为文本生成:

apps-gen.png

我们为该应用添加两个变量:

  • city - 城市名称,字段类型为文本
  • weather_result - 天气查询结果,字段类型为基于 API 的变量,选择上一步添加的 API 扩展

apps-gen-variable-api.png

接着编写如下提示词:

根据下面的查询结果:
{{weather_result}} 

回答问题:
今天 {{city}} 的天气怎么样?

一个简单的天气查询助手就开发好了:

apps-gen-test.png

可以看出,Dify 会自动调用 API 扩展的接口,并将获取到的外部数据组装至提示词中作为大模型的额外信息。

开发 API 扩展

接下来,让我们来看下这个 API 扩展是如何实现的?

首先我们要明确下扩展的概念,不同于 Dify 的插件机制,Dify 的 API 扩展主要针对这两个模块:

  • external_data_tool 外部数据工具
  • moderation 敏感内容审计

Dify 的 API 扩展遵循一定的规范,它会按照下面的格式调用你填写的接口地址:

POST {Your-API-Endpoint}

Authorization: Bearer {api_key}
Content-Type: application/json

{
  "point":  string, //  扩展点,不同模块可能包含多个扩展点
  "params": {
    ...  // 各模块扩展点传入参数
  }
}

其中 point 为扩展点,params 为不同扩展点对应的传入参数。Dify 支持下面几类不同的扩展点:

  • ping:测试接口,在添加 API 扩展时通过该扩展点验证接口的可用性;当 API 接收到 point=ping 时,接口应返回 result=pong 固定值;
  • app.external_data_tool.query:应用外部数据工具查询扩展点,将用户传入的变量和对话内容作为参数,传给 API;开发者需要实现对应工具的查询逻辑,并返回字符串类型的查询结果;
  • app.moderation.input:输入内容审查扩展点,用于审查用户传入的变量以及对话内容;
  • app.moderation.output:输出内容审查扩展点,用于审查大模型输出的内容,当输出为流式时,输出的内容将以 100 个字符为一个批次进行请求 API;

要实现基于 API 的变量,其实就是实现 app.external_data_tool.query 扩展点。该扩展点的输入参数如下:

{
  "point": "app.external_data_tool.query", // 扩展点类型,此处固定为 app.external_data_tool.query
  "params": {
    "app_id": string,  // 应用 ID
    "tool_variable": string,  // 外部数据工具变量名称,表示对应变量工具调用来源
    "inputs": {  // 用户传入的变量,key 为变量名,value 为变量值
      "var_1": "value_1",
      "var_2": "value_2",
      ...
    },
    "query": string | null  // 用户当前的对话内容
  }
}

该扩展点直接输出一个 result 字符串即可:

{
  "result": string
}

天气查询示例

这里是一个简单的外部数据工具示例,场景为根据城市获取天气信息作为上下文。创建 main.py 文件:

import json
from fastapi import FastAPI, HTTPException, Header

app = FastAPI()

@app.post("/api/weather")
async def query_weather(data: dict, authorization: str = Header(None)):
    
  # 简单鉴权
  expected_api_key = "123456"
  auth_scheme, _, api_key = authorization.partition(' ')
  if auth_scheme.lower() != "bearer" or api_key != expected_api_key:
    raise HTTPException(status_code=401, detail="Unauthorized")

  print("接受到请求:" + json.dumps(data))
  
  # 扩展点
  point = data["point"]
  
  if point == "ping":
    # 测试
    return {
      "result": "pong"
    }

  if point == "app.external_data_tool.query":
    # 外部数据扩展
    return handle_app_external_data_tool_query(params=data["params"])
  
  raise HTTPException(status_code=400, detail="Not implemented")

def handle_app_external_data_tool_query(params: dict):
  # 模拟天气查询接口
  inputs = params.get("inputs")
  if inputs.get("city") == "合肥":
    return {
      "result": "天气晴,西北风,温度10-24摄氏度"
    }
  else:
    return {
      "result": "未知城市"
    }

代码比较简单,按照接口规范编写即可,它实现了 pingapp.external_data_tool.query 两个扩展点,并配置了 API Key 为 123456。这段代码基于 Python 的 FastAPI 框架,因此需要安装对应的依赖:

$ pip install fastapi[all] uvicorn

然后通过 uvicorn 启动 API 服务:

$ uvicorn main:app --reload --host 0.0.0.0

默认端口为 8000,通过 curl 验证之:

$ curl 'http://127.0.0.1:8000/api/weather' \
  -H 'Authorization: Bearer 123456' \
  -H 'Content-Type: application/json' \
  -d '{
    "point": "ping"
  }'

验证 OK 后,就可以在 Dify 配置页面添加该 API 扩展,然后在应用中选择它。调试时,通过日志可以看到,Dify 发送的请求内容如下:

{
  "point": "app.external_data_tool.query",
  "params": {
    "app_id": "6187c87a-9495-4412-8d22-0a11ec409376",
    "tool_variable": "weather_result",
    "inputs": {
      "city": "合肥"
    },
    "query": ""
  }
}

内容审核扩展

我们在上一篇文章中学习过,Dify 支持多种内容审核方式,除了 OpenAI Moderation 和关键词审核,它还支持 API 扩展方式来实现更加定制化的内容审核策略:

moderation-api-extension.png

当开启输入内容审查时,Dify 会给相应的 API 扩展发送 HTTP POST 请求,我们必须实现 app.moderation.input 扩展点:

{
  "point": "app.moderation.input", // 扩展点类型,此处固定为 app.moderation.input
  "params": {
    "app_id": string,  // 应用 ID
    "inputs": {  // 用户传入的变量,key 为变量名,value 为变量值
      "var_1": "value_1",
      "var_2": "value_2",
      ...
    },
    "query": string | null  // 用户当前的对话内容
  }
}

接口返回需满足如下规范:

{
  "flagged": bool,  // 是否违反校验规则
  "action": string, // 动作,direct_output 直接输出预设回答; overridden 覆写传入变量值
  "preset_response": string,  // 预设回答(仅当 action=direct_output 返回)
  "inputs": {  // 覆写用户传入的变量,key 为变量名,value 为变量值(仅当 action=overridden 返回)
    "var_1": "value_1",
    "var_2": "value_2",
    ...
  },
  "query": string | null  // 覆写用户的对话内容。(仅当 action=overridden 返回)
}

当开启输出内容审查时,我们必须实现 app.moderation.output 扩展点:

{
  "point": "app.moderation.output", // 扩展点类型,此处固定为 app.moderation.output
  "params": {
    "app_id": string,  // 应用 ID
    "text": string  // 大模型的回答内容。当输出为流式时,此处为 100 字为一个分段的内容。
  }
}

接口返回需满足如下规范:

{
  "flagged": bool,  // 是否违反校验规则
  "action": string, // 动作,direct_output 直接输出预设回答; overridden 覆写传入变量值
  "preset_response": string,  // 预设回答(仅当 action=direct_output 返回)
  "text": string  // 覆写大模型的回答内容。(仅当 action=overridden 返回)
}

代码扩展

对于在本地部署 Dify 的开发人员来说,使用 API 扩展还是过于麻烦了,针对每个 API 扩展都要独立部署对应的 HTTP 服务。因此 Dify 还提供了另一种扩展方式 —— 代码扩展,通过代码扩展,你可以在不破坏 Dify 原始代码逻辑的情况下,以代码形式扩展或增强程序的功能。

代码扩展也支持外部数据工具和敏感内容审核两个模块,我们只需要在对应模块下添加代码,遵循一定的接口规范,从而实现平台的横向扩展。

还是以天气查询扩展为例,我们可以在 api/core/external_data_tool 目录下新建相关的目录和文件:

api/core/external_data_tool/weather_search/
├── __init__.py
├── weather_search.py
└── schema.json

其中 schema.json 文件用于定义前端组件的样式:

{
  "label": {
    "en-US": "Weather Search",
    "zh-Hans": "天气查询"
  },
  "form_schema": [
    {
      "type": "select",
      "label": {
        "en-US": "Temperature Unit",
        "zh-Hans": "温度单位"
      },
      "variable": "temperature_unit",
      "required": true,
      "options": [
        {
          "label": {"en-US": "Celsius", "zh-Hans": "摄氏度"},
          "value": "celsius"
        },
        {
          "label": {"en-US": "Fahrenheit", "zh-Hans": "华氏度"},
          "value": "fahrenheit"
        }
      ],
      "default": "celsius"
    }
  ]
}

weather_search.py 文件为具体的扩展实现,外部工具必须继承 ExternalDataTool 类:

class WeatherSearch(ExternalDataTool):
  # 天气查询外部数据工具
  # 注意:name 必须与目录名和文件名保持一致
  name: str = "weather_search"

  @classmethod
  def validate_config(cls, tenant_id: str, config: dict) -> None:
    # 验证配置的有效性
    if not config.get('temperature_unit'):
      raise ValueError('temperature unit is required')

  def query(self, inputs: dict, query: Optional[str] = None) -> str:
    # 执行天气查询
    city = inputs.get('city')
    temperature_unit = self.config.get('temperature_unit')

    # 模拟天气 API 调用
    if temperature_unit == 'fahrenheit':
      return f'Weather in {city} is 32°F'
    else:
      return f'Weather in {city} is 0°C'

重启 Dify 应用,在添加基于 API 的变量时,就可以看到我们自定义的 “天气查询” 变量类型:

apps-gen-variable-api-2.png

内容审核扩展和外部数据工具的实现基本类似,我们可以在 api/core/moderation 目录下新建相关的目录和文件,定义前端样式,实现审核接口,具体的内容此处不再展开,感兴趣的朋友可参考官方的文档:

当内容审核扩展开发就绪后,在 “内容审查” 的设置页面会多一个选项:

moderation-code-extension.png

小结

我们今天主要学习了 Dify 的扩展机制在 外部数据工具敏感内容审计 两个模块的应用,通过 API 扩展和代码扩展,开发者可以实现自定义的应用逻辑,创造高定制化的应用解决方案。

回顾 CompletionAppRunnerrun() 方法,在第一次提示词组装和输入内容审核之后,接着就是外部数据的填充,通过扩展机制从外部数据源获取额外信息,动态地补充到应用的输入参数中,从而重新组装出最终的提示词。在第二次提示词的组装中,除了外部数据,还有另一个重要部分,那就是知识库的检索结果,我们明天将继续学习这一部分。