Java ProjectReactor 响应式编程 Mono 简单工作流程解析

Posted 老磨谈技术

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java ProjectReactor 响应式编程 Mono 简单工作流程解析相关的知识,希望对你有一定的参考价值。

前言

我们在查看 Spring Cloud 源码的时候,会发现已经引入了 Mono 或者 Flux 相关的代码,如果对这些代码不熟悉,就会觉得有些 Spring Cloud 源码将会变得晦涩难懂。Mono 和 Flux 为 ProjectReactor 响应式框架中的核心类。其相关概念可以参考 Flux、Mono、Reactor 实战(史上最全)

响应式编程入门之 Project Reactor。我在参考了这些文章后,查看了相应的源码,这里是将自己的理解记录下来,希望可以帮助到初学者理解 ProjectReactor 。本文的目标是可以让大家理解以下者行代码的实现逻辑。

Mono.just("hello").map(e->e+" world").map(e->e+"!").subscribe(System.out::println);

核心接口

本文使用的是 java 8 ,项目中需要引入以下依赖

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.24</version>
</dependency>

ProjectReactor 其核心思想观察者模式,定义了发布者和订阅者以及订阅三个接口。其核心接口如下


/**
 * 当将一个 Subscriber 传递给 Publisher#subscribe(Subscriber) 方法的时候,Subscriber#onSubscribe(Subscription) 会被调用
 */
public interface Subscriber<T> 

    /**
     * 在 Publisher#subscribe(Subscriber) 调用时,被调用.
     * 他的职责是去调用 Subscription#request(long) 方法
     */
    public void onSubscribe(Subscription s);

    /**
     * 当 Subscription#request(long)调用的时候被调用,数据T来自 Publisher
     */
    public void onNext(T t);

    /**
     * 发生异常时被触发
     */
    public void onError(Throwable t);

    /**
     * 正常完成时被触发
     */
    public void onComplete();

/**
 * Subscription 代表 Subscriber 订阅一个 Publisher 的生命周期,只能被一个 Subscriber 使用一次。
 */
public interface Subscription 

    /**
     * 通常是 Subscriber#onSubscribe(Subscription)时被触发,然后调用 Subscriber#onNext(T)方法
     */
    public void request(long n);

    /**
     * 停止发送数据和清理相关资源
     */
    public void cancel();


/**
 * 一个发布者可以发布无数个元素给订阅者,被多个 Subscriber 进行订阅。
 */
public interface Publisher<T> 

    /**
     * 这个方法用于开始一个数据流,可以被调用多次,每次都会生成一个新的 Subscription 对象,每个 Subscription 对象只能被一个 Subscriber 使用。每个 Subscriber 最多订阅一次一个 Publisher。但是 Publisher 可以被多个 Subscriber 订阅。
     */
    public void subscribe(Subscriber<? super T> s);

仔细看官方接口的文档,可以看到核心调用逻辑如下:

  1. 首先调用 Publisher#subscribe(Subscriber)方法,传入了一个 Subscriber。

  1. 然后 Subscriber#onSubscribe(Subscription),传入了一个 Subscription。

  1. 然后 Subscription#request(long) 会被触发。

  1. 然后 Subscriber#onNext(T) 会被触发。

每个方法,都规定了具体的职责。

看到这里我就有一个疑问了:

为什么需要这样弯弯绕绕,而且需要引入一个中间的对象 Subscription 传递数据?
Publisher 提供数据,Subscriber 直接去获取不就好了吗?

这里先给出我的理解:

形成数据流,中间可以引入多个处理过程。
可以在 Subscriber 订阅的时候,整个处理流程才动起来。

这里先有一个印象,后续可以再回头看就理解了。

按照核心逻辑,我们对以上三个接口进行简单实现。

