响应式编程详解,带你熟悉Reactor响应式编程
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了响应式编程详解,带你熟悉Reactor响应式编程相关的知识,希望对你有一定的参考价值。
文章目录
- 一、什么是响应式编程
- 1、Java的流和响应式流
- 2、Java中响应式的使用
- 3、Reactor中响应式流的基本接口
- 4、Reactor中响应式接口的基本使用
- 二、初始Reactor
- 1、Flux和Mono的基本介绍
- 2、引入Reactor依赖
- 3、响应式类型的创建
- 4、响应式类型的组合
- (1)使用mergeWith合并响应式流
- (2)使用zip压缩合并响应式流
- (3)使用zip压缩合并为自定义对象的响应式流
- (4)选择第⼀个反应式类型进⾏发布
- 5、转换和过滤反应式流
- (1)skip操作跳过指定数⽬的消息
- (2)skip()操作的另⼀种形式
- (3)take操作只发布第⼀批指定数量的数据项
- (4)take操作的另一种形式
- (5)filter操作自定义过滤条件
- (6)distinct操作去重
- (7)map操作映射新元素
- (8)flatMap将流转成新的流
- (9)buffer操作现将数据流拆分为小块
- (10)collectList操作也可以将所有数据收集到一个List
- (11)collectMap 操作产生⼀个发布Map的Mono
- 6、在反应式类型上执行逻辑操作
- (1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件
- (2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件
- 7、在反应式类型上使用Subscriber订阅
- (1)使用Subscriber消费消息
- (2)使用Flux的doOnNext处理数据
- 8、使用then来处理完成数据返回
- 写在后面
一、什么是响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
在开发应⽤程序代码时,我们可以编写两种⻛格的代码,即命令式和响应式。
命令式(Imperative)的代码:它由⼀组任务组成,每次只运⾏⼀项任务,每项任务⼜都依赖于前⾯的任务。数据会按批次进⾏处理,在前⼀项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下⼀项处理任务。
响应式(Reactive)的代码:它定义了⼀组⽤来处理数据的任务,但是这些任务可以并⾏地执⾏。每项任务处理数据的⼀部分⼦集,并将结果交给处理流程中的下⼀项任务,同时继续处理数据的另⼀部分⼦集。
Reactor 是⼀个响应式编程库,同时也是Spring家族的⼀部分。它是Spring 5反应式编程功能的基础。
1、Java的流和响应式流
Java的Stream流通常都是同步的,并且只能处理有限的数据集。从本质上来说,它们只是使⽤函数来对集合进⾏迭代的⼀种⽅式。
响应式流⽀持异步处理任意⼤⼩的数据集,同样也包括⽆限数据集。只要数据就绪,它们就能实时地处理数据,并且能够通过回压来避免压垮数据的消费者。
2、Java中响应式的使用
JDK1.8时,是基于Observer/Observable接口而实现的观察者模式:
ObserverDemo observer = new ObserverDemo();
// 添加观察者
observer.addObserver(new Observer()
@Override
public void update(Observable o, Object arg)
System.out.println("发生了变化");
);
observer.addObserver(new Observer()
@Override
public void update(Observable o, Object arg)
System.out.println("收到了通知");
);
observer.setChanged(); // 数据变化
observer.notifyObservers(); // 通知
JDK9及以后,Observer/Observable接口就被弃用了,取而代之的是Flow类:
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class FlowDemo
public static void main(String[] args) throws Exception
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher, 它实现了 Publisher 接口
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定义订阅者
Subscriber<Integer> subscriber = new Subscriber<Integer>()
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription)
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
@Override
public void onNext(Integer item)
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
try
TimeUnit.SECONDS.sleep(3);
catch (InterruptedException e)
e.printStackTrace();
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
@Override
public void onError(Throwable throwable)
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
;
// 3. 发布者和订阅者 建立订阅关系
publiser.subscribe(subscriber);
// 4. 生产数据, 并发布
// 这里忽略数据生产过程
for (int i = 0; i < 1000; i++)
System.out.println("生成数据:" + i);
// submit是个block方法
publiser.submit(i);
// 5. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
// debug的时候, 下面这行需要有断点
// 否则主线程结束无法debug
System.out.println();
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
/**
* 带 process 的 flow demo
*/
/**
* Processor, 需要继承SubmissionPublisher并实现Processor接口
*
* 输入源数据 integer, 过滤掉小于0的, 然后转换成字符串发布出去
*/
class MyProcessor extends SubmissionPublisher<String>
implements Processor<Integer, String>
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription)
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
@Override
public void onNext(Integer item)
// 接受到一个数据, 处理
System.out.println("处理器接受到数据: " + item);
// 过滤掉小于0的, 然后发布出去
if (item > 0)
this.submit("转换后的数据:" + item);
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
@Override
public void onError(Throwable throwable)
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关闭发布者
this.close();
public class FlowDemo2
public static void main(String[] args) throws Exception
// 1. 定义发布者, 发布的数据类型是 Integer
// 直接使用jdk自带的SubmissionPublisher
SubmissionPublisher<Integer> publiser = new SubmissionPublisher<Integer>();
// 2. 定义处理器, 对数据进行过滤, 并转换为String类型
MyProcessor processor = new MyProcessor();
// 3. 发布者 和 处理器 建立订阅关系
publiser.subscribe(processor);
// 4. 定义最终订阅者, 消费 String 类型数据
Subscriber<String> subscriber = new Subscriber<String>()
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription)
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
@Override
public void onNext(String item)
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
@Override
public void onError(Throwable throwable)
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
;
// 5. 处理器 和 最终订阅者 建立订阅关系
processor.subscribe(subscriber);
// 6. 生产数据, 并发布
// 这里忽略数据生产过程
publiser.submit(-111);
publiser.submit(111);
// 7. 结束后 关闭发布者
// 正式环境 应该放 finally 或者使用 try-resouce 确保关闭
publiser.close();
// 主线程延迟停止, 否则数据没有消费就退出
Thread.currentThread().join(1000);
3、Reactor中响应式流的基本接口
响应式流规范可以总结为4个接⼝:Publisher、Subscriber、Subscription和Processor。
Publisher负责⽣成数据,并将数据发送给 Subscription(每个Subscriber对应⼀个Subscription)。
public interface Publisher<T>
// Publisher接⼝声明了⼀个⽅法 subscribe(),Subscriber可以通过该⽅法向 Publisher发起订阅。
public void subscribe(Subscriber<? super T> s);
⼀旦Subscriber订阅成功,就可以接收来⾃Publisher的事件。
public interface Subscriber<T>
// Subscriber的第⼀个事件是通过对 onSubscribe()⽅法的调⽤接收的。
public void onSubscribe(Subscription s);
// 每个数据项都会通过该方法处理
public void onNext(T t);
// 异常处理
public void onError(Throwable t);
// 结束
public void onComplete();
Publisher调⽤ onSubscribe() ⽅法时,会将Subscription对象传递给 Subscriber。
通过Subscription,Subscriber可以管理其订阅情况:
public interface Subscription
// Subscriber可以通过调⽤ request()⽅法来请求 Publisher 发送数据,可以传⼊⼀个long类型的数值以表明它愿意接受多少数据
// 这也是回压能够发挥作⽤的地⽅,以避免Publisher 发送多于 Subscriber能够处理的数据量
public void request(long n);
// 调⽤ cancel()⽅法表明它不再对数据感兴趣并且取消订阅
public void cancel();
Subscriber 请求数据之后,数据就会开始流经响应式流,调用onNext方法。
Processor接⼝,它是Subscriber和Publisher的组合:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R>
4、Reactor中响应式接口的基本使用
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
public class ReactorDemo
public static void main(String[] args)
// reactor = jdk8 stream + jdk9 reactive stream
// Mono 0-1个元素
// Flux 0-N个元素
String[] strs = "1", "2", "3" ;
// 2. 定义订阅者
Subscriber<Integer> subscriber = new Subscriber<Integer>()
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription)
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
@Override
public void onNext(Integer item)
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
try
TimeUnit.SECONDS.sleep(3);
catch (InterruptedException e)
e.printStackTrace();
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
@Override
public void onError(Throwable throwable)
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
;
// 这里就是jdk8的stream
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// 最终操作
// 这里就是jdk9的reactive stream
.subscribe(subscriber);
二、初始Reactor
1、Flux和Mono的基本介绍
Reactor中有两个核心类,Mono和Flux。Flux和Mono是Reactor提供的最基础的构建块,⽽这两种响应式类型所提供的操作符则是组合使⽤它们以构建数据流动管线的黏合剂。
这两个类实现接口Publisher,提供丰富操作符。Flux对象实现发布者,返回N个元素Mono实现发布者,返回0或者1个元素。
Flux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号:元素值、错误信号、完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
Flux和Mono共有500多个操作,这些操作都可以⼤致归类为:创建操作;组合操作;转换操作;逻辑操作。
注意!Mono和Flux的很多操作是相同的,只不过对应的数据数量不同,所以本文更多的操作都是基于Flux的,Mono也同理。
2、引入Reactor依赖
需要引入reactor-core核心包和测试包。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.x.x</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.x.x</version>
<scope>test</scope>
</dependency>
3、响应式类型的创建
Reactor提供了多种创建Flux和Mono的操作。
// 使⽤Flux或Mono上的静态 just()⽅法来创建⼀个响应式类型
Mono.just(1);
Flux<String> fruitFlux = Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 调用just或其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。
// 添加一个订阅者,subscribe的方法参数相当于是一个Consumer
fruitFlux.subscribe(
f -> System.out.println("Heres some fruit: " + f)
);
// 根据集合创建
String[] fruits = new String[]
"Apple", "Orange", "Grape", "Banana", "Strawberry" ;
Flux<String> fruitFlux2 = Flux.fromArray(fruits);
List<String> list = Arrays.asList(fruits);
Flux.fromIterable(list); // 集合
Stream<String> stream = list.stream();
Flux.fromStream(stream); // stream流
// 根据区间创建1-5
Flux<Integer> intervalFlux =
Flux.range(1, 5);
intervalFlux.subscribe(
f -> System.out.println("data is :" + f)
);
// 每秒发布⼀个值的Flux,通过interval()⽅法创建的Flux会从0开始发布值,并且后续的条⽬依次递增。
// 因为interval()⽅法没有指定最⼤值,所以它可能会永远运⾏。我们也可以使⽤take()⽅法将结果限制为前5个条⽬。
Flux<Long> intervalFlux2 =
Flux.interval(Duration.ofSeconds(1))
.take(5);
intervalFlux2.subscribe(
f -> System.out.println("data2 is :" + f)
);
// 阻塞,等待结果
Thread.sleep(100000);
4、响应式类型的组合
(1)使用mergeWith合并响应式流
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250)) // 订阅后250毫秒后开始发布数据
.delayElements(Duration.ofMillis(500)); // 每500毫秒发布⼀个数据
// 使⽤mergeWith()⽅法,将两个Flux合并,合并过后的Flux数据项发布顺序与源Flux的发布时间⼀致
// Garfield Lasagna Kojak Lollipops Barbossa Apples
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
mergedFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
我们发现,使用mergeWith合并过的两个FLux,并没有严格意义上的先后之分,谁产生了数据就接着消费,与同一个无异。
(2)使用zip压缩合并响应式流
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
// 当两个Flux对象压缩在⼀起的时候,它将会产⽣⼀个新的发布元组的Flux,其中每个元组中都包含了来⾃每个源Flux的数据项
// 这个合并后的Flux发出的每个条⽬都是⼀个Tuple2(⼀个容纳两个其他对象的容器对象)的实例,其中包含了来⾃每个源Flux的数据项,并保持着它们发布的顺序。
Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
zippedFlux.subscribe(t ->
System.out.println(t.getT1() + "|" + t.getT2());
);
/**
* 执行结果:
* Garfield|Lasagna
* Kojak|Lollipops
* Barbossa|Apples
*/
(3)使用zip压缩合并为自定义对象的响应式流
如果你不想使⽤Tuple2,⽽想要使⽤其他类型,就可以为zip()⽅法提供⼀个合并函数来⽣成你想要的任何对象,合并函数会传⼊这两个数据项。
zip操作的另⼀种形式(从每个传⼊Flux中各取⼀个元素,然后创建消息对象,并产⽣这些消息组成的Flux)
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");
// 压缩成自定义对象
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);
zippedFlux.subscribe(System.out:: println);
/**
* 执行结果:
* Garfield eats Lasagna
* Kojak eats Lollipops
* Barbossa eats Apples
*/
(4)选择第⼀个反应式类型进⾏发布
假设我们有两个Flux对象,此时我们不想将它们合并在⼀起,⽽是想要创建⼀个新的Flux,让这个新的Flux从第⼀个产⽣值的Flux中发布值。first()操作会在两个Flux对象中选择第⼀个发布值的Flux,并再次发布它的值。
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")
.delaySubscription(Duration.ofMillis(100)); // 延迟100ms
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");
// 选择第⼀个反应式类型进⾏发布
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);
firstFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
* 执行结果:
* hare
* cheetah
* squirrel
*/
5、转换和过滤反应式流
在数据流经⼀个流时,我们通常需要过滤掉某些值并对其他的值进⾏处理。
(1)skip操作跳过指定数⽬的消息
skip操作跳过指定数⽬的消息并将剩下的消息继续在结果Flux上进⾏传递
// 跳过3个,并创建一个新的Flux
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.skip(3);
skipFlux.subscribe(System.out::println);
/**
* 执行结果
* ninety nine
* one hundred
*/
(2)skip()操作的另⼀种形式
在⼀段时间之内跳过所有的第⼀批数据。
// 这是skip()操作的另⼀种形式,将会产⽣⼀个新Flux,在发布来⾃源Flux的数据项之前等待指定的⼀段时间
Flux<String> skipFlux = Flux.just(
"one", "two", "skip a few", "ninety nine", "one hundred")
.delayElements(Duration.ofSeconds(1)) // 每1秒一个
.skip(Duration.ofSeconds(4)); // 4秒前的都跳过
skipFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
* 执行结果:
* ninety nine
* one hundred
*/
(3)take操作只发布第⼀批指定数量的数据项
根据对skip操作的描述来看,take可以认为是与skip相反的操作。skip操作会跳过前⾯⼏个数据项,⽽take操作只发布第⼀批指定数量的数据项,然后将取消订阅。
// take操作只发布传⼊Flux中前⾯指定数⽬的数据项,然后将取消订阅
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.take(3);
nationalParkFlux.subscribe(System.out::println);
/**
* 执行结果:
* Yellowstone
* Yosemite
* Grand Canyon
*/
(4)take操作的另一种形式
take()⽅法也有另⼀种替代形式,基于间隔时间⽽不是数据项个数(在指定的时间过期之前,⼀直将消息传递给结果Flux)。它将接受并发布与源Flux⼀样多的数据项,直到某段时间结束,之后Flux将会完成。
// 在订阅之后的前3.5秒发布数据条⽬。
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.delayElements(Duration.ofSeconds(1))
.take(Duration.ofMillis(3500));
nationalParkFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
/**
* 执行结果:
* Yellowstone
* Yosemite
* Grand Canyon
*/
(5)filter操作自定义过滤条件
filter操作允许我们根据任何条件进⾏选择性地发布。
Flux<String> nationalParkFlux = Flux.just(
"Yellowstone", "Yosemite", "Grand Canyon",
"Zion", "Grand Teton")
.filter(np -> !np.contains(" ")); // 过滤携带空格的
nationalParkFlux.subscribe(System.out::println);
/**
* 执行结果
* Yellowstone
* Yosemite
* Zion
*/
(6)distinct操作去重
Flux<String> animalFlux = Flux.just(
"dog", "cat", "bird", "dog", "bird", "anteater")
.distinct();
// 去重
animalFlux.subscribe(System.out::println);
/**
* 执行结果:
* dog
* cat
* bird
* anteater
*/
(7)map操作映射新元素
map将元素映射为新的元素,并创建一个新的Flux。
// map将元素映射为新的元素,并创建一个新的Flux
Flux<Integer> integerFlux = Flux
.just("Michael Jordan", "Scottie Pippen", "Steve Kerr")
.map(n ->
String[] split = n.split("\\\\s");
return split.length; // 将String转为Integer
);
integerFlux.subscribe(System.out::println);
/**
* 执行结果:
* 2
* 2
* 2
*/
其中重要的⼀点是:在每个数据项被源Flux发布时,map操作是同步执⾏的,如果你想要异步地转换过程,那么你应该考虑使⽤flatMap操作。
(8)flatMap将流转成新的流
flatMap并不像map操作那样简单地将⼀个对象转换到另⼀个对象,⽽是将对象转换为新的Mono或Flux。结果形成的Mono或Flux会扁平化为新的Flux。当与subscribeOn()⽅法结合使⽤时,flatMap操作可以释放Reactor反应式的异步能⼒。
// 使⽤flatMap()⽅法和subscribeOn()⽅法
Flux<Integer> integerFlux = Flux
.just("Michael", "Scottie Pippen", "Steve Kerr Ob")
.flatMap(n -> Mono.just(n)
.map(p ->
String[] split = p.split("\\\\s");
return split.length; // 将String转为Integer
)
.subscribeOn(Schedulers.parallel()) // 定义异步
);
integerFlux.subscribe(System.out::println);
// 阻塞,等待结果
Thread.sleep(100000);
(9)buffer操作现将数据流拆分为小块
buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
// buffer操作会产⽣⼀个新的包含列表Flux(具备最⼤⻓度限制的列表,包含从传⼊的Flux中收集来的数据)
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
// 创建⼀个新的包含List 集合的Flux,其中每个List只有不超过指定数量的元素
Flux<List<String>> bufferedFlux = fruitFlux.buffer(3); // 数据切分为小块,每3个一块
bufferedFlux.subscribe(System.out::println);
/**
* 执行结果:
* [apple, orange, banana]
* [kiwi, strawberry]
*/
// 可以分片后并行执行
bufferedFlux.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
).subscribe(l ->
System.out.println(Thread.currentThread().getName() + "线程执行:" + l);
);
/**
* 执行结果(因为并行执行,结果可能不一致):
* parallel-1线程执行:APPLE
* parallel-1线程执行:ORANGE
* parallel-1线程执行:BANANA
* parallel-2线程执行:KIWI
* parallel-2线程执行:STRAWBERRY
*/
// 阻塞,等待结果
Thread.sleep(100000);
使⽤不带参数的buffer()⽅法可以将Flux发布的所有数据项都收集到⼀个List中:
Flux<List<String>> bufferedFlux = fruitFlux.buffer();
(10)collectList操作也可以将所有数据收集到一个List
collectList操作将产⽣⼀个包含传⼊Flux发布的所有消息的Mono。
Flux<String> fruitFlux = Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry");
// 生成一个Mono,里面包含一个List
Mono<List<String>> fruitListMono = fruitFlux.collectList();
(11)collectMap 操作产生⼀个发布Map的Mono
collectMap操作将会产⽣⼀个Mono(包含了由传⼊Flux所发出的消息产⽣的Map,这个Map的key是从传⼊消息的某些特征衍⽣⽽来的)
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Map<Character, String>> animalMapMono =
animalFlux.collectMap(a -> a.charAt(0)); // 将第一个字符作为Map的key
animalMapMono.subscribe(System.out::println);
/**
* 执行结果:
* a=aardvark, e=eagle, k=kangaroo
*/
// 阻塞,等待结果
Thread.sleep(100000);
key相同的,会被覆盖。
6、在反应式类型上执行逻辑操作
(1)⽤all()⽅法来确保Flux中的所有消息都满⾜某些条件
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.all(a -> a.contains("a"));
都满足条件会返回true,否则返回false。
(2)⽤any()⽅法来确保Flux中⾄少有⼀个消息满⾜某些条件
Flux<String> animalFlux = Flux.just(
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono<Boolean> hasAMono = animalFlux.any(a -> a.contains("t"));
至少有一个满足条件,就为true,都不满足就为false。
7、在反应式类型上使用Subscriber订阅
(1)使用Subscriber消费消息
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.subscribe(new Subscriber<String>()
// 保存订阅关系, 需要用它来给发布者响应
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription)
System.out.println("订阅者开始订阅");
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
@Override
public void onNext(String item)
System.out.println("订阅者开始处理数据" + item);
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
@Override
public void onError(Throwable t)
// 出现了异常(例如处理数据的时候产生了异常)
t.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
@Override
public void onComplete()
// 全部数据处理完了(发布者关闭了)
System.out.println("订阅者处理完了!");
);
/**
* 执行结果:
* 订阅者开始订阅
* 订阅者开始处理数据Apple
* 订阅者开始处理数据Orange
* 订阅者开始处理数据Grape
* 订阅者开始处理数据Banana
* 订阅者开始处理数据Strawberry
* 订阅者处理完了!
*/
// 阻塞
Thread.sleep(10000);
(2)使用Flux的doOnNext处理数据
Flux的doOnNext,会添加当Flux发出一个项目时触发的行为(副作用)。
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t))
.subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
* 执行结果:
* 发布者处理数据:Apple
* 订阅者处理数据:Apple
* 发布者处理数据:Orange
* 订阅者处理数据:Orange
* 发布者处理数据:Grape
* 订阅者处理数据:Grape
* 发布者处理数据:Banana
* 订阅者处理数据:Banana
* 发布者处理数据:Strawberry
* 订阅者处理数据:Strawberry
*/
// 阻塞
Thread.sleep(10000);
但是!以下写法是不会触发发布者的doOnNext事件的:
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者处理数据:" + t));
stringFlux.subscribe(t -> System.out.println("订阅者处理数据:" + t));
只有链式调用,才会触发发布者的doOnNext事件。
doOnNext可以写多个,顺序执行:
Flux<String> stringFlux = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
stringFlux.doOnNext(t -> System.out.println("发布者1处理数据:" + t))
.doOnNext(t -> System.out.println("发布者2处理数据:" + t))
.subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
* 执行结果:
* 发布者1处理数据:Apple
* 发布者2处理数据:Apple
* 订阅者处理数据:Apple
* 发布者1处理数据:Orange
* 发布者2处理数据:Orange
* 订阅者处理数据:Orange
* 发布者1处理数据:Grape
* 发布者2处理数据:Grape
* 订阅者处理数据:Grape
* 发布者1处理数据:Banana
* 发布者2处理数据:Banana
* 订阅者处理数据:Banana
* 发布者1处理数据:Strawberry
* 发布者2处理数据:Strawberry
* 订阅者处理数据:Strawberry
*/
8、使用then来处理完成数据返回
Flux<String> just = Flux.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
// 返回一个Mono ,在此Flux完成时完成。这将主动忽略序列,只重放完成或错误信号。
just.doOnNext(t -> System.out.println("发布者处理数据:" + t))
.then(Mono.defer(() ->
return Mono.just("我完成了");
))
.subscribe(t -> System.out.println("订阅者处理数据:" + t));
/**
* 执行结果:
* 发布者处理数据:Apple
* 发布者处理数据:Orange
* 发布者处理数据:Grape
* 发布者处理数据:Banana
* 发布者处理数据:Strawberry
* 订阅者处理数据:我完成了
*/
通常来说,发布者发布完之后,都需要调用then来处理数据,或调用thenEmpty返回一个空的Mono(Mono.empty())。
写在后面
如果本文对你有帮助,请点赞收藏关注一下吧 ~
学习响应式编程 Reactor - reactor 基础
Reactor
Reactor 项目的主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范的响应式库。
Reactor 提供了实现 Publisher 的响应式类 Flux 和 Mono,以及丰富的操作符。一个 Flux 代表 0...N 个元素的响应式流;一个 Mono 代表 0|1 个元素的响应式流。
Flux 和 Mono 之间可以转换,比如 Flux 的 count 操作(计算流中元素个数)返回 Mono,Mono 的 concatWith 操作(连接另一个响应式流)返回 Flux。
Flux
Flux<T> 是一个能够发出 0 到 N 个元素的标准 Publisher<T>,它会被一个完成(completion)或错误(error)信号终止。因此,一个 Flux 的可能结果是 value、completion
或 value,这三个分别会传递给订阅者中的 onNext、onComplete、onError 方法。
注意:所有的信号事件,包括代表终止的信号事件都是可选的。如果没有 onNext 事件,但是有 onComplete 事件,那么发出的就是空的有限流;如果去掉 onComplete
就得到一个 无限的空数据流。无限的数据流可以不是空的,比如 Flux.interval(Duration) 生成的是一个 Flux,这是一个无限周期性发出规律整数的时钟数据流。
下图展示的是 Flux 基于时间线的弹珠交互图,通过操作符转换 Flux 中元素:
- 上面那条线表示的是 Flux 数据流时间线,时间从左至右
- 上面那条线中的弹珠代表示的是 Flux 发出的 数据元素
- 上面那条线最后的垂直线表示的是 Flux 已经完成成功事件
- 中间的箭头虚线和框表示的是 Flux 中的元素正在被转换,框内的文字表示的是转换的方式(包含操作符)
- 下面那条线表示的是 FLux 经过转换后的新数据流
- 如果由于某种原因导致 Flux 的转换终止,将使用 X 来代替 垂直线
后续在学习操作符的过程中,我们将见到很多类似的弹珠图,请大家详细了解清楚该图各部分的含义。
Mono
Mono<T> 是一种特殊的Publisher<T>,它最多只能发出一个元素,然后(可选的)终止于 onComplete 或 onError 信号。
Mono 中的操作符是 Flux 中操作符的子集,即 Flux 中只有部分操作符适用于 Mono,有些操作符是将 Mono 和另一个 Publisher 连接转换为 Flux。例如,Mono#concatWith(Publisher
) 转换为 Flux,Mono#then(Mono) 返回另一个 Mono。
注意:可以使用 Mono
来创建一个只有完成概念的空值异步处理过程(类似于 Runnable)。
下图展示的是 Mono 基于时间线的弹珠交互图:
创建 Flux 和 Mono
如同创建 Java Stream 一样,Reactor 也为我们提供了 多个工厂方法用来创建 Flux 和 Mono,有了 Stream 的基础,创建的基本方法我们来快速过一下。
下面的创建方法,如果是 Flux 或 Mono 独有的,会在方法名前增加类名前缀。
下面的示例代码中都有用到 subscribe 方法,下面会讲到,大家先了解它是响应式流的订阅方法,用于触发流,类似于 Java Stream 中的终端操作。
just
使用提供的元素发出数据然后结束的流。
Mono.just("hello, world").subscribe(System.out::println);
Mono.justOrEmpty(str).subscribe(System.out::println);
Mono.justOrEmpty(optional).subscribe(System.out::println);
Flux.just("hello", "world").subscribe(System.out::println);
Flux.just("hello").subscribe(System.out::println);
Flux#fromXxx
Flux 提供了 fromArray(从数组)、fromIterable(从迭代器)、fromStream(从 Java Stream 流) 的方式来创建 Flux。
String[] array = new String[]{"hello", "reactor", "flux"};
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux.fromArray(array).subscribe(System.out::println);
Flux.fromIterable(iterable).subscribe(System.out::println);
Flux.fromStream(Arrays.stream(array)).subscribe(System.out::println);
Flux#range
从 start 开始构建一个 Flux,该 Flux 仅发出一系列递增计数的整数。 也就是说,在 start(包括)和 start + count(排除)之间发出整数,然后完成。见图识意:
Flux.range(3, 5).subscribe(System.out::println);
Flux#interval
在全局计时器上创建一个 Flux,该 Flux 在初始延迟后,发出从0开始并以指定的时间间隔递增的长整数。 如果未及时产生,则会通过溢出 IllegalStateException 发出 onError
信号,详细说明无法发出的原因。 在正常情况下,Flux 将永远不会完成。interval 提供了 3 个重载方法,三者的区别主要在于是否延迟发出、以及使用的调度器。
interval 生成的是一个无限数据流。
Flux<Long> interval(Duration period)
Flux<Long> interval(Duration delay, Duration period)
Flux<Long> interval(Duration delay, Duration period, Scheduler timer)
- 第 1 个方法,没有延迟,按照 period 的周期立即发出,默认使用 Schedulers.parallel() 调度器
- 第 2 个方法,以 delay 延迟,按照 period 的周期发出,默认使用 Schedulers.parallel() 调度器
- 第 3 个方法,以 delay 延迟,按照 period 的周期发出,使用指定的调度器
见图识意:
Flux.interval(Duration.ofMillis(30), Duration.ofMillis(500)).subscribe(System.out::println);
empty
生成一个空的有限流。见图识意:
Flux.empty().subscribe(System.out::println, System.out::println, () -> System.out.println("结束"));
never
生成一个空的无限流。见图识意:
Flux.never().subscribe(System.out::println, System.out::println, () -> System.out.println("结束"));
其它
Flux 和 Mono 还提供了编程式的创建数据流的方法,诸如 create、generate、push、handle 等的方式,这些内容暂时不是我们的重点,这里我们不细展开,感兴趣的可看 Api 进行研究下。
订阅 Flux 和 Mono
在上面创建 Flux 和 Mono 笔记的示例代码中,我们已经提到了 subscribe 订阅,在 subscribe 订阅中,Flux 和 Mono 支持 Java 8 Lambda 表达式。下面我们来看看 Reactor
为我们提供了哪些订阅方法。
subscribe(); // ①
subscribe(Consumer<? super T> consumer); // ②
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer); // ③
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer); // ④
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer); // ⑤
subscribe(Subscriber<? super T> actual); // ⑥
- 序号① 订阅并触发响应式流。
- 序号② 对每个生成的元素进行消费。
- 序号③ 对正常元素进行消费,对错误进行响应处理。
- 序号④ 对正常元素和错误均有响应,还定义了响应流正常完成后的回调。
- 序号⑤ 对正常元素、错误信号和完成信号均有响应,同时也定义了 对该 subscribe 返回的 Subscription 的回调处理。
- 序号⑥ 通过自定义实现 Subscriber 接口来订阅。
注意:序号⑤ 变量传递一个 Subscription 的引用,如果不再需要更多元素时,可以通过它来取消订阅。取消订阅时,源头会停止生成数据,并清理相关的资源。取消和清理的操作是在 Disposable 接口中定义的。
来看下序号 ⑤ 的 subscribe 的弹珠图:
Flux.range(1, 4)
.subscribe(System.out::println,
error -> System.err.println("发生错误:" + error),
() -> System.out.println("完成"),
sub -> {
System.out.println("已订阅");
// 理解背压
// 尝试修改下 request 中的值,看看有啥变化
sub.request(10);
});
注意:序号⑥ 的方式支持背压等操作,不在我们本次笔记的范畴,我们还是先略过,后期在学习。
补充
在上节我们讲解 Reactor 调试部分时,遗漏了记录数据流的日志方法,再此做下补充:除了基于 stack trace 的方式调试分析,我们还可以使用 log
操作符,来跟踪响应式流并记录日志。将它添加到操作链上之后,它会读取每一个再其上游的 Flux 和 Mono 事件(包括 onNext、onError、onComplete、Subscribe、Cancel 和 Request)。
// 尝试交换下 take 和 log 的顺序,看看有啥变化
Flux.range(1, 10)
// .log()
.take(3)
.log()
.subscribe();
总结
本篇我们介绍了 Reactor 的基础知识:先是了解了 Reactor 为我们提供的响应式流类 Flux 和 Mono,之后学习了如何创建他们和订阅他们,因为有之前 Stream
的基础,想来大家对这些知识点都好理解和接受。
今天的内容就学到这里,我们下篇开始学习 Reactor 的操作符。
源码详见:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模块下 ReactorBasicLearningTest 测试类。
参考
以上是关于响应式编程详解,带你熟悉Reactor响应式编程的主要内容,如果未能解决你的问题,请参考以下文章
(17)Reactor的调试——响应式Spring的道法术器