FluxMonoReactor 实战(史上最全)

Posted 架构师-尼恩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FluxMonoReactor 实战(史上最全)相关的知识,希望对你有一定的参考价值。

文章很长,建议收藏起来慢慢读! 疯狂创客圈 总目录 为您奉上珍贵的学习资源 :


前言

响应式编程用的是越来越多,尤其是在移动端 安卓的应用上边。

在Java后台服务开发中, 响应式编程用的不是太广泛,主要原因是, 响应式编程需要一个完整的生态, 包括数据库、缓存、中间件,都需要配套的响应式组件。 但是这点,其实很多并没有。

但是,随着 SpringCloud Gateway 的火爆, 响应式编程又变成了 不可回避, 不得不去学习的技术。

如果要做 SpringCloud Gateway 的开发, 就必须掌握一些响应式编程的知识。 由于最近在做 spring cloud gateway相关的开发工作, 所以:

把响应式编程Flux 和 Mono 的知识梳理一下,形成了此文

并且,此文会不断完善。

姊妹篇:关于 SpringCloud Gateway 简介

SpringCloud Gateway 是 Spring Cloud 的一个全新项目,该项目是基于 Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。

SpringCloud Gateway 作为 Spring Cloud 生态系统中的网关,目标是替代 Zuul,在Spring Cloud 2.0以上版本中,没有对新版本的Zuul 2.0以上最新高性能版本进行集成,仍然还是使用的Zuul 2.0之前的非Reactor模式的老版本。而为了提升网关的性能,SpringCloud Gateway是基于WebFlux框架实现的,而WebFlux框架底层则使用了高性能的Reactor模式通信框架Netty。

Spring Cloud Gateway 的目标,不仅提供统一的路由方式,并且基于 Filter 链的方式提供了网关基本的功能,例如:安全,监控/指标,和限流。

有关 Spring Cloud Gateway 响应式编程的实战, 具体请参考本文姊妹篇:

《SpringCloud gateway (史上最全))》

特别说明

Spring Cloud Gateway 底层使用了高性能的通信框架Netty。

Netty 是高性能中间件的通讯底座, rocketmq 、seata、nacos 、sentinel 、redission 、dubbo 等太多、太多的的大名鼎鼎的中间件,无一例外都是基于netty。

可以毫不夸张的说: netty 是进入大厂、走向高端 的必备技能

要想深入了解springcloud gateway ,最好是掌握netty 编程

有关 netty学习 具体请参见机工社出版 、尼恩的畅销书: Java高并发核心编程卷 1

响应式编程概述

背景知识

为了应对高并发服务器端开发场景,在2009 年,微软提出了一个更优雅地实现异步编程的方式——Reactive Programming,我们称之为响应式编程。随后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技术,使得Java 平台也有了能够实现响应式编程的框架。

在2017 年9 月28 日,Spring 5 正式发布。Spring 5 发布最大的意义在于,它将响应式编程技术的普及向前推进了一大步。而同时,作为在背后支持Spring 5 响应式编程的框架Spring Reactor,也进入了里程碑式的3.1.0 版本。

什么是响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

响应式编程基于reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的io操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。

电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。

响应式传播核心特点之一:变化传播:一个单元格变化之后,会像多米诺骨牌一样,导致直接和间接引用它的其他单元格均发生相应变化。

基于Java8实现观察者模式

Observable类:此类表示可观察对象,或模型视图范例中的“数据”。

它可以被子类实现以表示应用程序想要观察的对象。

//想要观察的对象 ObserverDemo
public class ObserverDemo extends Observable 
    public static void main(String[] args) 
        ObserverDemo observerDemo = new ObserverDemo();
        //添加观察者
        observerDemo.addObserver((o,arg)->
            System.out.println("数据发生变化A");
        );
        observerDemo.addObserver((o,arg)->
            System.out.println("数据发生变化B");
        );
        observerDemo.setChanged();//将此Observable对象标记为已更改
        observerDemo.notifyObservers();//如果该对象发生了变化,则通知其所有观察者
    

启动程序测试:

