响应式编程前生今世-从规范到实例
Posted 如何在3年拿到50K
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了响应式编程前生今世-从规范到实例相关的知识,希望对你有一定的参考价值。
一、Servlet容器的乌云
Thread per Request Model
servlet容器声明一个线程池处理http request,每当有一个请求进来时,tomcat就会分配一个线程处理该请求的整个生命周期,这意味着一个服务器能同时处理的请求数量和线程池的大小是相等的。
2,每个线程会消耗一定的内存(通常1M),线程池增加时消耗的内存就会越大
3,微服务架构下,应用服务分布在不同服务器,服务器之间通过同步的 request/response模型内部通信也会阻塞,存在巨大的资源浪费。
Blocking can be wastefull
1:多线程
多线程可以提高对cpu资源的利用率,增加整体的io吞吐量。
- 高并发环境下,多线程切换会消耗cpu资源。
- 应对高并发的多线程开发比较难,有些问题难以发现(指令重排)
- 线程本身也会消耗一定内存(1M),更多线程以为更多资源消耗
2:非阻塞
异步不一定能补救callback导致回调接口膨胀,future增加使用难度。
$.ajax({
type: "POST",
url: "/url/path",
data: "",
success: function(msg){
//sucess
}
});
- CallBack Hell
二、升级之路
2.1 lambda表达式(jdk8)
java8 引入函数式编程 定义函数作为一等公民可以复制给变量,作为参数返回值进行传递。lambda是java引入函数式编程的一次革命式尝试。
函数式接口
只包含一个抽象方法
@FunctionalInterface
interface GreetingService
{
void sayMessage(String message);
}
内置接口
函数接口 | 参数类型 | 返回类型 | 用途 |
---|---|---|---|
Consumer | T | void | void accept(T t) |
Supplier | 无 | T | 返回类型为T的对象 T get() |
Function<T,R> | T | R | 对类行为T的对象操作 R apply(T t) |
Predicate | T | boolean | boolean test(T t) |
2.2 java stream(jdk8)
java stream是jdk8和lambda同时提出的抽象层新特性,可以通过声明式的方式处理流式数据。
- 有序队列 steam代表了有序的元素序列,stream只处理元素并不存储元素
- 数据源 Collections, Arrays, I/O 序列
- 聚合操作符 filter, map, limit, reduce, find, match 等等
- Pipelining − 管道操作和终止操作
- 自动迭代器 steam操作会自动迭代内部元素
2.3 FLowAPI(jdk9)
当使用Java1.9时, Reactive Streams已成为官方Java 9 API的一部分,Java9中Flow类下的内容与Reactive Streams完全一致。
不要想太多,仅仅是一些准守响应式编程规范的api而已
https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.html
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
/**
* @ClassName FlowDemo
* @Description 利用jdk9中的FlowAPI演示
* @Author gl
* @Version 1.0
*/
public class FlowDemo {
public static void main(String[] args) throws InterruptedException {
//1, 定义发布者
SubmissionPublisher<Integer> publisher = new SubmissionPublisher();
//2, 定义订阅者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>() {
//控制器,订阅关系的合约
Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
//建立订阅关系时调用
this.subscription=subscription;
subscription.request(10);//获取元素
}
@Override
public void onNext(Integer item) {
//获取到元素时执行
System.out.println("当前元素: "+item);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(10);//获取元素数量
}
@Override
public void onError(Throwable throwable) {
//处理元素时抛出异常
System.out.println("处理异常: "+throwable.getMessage());
subscription.cancel();//停止生产
}
@Override
public void onComplete() {
//处理结束时执行
System.out.println("complete");
}
};
// 绑定发布者和订阅者
publisher.subscribe(subscriber);
// 生产数据
for (int i = 0; i <1000 ; i++) {
publisher.submit(i);
//submit是阻塞方法,只要生产者缓冲池满了,就会阻塞等待缓冲池中元素被消耗
System.out.println("生产数据: "+i);
}
publisher.close();
Thread.sleep(100000);
}
}
2.4 The Reactive Manifesto
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i4OUBlr9-1620015685928)(E://youdao//1601442318(1)].jpg)
三、 Reactive Stream
3.1 Reactive programming概念
总结起来,响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。
- 创建数据池
- 声明管道
- 以stream的方式传递和处理变化
3.2 ReactiveStream Specification(重要)
2013年末的时候, Kaazing, Lightbend, Netflix, Pivotal, Red Hat, Twitter 等公司的工程师们共同发起了关于制定“响应式流规范(Reactive Stream Specification)”的倡议和讨论。对响应式流做了定义,并对实现的目标,范围和核心API做了规范和定义,响应式流规范是很多具体实现框架的总纲。
https://github.com/reactive-streams/reactive-streams-jvm
3.2.1 目标
provide a standard for asynchronous stream processing with non-blocking backpressure.
提供一个异步的,非阻塞的,具备背压机制的标准。
3.2.2 定义
- process a potentially unbounded number of elements
- in sequence,
- asynchronously passing elements between components,
- with mandatory non-blocking backpressure.
3.2.3 核心组件
- 核心API
- 工具包The Technology Compatibility Kit (TCK)
实现着可以完全按照自己的方式实现,只要能够通过TCK的测试。
3.3 核心API解读(重要)
- Publisher
- Subscriber
- Subscription
- Processor
https://github.com/reactive-streams/reactive-streams-jvm
3.3.1 Publisher 发布者
Publisher是能够发出元素的发布者。根据Subcriber的请求发布有序的没有边界的数据流。仅提供一个方法让订阅者能够注册到publisher。
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}
通过Publisher.subscribe(Subscriber)方法可以调用下面方法
onSubscribe onNext* (onError | onComplete)?
3.3.2 Subscriber 订阅者
Subcriber接收元素并处理,当执行发布者的subscribe方法时,就会调用Subcriber的onSucribe方法,并且传入Subcription,借助Subcription可以向发布者请求n个元素或者取消发布。
public interface Subscriber<T> {
//订阅时触发,必须传参Subcription
void onSubscribe(Subscription var1);
//获取下一个元素信号是触发
void onNext(T var1);
//publisher元素产生异常时,发射任务终止
void onError(Throwable var1);
//所有元素已经发射和处理完毕后调用
void onComplete();
}
3.3.3 Subscription 控制器
发布者和订阅者的中间人,其持有的方法可以用于控制发布者的元素发射速率,背压
也是基于其实现
public interface Subscription {
void request(long var1); //请求指定数量的元素
void cancel(); //让publisher停止发射任务
}
Subscription.request and Subscription.cancel 只能在Subcriber内部调用,被一个Publisher和Subcriber共享
3.3.4 Processor 流程控制器
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
同时具备Subcriber和publisher的特性,可以看做是两者的结合体,根据场景有不同的Processor
3.4 Reactive Stream实现框架
ReactiveX
ReactiveX是Reactive Extensions的缩写,一般简写为Rx。Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。RxJava只是其在java语言方向的实现库。
https://github.com/ReactiveX/RxJava
RxJava
- RxJava is a Java VM implementation of ReactiveX (Reactive Extensions): a library for composing asynchronous and event-based programs by using observable sequences.
- Rxjava支持jdk6以上版本的基于JVM实现的多种语言环境,例如:Groovy, Clojure, JRuby, Kotlin and Scala。
https://github.com/ReactiveX/RxJava/wiki
Project Reactor
- Reactor是Pivotal旗下的项目,与大名鼎鼎的Spring是兄弟关系,是Spring Spring Framework 5.0和WebFlux的“御用”响应式编程实现。
- Reactor is a fourth-generation reactive library, based on the Reactive Streamsspecification, for building non-blocking applications on the JVM
- Project Reactor是第四代运行在JVM上的完全非阻塞响应式库,以“背压”的形式管理数据处理,提供了可组合的异步序列APIFlux和Mono。
https://projectreactor.io/
http://ypk1226.com/2019/08/02/reactive/project-reactor/
Others implementations
- Akka Streams完全实现了Reactive Streams规范,但Akka Streams API与Reactive Streams API完全分离。
- Ratpack是一组用于构建现代高性能HTTP应用程序的Java库。Ratpack使用Java 8,Netty和Reactive原则。可以将RxJava或Reactor与Ratpack一起使用。
- Vert.x是一个Eclipse Foundation项目,它是JVM的多语言事件驱动的应用程序框架。Vert.x中的反应支持与Ratpack类似。Vert.x允许我们使用RxJava或其Reactive Streams API的实现。
四、概念区分
declarative VS imperative
- imperative命令式强调How,命令“机器”如何去做事情(how),这样不管你想要的是什么(what),它都会按照你的命令实现。
List<String> name = new ArrayList<>();
List<User> userList = Arrays.asList(new User("zhangsan",33),new User("wangwu",33));
for (User user : userList) {
if(user.getAge()>30){
name.add(user.getName());
}
}
- declaretive 告诉“机器”你想要的是什么(what),让机器决定如何去做(how)。
SELECT * FROM users where age >30
- 当描述的目标变复杂时,声明式语言也不可避免变得更命令式,通过描述过程来描述更多细节
- 通过适当的封装、组件化,命令式也可以变成目标导向,变得更加“声明式”
参考链接:
声明式和命令式的区别
Java Stream VS Reactive Stream
- java Stream是一种同步的API,存在I/O阻塞。
- java stream不具备完善的流的控制能力(ex:backpressure)
而reactive steam是一种异步非阻塞和强大流控制能力的的数据流。
阻塞VS非阻塞 && 异步VS同步
- 阻塞和非阻塞反映的是调用者的状态,当调用者调用了服务提供者的方法后,如果一直在等待结果返回,否则无法执行后续的操作,那就是阻塞状态;如果调用之后直接返回,从而可以继续执行后续的操作,那可以理解为非阻塞的。
- 同步和异步反映的是服务提供者的能力,当调用者调用了服务提供者的方法后,如果服务提供者能够立马返回,并在处理完成后通过某种方式通知到调用者,那可以理解为异步的;否则,如果只是在处理完成后才返回,或者需要调用者再去主动查询处理是否完成,就可以理解为是同步的。
Observer观察者模式 VS iterator迭代器模式
- Iterator 是基于 “拉取”(pull)方式的
- 响应式流是基于“推送”(push)方式的。
响应式系统 VS 响应式编程
- 响应式系统 是一种架构风格architectural style,它允它允许许多独立的应用结合在一起成为一个单元,共同响应它们所处的环境。一般是基于消息驱动的。
- 响应式编程是一种异步编程范式的子集,一般是事件驱动。响应式编程的接口一般是下面2者之一。
- Callback-based—匿名的间接作用side-effecting回调函数被绑定在事件源event sources上,当事件被放入数据流dataflow chain中时,回调函数被调用。
- 声明式的Declarative——通过函数的组合,通常是使用一些固定的函数,像 map、 filter、 fold 等等。
五 从Reactor到WebFLux
5.1 Reactor maven依赖的引入
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.1.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.3.1.RELEASE</version>
</dependency>
5.2 Flux 和 Mono
- Flux 代表0-N个元素
- Mono 代表0-1个元素
Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。
5.3 操作符Operators
操作符是reactor编程中重要的组成,通过操作符之间的配合,可以处理stream流中的一个个元素,加工后得到最终的输出。reactive操作符种类很多,可以参考ReactiveX的文档或者APIDOC来学习。
- https://www.kancloud.cn/luponu/rxjava_zh/974452
- https://projectreactor.io/docs/core/release/api/
创建数据流
@Test
public void testCreateStream(){
Flux.just(1,3,3,"3"); //可以重复,可以是不同了类型
Mono.just(1); //只能包含一个或者没有
List<String> list = Arrays.asList("1","2","3","4","5");
Flux<String> stringFlux = Flux.fromStream(list.stream());
//订阅前什么都不会发生
//只有subscribe()方法调用的时候才会触发数据流
stringFlux.subscribe((item)-> System.out.println(item));
//订阅者的方式2
Flux.just(1,3,3,5,5,new Exception(),5,8)
.subscribe(System.out::println,
System.out::println,
()-> System.out.println("输出完成")
);
}
变换操作
map 从一个元素转换为另外一个元素
/**
* 变换操作符map演示
*/
@Test
public void testMap(){
StepVerifier.create(Flux.range(1, 6)
.map(i -> i * i)) // 2
.expectNext(1, 5, 9, 16, 25, 36)
.expectComplete()
.verify();
}
- 过滤操作
- 结合操作
- 错误处理
- 辅助操作
- 条件和布尔
- 异步操作
等等,可以参考reactor的api文档
5.4 线程调度
Scheduler
Reactor调度线程池与Executors对比
当前线程:schedulers.immediate()
Thread.currentThread()
单个线程:Scheduler.single()
ScheduledThreadPoolExeccutor(core1)
弹性线城池Scheduler.elastic()
ScheduledThreadPoolExecutor
并行线程池:Schedulers.parallel()
ScheduledThreadPoolExecutor
/**
* 调度测试
*/
@SneakyThrows
@Test
public void testScheduler(){
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1,100)
.publishOn(Schedulers.elastic())
.doOnNext((item)-> System.out.println(Thread.currentThread().getName()+"pub|"+item))
.subscribeOn(Schedulers.newSingle("单个线程"))
.publishOn(Schedulers.immediate())
.doOnNext((item)-> System.out.println(Thread.currentThread().getName()+"sub|"+item))
.subscribe(System.out::println, null, countDownLatch::countDown);
countDownLatch.await(10, TimeUnit.SECONDS);
}
subscribeOn和PublishOn
publishOn会影响链中其后的操作符, subscribeOn无论出现在什么位置,都只影响源头的执行环境,也就是range方法是执行在单线程中的,直至被第一个publishOn切换调度器之前,所以range后的map也在单线程中执行。
5.5 异常处理
通过subcribe中错误处理方法处理
通过声明式编程方式处理
/**
* 异常处理
*/
@Test
public void testException(){
Flux.range(1,10)
.map(x->x/(3-x))
.subscribe(System.out::println,(e)->{System.err.println(e.getMessage());},()->System.out.println("complete"));
Flux.range(1,10)
.map(x->x/(3-x))
.doOnNext(System.out::println) //正常打印
// .onErrorResume((item)->Flux.just(33)) //错误时替换为33
.doOnError(System.err::println) //错误时打印
.doOnComplete(()->System.out.println("complete2"))
.subscribe();
}
5.6 背压BackPressure
订阅者能够向上游反馈流量请求的机制就叫做回压(backpressure)。
下游处理速度比upsteam慢,及整个数据流处于push状态下。一个Subcriber可以通过request(n)的方式从主动从上游获取指定个数元素,从而控制数据流程。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-84ZqeNKG-1620015685931)(E://youdao//v2-5015b17395b979ccb8d85a0bb5c1d740_720w.jpg)]
@Test
public void testBackPressure(){
Flux.range(1,10)
.subscribe(new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
//订阅时触发
this.subscription= subscription;
subscription.request(1);//请求一个元素
}
@Override
public void onNext(Integer integer) {
//获取到元素时触发
System.out.println("当前元素:"+integer+""+new Date());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(2);
}
@Override
public void onError(Throwable throwable) {
System.out.println("on error");
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
5.7 Sink的概念
虽然我们之前已经学习了Flux.just和create方法,但是sink概念的提出可以进一步帮助编程解耦,与Processor配合实现响应式编程的最佳实践。
5.7.1 SynchronousSink 同步数据池
SynchronousSink是一种同步sink,通过Flux.generate方法可以利用该数据池发送数据
@Test
public void sinkTest1(){
Flux.generate(()->2,(index,sink)->{
sink.next(index+" | "+new Date());
if(index==5){
sink.complete();
}
return index+1;
},(item)-> System.out.println("complete = finally"+item))
.doOnNext(System.out::print)
.subscribe();
}
5.7.2 FluxSink
create生成数据流的方式既可以是同步的,也可以是异步的,利用的就是FluxSink池。异步的sink池可以持续发射元素,订阅者监控元素的发射并获取和处理。Fluxsink池可以极大增加编码的灵活性。
@SneakyThrows
@Test
public void fluxSinkTest(){
final Flux<Integer> flux = Flux.<Integer> create(fluxSink -> {
//NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink
log.info("sink:{}",fluxSink.getClass());
while (true) {
log.info("sink next");
fluxSink.next(ThreadLocalRandom.current().nextInt());
}
}, FluxSink.OverflowStrategy.BUFFER);
//NOTE flux:class reactor.core.publisher.FluxCreate,prefetch:-1
log.info("flux:{},prefetch:{}",flux.getClass(),flux.getPrefetch());
flux.subscribe(e -> {
log.info("subscribe:{}",e);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
});
TimeUnit.MINUTES.sleep(20);
}
5.8 Hot and Cold
所谓冷热的概念主要区别在于stream流如何响应Subcribers
- cold, 对于一个stream,每个新的Subcriber都会接收到所有的元素。
- hot, stream序列并不会因为Subcriber改变而重头开始,每个新的Subcriber订阅后只能读取订阅开始时刻的剩余stream data。注:个别具备自身缓存的除外
注:hot stream甚至会在没有订阅者的情况下依然发射数据,违反了“nothing happens before you subscribe” 规则
六、WebFLux first View
6.1 定义
WebFlux是spring5.0 reactive技术栈推出的,实现了Reactive Stream 的响应式web编程框架。支持背压和非阻塞IO,可以在Netty(默认),Undertow,Servlet3.1的容器上运行。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JzVXyXyx-1620015685937)(E://youdao//1601436274(1)].jpg)
https://docs.spring.io/spring-framework/docs/5.0.20.BUILD-SNAPSHOT/spring-framework-reference/web-reactive.html#spring-webflux
6.2 Why Webflux?
- 与servlet API解耦,规避其中同步io,阻塞Io,从而利用更少的资源满足非阻塞的方式实现的web的高并发和扩展性需求。
- 函数式编程,就像jdk1.5退出注解编程一样,在jdk1.8中已经推出了lambda表达式。webFLux推出了函数式编程和注解编程两种编程范式。
- The main purpose of Reactive Streams is to allow the subscriber to control how fast or how slow the publisher will produce data.
6.3 webflux对于响应式的定义
- I/O事件驱动和非阻塞。
- 非阻塞背压,调用者可以控制生产的速率,避免生产过剩造成阻塞。(阻塞是一种浪费)
6.4 WebFlux编程模型
- @Controller注解编程,webFlux几乎支持所有springMVC的注解,可以无感的迁移MVC工程到WebFlux。
- Functional Endpoints 函数式编程,基于lambda的轻量级函数式编程
6.5 适用范围
下图给出了springMVC和SpringWebFLux的不同和相同点,总的来说虽然两者底层有巨大的不同,但两者的设计和架构师可持续的,在工作中需要合理搭配,有选择性的使用。
- 没有必要大费周章的全部迁移到webFlux
- 如果仅仅时远程调用,可以使用非阻塞的WebClient
6.6 并发模型
- 对于传统的springMVC,当前的线程可能会阻塞,所以servelt使用一个很大的线程池来避免线程的阻塞
- 对于springFlux而言,默认是不会阻塞线程的,所以非阻塞的服务器会使用一个较小的固定线程池处理请求。
6.7 HttpStreaming & SSE
注:这点和springMVC是一致的,并不是新的事务,这里只是介绍下WebFlux下Server Sent Event的形式
@RestController
public class SseController {
@GetMapping(value = "/sse",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> sendSSE(){
Flux<String> interval = Flux.interval(Duration.ofSeconds(1)).map((item)->{
return item +" | "+ LocalDateTime.now();
});
return interval;
}
}
## 结果
data:0 | 2020-09-30T11:20:07.344377600
data:1 | 2020-09-30T11:20:08.326811800
data:2 | 2020-09-30T11:20:09.326301100
data:3 | 2020-09-30T11:20:10.325746200
data:4 | 2020-09-30T11:20:11.326232500
data:5 | 2020-09-30T11:20:12.326215900
6.8 WebFlux VS webMVC性能对比
webflux
@RestController
public class HelloController {
@GetMapping("/helloFlux")
public Mono<String> hello(){
Mono<String> hello_webFLux = Mono.just("hello webFLux").delayElement(Duration.ofMillis(1000));
return hello_webFLux;
}
}
MVC
@SneakyThrows
@GetMapping("/helloMVC")
public String hello2(){
Thread.sleep(1000);
return "helloMVC";
}
结论:单从测试数据来看WebFLux在吞吐量上有明显的提升,但这个不能说明太多问题,需要更加严谨的思考测试手段
结尾
reactive和non-blocking并不会让应用跑的更快,相反为了维持非阻塞会轻微的加大资源的消耗。响应式编程的可以确定的有点在于,可以利用更少的资源消耗和固定的线程控制
- Reactive and non-blocking generally do not make applications run faster
- using the WebClient to execute remote calls in parallel
- it requires more work to do things the non-blocking way and that can increase slightly the required processing time.
- The key expected benefit of reactive and non-blocking is the ability to scale with a small, fixed number of threads and less memory.
参考链接及资源
- webflux 官方文档
https://docs.spring.io/spring-framework/docs/5.0.20.BUILD-SNAPSHOT/spring-framework-reference/web-reactive.html#spring-webflux - project reactor api文档
https://projectreactor.io/docs/core/release/api/ - ReactiveX中文文档 https://github.com/mcxiaoke/RxDocs/blob/master/Intro.md
- ReactiveX官网 http://reactivex.io/intro.html
- Project Reactor 官网 https://projectreactor.io/
- Reactor 3 Reference Guide
文章一直在硬盘不记得参考过那些博客了
以上是关于响应式编程前生今世-从规范到实例的主要内容,如果未能解决你的问题,请参考以下文章