Java ProjectReactor 响应式编程 Mono 简单工作流程解析
Posted 老磨谈技术
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java ProjectReactor 响应式编程 Mono 简单工作流程解析相关的知识,希望对你有一定的参考价值。
前言
我们在查看 Spring Cloud 源码的时候,会发现已经引入了 Mono 或者 Flux 相关的代码,如果对这些代码不熟悉,就会觉得有些 Spring Cloud 源码将会变得晦涩难懂。Mono 和 Flux 为 ProjectReactor 响应式框架中的核心类。其相关概念可以参考 Flux、Mono、Reactor 实战(史上最全)和
响应式编程入门之 Project Reactor。我在参考了这些文章后,查看了相应的源码,这里是将自己的理解记录下来,希望可以帮助到初学者理解 ProjectReactor 。本文的目标是可以让大家理解以下者行代码的实现逻辑。
Mono.just("hello").map(e->e+" world").map(e->e+"!").subscribe(System.out::println);
核心接口
本文使用的是 java 8 ,项目中需要引入以下依赖
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.24</version>
</dependency>
ProjectReactor 其核心思想观察者模式,定义了发布者和订阅者以及订阅三个接口。其核心接口如下
/**
* 当将一个 Subscriber 传递给 Publisher#subscribe(Subscriber) 方法的时候,Subscriber#onSubscribe(Subscription) 会被调用
*/
public interface Subscriber<T>
/**
* 在 Publisher#subscribe(Subscriber) 调用时,被调用.
* 他的职责是去调用 Subscription#request(long) 方法
*/
public void onSubscribe(Subscription s);
/**
* 当 Subscription#request(long)调用的时候被调用,数据T来自 Publisher
*/
public void onNext(T t);
/**
* 发生异常时被触发
*/
public void onError(Throwable t);
/**
* 正常完成时被触发
*/
public void onComplete();
/**
* Subscription 代表 Subscriber 订阅一个 Publisher 的生命周期,只能被一个 Subscriber 使用一次。
*/
public interface Subscription
/**
* 通常是 Subscriber#onSubscribe(Subscription)时被触发,然后调用 Subscriber#onNext(T)方法
*/
public void request(long n);
/**
* 停止发送数据和清理相关资源
*/
public void cancel();
/**
* 一个发布者可以发布无数个元素给订阅者,被多个 Subscriber 进行订阅。
*/
public interface Publisher<T>
/**
* 这个方法用于开始一个数据流,可以被调用多次,每次都会生成一个新的 Subscription 对象,每个 Subscription 对象只能被一个 Subscriber 使用。每个 Subscriber 最多订阅一次一个 Publisher。但是 Publisher 可以被多个 Subscriber 订阅。
*/
public void subscribe(Subscriber<? super T> s);
仔细看官方接口的文档,可以看到核心调用逻辑如下:
首先调用 Publisher#subscribe(Subscriber)方法,传入了一个 Subscriber。
然后 Subscriber#onSubscribe(Subscription),传入了一个 Subscription。
然后 Subscription#request(long) 会被触发。
然后 Subscriber#onNext(T) 会被触发。
每个方法,都规定了具体的职责。
看到这里我就有一个疑问了:
为什么需要这样弯弯绕绕,而且需要引入一个中间的对象 Subscription 传递数据?
Publisher 提供数据,Subscriber 直接去获取不就好了吗?
这里先给出我的理解:
形成数据流,中间可以引入多个处理过程。
可以在 Subscriber 订阅的时候,整个处理流程才动起来。
这里先有一个印象,后续可以再回头看就理解了。
按照核心逻辑,我们对以上三个接口进行简单实现。
简单实现
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class Demo
static class MySubscription implements Subscription
private String data;
private Subscriber subscriber;
public MySubscription(String data, Subscriber subscriber)
this.data = data;
this.subscriber = subscriber;
private boolean isCanceled;
@Override
public void request(long l)
if (!isCanceled)
try
subscriber.onNext(data);
subscriber.onComplete();
catch (Exception e)
subscriber.onError(e);
@Override
public void cancel()
isCanceled = true;
static class MyPublisher implements Publisher
private String data;
@Override
public void subscribe(Subscriber subscriber)
subscriber.onSubscribe(new MySubscription(data, subscriber));
public static Publisher just(String a)
MyPublisher myPublisher = new MyPublisher();
myPublisher.data = a;
return myPublisher;
static class MySubscriber implements Subscriber
@Override
public void onSubscribe(Subscription subscription)
subscription.request(1L);
@Override
public void onNext(Object o)
System.out.println(o);
@Override
public void onError(Throwable throwable)
System.out.println("error");
@Override
public void onComplete()
System.out.println("completed");
public static void main(String[] args)
MyPublisher.just("MyPublisher1").subscribe(new MySubscriber());
// 打印如下:
// MyPublisher1
//completed
是不是特别简单,一个简单的发布订阅流程就完成了。
但很明显扩展性不强,比如我想在 MyPublisher 和 MySubscriber 之间,做一些不限次数的处理逻辑,而且写法跟Stream类似,怎么办呢?
为了完成这个逻辑,需要引入一些操作类(operator)的对象,包括 Publisher,Subscriber 以及 Subscription 的实现类。
引入中间操作类的实现
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.function.Function;
public class OperatorPublisherTest
static class MySubscription implements Subscription
private String data;
private Subscriber subscriber;
public MySubscription(String data, Subscriber subscriber)
this.data = data;
this.subscriber = subscriber;
private boolean isCanceled;
@Override
public void request(long l)
System.out.println("MySubscription: 调用 MySubscription#request(long)");
if (!isCanceled)
try
subscriber.onNext(data);
subscriber.onComplete();
catch (Exception e)
subscriber.onError(e);
@Override
public void cancel()
isCanceled = true;
/**
* 抽象类,用于引入操作类的 Publisher
*/
static abstract class AbstractPublisher implements Publisher
public OperatorPublisher operator(Function function,String name )
return new OperatorPublisher(this,function,name);
/**
* SourcePublisher 用于获取最初始的数据
*/
static class SourcePublisher extends AbstractPublisher
private String name;
private String data;
public SourcePublisher(String name)
this.name = name;
println("生成 SourcePublisher ");
@Override
public void subscribe(Subscriber subscriber)
println("SourcePublisher#subscribe(Subscriber), 生成 MySubscription 对象");
subscriber.onSubscribe(new MySubscription(data, subscriber));
public static AbstractPublisher just(String a)
SourcePublisher myPublisher = new SourcePublisher("数据源Publisher");
myPublisher.data = a;
return myPublisher;
void println(String context)
System.out.println(name+": "+context);
static class OperatorPublisher extends AbstractPublisher
private String name;
private Publisher source;
private OperatorPublisher prePublisher;
Function mapper;
public OperatorPublisher(Publisher prePublisher,Function function,String name)
this.name = name;
this.source = prePublisher;
if (prePublisher instanceof OperatorPublisher)
this.prePublisher = (OperatorPublisher) prePublisher;
mapper = function;
print("生成 OperatorPublisher 对象");
@Override
public void subscribe(Subscriber s)
print("订阅:OperatorPublisher#subscribe(Subscriber)");
OperatorPublisher currentPublisher = this;
Subscriber currentSubscriber = s;
while (true)
currentSubscriber = currentPublisher.subscribeOrReturn(currentSubscriber);
OperatorPublisher nextPublisher = currentPublisher.prePublisher;
if (nextPublisher == null)
currentPublisher.source.subscribe(currentSubscriber);
return;
currentPublisher = nextPublisher;
public OperatorSubscriptionSubscriber subscribeOrReturn(Subscriber s)
print("OperatorPublisher#subscribeOrReturn(Subscriber), 生成 OperatorSubscriptionSubscriber 对象");
return new OperatorSubscriptionSubscriber(mapper,s,name);
public void print(String text)
System.out.println(name+": "+text);
static class OperatorSubscriptionSubscriber implements Subscriber,Subscription
private String name;
private Subscriber preSubscriber;
private Function function;
private Subscription preSubscription;
public OperatorSubscriptionSubscriber(Function f, Subscriber preSubscriber, String name)
this.function = f;
this.preSubscriber = preSubscriber;
this.name = name+"_OperatorSubscriptionSubscriber";
private boolean isCanceled;
@Override
public void request(long l)
println("OperatorSubscriptionSubscriber#request(long)");
preSubscription.request(l);
@Override
public void onSubscribe(Subscription preSubscription)
println("OperatorSubscriptionSubscriber#onSubscribe(Subscription)");
this.preSubscription = preSubscription;
preSubscriber.onSubscribe(this);
@Override
public void onNext(Object o)
println("OperatorSubscriptionSubscriber#onNext(Object)");
Object apply = function.apply(o);
println("处理后的值为:"+apply);
preSubscriber.onNext(apply);
@Override
public void onError(Throwable t)
preSubscriber.onError(t);
@Override
public void onComplete()
preSubscriber.onComplete();
@Override
public void cancel() isCanceled = true;
void println(String context)
System.out.println(name+": "+context);
static class MySubscriber implements Subscriber
private String name = "最终订阅者MySubscriber:";
@Override
public void onSubscribe(Subscription subscription)
println("MySubscriber#onSubscribe(Subscription)");
subscription.request(1L);
@Override
public void onNext(Object o)
println("最终订阅者MySubscriber: MySubscriber onNext");
println("最终订阅者MySubscriber:结果输出 "+o);
@Override
public void onError(Throwable throwable)
println("error");
@Override
public void onComplete()
println("MySubscriber#onComplete()");
void println(String context)
System.out.println(name+context);
public static void main(String[] args)
SourcePublisher.just("MyPublisher1")// 生成 SourcePublisher
.operator(e->e+" operator1 ","操作对象1")// 生成 OperatorPublisher
.operator(e->e+" operator2 ","操作对象2") // 生成 OperatorPublisher
.subscribe(new MySubscriber()); // 生成了多个嵌套的 Subscriber 回到 SourcePublisher 进行 onSubscribe 调用链
涉及的接口总结如下
接口 | 普通实现 | 操作类实现 |
Publisher | SourcePublisher | OperatorPublisher |
Subscriber | MySubscriber | OperatorSubscriptionSubscriber |
Subscription | MySubscription | OperatorSubscriptionSubscriber |
接口对比如下
逻辑如下:
引入了操作者的概念(OperatorPublisher),本身是一个 Publisher ,可以通过 source 和 parentPublisher 变量将所有的 Publisher 组织成为一个 Publisher 单向链表,注意的是OperatorPublisher还包含一个函数式接口Function,这个是真正处理数据的地方。
引入了新的操作类型的 Subscriber(同时也是一个 Subscription),当 Publisher 链表被订阅的时候启动遍历 Publisher ,在遍历的过程中也形成了一个 Subscriber 的链表,链表最后一个 Subscriber 为真正的订阅者。
当遍历至 Publisher 链表最后一个元素的时候, Subscriber 的队列也已经构建完成。
调用最后一个 Publisher 的 subscribe 方法,执行 onSubscribe 方法,这个方法又会将 Subscriber 的链表遍历一遍,形成一个 Subscription 链表。
当 Subscriber 的链表遍历至最后一个的时候,通过调用 Subscriber 的 onSubscribe 方法,将会执行 subscription#request(long),这里也是一个遍历的方法,遍历 Subscription 到最后一个元素。
执行最后一个 Subscription 的 request 方法,依次执行 Subscriber 链表中的 Subscriber 的 onNext方法。
在操作类型的 Subscriber 中,含有对数据的处理逻辑,数据依次被处理后,最终的 Subscriber,完成最后的输出。
流程图如下:
打印效果如下(为了方便描述加上了行号):
数据源 Publisher: 生成 SourcePublisher
操作对象1: 生成 OperatorPublisher 对象
操作对象2: 生成 OperatorPublisher 对象
操作对象2: 订阅:OperatorPublisher#subscribe(Subscriber)
操作对象2: OperatorPublisher#subscribeOrReturn(Subscriber), 生成 OperatorSubscriptionSubscriber 对象
操作对象1: OperatorPublisher#subscribeOrReturn(Subscriber), 生成 OperatorSubscriptionSubscriber 对象
数据源Publisher: SourcePublisher#subscribe(Subscriber), 生成 MySubscription 对象
操作对象1_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onSubscribe(Subscription)
操作对象2_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onSubscribe(Subscription)
最终订阅者MySubscriber:MySubscriber#onSubscribe(Subscription)
操作对象2_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#request(long)
操作对象1_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#request(long)
MySubscription: 调用 MySubscription#request(long)
操作对象1_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onNext(Object)
操作对象1_OperatorSubscriptionSubscriber: 处理后的值为:MyPublisher1 operator1
操作对象2_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onNext(Object)
操作对象2_OperatorSubscriptionSubscriber: 处理后的值为:MyPublisher1 operator1 operator2
最终订阅者MySubscriber:最终订阅者MySubscriber: MySubscriber onNext
最终订阅者MySubscriber:最终订阅者MySubscriber:结果输出 MyPublisher1 operator1 operator2
最终订阅者MySubscriber:MySubscriber#onComplete()
以上,一个完整的发布订阅逻辑已经处理完成,通过查看流程,可以看到
Publisher 在调用 subscribe 方法之前,只是构建了一个 Publisher 链表,没有发生任何数据处理逻辑。
查看 Publisher 是不存在任何状态的,也就是说,这个 Publisher 链表是可以重复使用的。
当调用了 subscribe 方法之后,数据处理逻辑开始开动,而且查看 Subscriber,
Mono 的工作流程
当你看完前面的逻辑并已经理解了之后,查看 Mono 的工作流程就会变得比较简单
首先一行最简单的代码如下:
import reactor.core.publisher.Mono;
public class MonoTest
public static void main(String[] args)
Mono.just("hello").map(e->e+" world").map(e->e+"!").subscribe(System.out::println);
首先,查看Mono类,发现他是一个抽象类
public abstract class Mono implements CorePublisher
是一个 Publisher 的实现类,当他调用 just(T) 方法时,会返回一个 MonoJust,MonoJust 继承了 Mono。
public static <T> Mono<T> just(T data)
return onAssembly(new MonoJust<>(data));
然后调用 map(Function) 时,会返回一个 MonoMapFuseable ,MonoMapFuseable 也是继承了 Mono。继续调用 map(Function),也是如此。
public final <R> Mono<R> map(Function<? super T, ? extends R> mapper)
if (this instanceof Fuseable)
return onAssembly(new MonoMapFuseable<>(this, mapper));
return onAssembly(new MonoMap<>(this, mapper));
至此,Publisher 的两个实现类已经出现 MonoJust 和 MonoMapFuseable。MonoJust 负责存储原始数据,MonoMapFuseable 用于处理数据,并负责构建 Publisher 链表。看下两个类的实现
final class MonoJust<T>
extends Mono<T>
implements Fuseable.ScalarCallable<T>, Fuseable, SourceProducer<T>
final T value;
// 将数据存储在 value 变量中。
MonoJust(T value)
this.value = Objects.requireNonNull(value, "value");
@Override
public void subscribe(CoreSubscriber<? super T> actual)
actual.onSubscribe(Operators.scalarSubscription(actual, value));
final class MonoMapFuseable<T, R> extends InternalMonoOperator<T, R>
implements Fuseable
final Function<? super T, ? extends R> mapper;
MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper)
super(source);
this.mapper = Objects.requireNonNull(mapper, "mapper");
// 为了方便理解,将 subscribe(CoreSubscriber)方法从 父类 InternalMonoOperator 搬到了这里。
@Override
@SuppressWarnings("unchecked")
public final void subscribe(CoreSubscriber<? super O> subscriber)
OptimizableOperator operator = this;
try
while (true)
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null)
// null means "I will subscribe myself", returning...
return;
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null)
operator.source().subscribe(subscriber);
return;
operator = newSource;
catch (Throwable e)
Operators.reportThrowInSubscribe(subscriber, e);
return;
@Override
@SuppressWarnings("unchecked")
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual)
if (actual instanceof ConditionalSubscriber)
ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);
return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);
注意在 subscribeOrReturn 方法中返回了一个 MapFuseableConditionalSubscriber
可以看到 MapFuseableConditionalSubscriber 既是一个 Subscrition 也是一个 Subscriber。用来构建 Subscriber 链表和 Subscrition 链表,而它存有 MonoMapFuseable 传进来的 Function<? super T, ? extends R> mapper ,用于数据处理。其核心逻辑如下,
static final class MapFuseableConditionalSubscriber<T, R>
implements ConditionalSubscriber<T>, InnerOperator<T, R>,
QueueSubscription<R>
final ConditionalSubscriber<? super R> actual;
final Function<? super T, ? extends R> mapper;
boolean done;
QueueSubscription<T> s;
MapFuseableConditionalSubscriber(ConditionalSubscriber<? super R> actual,
Function<? super T, ? extends R> mapper)
this.actual = actual;
this.mapper = mapper;
@SuppressWarnings("unchecked")
@Override
public void onSubscribe(Subscription s)
if (Operators.validate(this.s, s))
this.s = (QueueSubscription<T>) s;
actual.onSubscribe(this);
@Override
public void onNext(T t)
if (sourceMode == ASYNC)
actual.onNext(null);
else
R v;
try
v = mapper.apply(t);
if (v == null)
throw new NullPointerException("The mapper [" + mapper.getClass().getName() + "] returned a null value.");
catch (Throwable e)
return;
actual.onNext(v);
@Override
public void request(long n)
s.request(n);
当调用 .subscribe(System.out::println) 的时候,这里传入的是一个方法引用。调用的是父类Mono的多个重载方法
public final Disposable subscribe(Consumer<? super T> consumer)
Objects.requireNonNull(consumer, "consumer");
return subscribe(consumer, null, null);
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer)
return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
public final Disposable subscribe(
@Nullable Consumer<? super T> consumer,
@Nullable Consumer<? super Throwable> errorConsumer,
@Nullable Runnable completeConsumer,
@Nullable Context initialContext)
return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
completeConsumer, null, initialContext));
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber)
// 这里将会调用子类的重写 subscribe 方法。
subscribe(subscriber);
return subscriber;
传入的 Consumer 被封装成为了 LambdaMonoSubscriber,最终在 onNext 方法中调用 Consumer 的逻辑。
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable
// 两个核心方法
@Override
public final void onSubscribe(Subscription s)
if (Operators.validate(subscription, s))
this.subscription = s;
if (subscriptionConsumer != null)
//。。。
else
s.request(Long.MAX_VALUE);
@Override
public final void onNext(T x)
if (consumer != null)
try
consumer.accept(x);
catch (Throwable t)
Exceptions.throwIfFatal(t);
s.cancel();
doError(t);
// 省略其他。。。
以上就是所有的关键类,跟我们前面的实现逻辑是一致的,流程也是一致,因此不再做过多的描述。
总结
本文通过自己实现接口的方式,揭示了响应式编程核心类 Mono 的基本原理。其核心接口为三个 Publisher,Subscriber,Subsription。核心方法为 Publisher#onSubscribe(Subscription),Subscriber#onSubscribe(Subscription),Subsription#request(long), Subscriber#onNext(T),围绕Publisher的构造函数以及这四个方法,依次构建了Publisher 链表, Subscriber 链表和 Subsription 链表。通过这三个链表构建和遍历,完成了数据的发布,订阅和流式处理逻辑。
但是本文未涉及如何完成"响应式",这个在后续的文章中会涉及。
参考
reactor简单入门
reactor是一个基于reactive streams的响应式编程框架。 在了解project reactor 项目之前, 你需要熟悉观察者模式(订阅发布模式)和reactive streams。
只有了解了这些内容,才能更好的开始project reactor的学习。你可以通过看
观察者模式之我见 和
一篇入门reactive streams背压响应式编程
了简单学习这两个知识点。
建议的学习方法
学习reactor的总步骤和前置条件
- 首先理解同步也异步的概念,理解为什么需要异步
- 理解观察者模式,理解为什么需要观察者模式
- 理解reactive streams,至少知道观察者模式(订阅发布模式)在reactive streams中是怎么要求的。
- 要学习和使用jdk 8 中stream的操作方法和风格
- 开始学习reactor。
学习reactor的时候建议:
- 先理解reactor的基本流程再去学习
- 首先理解了flux和mono在订阅模式中的作用和地位,不要被flux和mono产生数据的方法和操作符的众多知识所迷惑。
- 理解了subscribe的方法和调用。
- 大概知道调度器scheduler的作用和使用,大概知道有一个hooks和作用即可。
- 大概翻阅flux和mono创建数据的方法,用的时候可以查阅。
- 大概翻阅flux和mono的操作符,用的时候可以查阅。
- 上手真实的项目,开始使用reactor,需要的时候查阅文档。项目中如果有必要再去学习context。
- 等reactor能上手搞定项目,再试着去通过scheduler和hooks来优化项目。
如果你是为了面试,当我没说。
数据源Flux 和 Mono
reactor正如所有的发布订阅模式一样,符合reactive streams规范。 所以reactor也包含有publisher, subscriber, subscription, processor, operator等概念。
Flux和Mono就是reactor实现的publisher,他们可以接受被其他的订阅器所订阅,产生数据并且把数据推送给订阅器。 同时他们还集成了一些对数据流的操作,比如map, filter等。
区别
Flux是一个包含0到N个元素的数据流,Mono是一个包含0或者1个元素的数据流。
基本流程
总体上理解了reactor的流程,才能不被琐事的概念迷失了方向。其实整个reactor就一个订阅发布模式。
Flux和Mono是整个系统默认的publisher,目的是为了简化publisher自定义的工作。
Flux和Mono集成了很多的操作符,用来减少我们自定义subscriber和processor的工作量。
因为操作符的存在,我们对数据源和元素的操作就不需要自己定义自己的processor和subscriber了,直接使用操作符的组合即可完成工作.
除非不得已,否则不要试图自定义subscriber和processor。
创建Flux/Mono数据源
理解了发布订阅模式和publisher的作用,就理解了flux和mono。Flux和mono为了满足需求,有大量的产生数据的方法,
因为篇幅问题,我把这部分内容单独进行了整理,详见reactor之数据源的产生
操作符
在基本流程中,已经提到了reactor为了减少自定义subscriber和processor的工作量,集成了很多的操作符。
首先应该大概理解操作符的作用和应用场景,大概知道有哪些种类的操作符即可。
用到的时候不妨翻阅官方文档,常用的不用记,因为经常会用到。不常用的更不用记忆,因为记了也用不到。
因为篇幅问题,我把这部分内容单独进行了整理,详见reactor之操作符
subscribe
subscribe 操作符用来订阅流中的元素。
当流中的元素没有被订阅的时候,所有的操作都不会触发,只有当流中的元素被订阅的时候,所有的操作才会触发。
常用的subscribe接口如下
Flux.subscribe();
/**
* @param consumer 消费者接口,用来消费流中的元素
*
*/
Flux.subscribe(Consumer<? super T> consumer);
/**
* @param consumer 消费者接口,用来消费流中的元素
* @param errorConsumer 错误消费者接口,用来消费流中的错误
*/
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
/**
* @param consumer 消费者接口,用来消费流中的元素
* @param errorConsumer 错误消费者接口,用来消费流中的错误
* @param completeConsumer 完成消费者接口,用来消费流中的完成
*/
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer);
/**
* @param consumer 消费者接口,用来消费流中的元素
* @param errorConsumer 错误消费者接口,用来消费流中的错误
* @param completeConsumer 完成消费者接口,用来消费流中的完成
* @param subscriptionConsumer 订阅消费者接口,用来消费流中的订阅
*/
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer)
Scheduler
Reactor也可以被认为是 并发无关(concurrency agnostic)的。意思就是, 它并不强制要求任何并发模型。
更进一步,它将选择权交给开发者。不过,它还是提供了一些方便 进行并发执行的库。
Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOn 和 subscribeOn。
它们都接受一个 Scheduler 作为参数,从而可以改变调度器。
但是 publishOn 在链中出现的位置 是有讲究的,而 subscribeOn 则无所谓。
publishOn 它会 改变后续的操作符的执行所在线程 。而 subscribeOn 则会改变下游操作符的调度器。
在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler。
- 当前线程(Schedulers.immediate())
- 单线程(Schedulers.single())
- 固定大小线程池(Schedulers.parallel())
- 弹性线程池(Schedulers.elastic())
Flux.just(1, 2, 3)
.publishOn(Schedulers.parallel()) //指定在parallel线程池中执行
.map(i ->
System.out.println("map1: " + Thread.currentThread().getName());
return i;
)
.publishOn(Schedulers.elastic()) // 指定下游的执行线程
.map(i ->
System.out.println("map2: " + Thread.currentThread().getName());
return i;
)
.subscribeOn(Schedulers.single())
.subscribe(i -> System.out.println("subscribe: " + Thread.currentThread().getName()));
此外一些操作符会使用指定的调度器。
Flux.interval(Duration.ofSeconds(1), Schedulers.single())
.subscribe(System.out::println);
processor
Processor 是一个实现了 Publisher 和 Subscriber 接口的对象,它可以用来连接 Publisher 和 Subscriber。
多数情况下,你应该进行避免使用 Processor,它们较难正确使用,主要用于一些特殊场景下。
比起直接使用 Reactor 的 Processors,更好的方式是通过调用一次 sink() 来得到 Processor 的 Sink。
FluxProcessor<String, String> processor = DirectProcessor.create();
processor.subscribe(System.out::println);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();
sink
Sink 是一个接口,它定义了一些方法,用来向 Processor 发送数据。
UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<String> sink = processor.sink();
sink.next("foo");
sink.next("bar");
sink.next("baz");
sink.complete();
现有的 Processors 总览
- DirectProcessor:直接处理器,它是一个同步的处理器,它会将所有的数据发送给所有的订阅者。
- UnicastProcessor:单播处理器,它是一个同步的处理器,它只会将数据发送给第一个订阅者。
- ReplayProcessor:重放处理器,它是一个异步的处理器,它会将所有的数据发送给所有的订阅者,包括那些在订阅之后才订阅的订阅者。
- WorkQueueProcessor:工作队列处理器,它是一个异步的处理器,它会将所有的数据发送给所有的订阅者,包括那些在订阅之后才订阅的订阅者。
- TopicProcessor:主题处理器,它是一个异步的处理器,它会将所有的数据发送给所有的订阅者,包括那些在订阅之后才订阅的订阅者。
- EmitterProcessor:发射处理器,它是一个异步的处理器,它会将所有的数据发送给所有的订阅者,包括那些在订阅之后才订阅的订阅者。
Hooks
Hooks算是一个工具类,设定好以后,对后面的Flux和Mono都会回调Hooks设置的方法,类似操作系统的钩子。
本部分算是reactor中比较高级的部分,建议在开始上手用reactor做项目前,大概知道有这么一个概念即可。
做了一两个项目以后,再回头来看看hooks是做什么的即可
我把这部分的内容进行了拆分,详见:reactor之Hooks
Context
当从命令式编程风格切换到响应式编程风格的时候,一个技术上最大的挑战就是线程处理。
在命令式编程风格中,我们可以通过 ThreadLocal 来传递数据,
但是在响应式编程风格中,我们无法通过 ThreadLocal 来传递数据。
因为线程是由 Reactor 来管理的,我们无法控制线程的创建和销毁。
Context 就是用来解决这个问题的。Context 是一个接口,它定义了一些方法,用来获取和设置数据。
这部分内容相对也比较难以理解,建议把学习和理解放在后面,总之你需要用到类似多线程环境中的ThreadLocal类的时候,再来学习这部分不迟。
String key = "key";
Mono<String> r = Mono.just("hello")
.flatMap(s -> Mono.subscriberContext()
.map(ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world
context api
Context 是一个类似于 Map(这种数据结构)的接口:它存储键值(key-value)对,你需要通过 key 来获取值:
- put 方法:将一个键值对放入 Context 中。
- get 方法:通过 key 来获取值。
- delete 方法:通过 key 来删除键值对。
- hasKey 方法:通过 key 来判断是否存在键值对。
- stream 方法:返回一个流,用来遍历 Context 中的所有键值对。
- isEmpty 方法:判断 Context 是否为空。
- size 方法:返回 Context 中键值对的个数。
- putAll 方法:将一个 Context 中的所有键值对放入另一个 Context 中。
- currentContext 方法:返回当前线程的 Context。
- empty 方法:返回一个空的 Context。
- root 方法:返回一个空的 Context。
把context 绑定到Flux and writing
String key = "key";
Flux<String> r = Flux.just("hello")
.flatMap(s -> Mono.subscriberContext()
.subscriberContext(ctx -> ctx.put(key, "world"));
.map(ctx -> s + " " + ctx.get(key)))
从context中读取数据
String key = "key"
Flux<String> r = Flux.just("hello")
.flatMap(s -> Mono.subscriberContext()
.map(ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world
以上是关于Java ProjectReactor 响应式编程 Mono 简单工作流程解析的主要内容,如果未能解决你的问题,请参考以下文章
响应式编程库Reactor 3 Reference Guide参考文档中文版(v3.2.0)