简单实现

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class Demo 
    static class MySubscription implements Subscription
        private String data;
        private Subscriber subscriber;
        public MySubscription(String data, Subscriber subscriber) 
            this.data = data;
            this.subscriber = subscriber;
        
        private boolean isCanceled;
        @Override
        public void request(long l) 
            if (!isCanceled)
                try 
                    subscriber.onNext(data);
                    subscriber.onComplete();
                 catch (Exception e) 
                    subscriber.onError(e);
                
            
        

        @Override
        public void cancel() 
            isCanceled = true;
        
    
    static  class MyPublisher implements Publisher
        private String data;
        @Override
        public void subscribe(Subscriber subscriber) 
            subscriber.onSubscribe(new MySubscription(data, subscriber));
        
        public static Publisher just(String a)
            MyPublisher myPublisher = new MyPublisher();
            myPublisher.data = a;
            return myPublisher;
        
    

    static class MySubscriber implements Subscriber
        @Override
        public void onSubscribe(Subscription subscription) 
            subscription.request(1L);
        
        @Override
        public void onNext(Object o) 
            System.out.println(o);
        
        @Override
        public void onError(Throwable throwable) 
            System.out.println("error");
        
        @Override
        public void onComplete() 
            System.out.println("completed");
        
    

    public static void main(String[] args) 
        MyPublisher.just("MyPublisher1").subscribe(new MySubscriber());
        // 打印如下:
        // MyPublisher1
        //completed
    

是不是特别简单,一个简单的发布订阅流程就完成了。

但很明显扩展性不强,比如我想在 MyPublisher 和 MySubscriber 之间,做一些不限次数的处理逻辑,而且写法跟Stream类似,怎么办呢?

为了完成这个逻辑,需要引入一些操作类(operator)的对象,包括 Publisher,Subscriber 以及 Subscription 的实现类。

引入中间操作类的实现

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.function.Function;

