什么是反应式编程 (Reactive programming)? 这里有你想要了解的反应式编程

Posted Kotlin 开发者社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了什么是反应式编程 (Reactive programming)? 这里有你想要了解的反应式编程相关的知识,希望对你有一定的参考价值。

理解反应式编程

想象一下,如果他们告诉你:“因为你支付的是一整年的订阅费用,而现在这一年还没有结束,当这一年结束时,你肯定可以一次性完整地收到它们。”那么你会有多么惊讶。值得庆幸的是,这并非订阅的真正运作方式。报纸具有一定的时效性。在出版后,报纸需要及时投递,以确保在阅读它们时内容仍然是新鲜的。此外,当你在阅读最新一期的报纸时,记者们正在为未来的版本撰写内容,同时印刷机正在满速运转,印刷下一期的内容——一切都是并行的。在开发应用程序代码时,我们可以编写两种风格的代码,即命令式和反应式。

•命令式(Imperative)的代码:非常类似于上文所提的虚构的报纸订阅方式。它由一组任务组成,每次只运行一项任务,每项任务又都依赖于前面的任务。数据会按批次进行处理,在前一项任务还没有完成对当前数据批次的处理时,不能将这些数据递交给下一项处理任务。

•反应式(Reactive)的代码:非常类似于真实的报纸订阅方式。它定义了一组用来处理数据的任务,但是这些任务可以并行地执行。每项任务处理数据的一部分子集,并将结果交给处理流程中的下一项任务,同时继续处理数据的另一部分子集。

反应式编程简介

What

反应式编程(Reactive programming,Rx)最初来源于函数式语言里面的函数式反应编程(Functional Reactive programming,FRP)。后来随着微软.Net Framework增加了Reactive Extension而在主流语言中流行起来。

反应式编程是一种编程思想、编程方式,是为了简化并发编程而出现的。与传统的处理方式相比,它能够基于数据流中的事件进行反应处理。例如:a+b=c的场景,在传统编程方式下如果a、b发生变化,那么我们需要重新计算a+b来得到c的新值。而反应式编程中,我们不需要重新计算,a、b的变化事件会触发c的值自动更新。这种方式类似于我们在消息中间件中常见的发布/订阅模式。由流发布事件,而我们的代码逻辑作为订阅方基于事件进行处理,并且是异步处理的。

反应式编程中,最基本的处理单元是事件流(事件流是不可变的,对流进行操作只会返回新的流)中的事件。流中的事件包括正常事件(对象代表的数据、数据流结束标识)和异常事件(异常对象,例如Exception)。同时,只有当订阅者第一次发布者,发布者发布的事件流才会被消费,后续的订阅者只能从订阅点开始消费,但是我们可以通过背压、流控等方式控制消费。

常用的反应式编程实现类库包括:Reactor、RxJava 2,、Akka Streams、Vert.x以及Ratpack。本文基于Reactor (由于Reactor有Spring背书,同时反应式编程已经集成于Java 9)。

反应式编程与Java8提供的Streams有众多相似之处(尤其是API上),且提供了相互转化的API。但是反应式编程更加强调异步非阻塞,通过onComplete等注册监听的方式避免阻塞,同时支持delay、interval等特性。而Streams本质上是对集合的并行处理,并不是非阻塞的。

Why

反应式编程的核心是基于事件流、无阻塞、异步的,使用反应式编程不需要编写底层的并发、并行代码。并且由于其声明式编写代码的方式,使得异步代码易读且易维护。

How

基本概念

  • Flux,是Reactor中的一种发布者,包含0到N个元素的异步序列。通过其提供的操作可以生成、转换、编排序列。如果不触发异常事件,Flux是无限的。

  • Mono,是Reactor中的一种发布者,包含0或者1个的异步序列。可以用于类似于Runnable的场景。
    背压(backpressure),由订阅者声明的、限定本消费者可处理的流中的元素个数。

操作

所有的流都是不可变的,所以对流的操作都会返回一个新的流。

