1小时让你掌握响应式编程,并入门Reactor
Posted 编程新说
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1小时让你掌握响应式编程,并入门Reactor相关的知识,希望对你有一定的参考价值。
我看同步阻塞
“你知道什么是同步阻塞吗”,当然知道了。“那你怎么看它呢”,这个。。。
在同步阻塞的世界里,代码执行到哪里,数据就跟到哪里。如果数据很慢跟不上来,代码就停在那里等待数据的到来,然后再带着数据一起往下执行。
可以说是,代码执行和数据是结伴而行,不离不弃。执子之手与子偕老。让人老感动了。
如果还不太理解的话,可以认为代码执行其实就是一些行为动作,这些行为动作的目的就是为了获取/操作数据。
例如加法,这里的行为动作就是执行相加,数据就是加数和被加数。操作结果就是得到了另一个数据,即两个数的和。
只是在这个加法里,数据跑的特别快,(CPU的寄存器,能不快吗),我们几乎觉察不到执行动作在等数据的过程。怎么办呢,那就看一个能把它们拉开的例子。
那自然非数据库查询莫属了,既有网络I/O,又有磁盘I/O,肯定会慢一些。
假设我的业务是这样的,代码先去数据库查询一个用户,接着修改用户的密码,然后再更新回数据库,最后代码返回成功。
如果网速和数据库都很慢的话,可能是这样的。代码执行一个查询数据库动作,然后等啊等啊等,等的花都谢了,终于数据库把用户返回过来了,接着,代码飞快的修改了密码,并执行一个更新数据库的动作,然后又是等啊等啊等,等的花又开了,数据库终于回话了,更新成功。然后代码返回成功,全部执行完了。
所以同步阻塞代码的最大特点就是,带着数据上路,数据不到位就阻塞住。
最后来个小小的升华:
●所谓同步就是快的等慢的,然后一起往前走,表示的是目的。
●所谓阻塞就是想办法让快的停滞不前,等待慢的到来,表示的是手段。
一言以蔽之,同步是目的,阻塞是手段。
我看异步非阻塞
我看响应式
所谓响应式就是外界发生了变化,你要做出反应。所以响应式编程就是围绕着变化来构建的。
如何收集到原始变化,如何把这个变化告知相关处理者,处理者如何做出反应,做出反应的过程其实就是引发了新的变化,这个新的变化又该如何被收集,又该如何告知下一个处理者,如此往复,直至全部结束。
可以说整个自然界都是响应式的,因为它们都会对外界的变化或自身的变化产生反应。
先说人类,冷的时候加衣,饿的时候吃饭,病的时候去医院。看到绿色放松,看到蓝色镇定,看到红色易激动。
再说动植物,向日葵围绕太阳转叫趋光性,植物的根系朝水多的地方生长叫趋水性,鸽子可以磁场辨别方向,鲸鱼、海归都可以利用磁场记住自己走过的路。
所以响应式“本身”是一个很简单的模型,你给我一个变化,我做出一个反应。
动植物都有一套完善的感觉器官,能够感受到外界变化。同时他们又有超高的智商或完善的一套生物系统能够对这种变化作出反应。这是数万年甚至数千万年进化的结果,是基因决定的,所以看起来很自然。
再来看看编程界的响应式,也是这两个问题,一是如何知道外界的变化,二是如何对这种变化作出反应。
代码可是没有生命的,那就只能简单粗暴了。如何知道变化,那就让别人告诉你呗。如何做出反应,那就执行一段逻辑代码呗。
别人告诉你就等于异步回调/通知,执行的这段逻辑代码,可以是外界传入的,也可以是自己本身的一个方法。
现在明白了吧,异步非阻塞就是响应式。
最后来个小小升华:
所谓响应式就是一个概念,或是一种编程模式,它并不是一个知识,也不是一个技术。但它需要用到一个技术,那就是实现异步非阻塞的技术。
我看Reactor
在传统的编码中,会将逻辑处理代码写成方法,需要的数据由方法参数传入,处理过的数据由方法的返回值返回。
执行时以main方法为入口点启动,按照一定的顺序执行这些方法,数据依次流入流出每个方法,当所有的方法执行完时,数据也处理完了,就结束了。
整个过程是以逻辑代码的执行为主线,数据只是一个必须的参与者而已,因为代码要处理数据,如果数据不到位,代码就停下来不执行,等待数据的到来。
这就是典型的同步阻塞式的执行过程,非常简单,易于理解,而且代码也很好写。
到目前为止,我们提到的都是响应式的理论,那应该怎样去实现它呢,一时间还真没有头绪。
响应式是异步非阻塞,和同步阻塞应该是相对的。那我们不妨就拿响应式往同步阻塞上套一下,看看能得到什么有价值的发现。
响应式关注两点,变化和反应,而且是变化在前,反应在后。同步阻塞也关注两点,执行逻辑和数据,而且是执行逻辑在前,数据在后。
那就开始建立对应关系。因为“反应”是一系列行为动作,所以应该和“执行逻辑”对应。那“变化”只能和“数据”对应,其实这是对的,“数据”由不可用到可用,本身就是发生了一个“变化”。
这个对应关系建立的很完美,但是逻辑顺序却完全冲突。响应式是由变化主导反应,这很好理解,我都没有变化,你无须做出反应。同步阻塞是由执行逻辑主导数据,这也很好理解,我代码都没执行呢,根本不需要数据。
可见,它们的对应关系非常完美,但主导顺序完全相反,这就是一个非常非常有价值的发现。
因为我们只需把同步阻塞倒过来,就是实现响应式的大致方向。这样的推理貌似是对的,但实际当中是这样的吗?嗯,是这样的。
现在请大家和我一起扭转思维。原来以逻辑代码执行作为主线,数据作为参与者。现在以数据作为主线,逻辑代码执行作为参与者。说的再白一些,原来是数据传递到逻辑代码里,现在是逻辑代码传递到数据里。
有人也许会问,逻辑代码怎么传递?哈哈,Lambda表达式呀,函数式编程呀。
想象一下,有一个长长的管子,里面的水一直在流。
如果你想让水变成橙色的,只需在管子上开个口,加装一个可以持续投放橙色染料的装置,结果流经它的水都变成橙色的了。
如果你想让橙色的水变甜的话,只需在后面的管子上开个口,加装一个可以持续投放白糖的装置,结果流经它的水都变成甜的了。
同理,可以在后面继续加装投放柠檬酸的装置,让水变酸,在后面继续加装压入二氧化碳的装置,让水带气泡。
最后发现,自来水经过多道工序处理后变成了芬达。
如果把水流看作是数据流,把投放装置看作是逻辑代码,就变成了,数据先流入第一个逻辑代码,处理后再流入第二个逻辑代码,依次流下去直至结束。
这就是以数据作为主线,逻辑代码只是参与者,同时它也是Reactor实现响应式编程的原理,Spring官方使用的响应式类库就是Reactor。
其中,“以数据为主线”和“在变化时通知处理者”这两个功能Reactor库都已经实现了,我们需要做的就是“对变化做出反应”,即插入逻辑代码。
Reactor入门
在Reactor中,有两个非常重要的类,就是Mono和Flux,它们都是数据源,在它们内部都已经实现了“以数据为主线”和“在变化时通知处理者”这两个功能,而且还提供了方法让我们来插入逻辑代码用于“对变化做出反应”。
Mono表示0个或1个数据,Flux表示0到多个数据。先从简单的Mono开始。
设计一个简单的示例,首先创建一个数据源,只包含一个数据10,第一个处理就是加1,第二个处理就是奇偶性过滤,第三个处理就是把这个数据消费掉,然后就结束了。
为了清楚地看出来主线程执行的是哪些代码,工作线程执行的是哪些代码,特意打印了很多信息。
public static void main(String[] args) {
displayCurrTime(1);
displayCurrThreadId(1);
//创建一个数据源
Mono.just(10)
//延迟5秒再发射数据
.delayElement(Duration.ofSeconds(5))
//在数据上执行一个转换
.map(n -> {
displayCurrTime(2);
displayCurrThreadId(2);
displayValue(n);
delaySeconds(2);
return n + 1;
})
//在数据上执行一个过滤
.filter(n -> {
displayCurrTime(3);
displayCurrThreadId(3);
displayValue(n);
delaySeconds(3);
return n % 2 == 0;
})
//如果数据没了就用默认值
.defaultIfEmpty(9)
//订阅一个消费者把数据消费了
.subscribe(n -> {
displayCurrTime(4);
displayCurrThreadId(4);
displayValue(n);
delaySeconds(2);
System.out.println(n + " consumed, worker Thread over, exit.");
});
displayCurrTime(5);
displayCurrThreadId(5);
pause();
}
//显示当前时间
static void displayCurrTime(int point) {
System.out.println(point + " : " + LocalTime.now());
}
//显示当前线程Id
static void displayCurrThreadId(int point) {
System.out.println(point + " : " + Thread.currentThread().getId());
}
//显示当前的数值
static void displayValue(int n) {
System.out.println("input : " + n);
}
//延迟若干秒
static void delaySeconds(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//主线程暂停
static void pause() {
try {
System.out.println("main Thread over, paused.");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
以下是输出结果:
1 : 15:00:39.809
1 : 1
5 : 15:00:40.158
5 : 1
main Thread over, paused.
2 : 15:00:45.158
2 : 9
input : 10
3 : 15:00:47.160
3 : 9
input : 11
4 : 15:00:50.162
4 : 9
input : 9
9 consumed, worker Thread over, exit.
可以看到不到1秒钟时间主线程就执行完了。然后5秒后数据从数据源发射出来进入第一步处理,2秒后进入第二步处理,3秒后进入第三步处理,数据被消费掉,就结束了。其中主线程Id是1,工作线程Id是9。
这段代码其实是建立了一个数据通道,在通道的指定位置上插入处理逻辑,等待数据到来。
主线程执行的是建立通道的代码,主线程很快执行完,通道就建好了。此时只是一个空的通道,根本就没有数据。
在数据到来时,由工作线程执行每个节点的逻辑代码来处理数据,然后把数据传入下一个节点,如此反复直至结束。
所以,在写响应式代码的时候,心里一定要默念着,我所做的事情就是建立一条数据通道,在通道上指定的位置插入适合的逻辑处理代码。同时还要切记,主线程执行完时,只是建立了通道,并没有数据。
如果本文内容你没有看懂,那就多看几遍,保证能懂。如果你都看懂了,那恭喜你已经入门响应式编程了。
想更加完整的了解响应式,请看下面文章:
(完)
编程新说
10年架构师用独特的视角说技术
以上是关于1小时让你掌握响应式编程,并入门Reactor的主要内容,如果未能解决你的问题,请参考以下文章