public class OperatorPublisherTest 
    static class MySubscription implements Subscription
        private String data;
        private Subscriber subscriber;
        public MySubscription(String data, Subscriber subscriber) 
            this.data = data;
            this.subscriber = subscriber;
        
        private boolean isCanceled;
        @Override
        public void request(long l) 
            System.out.println("MySubscription:  调用 MySubscription#request(long)");
            if (!isCanceled)
                try 
                    subscriber.onNext(data);
                    subscriber.onComplete();
                 catch (Exception e) 
                    subscriber.onError(e);
                
            
        

        @Override
        public void cancel() 
            isCanceled = true;
        
    

    /**
     * 抽象类,用于引入操作类的 Publisher
     */
    static abstract class AbstractPublisher implements Publisher

        public OperatorPublisher operator(Function function,String name )
            return new OperatorPublisher(this,function,name);
        
    

    /**
     * SourcePublisher 用于获取最初始的数据
     */
    static  class SourcePublisher extends AbstractPublisher
        private String name;
        private String data;
        public SourcePublisher(String name) 
            this.name = name;
            println("生成 SourcePublisher ");
        
        @Override
        public void subscribe(Subscriber subscriber) 
            println("SourcePublisher#subscribe(Subscriber), 生成 MySubscription 对象");
            subscriber.onSubscribe(new MySubscription(data, subscriber));
        
        public static AbstractPublisher just(String a)
            SourcePublisher myPublisher = new SourcePublisher("数据源Publisher");
            myPublisher.data = a;
            return myPublisher;
        
        void println(String context)
            System.out.println(name+": "+context);
        
    
    static class OperatorPublisher extends AbstractPublisher
        private String name;
        private Publisher source;
        private OperatorPublisher prePublisher;
        Function mapper;
        public OperatorPublisher(Publisher prePublisher,Function function,String name) 
            this.name = name;
            this.source = prePublisher;
            if (prePublisher instanceof OperatorPublisher)
                this.prePublisher = (OperatorPublisher) prePublisher;
            
            mapper = function;
            print("生成 OperatorPublisher 对象");
        
        @Override
        public void subscribe(Subscriber s) 
            print("订阅:OperatorPublisher#subscribe(Subscriber)");
            OperatorPublisher currentPublisher = this;
            Subscriber currentSubscriber = s;
            while (true)
                currentSubscriber = currentPublisher.subscribeOrReturn(currentSubscriber);
                OperatorPublisher nextPublisher = currentPublisher.prePublisher;
                if (nextPublisher == null)
                    currentPublisher.source.subscribe(currentSubscriber);
                    return;
                
                currentPublisher = nextPublisher;
            
        
        public OperatorSubscriptionSubscriber subscribeOrReturn(Subscriber s)
            print("OperatorPublisher#subscribeOrReturn(Subscriber), 生成 OperatorSubscriptionSubscriber 对象");
            return new OperatorSubscriptionSubscriber(mapper,s,name);
        
        public void print(String text)
            System.out.println(name+": "+text);
        
    
    static class OperatorSubscriptionSubscriber implements Subscriber,Subscription
        private String name;
        private Subscriber preSubscriber;
        private Function function;
        private Subscription preSubscription;
        public OperatorSubscriptionSubscriber(Function f, Subscriber preSubscriber, String name) 
            this.function = f;
            this.preSubscriber = preSubscriber;
            this.name = name+"_OperatorSubscriptionSubscriber";
        
        private boolean isCanceled;
  
        @Override
        public void request(long l) 
            println("OperatorSubscriptionSubscriber#request(long)");
            preSubscription.request(l);
        

        @Override
        public void onSubscribe(Subscription preSubscription) 
            println("OperatorSubscriptionSubscriber#onSubscribe(Subscription)");
            this.preSubscription = preSubscription;
            preSubscriber.onSubscribe(this);
        

        @Override
        public void onNext(Object o) 
            println("OperatorSubscriptionSubscriber#onNext(Object)");
            Object apply = function.apply(o);
            println("处理后的值为:"+apply);
            preSubscriber.onNext(apply);
        

        @Override
        public void onError(Throwable t) 
            preSubscriber.onError(t);

        @Override
        public void onComplete() 
            preSubscriber.onComplete();

        @Override
        public void cancel() isCanceled = true;

        void println(String context)
            System.out.println(name+": "+context);
        
    

    static class MySubscriber implements Subscriber
        private String name = "最终订阅者MySubscriber:";
        @Override
        public void onSubscribe(Subscription subscription) 
            println("MySubscriber#onSubscribe(Subscription)");
            subscription.request(1L);
        
        @Override
        public void onNext(Object o) 
            println("最终订阅者MySubscriber: MySubscriber onNext");
            println("最终订阅者MySubscriber:结果输出 "+o);

        
        @Override
        public void onError(Throwable throwable) 
            println("error");
        
        @Override
        public void onComplete() 
            println("MySubscriber#onComplete()");
        
        void println(String context)
            System.out.println(name+context);
        
    

    public static void main(String[] args) 
        SourcePublisher.just("MyPublisher1")// 生成 SourcePublisher
                .operator(e->e+" operator1 ","操作对象1")// 生成 OperatorPublisher
                .operator(e->e+" operator2 ","操作对象2") // 生成 OperatorPublisher
                .subscribe(new MySubscriber()); // 生成了多个嵌套的  Subscriber 回到 SourcePublisher 进行 onSubscribe 调用链
    

涉及的接口总结如下

接口

普通实现

操作类实现

Publisher

SourcePublisher

OperatorPublisher

Subscriber

MySubscriber

OperatorSubscriptionSubscriber

Subscription

MySubscription

OperatorSubscriptionSubscriber

接口对比如下

逻辑如下:

引入了操作者的概念(OperatorPublisher),本身是一个 Publisher ,可以通过 source 和 parentPublisher 变量将所有的 Publisher 组织成为一个 Publisher 单向链表,注意的是OperatorPublisher还包含一个函数式接口Function,这个是真正处理数据的地方。
引入了新的操作类型的 Subscriber(同时也是一个 Subscription),当 Publisher 链表被订阅的时候启动遍历 Publisher ,在遍历的过程中也形成了一个 Subscriber 的链表,链表最后一个 Subscriber 为真正的订阅者。
当遍历至 Publisher 链表最后一个元素的时候, Subscriber 的队列也已经构建完成。
调用最后一个 Publisher 的 subscribe 方法,执行 onSubscribe 方法,这个方法又会将 Subscriber 的链表遍历一遍,形成一个 Subscription 链表。
当 Subscriber 的链表遍历至最后一个的时候,通过调用 Subscriber 的 onSubscribe 方法,将会执行 subscription#request(long),这里也是一个遍历的方法,遍历 Subscription 到最后一个元素。
执行最后一个 Subscription 的 request 方法,依次执行 Subscriber 链表中的 Subscriber 的 onNext方法。
在操作类型的 Subscriber 中,含有对数据的处理逻辑,数据依次被处理后,最终的 Subscriber,完成最后的输出。