创建(数据流模型)

just,根据参数创建数据流
never,创建一个不会发出任何数据的无限运行的数据流
empty,创建一个不包含任何数据的数据流,不会无限运行。
error,创建一个订阅后立刻返回异常的数据流
concact,从多个Mono创建Flux
generate,同步、逐一的创建复杂流。重载方法支持生成状态。在方法内部的lambda中通过调用next和complete、error来指定当前循环返回的流中的元素(并不是return)。
create,支持同步、异步、批量的生成流中的元素。
zip,将多个流合并为一个流,流中的元素一一对应
delay,Mono方法,用于指定流中的第一个元素产生的延迟时间
interval,Flux方法,用于指定流中各个元素产生时间的间隔(包括第一个元素产生时间的延迟),从0开始的Long对象组成的流
justOrEmpty,Mono方法,用于指定当初始化时的值为null时返回空的流
defaultIfEmpty,Mono方法,用于指定当流中元素为空时产生的默认值
range,生成一个范围的Integer队列

转化(就是一些标准函数算子)

map,将流中的数据按照逻辑逐个映射为一个新的数据,当流是通过zip创建时,有一个元组入参,元组内元素代表zip前的各个流中的元素。
flatMap,将流中的数据按照逻辑逐个映射一个新的流,新的流之间是异步的。
take,从流中获取N个元素,有多个扩展方法。
zipMap,将当前流和另一个流合并为一个流,两个流中的元素一一对应。
mergeWith,将当前流和另一个流合并为一个流,两个流中的元素按照生成顺序合并,无对应关系。
join,将当前流和另一个流合并为一个流,流中的元素不是一一对应的关系,而是根据产生时间进行合并。
concactWith,将当前流和另一个流按声明顺序(不是元素的生成时间)链接在一起,保证第一个流消费完后再消费第二流
zipWith,将当前流和另一个流合并为一个新的流,这个流可以通过lambda表达式设定合并逻辑,并且流中元素一一对应
first,对于Mono返回多个流中,第一个产生元素的Mono。对于Flux,返回多个Flux流中第一个产生元素的Flux。
block,Mono和Flux中类似的方法,用于阻塞当前线程直到流中生成元素
toIterable,Flux方法,将Flux生成的元素返回一个迭代器
defer,Flux方法,用于从一个Lambda表达式获取结果来生成Flux,这个Lambda一般是线程阻塞的
buffer相关方法,用于将流中的元素按照时间、逻辑规则分组为多个元素集合,并且这些元素集合组成一个元素类型为集合的新流。
window,与buffer类似,但是window返回的流中元素类型还是流,而不是buffer的集合。
filter,顾名思义,返回负责规则的元素组成的新流
reduce,用于将流中的各个元素与初始值(可以设置)逐一累积,最终得到一个Mono。

其他

doOnXXX,当流发生XXX时间时的回调方法,可以有多个,类似于监听。XXX包括Subscribe、Next、Complete、Error等。
onErrorResume,设置流发生异常时返回的发布者,此方法的lambda是异常对象
onErrorReturn,设置流发生异常时返回的元素,无法捕获异常
then,返回Mono,跳过整个流的消费
ignoreElements,忽略整个流中的元素
subscribeOn,配合Scheduler使用,订阅时的线程模型。
publisherOn,配合Scheduler使用,发布时的线程模型。
retry,订阅者重试次数

异步 Web 框架

异步的Web框架能够以更少的线程获得更高的可扩展性,通常它们只需要与CPU核心数量相同的线程。通过使用所谓的事件轮询(event looping)机制(如图11.1所示),这些框架能够用一个线程处理很多请求,这样每次连接的成本会更低。

什么是反应式编程 (Reactive programming)? 这里有你想要了解的反应式编程

