一、Project Reactor 简介
// 以下是 import 部分
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
public class ReactorDemo {
public static void main(String[] args) throws InterruptedException {
Flux
flux = Flux.just("Hello", "World", "Reactor");
flux.subscribe(System.out::println);
}
}
Project Reactor 是一个基于 Reactive Stream API 之上的 reactive 库。Reactive Stream 是一个规范,用于通讯无阻塞背压异步流的交互。Project Reactor 实现了 Reactive Stream API 规范,同时针对 Java 8 的特性也进行了进一步的扩展,支持 Reactor 的 Mono 和 Flux API,让使用者能够方便地使用 Java 8 的 Stream 相关 API 进行数据处理。
Reactor 的核心构建模块为 Mono 和 Flux,该模块提供了类似于Java Stream 的编程模型,支持生成、处理序列数据。其中 Flux 表示的是包含 0 到 N 个数据元素的异步序列,而 Mono 则表示包含 0 或者 1 个数据元素的异步序列,也就是可以返回数据也可以返回值为空,同时在底层实现上,这两个类也提供了流式操作 API,例如:map、filter、reduce 等。
以下是 Flux 的示例代码:
Flux.just("Hello", "World", "Reactor")
.map(s -> s.concat(", "))
.zipWith(Flux.range(1, Integer.MAX_VALUE), (s, index) -> index + ". " + s)
.subscribe(System.out::print);
// Output: 1. Hello, 2. World, 3. Reactor,
在上述代码中,我们先通过 Flux.just 初始化了一个有三个元素的 Flux 对象,接着使用 map 进行字符串拼接,通过 zipWith 将两个 Flux 对象先后对应地组合在一起,最后通过 subscribe 方法将结果输出。
二、Project Reactor 编程模型
1. Flux 模型
Flux 表示包含 0 到 N 个数据元素的异步序列,可以用于推送数据到一个订阅者流中。Flux 支持两种方式的数据推送:订阅和异步执行。当数据源被订阅时,Flux 将按照订阅者指定的方式推送数据。当没有订阅者时,数据源不会进行推送。异步执行时,Flux 采用 Reactor 提供的 Schedulers 线程调度进行异步处理。
以下是 Flux 的基本用法:
// 以下是 import 部分
import reactor.core.publisher.Flux;
public class FluxDemo {
public static void main(String[] args) {
Flux.just("Hello", "World", "Reactor")
.subscribe(System.out::println);
}
}
这段代码演示了如何在控制台打印 Flux 数据流中的元素。在 subscribe 方法执行后,Flux 将立即开始推送在 just 方法中指定的数据。
2. Mono 模型
Mono 则表示一个包含 0 或 1 个数据元素的异步序列,同样也可以用于推送数据到一个订阅者流中。它和 Flux 唯一的区别在于数据元素的数量。
以下是 Mono 的基本用法:
// 以下是 import 部分
import reactor.core.publisher.Mono;
public class MonoDemo {
public static void main(String[] args) {
Mono.just("Hello Reactor")
.subscribe(System.out::println);
}
}
这段代码演示了如何在控制台打印 Mono 数据流中的元素。在 subscribe 方法执行后,Mono 将立即开始推送在 just 方法中指定的数据。
三、Project Reactor 批处理
当需要对数据流进行批处理时,可以使用 buffer 操作符将多个元素合并到一个 List 中,形成一个批次,然后把这个 List 推给下游流。如下所示:
// 以下是 import 部分
import reactor.core.publisher.Flux;
public class BufferDemo {
public static void main(String[] args) {
Flux.range(1, 10)
.buffer(3)
.subscribe(System.out::println);
}
}
这个例子的中 buffer(3) 操作符创建了一个可以包含 3 个元素的 List。Flux 已经发出了 10 个元素,因此在推送完所有元素之前,将生成三个 List。输出如下:
[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
[10]
四、Project Reactor 选择线程
Reactor 提供了众多的线程池支持,以便于让开发者可以自定义线程池,通过配置不同的线程池来实现多线程编程。以下是官网提供的线程选择策略,针对不同的场景可以选择不同的策略:
- boundedElastic:一个支持生命周期的线程池(回收线程),默认可以使用 Integer.MAX_VALUE 个线程,其中每个线程都是预定义的,以保证线程可以在几纳秒内就可以被重用。此线程池适用于执行计算密集型任务,如单独线程的映射,同步 IO 或 CPU 绑定的阻塞。此线程池不支持任务队列,每次提交任务都会立即执行。
- elastic:使用无界队列的线程池,每次提交任务都会创建一个新的线程。此线程池适用于执行 CPU 密集型任务,因为这样可以保证线程不会被阻塞。
- single:使用一个线程处理任务,适用于执行多个任务中的串行计算,并使得任务在相同的线程上运行。此线程池适用于执行串行计算,如 Stream -> Stream。
- immediate:直接在订阅线程上执行任务。
以下是使用 elastic 线程池的代码示例:
// 以下是 import 部分
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class SchedulerDemo {
public static void main(String[] args) {
Flux.just("Hello", "World", "Reactor")
.publishOn(Schedulers.elastic())
.map(String::toUpperCase)
.subscribe(System.out::println);
}
}
在上述示例中,通过 publishOn 方法选择 Schedulers.elastic() 线程池,使用其线程池进行数据处理,然后通过 map 方法将数据进行处理、转化为大写,最后输出结果。
五、Project Reactor 反应式API
Project Reactor 提供了 Flux 和 Mono 类,这些类提供了一组反应式的 API,用于处理异步数据流,并提供了相关的方法进行数据处理。
以下代码示例使用反应式 API 计算 Stream 数据的平均值:
// 以下是 import 部分
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.Stream;
public class ReactiveAPIDemo {
public static void main(String[] args) {
Flux
intervalFlux = Flux.interval(Duration.ofSeconds(1));
Stream
inputStream = Stream.of(5.0f, 6.0f, 7.0f);
Flux.fromStream(inputStream)
.zipWith(intervalFlux, (i, t) -> i)
.buffer(3)
.map(buf -> {
float sum = buf.stream().reduce(Float::sum).orElse(0f);
return sum / buf.size();
})
.subscribe(System.out::println);
}
}
在上面的代码示例中我们首先创建了自动发出递增长整数序列的 Flux,接着使用 zipWith 操作符将其和 Stream 数据进行组合,每 3 个组成一组。最后,我们使用 map 函数计算组的平均值,并将组对象返回。最终输出结果如下:
6.0
六、结语
通过本文介绍,我们已经学习了 Project Reactor 的核心构建模块 Flux 和 Mono 的使用方法,以及 Project Reactor 的编程模型和批处理。通过调整选择线程池的位置可以控制代码的并发程度,提高代码的处理效率。同时通过之前的示例我们也学习了反应式 API 的使用,我们可以在项目中通过这个 API 实现多线程异步计算,优化项目性能,提高代码质量和可读性。