流程图如下:

打印效果如下(为了方便描述加上了行号):

  1. 数据源 Publisher: 生成 SourcePublisher

  1. 操作对象1: 生成 OperatorPublisher 对象

  1. 操作对象2: 生成 OperatorPublisher 对象

  1. 操作对象2: 订阅:OperatorPublisher#subscribe(Subscriber)

  1. 操作对象2: OperatorPublisher#subscribeOrReturn(Subscriber), 生成 OperatorSubscriptionSubscriber 对象

  1. 操作对象1: OperatorPublisher#subscribeOrReturn(Subscriber), 生成 OperatorSubscriptionSubscriber 对象

  1. 数据源Publisher: SourcePublisher#subscribe(Subscriber), 生成 MySubscription 对象

  1. 操作对象1_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onSubscribe(Subscription)

  1. 操作对象2_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onSubscribe(Subscription)

  1. 最终订阅者MySubscriber:MySubscriber#onSubscribe(Subscription)

  1. 操作对象2_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#request(long)

  1. 操作对象1_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#request(long)

  1. MySubscription: 调用 MySubscription#request(long)

  1. 操作对象1_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onNext(Object)

  1. 操作对象1_OperatorSubscriptionSubscriber: 处理后的值为:MyPublisher1 operator1

  1. 操作对象2_OperatorSubscriptionSubscriber: OperatorSubscriptionSubscriber#onNext(Object)

  1. 操作对象2_OperatorSubscriptionSubscriber: 处理后的值为:MyPublisher1 operator1 operator2

  1. 最终订阅者MySubscriber:最终订阅者MySubscriber: MySubscriber onNext

  1. 最终订阅者MySubscriber:最终订阅者MySubscriber:结果输出 MyPublisher1 operator1 operator2

  1. 最终订阅者MySubscriber:MySubscriber#onComplete()

以上,一个完整的发布订阅逻辑已经处理完成,通过查看流程,可以看到

Publisher 在调用 subscribe 方法之前,只是构建了一个 Publisher 链表,没有发生任何数据处理逻辑。
查看 Publisher 是不存在任何状态的,也就是说,这个 Publisher 链表是可以重复使用的。
当调用了 subscribe 方法之后,数据处理逻辑开始开动,而且查看 Subscriber,

Mono 的工作流程

当你看完前面的逻辑并已经理解了之后,查看 Mono 的工作流程就会变得比较简单

首先一行最简单的代码如下:

import reactor.core.publisher.Mono;
public class MonoTest 
    public static void main(String[] args) 
        Mono.just("hello").map(e->e+" world").map(e->e+"!").subscribe(System.out::println);
    

首先,查看Mono类,发现他是一个抽象类

public abstract class Mono implements CorePublisher

是一个 Publisher 的实现类,当他调用 just(T) 方法时,会返回一个 MonoJust,MonoJust 继承了 Mono。

	public static <T> Mono<T> just(T data) 
		return onAssembly(new MonoJust<>(data));
	

然后调用 map(Function) 时,会返回一个 MonoMapFuseable ,MonoMapFuseable 也是继承了 Mono。继续调用 map(Function),也是如此。

    public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) 
		if (this instanceof Fuseable) 
			return onAssembly(new MonoMapFuseable<>(this, mapper));
		
		return onAssembly(new MonoMap<>(this, mapper));
	

至此,Publisher 的两个实现类已经出现 MonoJust 和 MonoMapFuseable。MonoJust 负责存储原始数据,MonoMapFuseable 用于处理数据,并负责构建 Publisher 链表。看下两个类的实现