在事件轮询中,所有事情都是以事件的方式来进行处理的,包括请求以及密集型操作(如数据库和网络操作)的回调。当需要执行成本高昂的操作时,事件轮询会为该操作注册一个回调,这样操作可以并行执行,而事件轮询则会继续处理其他的事件。当操作完成时,事件轮询机制会将其作为一个事件,这一点与请求是相同的。这样达到的效果就是,在面临大量负载的时候,异步Web框架能够以更少的线程实现更好的可扩展性,这样会减少线程管理的开销。Spring 5引入了一个非阻塞、异步的Web框架,该框架在很大程度上是基于Reactor项目的,能够解决Web应用和API中对更好的可扩展性的需求。接下来我们看一下Spring WebFlux:面向Spring的反应式Web框架。

当Spring团队思考如何向Web层添加反应式编程模型时,如果不在Spring MVC中做大量工作,显然很难实现这一点。这会在代码中产生分支以决定是否要以反应式的方式来处理请求。如果这样做,本质上就是将两个Web框架打包成一个,依靠if语句来区分反应式和非反应式。与其将反应式编程模型硬塞进Spring MVC中,还不如创建一个单独的反应式Web框架,并尽可能多地借鉴Spring MVC。这样,Spring WebFlux就应运而生了。Spring 5定义的完整Web开发技术栈如图11.2所示。


什么是反应式编程 (Reactive programming)? 这里有你想要了解的反应式编程

在图11.2的左侧,我们会看到Spring MVC技术栈,这是Spring框架2.5版本就引入的。SpringMVC 建立在Java Servlet API之上,因此需要Servlet容器(比如Tomcat)才能执行。

与之不同,Spring WebFlux(在图11.2的右侧,和Spring MVC系出同门,并且很多核心组件都是公用的)并不会绑定Servlet API,所以它构建在Reactive HTTP API之上,这个API与ServletAPI具有相同的功能,只不过是采用了反应式的方式。因为Spring WebFlux没有与Servlet API耦合,所以它的运行并不需要Servlet容器。它可以运行在任意非阻塞Web容器中,包括Netty、Undertow、Tomcat、Jetty或任意Servlet 3.1及以上的容器。

在图11.2中,最值得注意的是左上角,它代表了Spring MVC和Spring WebFlux公用的组件,主要用来定义控制器的注解。因为Spring MVC和Spring WebFlux会使用相同的注解,所以SpringWebFlux与Spring MVC在很多方面并没有区别。右上角的方框表示另一种编程模型,它使用函数式编程范式来定义控制器,而不是使用注解。

Spring MVC和Spring WebFlux之间最显著的区别在于函数式Web编程模型。

什么是反应式编程 (Reactive programming)? 这里有你想要了解的反应式编程

在使用Spring WebFlux时,我们需要添加Spring Boot WebFlux starter依赖项.

反应式宣言(The Reactive Manifesto)

反应式系统是:

响应:该系统及时响应,如果在所有可能的。响应能力是可用性和实用性的基石,但更重要的是,响应能力意味着可以快速发现问题并进行有效处理。响应系统专注于提供快速且一致的响应时间,建立可靠的上限,以便它们提供一致的服务质量。这种一致的行为又简化了错误处理,建立了最终用户的信心,并鼓励了进一步的交互。

弹性:面对故障时,系统保持响应能力。这不仅适用于高可用性,关键任务系统,任何非弹性的系统在发生故障后都将无响应。弹性是通过复制,遏制,隔离和委派实现的。故障包含在每个组件中,使组件彼此隔离,从而确保系统的各个部分可以发生故障并可以恢复而不会损害整个系统。每个组件的恢复都委派给另一个(外部)组件,并在必要时通过复制来确保高可用性。组件的客户端不承担处理其故障的负担。

弹性:系统在变化的工作负载下保持响应能力。无功系统可以通过增加或减少分配给这些输入的资源来对输入速率的变化做出反应。这意味着没有争用点或中央瓶颈的设计,从而具有分片或复制组件并在其中分配输入的能力。反应性系统通过提供相关的实时性能指标来支持预测性和反应性缩放算法。它们在商品硬件和软件平台上以经济高效的方式实现了弹性。