创建一个Observable

rxjava中,可以使用Observable.create() 该方法接收一个Obsubscribe对象

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() 
            @Override
            public void call(Subscriber<? super Integer> subscriber) 

            
        );

来个例子:

        Observable<Integer> observable=Observable.create(new Observable.OnSubscribe<Integer>() 
            @Override
            public void call(Subscriber<? super Integer> subscriber) 
                for(int i=0;i<5;i++)
                    subscriber.onNext(i);
                
                subscriber.onCompleted();
            
        );
        //Observable.subscribe(Observer),Observer订阅了Observable
        Subscription subscribe = observable.subscribe(new Observer<Integer>() 
            @Override
            public void onCompleted() 
                Log.e(TAG, "完成");
            

            @Override
            public void onError(Throwable e) 
                Log.e(TAG, "异常");
            

            @Override
            public void onNext(Integer integer) 
                Log.e(TAG, "接收Obsverable中发射的值:" + integer);
            
        );

输出:

 接收Obsverable中发射的值:0
接收Obsverable中发射的值:1
接收Obsverable中发射的值:2
接收Obsverable中发射的值:3
接收Obsverable中发射的值:4

从上面的例子可以看出,在Observer订阅了Observable后,

Observer作为OnSubscribe中call方法的参数传入,从而调用了Observer的相关方法

基于 Reactor 实现

Reactor 是一个运行在 Java8 之上满足 Reactice 规范的响应式框架,它提供了一组响应式风格的 API。

Reactor 有两个核心类: Flux<T>Mono<T>,这两个类都实现 Publisher 接口。

  • Flux 类似 RxJava 的 Observable,它可以触发零到多个事件,并根据实际情况结束处理或触发错误。
  • Mono 最多只触发一个事件,所以可以把 Mono 用于在异步任务完成时发出通知。

Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号;错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

三种信号的特点:

  • 错误信号和完成信号都是终止信号,不能共存
  • 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
  • 如果没有错误信号,也没有完成信号,表示是无限数据流

引入依赖

        <dependency>
            <groupId>org.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>1.1.6.RELEASE</version>
        </dependency>

just 和 subscribe方法

just():创建Flux序列,并声明指定数据流

subscribe():订阅Flux序列,只有进行订阅后才回触发数据流,不订阅就什么都不会发生

public class TestReactor 
    public static void main(String[] args) 
        //just():创建Flux序列,并声明数据流,
        Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4);//整形
        //subscribe():订阅Flux序列,只有进行订阅后才回触发数据流,不订阅就什么都不会发生
        integerFlux.subscribe(System.out::println);
        
        Flux<String> stringFlux = Flux.just("hello", "world");//字符串
        stringFlux.subscribe(System.out::println);
        
        //fromArray(),fromIterable()和fromStream():可以从一个数组、Iterable 对象或Stream 对象中创建Flux序列
        Integer[] array = 1,2,3,4;
        Flux.fromArray(array).subscribe(System.out::println);
        
        List<Integer> integers = Arrays.asList(array);
        Flux.fromIterable(integers).subscribe(System.out::println);
        
        Stream<Integer> stream = integers.stream();
        Flux.fromStream(stream).subscribe(System.out::println);
    

启动测试:

响应流的特点

要搞清楚这两个概念,必须说一下响应流规范。它是响应式编程的基石。他具有以下特点:

  • 响应流必须是无阻塞的。

  • 响应流必须是一个数据流。

  • 它必须可以异步执行。

  • 并且它也应该能够处理背压。

  • 即时响应性: 只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。

  • **回弹性:**系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。 (因此)组件的客户端不再承担组件失败的处理。

  • 弹性: 系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性

  • **消息驱动:**反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。

Publisher/Flux和Mono

由于响应流的特点,我们不能再返回一个简单的POJO对象来表示结果了。必须返回一个类似Java中的Future的概念,在有结果可用时通知消费者进行消费响应。

Reactive Stream规范中这种被定义为Publisher

Publisher是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber<? super T>的需求推送元素。

一个Publisher可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。