final class MonoJust<T> 
extends Mono<T>
		implements Fuseable.ScalarCallable<T>, Fuseable, SourceProducer<T>  

	final T value;
    // 将数据存储在 value 变量中。
	MonoJust(T value) 
		this.value = Objects.requireNonNull(value, "value");
	

	@Override
	public void subscribe(CoreSubscriber<? super T> actual) 
		actual.onSubscribe(Operators.scalarSubscription(actual, value));
	


final class MonoMapFuseable<T, R> extends InternalMonoOperator<T, R>
        implements Fuseable 

    final Function<? super T, ? extends R> mapper;
  
    MonoMapFuseable(Mono<? extends T> source, Function<? super T, ? extends R> mapper) 
        super(source);
        this.mapper = Objects.requireNonNull(mapper, "mapper");
    
    // 为了方便理解,将 subscribe(CoreSubscriber)方法从 父类 InternalMonoOperator 搬到了这里。
    @Override
    @SuppressWarnings("unchecked")
    public final void subscribe(CoreSubscriber<? super O> subscriber) 
        OptimizableOperator operator = this;
        try 
            while (true) 
                subscriber = operator.subscribeOrReturn(subscriber);
                if (subscriber == null) 
                    // null means "I will subscribe myself", returning...
                    return;
                
                OptimizableOperator newSource = operator.nextOptimizableSource();
                if (newSource == null) 
                    operator.source().subscribe(subscriber);
                    return;
                
                operator = newSource;
            
        
        catch (Throwable e) 
            Operators.reportThrowInSubscribe(subscriber, e);
            return;
        
    

    @Override
    @SuppressWarnings("unchecked")
    public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) 
        if (actual instanceof ConditionalSubscriber) 
            ConditionalSubscriber<? super R> cs = (ConditionalSubscriber<? super R>) actual;
            return new FluxMapFuseable.MapFuseableConditionalSubscriber<>(cs, mapper);
        
        return new FluxMapFuseable.MapFuseableSubscriber<>(actual, mapper);
    

注意在 subscribeOrReturn 方法中返回了一个 MapFuseableConditionalSubscriber

可以看到 MapFuseableConditionalSubscriber 既是一个 Subscrition 也是一个 Subscriber。用来构建 Subscriber 链表和 Subscrition 链表,而它存有 MonoMapFuseable 传进来的 Function<? super T, ? extends R> mapper ,用于数据处理。其核心逻辑如下,

static final class MapFuseableConditionalSubscriber<T, R>
			implements ConditionalSubscriber<T>, InnerOperator<T, R>,
			           QueueSubscription<R> 

		final ConditionalSubscriber<? super R> actual;
		final Function<? super T, ? extends R> mapper;

		boolean done;

		QueueSubscription<T> s;


		MapFuseableConditionalSubscriber(ConditionalSubscriber<? super R> actual,
				Function<? super T, ? extends R> mapper) 
			this.actual = actual;
			this.mapper = mapper;
		

		@SuppressWarnings("unchecked")
		@Override
		public void onSubscribe(Subscription s) 
			if (Operators.validate(this.s, s)) 
				this.s = (QueueSubscription<T>) s;
				actual.onSubscribe(this);
			
		

		@Override
		public void onNext(T t) 
			if (sourceMode == ASYNC) 
				actual.onNext(null);
			
			else 
				R v;
				try 
					v = mapper.apply(t);
					if (v == null) 
						throw new NullPointerException("The mapper [" + mapper.getClass().getName() + "] returned a null value.");
					
				
				catch (Throwable e) 
					return;
				
				actual.onNext(v);
			
		

		@Override
		public void request(long n) 
			s.request(n);
		
	

当调用 .subscribe(System.out::println) 的时候,这里传入的是一个方法引用。调用的是父类Mono的多个重载方法

	public final Disposable subscribe(Consumer<? super T> consumer) 
		Objects.requireNonNull(consumer, "consumer");
		return subscribe(consumer, null, null);
	

	public final Disposable subscribe(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer) 
		return subscribe(consumer, errorConsumer, completeConsumer, (Context) null);
	

	public final Disposable subscribe(
			@Nullable Consumer<? super T> consumer,
			@Nullable Consumer<? super Throwable> errorConsumer,
			@Nullable Runnable completeConsumer,
			@Nullable Context initialContext) 
		return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
				completeConsumer, null, initialContext));
	

    public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) 
        // 这里将会调用子类的重写 subscribe 方法。
		subscribe(subscriber);
		return subscriber;
	