消息驱动:响应式系统依靠异步 消息传递在组件之间建立边界,以确保松散的耦合,隔离和位置透明性。此边界还提供了将故障委派为消息的方法。通过使用显式消息传递,可以通过成形和监视系统中的消息队列并在必要时施加背压来实现负载管理,弹性和流量控制。位置透明消息传递作为一种通信手段,使得故障管理有可能在整个集群或单个主机内以相同的构造和语义进行工作。不阻塞通信允许接收者仅在活动状态下消耗资源,从而减少了系统开销。

大型系统由较小的系统组成,因此取决于其组成部分的反应性。这意味着反应式系统将应用设计原则,以便这些属性可应用于所有级别的规模,从而使其可组合。世界上最大的系统依赖于基于这些属性的体系结构,每天满足数十亿人的需求。现在是时候从一开始就有意识地应用这些设计原则,而不是每次都重新发现它们。

英文阅读能力提升:

Reactive Systems are:

Reactive Systems are:
Responsive: The system responds in a timely manner if at all possible. Responsiveness is the cornerstone of usability and utility, but more than that, responsiveness means that problems may be detected quickly and dealt with effectively. Responsive systems focus on providing rapid and consistent response times, establishing reliable upper bounds so they deliver a consistent quality of service. This consistent behaviour in turn simplifies error handling, builds end user confidence, and encourages further interaction.

Resilient: The system stays responsive in the face of failure. This applies not only to highly-available, mission-critical systems — any system that is not resilient will be unresponsive after a failure. Resilience is achieved by replication, containment, isolation and delegation. Failures are contained within each component, isolating components from each other and thereby ensuring that parts of the system can fail and recover without compromising the system as a whole. Recovery of each component is delegated to another (external) component and high-availability is ensured by replication where necessary. The client of a component is not burdened with handling its failures.

Elastic: The system stays responsive under varying workload. Reactive Systems can react to changes in the input rate by increasing or decreasing the resources allocated to service these inputs. This implies designs that have no contention points or central bottlenecks, resulting in the ability to shard or replicate components and distribute inputs among them. Reactive Systems support predictive, as well as Reactive, scaling algorithms by providing relevant live performance measures. They achieve elasticity in a cost-effective way on commodity hardware and software platforms.

Message Driven: Reactive Systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation and location transparency. This boundary also provides the means to delegate failures as messages. Employing explicit message-passing enables load management, elasticity, and flow control by shaping and monitoring the message queues in the system and applying back-pressure when necessary. Location transparent messaging as a means of communication makes it possible for the management of failure to work with the same constructs and semantics across a cluster or within a single host. Non-blocking communication allows recipients to only consume resources while active, leading to less system overhead.

Large systems are composed of smaller ones and therefore depend on the Reactive properties of their constituents. This means that Reactive Systems apply design principles so these properties apply at all levels of scale, making them composable. The largest systems in the world rely upon architectures based on these properties and serve the needs of billions of people daily. It is time to apply these design principles consciously from the start instead of rediscovering them each time.

参考资料

https://baike.baidu.com/item/%E5%8F%8D%E5%BA%94%E5%BC%8F%E7%BC%96%E7%A8%8B/19928105
https://blog.csdn.net/li_xiao_dai/article/details/80841642
https://www.reactivemanifesto.org
Spring 实战(第 5 版)


Kotlin 开发者社区

越是喧嚣的世界,越需要宁静的思考。


以上是关于什么是反应式编程 (Reactive programming)? 这里有你想要了解的反应式编程的主要内容,如果未能解决你的问题,请参考以下文章

Reactive(响应式)编程

干货 | Reactive模式优势与实战

函数式响应式编程-FRP简介

在Reactive Streams Specification 1.0发布之后,jdbc规范也会被反应吗?

反应式编程和消息队列的区别

你!学Java的,除了Spring你还知道什么