下面这个Excel计算就能说明一些Publisher的特点。

A1-A9就可以看做Publisher及其提供的元素序列。

A10-A13分别是求和函数SUM(A1:A9)、平均函数AVERAGE(A1:A9)、最大值函数MAX(A1:A9)、最小值函数MIN(A1:A9)

A10-A13可以看作订阅者Subscriber

假如说我们没有A10-A13,那么A1-A9就没有实际意义,它们并不产生计算。

这也是响应式的一个重要特点:当没有订阅时发布者什么也不做。而Flux和Mono都是Publisher在Reactor 3实现。

Publisher提供了subscribe方法,允许消费者在有结果可用时进行消费。

如果没有消费者Publisher不会做任何事情,他根据消费情况进行响应。

Publisher可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型Mono和Flux。

Flux

Flux 是一个发出(emit)0-N个元素组成的异步序列的Publisher,可以被onComplete信号或者onError信号所终止。

在响应流规范中存在三种给下游消费者调用的方法 onNext, onComplete, 和onError。下面这张图表示了Flux的抽象模型:

以上的的讲解对于初次接触反应式编程的依然是难以理解的,所以这里有一个循序渐进的理解过程。

有些类比并不是很妥当,但是对于你循序渐进的理解这些新概念还是有帮助的。

传统数据处理

我们在平常是这么写的:

public List<ClientUser> allUsers() 
    return Arrays.asList(new ClientUser("felord.cn", "reactive"),
            new ClientUser("Felordcn", "Reactor"));

我们通过迭代返回值Listget这些元素进行再处理(消费),不管有没有消费者, 菜品都会生产出来。

流式数据处理

Java 8中我们可以改写为流的表示:

public Stream<ClientUser> allUsers() 
    return  Stream.of(new ClientUser("felord.cn", "reactive"),
            new ClientUser("Felordcn", "Reactor"));

反应式数据处理

Reactor中我们又可以改写为Flux表示:

public Flux<ClientUser> allUsers()
    return Flux.just(new ClientUser("felord.cn", "reactive"),
            new ClientUser("Felordcn", "Reactor"));

这时候食客来了,发生了订阅,厨师才开始做。

Flux 的创建Demo

Flux ints = Flux.range(1, 4);
Flux seq1 = Flux.just("bole1", "bole2", "bole3");
List iterable = Arrays.asList("bole_01", "bole_02", "bole_03");
Flux seq2 = Flux.fromIterable(iterable);
seq2.subscribe(i -> System.out.println(i));

Mono

Mono 是一个发出(emit)0-1个元素的Publisher,可以被onComplete信号或者onError信号所终止。

mono 整体和Flux差不多,只不过这里只会发出0-1个元素。也就是说不是有就是没有。

象Flux一样,我们来看看Mono的演化过程以帮助理解。

传统数据处理

public ClientUser currentUser () 
    return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;