传入的 Consumer 被封装成为了 LambdaMonoSubscriber,最终在 onNext 方法中调用 Consumer 的逻辑。

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable 

    // 两个核心方法
	@Override
	public final void onSubscribe(Subscription s) 
		if (Operators.validate(subscription, s)) 
			this.subscription = s;

			if (subscriptionConsumer != null) 
				//。。。
			
			else 
				s.request(Long.MAX_VALUE);
			

		
	
	@Override
	public final void onNext(T x) 
		if (consumer != null) 
			try 
				consumer.accept(x);
			
			catch (Throwable t) 
				Exceptions.throwIfFatal(t);
				s.cancel();
				doError(t);
			
		
		// 省略其他。。。
	

以上就是所有的关键类,跟我们前面的实现逻辑是一致的,流程也是一致,因此不再做过多的描述。

总结

本文通过自己实现接口的方式,揭示了响应式编程核心类 Mono 的基本原理。其核心接口为三个 Publisher,Subscriber,Subsription。核心方法为 Publisher#onSubscribe(Subscription),Subscriber#onSubscribe(Subscription),Subsription#request(long), Subscriber#onNext(T),围绕Publisher的构造函数以及这四个方法,依次构建了Publisher 链表, Subscriber 链表和 Subsription 链表。通过这三个链表构建和遍历,完成了数据的发布,订阅和流式处理逻辑。

但是本文未涉及如何完成"响应式",这个在后续的文章中会涉及。

参考

projectreactor官方网站

官方文档

Flux、Mono、Reactor 实战(史上最全)

二、subscribeOn和publishOn源码解析

响应式编程入门之 Project Reactor

reactor简单入门

reactor是一个基于reactive streams的响应式编程框架。 在了解project reactor 项目之前, 你需要熟悉观察者模式(订阅发布模式)和reactive streams。
只有了解了这些内容,才能更好的开始project reactor的学习。你可以通过看
观察者模式之我见
一篇入门reactive streams背压响应式编程
了简单学习这两个知识点。

建议的学习方法

学习reactor的总步骤和前置条件

  1. 首先理解同步也异步的概念,理解为什么需要异步
  2. 理解观察者模式,理解为什么需要观察者模式
  3. 理解reactive streams,至少知道观察者模式(订阅发布模式)在reactive streams中是怎么要求的。
  4. 要学习和使用jdk 8 中stream的操作方法和风格
  5. 开始学习reactor。

学习reactor的时候建议:

  1. 先理解reactor的基本流程再去学习
  2. 首先理解了flux和mono在订阅模式中的作用和地位,不要被flux和mono产生数据的方法和操作符的众多知识所迷惑。
  3. 理解了subscribe的方法和调用。
  4. 大概知道调度器scheduler的作用和使用,大概知道有一个hooks和作用即可。
  5. 大概翻阅flux和mono创建数据的方法,用的时候可以查阅。
  6. 大概翻阅flux和mono的操作符,用的时候可以查阅。
  7. 上手真实的项目,开始使用reactor,需要的时候查阅文档。项目中如果有必要再去学习context。
  8. 等reactor能上手搞定项目,再试着去通过scheduler和hooks来优化项目。

如果你是为了面试,当我没说。

数据源Flux 和 Mono

reactor正如所有的发布订阅模式一样,符合reactive streams规范。 所以reactor也包含有publisher, subscriber, subscription, processor, operator等概念。
Flux和Mono就是reactor实现的publisher,他们可以接受被其他的订阅器所订阅,产生数据并且把数据推送给订阅器。 同时他们还集成了一些对数据流的操作,比如map, filter等。

区别

Flux是一个包含0到N个元素的数据流,Mono是一个包含0或者1个元素的数据流。

基本流程

总体上理解了reactor的流程,才能不被琐事的概念迷失了方向。其实整个reactor就一个订阅发布模式。
Flux和Mono是整个系统默认的publisher,目的是为了简化publisher自定义的工作。

