Reactor响应式编程
Posted BasicLab基础架构实验室
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Reactor响应式编程相关的知识,希望对你有一定的参考价值。
响应式编程
(万字长文预警)
学习Spring WebFlux之余,发现好多概念理不清楚,细细一看,明白是自己不懂响应式编程哈哈哈哈,故利用上课时间,根据官网稍作整理,以供更加深入的学习。
为了防止出现争议,我会尽量附属原文词汇,因为这篇文章是我人工翻译的,所以可能会有翻译不到位的地方,所以建议大家还是去看官方文档。所有的冲突均以官方文档为准。
简介
Reactor响应式编程(Reactive Programming),是对响应式编程规范的实现,可被总结为以下几点:
Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
— https://en.wikipedia.org/wiki/Reactive_programming
说人话就是: 响应式编程是一种异步编程范式,它涉及数据流和变化的传播。这意味着可以通过采用的编程语言轻松地表达静态(例如数组)或动态(例如事件发射器)数据流。时代在进步,通过Reactive Streams的努力,使得Java标准化了此规范,该规范定义了JVM上的响应库的一组接口和交互规则。它的接口已在Flow类下集成到Java 9中。响应式编程范例通常以面向对象的语言表示,作为Observer设计模式(观察者模式)的扩展。
Reactor是JVM的一个完全非阻塞的响应式编程实现,具有高效的需求管理(以管理“负压”的形式)。它直接与Java8函数式API集成在一起,特别是CompletableFuture,Stream和Duration。它提供了可组合的异步序列API,例如Flux(用于[0|N]个元素)和Mono(用于[0|1]个元素),并且扩展实现了Reactive Streams规范。Reactor还支持与Reactor-Netty项目的非阻塞进程间通信。适用于微服务架构,Reactor Netty为HTTP(包括Websockets),TCP和UDP提供了支持背压的网络引擎。完全支持响应式编码和解码。
可以将主要的响应流模式与熟悉的Iterator设计模式进行比较,因为所有这些库中的Iterable-Iterator对都有双重性。一个主要的区别是,虽然Iterator是基于pull的(拉取式),但是响应流却是基于push的(推压式)。使用迭代器是命令式编程模式,即访问值的方法仅由Iterable负责(拉取)。确实,开发人员可以选择何时访问序列中的next()项。在响应式流中,上述迭代器对等效于Publisher-Subscriber对。
在响应式编程里,是Publisher在新的可用值出现时通知Subscriber,而此推送模式是做出反应的关键。(通知Subscriber就像推送消息,所以叫push,在迭代器中,是迭代器主动获取下一个迭代值,就像拉取值,所以是pull)。同样,应用于推送值的操作,是以声明方式而不是命令方式,表示:
程序员描述计算的逻辑,而不是描述其确切的控制流程。(更加OOP)
除了推送值之外,还以明确定义的方式覆写了错误处理和完成操作这两个方面。
发布者可以将新值推送到其订阅者(通过调用onNext),但也可以发送错误信号(通过调用onError)或完成操作(通过调用onComplete)。
错误信号和完成操作都会终止序列。
这种方法非常灵活。该模式支持没有值,一个值或n个值(包括无限个值序列,例如时钟的连续滴答声)的用例。
阻塞方式的不足:
现代应用程序会面对大量的并发操作,即使现代硬件的功能不断提高,现代软件的性能仍然是解决问题的关键。
广义上讲,有两种方法可以提高程序的性能:
a) 并行使用更多线程和更多硬件资源。
b) 在使用现有资源的情况下,寻求更高的效率。
通常,Java开发人员通过使用阻塞代码来编写程序。在没有遇到瓶颈之前,这种做法收效甚好。然后是时候引入其他线程,(也就是多线程编程)运行类似的阻塞代码了。 但是这种形式的资源利用会迅速引发竞争和并发问题。更糟糕的是,阻塞会浪费资源。如果仔细观察,程序一旦遇到一些耗时操作(特别是I/O,例如数据库请求或网络调用),就会浪费资源,因为线程(可能有很多线程)现在处于空闲状态,啥也不干,就呆呆地等待数据。因此,并行化方法不是灵丹妙药。我们有必要榨干硬件全部性能。
解决:前面提到的第二种方法,寻求更高的效率,可以解决资源浪费的问题。通过编写异步的非阻塞代码,您可以将执行操作切换到使用相同基础资源的另一个活动任务上去,并在异步处理完成后返回到当前进程。 但是如何在JVM上编写异步代码?Java提供了两种异步编程模型:
a) Callbacks(回调):没有返回值的异步方法,但是带有一个额外的回调参数(lambda或匿名类),该参数在结果可用时被调用。
一个著名的例子是Swing的EventListener层次结构。
b) Future(将来时):异步方法立即返回Future<T>。异步处理计算T值,但是Future会对象包装对其的访问。
该值不是立即可用的,程序可以轮询该对象,直到该值可用为止。例如,运行Callable<T>任务的ExecutorService使用Future对象。
这些技术够好吗?并非每种情况下都是完美的,这两种方法都有局限性。回调很难组合在一起,会导致难以阅读和维护的代码(称为“回调地狱”)。比如:
userService.getFavorites(userId,newCallback<List<String>>()publicvoidonSuccess(List<String>list)if(list.isEmpty())suggestionService.getSuggestions(newCallback<List<Favorite>>()publicvoidonSuccess(List<Favorite>list)UiUtils.submitOnUiThread(()->list.stream().limit(5).forEach(uiList::show););publicvoidonError(Throwableerror)UiUtils.errorPopup(error););elselist.stream().limit(5).forEach(favId->favoriteService.getDetails(favId,newCallback<Favorite>()publicvoidonSuccess(Favoritedetails)UiUtils.submitOnUiThread(()->uiList.show(details));publicvoidonError(Throwableerror)UiUtils.errorPopup(error);));publicvoidonError(Throwableerror)UiUtils.errorPopup(error););
在Reactor中,等效的代码会美观很多:
userService.getFavorites(userId).flatMap(favoriteService::getDetails).switchIfEmpty(suggestionService.getSuggestions()).take(5).publishOn(UiUtils.uiThreadScheduler()).subscribe(uiList::show,UiUtils::errorPopup);
Future对象比回调要好一些,但是尽管Java 8对CompletableFuture进行了改进,但它们在组合方面仍然表现不佳。一起编排多个Future对象是可行的,但并不容易。另外,Future还有其他问题:
a) 通过调用get()方法很容易让Future对象杀死另一个阻塞任务的运行。
b) 它们不支持惰性计算。
c) 他们缺乏对多个值和高级错误处理的支持。
比如:
CompletableFuture<List<String>>ids=ifhIds();CompletableFuture<List<String>>result=ids.thenComposeAsync(l->Stream<CompletableFuture<String>>zip=l.stream().map(i->CompletableFuture<String>nameTask=ifhName(i);CompletableFuture<Integer>statTask=ifhStat(i);returnnameTask.thenCombineAsync(statTask,(name,stat)->"Name "+name+" has stats "+stat););List<CompletableFuture<String>>combinationList=zip.collect(Collectors.toList());CompletableFuture<String>[]combinationArray=combinationList.toArray(newCompletableFuture[combinationList.size()]);CompletableFuture<Void>allDone=CompletableFuture.allOf(combinationArray);returnallDone.thenApply(v->combinationList.stream().map(CompletableFuture::join).collect(Collectors.toList())););List<String>results=result.join();assertThat(results).contains("Name NameJoe has stats 103","Name NameBart has stats 104","Name NameHenry has stats 105","Name NameNicole has stats 106","Name NameABSLAJNFOAJNFOANFANSF has stats 121");
从命令式编程到响应式编程:响应式库(例如Reactor)旨在解决JVM上“经典”异步方法的这些缺点,同时还着重于其他一些方面:
a) 可组合性和可读性。
b) 以丰富的操作工集合(vocabulary of operators)操纵数据流。
c) 订阅之前不会产生任何动作。
d) 负压或消费者向生产者发出推送速率过高的信号的能力。
e) 并发不可知的高级但高价值的抽象。
可组合性和可读性:
所谓“可组合性”,是指协调多个异步任务的能力,在这种模式下,我们使用先前任务的结果作为输入反馈给后续任务。
另外,我们可以以fork-join形式运行多个任务。
此外,我们可以将异步任务重用为更高级别系统中的离散组件。
编排可执行任务的能力与代码的可读性和可维护性紧密相关。
随着异步处理层的数量和复杂性的增加,能够组合和阅读代码变得越来越困难。
如我们所见,回调模型很简单,但是它的主要缺点之一是,对于复杂的流程,您需要从一个回调中执行一个回调,该回调本身嵌套在另一个回调中,依此类推。
这种混乱被称为“回调地狱”。您可以猜测(或从经验中学到),很难对这样的代码进行处理
Reactor提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且通常将所有内容保持在同一层面(将嵌套最小化)。
像流水线那样:
您可以将响应式应用程序的数据处理视为流水线。Reactor既是传送带又是工作站。
原材料从来源(原始的Publisher)运送出来,经过处理,最终成为准备好的以推向消费者(Subscriber)的成品。
原材料可以经过各种转换和其他中间步骤,也可以成为将中间件聚集在一起的较大装配线的一部分。
如果某一点出现故障或堵塞(也许装箱产品花费的时间过长),那么辛苦的工作站可以向上游发出信号,以减少原材料的推送。
操作工(Operators):
在Reactor中,操作工是我们生产线中的工作站。
每个操作工都会向Publisher添加行为,并将上一步的Publisher包装到新实例中。
因此,整个链被链接在一起,这样数据就从第一个发布者发出并在链中向下移动,并由每个链接进行处理。
最终,订阅者完成了该过程。请记住,在订阅者订阅发布者之前,什么也不会发生,正如我们不久将看到的那样。
虽然响应式流规范根本没有指定操作工,但是响应式库的最佳附加值之一(例如Reactor)是它们提供了丰富的操作工供我们使用。
从简单的转换和过滤到复杂的编排和错误处理,这些内容涉及很多领域。
直到你订阅之前,什么都不会发生:
在Reactor中,当编写Publisher链时,默认情况下不会将数据泵入其中。
相反,可以创建异步过程的抽象描述(这有助于重用和组合)。
通过订阅,您将发布者与订阅者绑定在一起,从而触发了整个链中的数据流动。
这是在内部通过来自Subscriber的单个请求信号实现的,该请求信号向上游传播,一直追溯到源发布者。
负压(Backpressure):
向上游传播的信号也用于实现负压,我们在组装流水线中将其描述为当工作站的处理速度比上游工作站慢时,沿生产线向上发送的反馈信号。
Reactive Streams规范定义的实际机制非常接近这个比喻:订阅者可以以无界模式工作,并让源以最快可达到的速率推送所有数据,或者可以使用请求机制向源发出信号,告知它准备处理最多n个元素。
中间操作工还可以在途中更改请求。想象一下一个缓冲区操作工,它将元素以十个为一组进行分组。如果订阅者请求一个缓冲区,则源产生十个元素是理所当然的。
一些操作工还实现了预取策略,避免了request(1)往返,如果在请求之前生成元素的成本不太高的话,这将是有益的。
冷热响应:
冷响应:对于每个订阅者,包括在数据源处,数据推送都会从头开始。例如,如果源包装了HTTP调用,则会为每个订阅发出一个新的HTTP请求。
热响应:并不是每个订阅者都可以从头开始接收数据。后面的订阅者会在订阅后接收已推送的信号。但是请注意,某些热反应流可以全部或部分缓存或重新上演发送的历史信息。
从一般的角度来看,即使没有订阅者在监听,热序列也甚至会发送数据(违背了:“订阅之前什么也不发送”的规则)。
起步需要
JDK8.0+
io.projectreactor包
核心特性
Reactor引入了可组合的响应式类型,这些类型实现了Publisher,但也提供了丰富的操作工集合:Flux和Mono。
Flux对象表示0..N个元素的响应序列,而Mono对象表示单值或空(0..1)序列。 这种区别在类型中包含了一些语义信息,表明了异步处理的粗略基数(基本个数)。 例如,一个HTTP请求仅产生一个响应,因此进行计数操作没有多大意义。 因此,将HTTP调用的结果表示为Mono\\<HttpResponse>比将其表示为Flux\\<HttpResponse>更有意义,因为它仅提供与零项目或一个项目的上下文相关的操作工。 更改处理最大基数的操作工也将切换到相关类型。例如,count操作工存在于Flux中,但它返回Mono\\<Long>。
Flux
Flux\\<T>是标准的Publisher\\<T>(因为它最多只有一个数据嘛~),表示它是可以发送0到N个元素的异步序列,可选的终止操作有操作完成或发生错误。 与Reactive Streams规范一样,这三种信号转换为对下游订户的onNext,onComplete和onError方法的调用。 在可能的信号范围如此之大的情况下,Flux是通用的响应式类型。 请注意,所有事件,甚至是终止事件,都是可选的,意思是:可能没有onNext事件,但只有onComplete事件,这就表示此Flux是一个空的有限序列。删除onComplete则可以得到一个无限的空序列(除了取消测试外,它没有什么用处)。 同样,无限序列不一定为空。例如,Flux.interval(Duration)产生Flux\\<Long>,它是无限的并且是从时钟发出的规则的滴答声。
Mono
Mono\\<T>是特制的Publisher\\<T>,它最多发出一项,然后(可选)以onComplete信号或onError信号终止。 它仅提供可用于Flux的操作工的子集,并且某些操作工(尤其是那些将Mono与其他Publisher结合在一起的操作工)可以把Mono切换到Flux。 例如,Mono.concatWith(Publisher)返回Flux,而Mono.then(Mono)返回另一个Mono。 请注意,您可以使用Mono来表示仅具有完成概念(类似于Runnable)的无值异步过程。要创建一个这样的Mono,可以使用一个空的Mono\\<Void>。
简单地创建Flux和Mono并订阅
Flux和Mono的创建基本是使用工厂方法:
a) 对于创建String类型的Flux:
Flux<String>flux1=Flux.just("qwer","asdf","zxcv");List<String>list=Arrays.asList("rewq","fdsa","vcxz");Flux<String>flux2=Flux.fromIterable(list);
还有一种创建方法:
Mono<String>empty=Mono.empty();// 创建一个空的Mono,即使是泛型,也会创建
Mono<String>mono=Mono.just("qwer");// 创建一个String类型的Mono
Flux<Integer>flux=Flux.range(0,5);//创建一个自增的整型序列
进行订阅
b) Flux和Mono有5种订阅方法签名的变种:
// 此方法完成订阅并触发序列
subscribe();// 对于传过来的每个值进行处理
subscribe(Consumer<?superT>consumer);// 处理传过来的每个值并处理过程中的异常
subscribe(Consumer<?superT>consumer,Consumer<?superThrowable>errorConsumer);// 处理传过来的值和异常并在序列完成时进行一些动作
subscribe(Consumer<?superT>consumer,Consumer<?superThrowable>errorConsumer,RunnablecompleteConsumer);// 处理传过来的值,异常,完成事件,以及此订阅调用产生的订阅(说白了就是,当订阅发生会调用subscriptionConsumer来执行一些任务(比如初始化操作))
subscribe(Consumer<?superT>consumer,Consumer<?superThrowable>errorConsumer,RunnablecompleteConsumer,Consumer<?superSubscription>subscriptionConsumer);
c) 以上方法的返回值均为Disposable,此类拥有取消订阅的能力。虽然调用Disposable的dispose()方法可以实现对订阅的取消,但是如果发布者推送过快,可能会在取消之前就推送了全部的数据。(只要我推送地够快,取消就追不上我哈哈哈哈) Disposables.swap()可以创建一个拥有自动取消功能的Disposable,或者替换一个Disposable。Disposables.composite(...)允许集成多个Disposable
自定义订阅器
lambda表达式的替代品:BaseSubscriber:
此类更多的是帮助完成用户自定义的订阅器。(通过继承它)。 用户自定义时的最低实现要求是实现hookOnSubscribe(Subscription subscription)和hookOnNext(T value)方法。
前一个方法指出了订阅被调用时应该干嘛,后一个进行下一个数据请求调用,别忘了调用request(int)方法,来进行数据请求。 requestUnbounded()方法等同于request(Long.MAX_VALUE))以切换到无界模式;还提供了cancel()方法。 还有诸如hookOnComplete(), hookOnError(), hookOnCancel(), 和hookFinally()等方法,后一个提供一个描述终止类型的参数。
消费原始请求的最简单方法是使用BaseSubscriber进行订阅,并覆盖hookOnSubscribe方法,如以下示例所示:
Flux.range(1,10).doOnRequest(r->System.out.println("request of "+r)).subscribe(newBaseSubscriber<Integer>()@OverridepublicvoidhookOnSubscribe(Subscriptionsubscription)request(1);@OverridepublicvoidhookOnNext(Integerinteger)System.out.println("Cancelling after having received "+integer);cancel(););
输出如下所示:
request of 1
Cancelling after having received 1
注意:在处理请求时,您必须小心以产生足够的需求来推进序列,否则您的Flux可能会“卡住”。 这就是为什么BaseSubscriber在hookOnSubscribe中默认为无界请求。覆盖此方法时,通常应至少调用一次请求(request(1))。
关于负压和重塑请求的方法:
在Reactor中实施负压时,通过向上游操作工发送请求,将消费者压力传播回源生产者。 当前请求的总和有时被称为当前“需求”或“待处理请求”。 需求的上限为Long.MAX_VALUE,代表一个无限制的请求(意思是“尽可能快地生成”-意味着会禁用负压)。 第一个请求在订阅时来自最终订阅者,但是最直接的订阅全部的方式会立刻触发Long.MAX_VALUE的无限制请求:(既以下方法会触发无边界请求)
subscription()及其大多数基于lambda的变种(拥有Consumer<Subscription>参数的除外)。
block(),blockFirst()和blockLast()。
通过toIterable()或toStream()进行迭代。
操作工会修改来自下游的需求:
要记住的一件事是,上游链中的每个操作工都可以调整在订阅级别表达的需求。
典型的例子是buffer(N)操作工:如果它收到一个request(2),则解释为对两个完整缓冲区的需求。结果,由于缓冲区需要将N个元素视为已满,因此缓冲区操作工会将请求重塑为2xN。
您可能还已经注意到,某些操作工的变体采用了称为预取的int输入参数。这是修改下游请求的另一类操作工。这些通常是处理内部序列的操作工,是从每个传入元素(例如flatMap)派生出Publisher的。
预取是一种调整对这些内部序列发出的初始请求的方法。如果未指定,则大多数这些操作工默认为32。这些操作工通常还会实施补充优化:一旦操作工看到预取请求的75%得到满足,它就会从上游重新请求75%。进行启发式优化,以便这些操作工主动预测即将到来的请求。
最后,几个操作工使您可以直接调整请求:limitRate和limitRequest。
a) limitRate(N)拆分下游请求,以便将它们以较小的批次传播到上游。(N的意思是,对于请求,每次划分成M/N进行向上游请求,最多请求次数<=N)。例如,对limitRate(10)发出的100个请求最多将导致10个10的请求传播到上游。注意,在这种形式下,limitRate实际上实现了前面讨论的补充优化。
b) 操作工具有一个变体,还可以让您调整补给量(在变体中称为lowTide):limitRate(highTide, lowTide)。(P.s.个人理解,请求次数>=lowTide且<=highTide)。选择lowTide为0会导致严格的lowTide请求批次,而不是用补充策略重新划出分支批次(请求批次必须严格的为highTide次)。
c) 另一方面,limitRequest(N)将下游请求限制为最大总请求数。它会将请求总计为N。如果单个请求未使总需求超过N,则该特定请求将完全传播到上游。在源发出该数量的回复后,limitRequest认为序列已完成,向下游发送onComplete信号,然后取消源。
程序化地创建一个序列
我们通过编程定义其关联事件(onNext, onError和onComplete)来介绍Flux或Mono的创建。所有这些方法都基于一个事实,它们公开一个API来触发我们称为接收器的事件。实际上有一些接收器变体,稍后我们将介绍。
同步:generate:
以编程方式创建Flux的最简单形式是通过generate方法,该方法带有一个generator函数。 这用于同步推送和逐个推送,这意味着接收器是SynchronousSink(翻译:同步连接槽),并且其next()方法在每次回调调用中最多只能调用一次。 然后,您可以另外调用error(Throwable)或complete(),但这是可选的。 最有用的变体可能是可以让您保持在接收器使用中可以参考的状态,以确定接下来要推送什么。 然后,生成器函数变为BiFunction\\, S>,而状态对象的类型为\\<S>。 您必须为初始状态提供Supplier\\<S>,并且您的生成器函数现在在每个回合中都返回新状态。
例如,您可以将int用作状态:
Flux<String>flux=Flux.generate(()->0,// 初始值为0
(state,sink)->// Supplier<S>函数
sink.next("3 x "+state+" = "+3*state);// 在连接槽上调用next()即可实现向Flux添加元素
if(state==10)sink.complete();returnstate+1;);
您也可以使用可变的\\<S>。例如,可以使用单个AtomicLong作为状态来重写上面的示例,并在每个回合中对其进行更改:
Flux<String>flux=Flux.generate(AtomicLong::new,(state,sink)->longi=state.getAndIncrement();sink.next("3 x "+i+" = "+3*i);// next()方法会添加元素在Flux里面
if(i==10)sink.complete();returnstate;);
如果您的状态对象需要清理一些资源,请使用generate(Supplier\\, BiFunction\\, S>, Consumer\\<S>)变体清理最后一个状态实例。
下面的示例使用包含Consumer\\<S>的generate方法:
Flux<String>flux=Flux.generate(AtomicLong::new,(state,sink)->longi=state.getAndIncrement();sink.next("3 x "+i+" = "+3*i);if(i==10)sink.complete();returnstate;,(state)->System.out.println("state: "+state));
我们将最后一个状态值(11)视为此Consumer lambda的输出。 在数据库连接或资源需要在最后的场景下,消费者lambda可以关闭该连接或以其他方式处理应在过程结束时完成的任何任务。
异步和多线程:create:
create是Flux的程序化创建的一种更高级的形式,它适用于每轮多次推送数据,甚至来自多个线程创建。
它暴露一个FluxSink及其next(),error()和complete()方法。与generate相反,它没有基于状态的变体。另一方面,它可以触发回调中的多线程事件。
注意:create不会并行化您的代码,也不会使其异步,即使它可以与异步API一起使用。 如果在create lambda中发生了阻塞,则会使自己陷入死锁和类似的副作用。
即使使用了SubscribeOn,也需要注意的是,长时间阻塞的create lambda(例如,调用sink.next(t)的无限循环)可以锁定管道:
由于循环使同一线程处于饥饿状态,因此请求将一直不会执行下去(请求也在此线程执行)。 使用subscriptionOn(Scheduler, false)变体:requestOnSeparateThread = false将使用Scheduler线程进行create,并且仍然可以通过在原始线程中执行请求来让数据流动。
此外,由于create可以桥接异步API并管理负压,因此您可以通过指示OverflowStrategy来优化如何进行负压行为:
a) IGNORE完全忽略下游负压请求。当队列在下游充满时,这可能会产生IllegalStateException异常。
b) ERROR, 当下游无法跟上时,将发出ERROR信号来表示一个IllegalStateException异常。
c) DROP, 如果下游还没有准备好接收数据,则将上游发来的信号丢弃。
d) LATEST, 让下游只从上游获取最新信号。
e) BUFFER, 如果下游无法跟上,则缓冲所有信号。(这会无限缓冲,并可能导致OutOfMemoryError异常)。
Mono也具有create生成器。Mono创建的MonoSink不允许多次发出数据。它将在第一个数据之后丢弃所有数据。
异步单线程:push:
推送是generate和create之间的妥协,适用于处理单个生产者的事件。 从某种意义上说,它与create类似,因为它也可以是异步的,并且可以使用create支持的任何溢出策略来管理负压。 但是,一次只有一个生产线程可以调用next(),complete()或error()。
Flux<String>bridge=Flux.push(sink->myEventProcessor.register(newSingleThreadEventListener<String>()publicvoidonDataChunk(List<String>chunk)for(Strings:chunk)sink.next(s);publicvoidprocessComplete()sink.complete();publicvoidprocessError(Throwablee)sink.error(e);););
一种混合式的pull/push模式:
大多数Reactor操作工(例如create)都遵循混合推/拉模型。
我们的意思是,尽管大多数处理都是异步的(建议采用推送方法),但其中有一个很小的拉组件:请求。
消费者从数据源中拉取数据的意思是:直到首次请求之前发布者都不会发出任何东西。只要有可用数据,源就会将数据推送到消费者,但要在其请求数量的范围内。
请注意,push()和create()都允许设置onRequest消费者以管理请求量,并确保仅在有待处理的请求时才通过连接槽推送数据。 两个回调onDispose和onCancel在取消或终止时执行清理操作。
当Flux完成,出现错误或被取消时,可以使用onDispose执行清理。onCancel可用于执行特定于取消的任何操作,它会先于onDispose清理。
Flux<String>bridge=Flux.create(sink->sink.onRequest(n->channel.poll(n)).onCancel(()->channel.cancel()).onDispose(()->channel.close()));
handle(处理):
handle方法有点不同:它是一个实例方法,这意味着它被链接到现有的源上(常见的操作工也是如此)。 它存在于Mono和Flux中。从某种意义上说,它更像使用SynchronousSink生成序列,并且仅允许逐个的推送数据。 但是,可以使用handle从每个源元素中生成任意值,可能会跳过某些元素。这样,它可以用作map和filter的组合。
handle的签名如下:
Flux<R>handle(BiConsumer<T,SynchronousSink<R>>);
让我们考虑一个例子。响应式流规范不允许序列中出现空值。如果要执行map但想使用预先存在的方法作为map函数,该方法有时返回null怎么办?例如,以下方法可以安全地应用于整数源:
publicStringalphabet(intletterNumber)if(letterNumber<1||letterNumber>26)returnnull;intletterIndexAscii='A'+letterNumber-1;return""+(char)letterIndexAscii;
然后,我们可以使用handle删除任何空值:
Flux<String>alphabet=Flux.just(-1,30,13,9,20).handle((i,sink)->Stringletter=alphabet(i);if(letter!=null)sink.next(letter););alphabet.subscribe(System.out::println);
线程和程序调度器
像RxJava一样,Reactor可以被视为与并发无关的。也就是说,它不强制执行并发模型。 相反,它使您(开发人员)处于命令状态。没有库帮助您进行并发。获得Flux或Mono并不一定意味着它在专用线程中运行。 取而代之的是,大多数操作工会继续在执行前一个操作工的线程中工作。除非指定,否则最顶层的操作工(源)本身运行在进行subscribe()调用的线程上。
以下示例在新线程中运行Mono:
publicstaticvoidmain(String[]args)throwsInterruptedExceptionfinalMono<String>mono=Mono.just("hello ");// Mono<String>在主线程中完成装配
Threadt=newThread(()->mono.map(msg->msg+"thread ").subscribe(v->// 在新线程中完成订阅
System.out.println(v+Thread.currentThread().getName())// 结果,map和filter都在新的线程执行
))t.start();t.join();
在Reactor中,执行模型以及执行的位置由所使用的调度程序确定。 调度程序具有类似于ExecutorService的调度职责,但是具有专用的抽象使其可以做更多的事情,尤其是充当时钟并启用更广泛的实现方式(测试的虚拟时间,蹦床或即时调度等)。 Schedulers类具有以下可访问执行上下文的静态方法:
a) 没有执行上下文(Schedulers.immediate()): 在处理时,将直接执行提交的Runnable,从而在当前线程上有效地运行它们(可以视为“空对象”或无操作调度程序)。
b) 单个可重用线程(Schedulers.single())。请注意,此方法对所有调用方都使用相同的线程,直到调度程序被释放为止。如果您需要每次调用一个专用线程,请对每个调用使用Schedulers.newSingle()。
c) 无限制的弹性线程池(Schedulers.elastic())。随着Schedulers.boundedElastic()的引入,Schedulers.boundedElastic()不再是首选方法,因为它倾向于隐藏背压问题并导致线程过多(请参见下文)。
d) 有界弹性线程池(Schedulers.boundedElastic())。像其前身elastic()一样,它根据需要创建新的工作池并重用空闲的工作池。闲置时间过长(默认值为60s)的工作池也将被丢弃。与其前身的elastic()有所不同,它对可以创建的线程数进行限制(默认为CPU核心数x 10)。达到上限后,最多可再提交10万个任务,并在有线程可用时重新调度(当任务被设置延迟执行时,延迟计时是在线程可用时开始)。这是I/O阻塞任务的更好选择。Schedulers.boundedElastic()是一种为阻塞处理提供自己的线程的简便方法,这样它就不会占用其他资源。
e) 为并行工作而调整的固定工作线程池(Schedulers.parallel())。它创建的工作线程数量与CPU内核数量一样多。
此外,您可以使用Schedulers.fromExecutorService(ExecutorService)从任何预先存在的ExecutorService中创建一个Scheduler。(尽管不鼓励这样做,但您也可以从执行器创建一个。) 您还可以使用newXXX方法创建各种调度程序类型的新实例。例如,Schedulers.newParallel(yourScheduleName)创建一个名为yourScheduleName的新并行调度程序。 虽然boundedElastic可以帮助处理无法避免的传统阻塞代码,但single和parallel则不是。 在single和parallel中使用Reactor阻塞API(block(), blockFirst(), blockLast()(以及在默认的single和parallel调度程序中迭代toIterable()或toStream())会导致引发IllegalStateException)。 通过创建实现NonBlocking标记接口的Thread实例调度程序,只能为“仅非阻塞”。(实现了NonBlocking接口的自定义调度程序不能阻塞执行,否则会报异常)。 默认情况下,某些操作工使用Schedulers中的特定调度程序(通常会为您提供替换为其他调度程序的选项)。 例如,调用Flux.interval(Duration.ofMillis(300))工厂方法将生成一个Flux,每300ms滴答一次。 默认情况下,此功能由Schedulers.parallel()执行。
以下将Scheduler更改为能起到类似功能的Schedulers.single()的新实例:
Flux.interval(Duration.ofMillis(300),Schedulers.newSingle("test"));
Reactor提供了两种在响应式链中切换执行上下文(或Scheduler)的方式:
publishOn和subscribeOn
两者都使用调度程序,并允许您将执行上下文切换到该调度程序。
但是publishOn在链中的位置很重要,而subscribeOn的位置并不重要。要了解这种差异,您首先必须记住,在您订阅之前什么都不会发生。 在Reactor中,当您链接操作工时,可以根据需要将尽可能多的Flux和Mono实现包装在一起。 订阅后,将创建一个订阅者对象链,向后(向上)到第一个发布者。这实际上对您是隐藏的。您只能看到Flux(或Mono)和Subscription的外层,但是这些中间操作工特定的订阅者才是真正执行任务的组件。
publishOn方法:
publishOn会把它后面的操作工转换到新的可执行上下文中。 publishOn在订阅者链的中间以与其他任何操作工相同的方式运行。它从上游获取信号并在下游释放它们,同时在关联的Scheduler的worker上执行回调。 因此,它将影响后续操作工的执行位置(直到链接了另一个publishOn),如下所示:
a) 将执行上下文更改为调度程序选择的一个线程。
b) 根据规范,onNext调用是按顺序发生的,因此会用完一个线程。
c) 除非他们在特定的Scheduler上工作,否则publishOn之后的运算符将继续在同一线程上执行。
以下示例使用publishOn方法:
Schedulers=Schedulers.newParallel("parallel-scheduler",4);// 创建一个由四个线程实例支持的新调度程序。(1)
finalFlux<String>flux=Flux.range(1,2).map(i->10+i)// 第一个映射在(5)中的匿名线程上运行。
.publishOn(s)// publishOn将整个序列切换到(1)Thread上。
.map(i->"value "+i);// 第二个映射在(1)的线程上运行。
newThread(()->flux.subscribe(System.out::println));//这个匿名线程是进行订阅的线程。打印发生在最新的执行上下文中,这是publishOn中的内容。(5)
subscribeOn方法:
当构造后面的链时,subscribeOn适用于订阅处理。 因此,无论您将subscribeOn放置在链中的什么位置,它始终会影响源发射的上下文。 但是,这不会影响随后对publishOn的调用的行为-它们仍会在publishOn之后切换链中该部分的执行上下文。
subscribeOn作用:
a) 更改整个操作工链所订阅的源发布者所处的线程
b) 从调度程序中选择一个线程
注意:实际上仅考虑链中最早的SubscribeOn调用。 以下示例使用subscribeOn方法:
Schedulers=Schedulers.newParallel("parallel-scheduler",4);// 创建一个由4个线程支持的并行线程
finalFlux<String>flux=Flux.range(1,2).map(i->10+i)// 此map运行在4个线程之一
.subscribeOn(s)// 把订阅时的初始化订阅线程进行切换
.map(i->"value "+i);// 此map所在的线程和前面的操作工所在的线程一致
newThread(()->flux.subscribe(System.out::println));//本来初始化应该在这个匿名线程执行的,可是因为subscribeOn的存在,切换到了4个线程之一。
处理错误
异常会沿着链一直向下传播,直到到订阅者的onError()方法,所以应该预处理或重写onError()方法
P.s.其实Reactor官方对于异常的处理有很多,我比较懒,就不看了,只看了最开头的建议。
处理器
处理器既当爹又当妈,既是发布者又是订阅者。
官方并不建议使用,因为使用难度大且适用于狭义的场景。与其直接使用Reactor处理器,不如直接调用一次sink()来获得处理器的连接槽,这是一个好习惯。
嗯...暂时跳过,日后再看。
测试
更新中...
调试Reactor
更新中...
监测Reactor指标参数
更新中...
高级特性和定义
更新中...
再附属上部分自己写的代码
importorg.reactivestreams.Subscription;importreactor.core.Disposable;importreactor.core.Disposables;importreactor.core.publisher.*;importreactor.core.scheduler.Scheduler;importreactor.core.scheduler.Schedulers;importreactor.util.context.Context;importjava.util.Arrays;importjava.util.List;importjava.util.concurrent.Callable;importjava.util.concurrent.Flow;importjava.util.function.BiConsumer;importjava.util.function.BiFunction;importjava.util.function.Consumer;/**
* @author SuanCaiYv
* @time 2020/3/11 上午12:21
*/publicclassMainpublicstaticvoidmain(String[]args)Flux<String>fluxStr1=Flux.just("Hello","Reactor","And","Me");// 直接push进去
List<String>listTemp=Arrays.asList("I","am","a","list");Flux<String>fluxStr2=Flux.fromIterable(listTemp);// 从可迭代的类型创建
Mono<String>monoStr1=Mono.empty();// 创建空时,不论泛型类型
Mono<String>monoStr2=Mono.just("text");Flux<Integer>fluxInt1=Flux.range(2,5);// 一个包含2, 3, 4, 5, 6五个数的Flux
/*
* 常见的静态方法还有
* fromArray()
* fromStream()
* merge()
* zip()
*//*
* 定义生成器
*/BiFunction<Integer,SynchronousSink<String>,Integer>biFunction1=newBiFunction<Integer,SynchronousSink<String>,Integer>()@OverridepublicIntegerapply(Integerinteger,SynchronousSink<String>stringSynchronousSink)stringSynchronousSink.next("No. "+integer);// 调用"连接槽"的next()方法进行插入值
if(integer>5)stringSynchronousSink.complete();returninteger+1;;/*
* 初始化用
*/Callable<Integer>callable1=newCallable<Integer>()@OverridepublicIntegercall()throwsExceptionreturn0;;// 生成序列, 另一个generate()是generate(Consumer<SynchronousSink<T>> generator);
Flux<String>fluxStr3=Flux.generate(callable1,biFunction1);Consumer<Integer>consumer3=t->System.out.println("No. "+t+"is done.");;// consumer3负责在调用了onComplete()之后进行清理
Flux<String>fluxStr4=Flux.generate(callable1,biFunction1,consumer3);Consumer<FluxSink<String>>consumer5=t->for(inti=0;i<5;++i)t.next("No. "+i);t.complete();;// 异步多线程创建一个序列
Flux<String>fluxStr5=Flux.create(consumer5,FluxSink.OverflowStrategy.BUFFER);fluxStr5.subscribe(System.out::println);// 和create区别就是它是单线程创建的
Flux<String>fluxStr6=Flux.push(consumer5,FluxSink.OverflowStrategy.BUFFER);BiConsumer<Integer,SynchronousSink<Integer>>biConsumer2=newBiConsumer<Integer,SynchronousSink<Integer>>()@Overridepublicvoidaccept(Integerinteger,SynchronousSink<Integer>integerSynchronousSink)if(integer>5)return;integerSynchronousSink.next(integer);;// handle是一个实例方法,可以进行过滤和转换,所以它也可以生成一个序列
Flux<Integer>fluxInt2=fluxInt1.handle(biConsumer2);fluxInt2.subscribe(System.out::println);fluxStr3.subscribe(newMyBaseSubscriber());classMyBaseSubscriberextendsBaseSubscriber<String>@OverrideprotectedvoidhookOnSubscribe(Subscriptionsubscription)System.out.println("我要开始订阅啦!");request(1);// 必须请求至少一个,不然会卡住不动
@OverrideprotectedvoidhookOnNext(Stringvalue)System.out.println("嘿!看我拿到了啥? "+value);request(1);
写在最后,鄙人能力有限,这里有一篇高质量的文章,我放在这里,供大家参考:使用 Reactor 进行反应式编程(IBM)
以上是关于Reactor响应式编程的主要内容,如果未能解决你的问题,请参考以下文章
浅谈java响应式编程以及Reactor 3框架(内有福利)
(17)Reactor的调试——响应式Spring的道法术器