直接返回符合条件的对象或者null`。

Optional的处理方式

public Optional<ClientUser> currentUser () 
    return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))
            : Optional.empty();

这个Optional我觉得就有反应式的那种味儿了,当然它并不是反应式。当我们不从返回值Optional取其中具体的对象时,我们不清楚里面到底有没有,但是Optional是一定客观存在的,不会出现NPE问题。

反应式数据处理

public Mono<ClientUser> currentUser () 
    return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
            : Mono.empty();

和Optional有点类似的机制,当然Mono不是为了解决NPE问题的,它是为了处理响应流中单个值(也可能是Void)而存在的。

Mono的创建Demo

Mono data = Mono.just("bole");

Mono noData = Mono.empty();

m.subscribe(i -> System.out.println(i));

Flux和Mono总结

Flux和Mono是Java反应式中的重要概念,但是很多同学包括我在开始都难以理解它们。这其实是规定了两种流式范式,这种范式让数据具有一些新的特性,比如基于发布订阅的事件驱动,异步流、背压等等。另外数据是推送(Push)给消费者的以区别于平时我们的拉(Pull)模式。同时我们可以像Stream Api一样使用类似mapflatmap等操作符(operator)来操作它们。

函数编程

反应式编程,常常和函数式编程结合,这就是让大家困扰的地方

函数编程接口

接口函数名说明
BiConsumer表示接收两个输入参数和不返回结果的操作。
BiFunction表示接受两个参数,并产生一个结果的函数。
BinaryOperator表示在相同类型的两个操作数的操作,生产相同类型的操作数的结果。
BiPredicate代表两个参数谓词(布尔值函数)。
BooleanSupplier代表布尔值结果的提供者。
Consumer表示接受一个输入参数和不返回结果的操作。
DoubleBinaryOperator代表在两个double值操作数的运算,并产生一个double值结果。
DoubleConsumer表示接受一个double值参数,不返回结果的操作。
DoubleFunction表示接受double值参数,并产生一个结果的函数。
DoublePredicate代表一个double值参数谓词(布尔值函数)。
DoubleSupplier表示表示接受double值参数,并产生一个结果的函数。值结果的提供者。
DoubleToIntFunction表示接受一个double值参数,不返回结果的操作。
DoubleFunction表示接受double值参数,并产生一个结果的函数。
DoublePredicate代表一个double值参数谓词(布尔值函数)。
DoubleSupplierDoubleToIntFunction
DoubleToIntFunction表示接受double值参数,并产生一个int值结果的函数。
DoubleToLongFunction表示上产生一个double值结果的单个double值操作数的操作。
Function代表接受一个double值参数,并产生一个long值结果的函数。
DoubleUnaryOperator表示上产生一个double值结果的单个double值操作数的操作。
Function表示接受一个参数,并产生一个结果的函数。
IntConsumer表示接受单个int值的参数并没有返回结果的操作。
IntFunction表示接受一个int值参数,并产生一个结果的函数。
IntPredicate表示一个整数值参数谓词(布尔值函数)。
IntSupplier代表整型值的结果的提供者。
IntToLongFunction表示接受一个int值参数,并产生一个long值结果的函数。
IntUnaryOperator表示产生一个int值结果的单个int值操作数的运算。
LongBinaryOperator表示在两个long值操作数的操作,并产生一个ObjLongConsumer值结果。
LongFunction表示接受long值参数,并产生一个结果的函数。
LongPredicate代表一个long值参数谓词(布尔值函数)。
LongSupplier表示long值结果的提供者。
LongToDoubleFunction表示接受double参数,并产生一个double值结果的函数。
LongToIntFunction表示接受long值参数,并产生一个int值结果的函数。
LongUnaryOperator表示上产生一个long值结果单一的long值操作数的操作。
ObjDoubleConsumer表示接受对象值和double值参数,并且没有返回结果的操作。
ObjIntConsumer表示接受对象值和整型值参数,并返回没有结果的操作。
ObjLongConsumer表示接受对象值和整型值参数,并返回没有结果的操作。
ObjLongConsumer表示接受对象值和double值参数,并且没有返回结果的操作。
ObjIntConsumer表示接受对象值和整型值参数,并返回没有结果的操作。
ObjLongConsumer表示接受对象的值和long值的说法,并没有返回结果的操作。
Predicate代表一个参数谓词(布尔值函数)。
Supplier表示一个提供者的结果。
ToDoubleBiFunction表示接受两个参数,并产生一个double值结果的功能。
ToDoubleFunction代表一个产生一个double值结果的功能。
ToIntBiFunction表示接受两个参数,并产生一个int值结果的函数。
ToIntFunction代表产生一个int值结果的功能。
ToLongBiFunction表示接受两个参数,并产生long值结果的功能。
ToLongFunction代表一个产生long值结果的功能。
UnaryOperator表示上产生相同类型的操作数的结果的单个操作数的操作。

常用函数编程示例

Consumer 一个输入 无输出

Product product=new Product();
//类名+静态方法  一个输入T 没有输出
Consumer consumer1 = Product->Product.nameOf(product);//lambda
consumer1.accept(product);
Consumer consumer = Product::nameOf;//方法引用
consumer.accept(product);

Funtion<T,R> 一个输入 一个输出

//对象+方法  一个输入T 一个输出R
Function<Integer, Integer> function = product::reduceStock;
System.out.println("剩余库存:" + function.apply(10));
//带参数的构造函数
Function<Integer,Product> function1=Product::new;
System.out.println("新对象:" +function1.apply(200));

Predicate 一个输入T, 一个输出 Boolean

//Predicate 一个输入T 一个输出Boolean
Predicate predicate= i -> product.isEnough(i);//lambda
System.out.println("库存是否足够:"+predicate.test(100));
Predicate predicate1= product::isEnough;//方法引用
System.out.println("库存是否足够:"+predicate1.test(100));

UnaryOperator 一元操作符 输入输出都是T

//一元操作符  输入和输出T
UnaryOperator integerUnaryOperator =product::reduceStock;
System.out.println("剩余库存:" + integerUnaryOperator.apply(20));
IntUnaryOperator intUnaryOperator = product::reduceStock;
System.out.println("剩余库存:" + intUnaryOperator.applyAsInt(30));

Supplier 没有输入 只有输出

//无参数构造函数
Supplier supplier = Product::new;
System.out.println("创建新对象:" + supplier.get());
Supplier supplier1=()->product.getStock();
System.out.println("剩余库存:" + supplier1.get());

BiFunction 二元操作符 两个输入<T,U> 一个输出

//类名+方法
BiFunction<Product, Integer, Integer> binaryOperator = Product::reduceStock;
System.out.println(" 剩余库存(BiFunction):" + binaryOperator.apply(product, 10));

BinaryOperator 二元操作符 ,二个输入 一个输出

//BinaryOperator binaryOperator1=(x,y)->product.reduceStock(x,y);
BinaryOperator binaryOperator1=product::reduceStock;
System.out.println(" 剩余库存(BinaryOperator):" +binaryOperator1.apply(product.getStock(),10));

Flux类中的静态方法:

简单的创建方法

just():

可以指定序列中包含的全部元素。创建出来的Flux序列在发布这些元素之后会自动结束

fromArray(),fromIterable(),fromStream():

可以从一个数组,Iterable对象或Stream对象中穿件Flux对象

empty():

创建一个不包含任何元素,只发布结束消息的序列

error(Throwable error):

创建一个只包含错误消息的序列

never():

传建一个不包含任务消息通知的序列

range(int start, int count):

创建包含从start起始的count个数量的Integer对象的序列

interval(Duration period)和interval(Duration delay, Duration period):

创建一个包含了从0开始递增的Long对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间

intervalMillis(long period)和intervalMillis(long delay, long period):

与interval()方法相同,但该方法通过毫秒数来指定时间间隔和延迟时间

例子

Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[]1, 2, 3).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscirbe(System.out::println);

复杂的序列创建 generate()

当序列的生成需要复杂的逻辑时,则应该使用generate()或create()方法。

generate()方法通过同步和逐一的方式来产生Flux序列。

序列的产生是通过调用所提供的的SynchronousSink对象的next(),complete()和error(Throwable)方法来完成的。

逐一生成的含义是在具体的生成逻辑中,next()方法只能最多被调用一次。

在某些情况下,序列的生成可能是有状态的,需要用到某些状态对象,此时可以使用

generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator),

其中stateSupplier用来提供初始的状态对象。

在进行序列生成时,状态对象会作为generator使用的第一个参数传入,可以在对应的逻辑中对改状态对象进行修改以供下一次生成时使用。

Flux.generate(sink -> 
  sink.next("Hello");
  sink.complete();  
).subscribe(System.out::println);


final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> 
  int value = random.nextInt(100);
  list.add(value);
  sink.next(value);
  if( list.size() ==10 )
    sink.complete();
  return list;
).subscribe(System.out::println);

复杂的序列创建 create()

create()方法与generate()方法的不同之处在于所使用的是FluxSink对象。

FluxSink支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。

Flux.create(sink -> 
  for(int i = 0; i < 10; i ++)
    sink.next(i);
  sink.complete();
).subscribe(System.out::println);

Mono静态方法

Mono类包含了与Flux类中相同的静态方法:just(),empty()和never()等。

除此之外,Mono还有一些独有的静态方法:

fromCallable(),fromCompletionStage(),fromFuture(),fromRunnable()和fromSupplier():分别从Callable,CompletionStage,CompletableFuture,Runnable和Supplier中创建Mono

delay(Duration duration)和delayMillis(long duration):创建一个Mono序列,在指定的延迟时间之后,产生数字0作为唯一值

ignoreElements(Publisher source):创建一个Mono序列,忽略作为源的Publisher中的所有元素,只产生消息

justOrEmpty(Optional<? extends T> data)和justOrEmpty(T data):从一个Optional对象或可能为null的对象中创建Mono。只有Optional对象中包含之或对象不为null时,Mono序列才产生对应的元素

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribte(System.out::println);

操作符

操作符buffer和bufferTimeout

这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。

在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。方法buffer()仅使用一个条件,而bufferTimeout()可以同时指定两个条件。

指定时间间隔时可以使用Duration对象或毫秒数,即使用bufferMillis()或bufferTimeoutMillis()两个方法。

除了元素数量和时间间隔外,还可以通过bufferUntil和bufferWhile操作符来进行收集。这两个操作符的参数时表示每个集合中的元素索要满足的条件的Predicate对象。

bufferUntil会一直收集直到Predicate返回true。

使得Predicate返回true的那个元素可以选择添加到当前集合或下一个集合中;bufferWhile则只有当Predicate返回true时才会收集。一旦为false,会立即开始下一次收集。

Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i%2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i%2 == 0).subscribe(System.out::println);

操作符Filter

对流中包含的元素进行过滤,只留下满足Predicate指定条件的元素。

Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);

操作符zipWith

zipWith操作符把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为Tuple2的流;也可以通过一个BiFunction函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。

Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println);

操作符take

take系列操作符用来从当前流中提取元素。提取方式如下:

take(long n),take(Duration timespan)和takeMillis(long timespan):按照指定的数量或时间间隔来提取

takeLast(long n):提取流中的最后N个元素

takeUntil(Predicate<? super T> predicate) :提取元素直到Predicate返回true

takeWhile(Predicate<? super T> continuePredicate):当Predicate返回true时才进行提取

takeUntilOther(Publisher<?> other):提取元素知道另外一个流开始产生元素

Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);

操作符reduce和reduceWith

reduce和reduceWith操作符对流中包含的所有元素进行累计操作,得到一个包含计算结果的Mono序列。累计操作是通过一个BiFunction来表示的。在操作时可以指定一个初始值。若没有初始值,则序列的第一个元素作为初始值。

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x + y) -> x + y).subscribe(System.out::println);

操作符merge和mergeSequential

merge和mergeSequential操作符用来把多个流合并成一个Flux序列。merge按照所有流中元素的实际产生序列来合并,而mergeSequential按照所有流被订阅的顺序,以流为单位进行合并。

Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);

操作符flatMap和flatMapSequential

flatMap和flatMapSequential操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential和flatMap之间的区别与mergeSequential和merge是一样的。

Flux.just(5, 10).flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x)).toStream().forEach(System.out::println);

操作符concatMap

concatMap操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。concatMap会根据原始流中的元素顺序依次把转换之后的流进行合并,并且concatMap堆转换之后的流的订阅是动态进行的,而flatMapSequential在合并之前就已经订阅了所有的流。

Flux.just(5, 10).concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x)).toStream().forEach(System.out::println);

操作符combineLatest

combineLatest操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。

Flux.combineLatest(Arrays::toString, Flux.intervalMillis(100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);

消息处理

当需要处理Flux或Mono中的消息时,可以通过subscribe方法来添加相应的订阅逻辑。

在调用subscribe方法时可以指定需要处理的消息类型。

Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).subscribe(System.out::println, System.err::println);

Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).onErrorReturn(0).subscribe(System.out::println);

第2种可以通过switchOnError()方法来使用另外的流来产生元素。

Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).switchOnError(Mono.just(0)).subscribe(System.out::println);

第三种是通过onErrorResumeWith()方法来根据不同的异常类型来选择要使用的产生元素的流。

Flux.just(1, 2).concatWith(Mono.error(new IllegalArgumentException())).onErrorResumeWith(e -> 
  if(e instanceof IllegalStateException)
    return Mono.just(0);
  else if(e instanceof IllegalArgumentException)
    return Mono.just(-1);
  return Mono.epmty();
).subscribe(System,.out::println);

当出现错误时还可以使用retry操作符来进行重试。重试的动作是通过重新订阅序列来实现的。在使用retry操作时还可以指定重试的次数。

 Flux.just(1, 2).concatWith(Mono.error(new IllegalStateException())).retry(1).subscrible(System.out::println);

调度器Scheduler

通过调度器可以指定操作执行的方式和所在的线程。有以下几种不同的调度器实现

当前线程,通过Schedulers.immediate()方法来创建

单一的可复用的线程,通过Schedulers.single()方法来创建

使用弹性的线程池,通过Schedulers.elastic()方法来创建。线程池中的线程是可以复用的。当所需要时,新的线程会被创建。若一个线程闲置时间太长,则会被销毁。该调度器适用于I/O操作相关的流的处理

使用对并行操作优化的线程池,通过Schedulers.parallel()方法来创建。其中的线程数量取决于CPU的核的数量。该调度器适用于计算密集型的流的处理

使用支持任务调度的调度器,通过Schedulers.timer()方法来创建

从已有的ExecutorService对象中创建调度器,通过Schedulers.fromExecutorService()方法来创建

通过publishOn()和subscribeOn()方法可以切换执行操作调度器。publishOn()方法切换的是操作符的执行方式,而subscribeOn()方法切换的是产生流中元素时的执行方式

Flux.create(sink -> 
  sink.next(Thread.currentThread().getName());
  sink.complete();  
).publishOn(Schedulers.single())
.map(x ->  String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);

测试

StepVerifier的作用是可以对序列中包含的元素进行逐一验证。通过StepVerifier.create()方法对一个流进行包装之后再进行验证。expectNext()方法用来声明测试时所期待的流中的下一个元素的值,而verifyComplete()方法则验证流是否正常结束。verifyError()来验证流由于错误而终止。

StepVerifier.create(Flux.just(a, b)).expectNext("a").expectNext("b").verifyComplete();

使用StepVerifier.withVirtualTime()方法可以创建出使用虚拟时钟的SteoVerifier。通过thenAwait(Duration)方法可以让虚拟时钟前进。

StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))
 .expectSubscription()
 .expectNoEvent(Duration.ofHours(4))
 .expectNext(0L)
 .thenAwait(Duration.ofDays(1))
 .expectNext(1L)
 .verifyComplete();

TestPublisher的作用在于可以控制流中元素的产生,甚至是违反反应流规范的情况。通过create()方法创建一个新的TestPublisher对象,然后使用next()方法来产生元素,使用complete()方法来结束流。

final TestPublisher<String> testPublisher = TestPublisher.creater();
testPublisher.next("a");
testPublisher.next("b");
testPublisher.complete();

StepVerifier.create(testPublisher)
    .expectNext("a")
    .expectNext("b")
    .expectComplete();

调试

在调试模式启用之后,所有的操作符在执行时都会保存额外的与执行链相关的信息。当出现错误时,这些信息会被作为异常堆栈信息的一部分输出。

Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());

也可以通过checkpoint操作符来对特定的流

以上是关于FluxMonoReactor 实战(史上最全)的主要内容,如果未能解决你的问题,请参考以下文章

disruptor (史上最全之1):伪共享原理&性能对比实战

disruptor (史上最全之1):伪共享原理&性能对比实战

Sharding-JDBC 实战(史上最全)

史上最全idea插件开发入门实战(傻瓜式教程)

史上最全接口测试实战案例(绝密首次公开)

Java 粘包/半包 原理与拆包实战(史上最全)