Flux和Mono集成了很多的操作符,用来减少我们自定义subscriber和processor的工作量。
因为操作符的存在,我们对数据源和元素的操作就不需要自己定义自己的processor和subscriber了,直接使用操作符的组合即可完成工作.
除非不得已,否则不要试图自定义subscriber和processor。

创建数据源 调用操作符对数据进行处理 subscibe订阅数据源

创建Flux/Mono数据源

理解了发布订阅模式和publisher的作用,就理解了flux和mono。Flux和mono为了满足需求,有大量的产生数据的方法,
因为篇幅问题,我把这部分内容单独进行了整理,详见reactor之数据源的产生

操作符

在基本流程中,已经提到了reactor为了减少自定义subscriber和processor的工作量,集成了很多的操作符。
首先应该大概理解操作符的作用和应用场景,大概知道有哪些种类的操作符即可。
用到的时候不妨翻阅官方文档,常用的不用记,因为经常会用到。不常用的更不用记忆,因为记了也用不到。
因为篇幅问题,我把这部分内容单独进行了整理,详见reactor之操作符

subscribe

subscribe 操作符用来订阅流中的元素。
当流中的元素没有被订阅的时候,所有的操作都不会触发,只有当流中的元素被订阅的时候,所有的操作才会触发。
常用的subscribe接口如下


Flux.subscribe();
/**
 * @param consumer 消费者接口,用来消费流中的元素
 *                 
 */
Flux.subscribe(Consumer<? super T> consumer);

/**
 * @param consumer 消费者接口,用来消费流中的元素
 * @param errorConsumer 错误消费者接口,用来消费流中的错误
 */
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);

/**
 * @param consumer 消费者接口,用来消费流中的元素
 * @param errorConsumer 错误消费者接口,用来消费流中的错误
 * @param completeConsumer 完成消费者接口,用来消费流中的完成
 */
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer);

/**
 * @param consumer 消费者接口,用来消费流中的元素
 * @param errorConsumer 错误消费者接口,用来消费流中的错误
 * @param completeConsumer 完成消费者接口,用来消费流中的完成
 * @param subscriptionConsumer 订阅消费者接口,用来消费流中的订阅
 */              
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer)

Scheduler

Reactor也可以被认为是 并发无关(concurrency agnostic)的。意思就是, 它并不强制要求任何并发模型。
更进一步,它将选择权交给开发者。不过,它还是提供了一些方便 进行并发执行的库。
Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOn 和 subscribeOn。
它们都接受一个 Scheduler 作为参数,从而可以改变调度器。
但是 publishOn 在链中出现的位置 是有讲究的,而 subscribeOn 则无所谓。
publishOn 它会 改变后续的操作符的执行所在线程 。而 subscribeOn 则会改变下游操作符的调度器。

在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler。

  • 当前线程(Schedulers.immediate())
  • 单线程(Schedulers.single())
  • 固定大小线程池(Schedulers.parallel())
  • 弹性线程池(Schedulers.elastic())
Flux.just(1, 2, 3)
        .publishOn(Schedulers.parallel()) //指定在parallel线程池中执行
        .map(i -> 
            System.out.println("map1: " + Thread.currentThread().getName());
            return i;
        )
        .publishOn(Schedulers.elastic()) // 指定下游的执行线程
        .map(i -> 
            System.out.println("map2: " + Thread.currentThread().getName());
            return i;
        )
        .subscribeOn(Schedulers.single())
        .subscribe(i -> System.out.println("subscribe: " + Thread.currentThread().getName()));

此外一些操作符会使用指定的调度器。

Flux.interval(Duration.ofSeconds(1), Schedulers.single())
        .subscribe(System.out::println);

processor

Processor 是一个实现了 Publisher 和 Subscriber 接口的对象,它可以用来连接 Publisher 和 Subscriber。
多数情况下,你应该进行避免使用 Processor,它们较难正确使用,主要用于一些特殊场景下。
比起直接使用 Reactor 的 Processors,更好的方式是通过调用一次 sink() 来得到 Processor 的 Sink。

FluxProcessor<String, String> processor = DirectProcessor.create();
processor.subscribe(System.out::println);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();

sink

Sink 是一个接口,它定义了一些方法,用来向 Processor 发送数据。

UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<String> sink = processor.sink();
sink.next("foo");
sink.next("bar");
sink.next("baz");
sink.complete();

现有的 Processors 总览

  • DirectProcessor:直接处理器,它是一个同步的处理器,它会将所有的数据发送给所有的订阅者。
  • UnicastProcessor:单播处理器,它是一个同步的处理器,它只会将数据发送给第一个订阅者。
  • ReplayProcessor:重放处理器,它是一个异步的处理器,它会将所有的数据发送给所有的订阅者,包括那些在订阅之后才订阅的订阅者。
  • WorkQueueProcessor:工作队列处理器,它是一个异步的处理器,它会将所有的数据发送给所有的订阅者,包括那些在订阅之后才订阅的订阅者。
  • TopicProcessor:主题处理器,它是一个异步的处理器,它会将所有的数据发送给所有的订阅者,包括那些在订阅之后才订阅的订阅者。
  • EmitterProcessor:发射处理器,它是一个异步的处理器,它会将所有的数据发送给所有的订阅者,包括那些在订阅之后才订阅的订阅者。

Hooks

Hooks算是一个工具类,设定好以后,对后面的Flux和Mono都会回调Hooks设置的方法,类似操作系统的钩子。

本部分算是reactor中比较高级的部分,建议在开始上手用reactor做项目前,大概知道有这么一个概念即可。
做了一两个项目以后,再回头来看看hooks是做什么的即可

我把这部分的内容进行了拆分,详见:reactor之Hooks

Context

当从命令式编程风格切换到响应式编程风格的时候,一个技术上最大的挑战就是线程处理。
在命令式编程风格中,我们可以通过 ThreadLocal 来传递数据,
但是在响应式编程风格中,我们无法通过 ThreadLocal 来传递数据。
因为线程是由 Reactor 来管理的,我们无法控制线程的创建和销毁。
Context 就是用来解决这个问题的。Context 是一个接口,它定义了一些方法,用来获取和设置数据。

这部分内容相对也比较难以理解,建议把学习和理解放在后面,总之你需要用到类似多线程环境中的ThreadLocal类的时候,再来学习这部分不迟。

String key = "key";
Mono<String> r = Mono.just("hello")
        .flatMap(s -> Mono.subscriberContext()
                .map(ctx -> s + " " + ctx.get(key)))
        .subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world

context api

Context 是一个类似于 Map(这种数据结构)的接口:它存储键值(key-value)对,你需要通过 key 来获取值:

  • put 方法:将一个键值对放入 Context 中。
  • get 方法:通过 key 来获取值。
  • delete 方法:通过 key 来删除键值对。
  • hasKey 方法:通过 key 来判断是否存在键值对。
  • stream 方法:返回一个流,用来遍历 Context 中的所有键值对。
  • isEmpty 方法:判断 Context 是否为空。
  • size 方法:返回 Context 中键值对的个数。
  • putAll 方法:将一个 Context 中的所有键值对放入另一个 Context 中。
  • currentContext 方法:返回当前线程的 Context。
  • empty 方法:返回一个空的 Context。
  • root 方法:返回一个空的 Context。

把context 绑定到Flux and writing

String key = "key";
Flux<String> r = Flux.just("hello")
        .flatMap(s -> Mono.subscriberContext()
        .subscriberContext(ctx -> ctx.put(key, "world"));
                .map(ctx -> s + " " + ctx.get(key)))

从context中读取数据

String key = "key"
Flux<String> r = Flux.just("hello")
        .flatMap(s -> Mono.subscriberContext()
                .map(ctx -> s + " " + ctx.get(key)))
        .subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world

以上是关于Java ProjectReactor 响应式编程 Mono 简单工作流程解析的主要内容,如果未能解决你的问题,请参考以下文章

响应式编程库Reactor 3 Reference Guide参考文档中文版(v3.2.0)

响应式编程

reactor简单入门

JVM上的响应式流 — Reactor简介

gateway&reactive(响应式流)函数编程的webflux

Java 响应式编程