这还是你认识的Spring吗?

Posted 方家小白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了这还是你认识的Spring吗?相关的知识,希望对你有一定的参考价值。

为什么创造webFlux?

1.需要非阻塞web堆栈来处理少量线程的并发性,并减少硬件资源进行扩展。2.函数式编程,它允许异步逻辑的生命性组合。在编程模型界别,java8使Spring webFlux能够提供功能性web端点以及带注解的控制器。

首先介绍下servlet

servlet由servlet container进行生命周期管理。container启动时构造servlet对象并调用servlet init()进行初始化;container关闭时调用servlet destory()销毁servlet;container运行时接受请求,并为每个请求分配一个线程(一般从线程池中获取空闲线程)然后调用service()。

servlet是一种简单的的网络编程模型。当请求进入servlet container时,servlet container就会为其绑定一个线程,在并发不高的场景下这种模型是适用的,但是一旦并发上升,线程数量就会上涨,而线程资源代价是昂贵的(上线文切换,内存消耗大)严重影响请求的处理时间。在一些简单的业务场景下,不希望为每个request分配一个线程,只需要1个或几个线程就能应对极大并发的请求,这种业务场景下servlet模型没有优势。

webFlux模型

Webflux模式替换了旧的Servlet线程模型。用少量的线程处理request和response io操作,这些线程称为Loop线程,而业务交给响应式编程框架处理,响应式编程是非常灵活的,用户可以将业务中阻塞的操作提交到响应式框架的work线程中执行,而不阻塞的操作依然可以在Loop线程中进行处理,大大提高了Loop线程的利用率。

这还是你认识的Spring吗?那我们选择springMVC还是Spring webFlux呢?这还是你认识的Spring吗?

对此,官网也是给出了一些建议:

1.如果你的SpringMvc程序运行正常,那就不用更改了.毕竟命令式编程是编写,理解和调试代码最简单的方法。 

2.如果您对使用Java 8 lambdas或Kotlin的轻量级功能web框架感兴趣,则使用Spring WebFlux功能web端点。对于具有较少复杂要求的较小应用或微服务,这也可能是更好的选择,可以从更大的透明度和控制中受益。 

3.在微服务架构中,您可以使用Spring MVC或Spring WebFlux控制器或Spring WebFlux功能端点的混合应用程序。在两个框架中支持相同的基于注释的编程模型,使得重新使用知识更容易,同时为正确的工作选择正确的工具。 

4. 如果你使用阻塞的持久性API(JPA,JDBC)或者使用的网络API。springMVC是常见架构的最佳选择, Reactor或者RxJava也是可以可行的,但是不能充分的利用非阻塞的web堆栈模型了。

接下来看下几个重要的知识点:

  • 响应式: 是指围绕变化做出反应的编程模式.非阻塞是被动的,在操作完成或数据可用时对通知(下文中的数据和事件)做出反应. 举个例子来讲就是excel中的加法公式。具有变化传递的特点。
    响应式的第二个特点就是基于数据流。

  • 数据流: 响应式编程的中的数据和事件的传输会以数据流的方式发出。

  • 声明式:

总的来说: 响应式编程是一种基础数据流和变化传递的声明式的编程范式。咱们先看下reactor。

Reactor

Reactor与Spring是兄弟项目,侧重于Server端的响应式编程,主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。(其实是放大的话的监听模式)

Reactor中的发布者(Publisher)由Flux和Mono两个类定义.

  • Flux: 代表一个包含0..N个元素的响应式序列。

  • Mono: 代表一个包含0个或者1个的元素的结果。

Publisher可以发出三种信号: 元素值,错误信号,完成信号. 错误信号和完成信号都是终止信号.

  • 元素值: 即返回的数据

  • 完成信号: 用于告知下游订阅者该数据正常结束.

  • 错误信号: 告知订阅者数据流已结束。如果没有发出任何一个元素值,直接发出一个完成/错误信号.表示这是一个空数据流. 如果没有错误信号或者完成信号,那就是一个无限数据流。

// 只有完成信号的空数据流
Flux.just();
Flux.empty();
Mono.empty();
Mono.justOrEmpty(Optional.empty());

// 只有错误信号的数据流
Flux.error(new Exception("some Error"));
Mono.eror(new Exception("some error"));

接下来是Flux和Mono的一个简单使用:

 public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5).subscribe(System.out::println);

System.out.println();

Mono.just(1).subscribe(System.out::println);

}

// 输出的结果是
1
2
3
4
5

1

其他API:

// 订阅并触发数据流
subscribe();
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer);
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);

响应式编程中如何调试呢?

使用StepVerifier来测试。

使用案例:

private static void test2() {

StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6))
// 用于测试下一个期望的数据元素
.expectNext(1, 2, 3, 4, 5, 6)
// 用于校验下一个元素是否为完成信号
.expectComplete()
// 进行验证
.verify();

StepVerifier.create(Mono.error(new Exception("error")))
// 用于校验下一个元素是否为错误信号
.expectErrorMessage("error1")
.verify();
}

操作符

map: 映射
flatMap: 每个元素映射为流,然后将流汇总成一个流
filter: 过滤
zip: 合并
用于编程方式自定义生成数据流的create和generate等及其变体方法;
用于“无副作用的peek”场景的doOnNext、doOnError、doOncomplete、doOnSubscribe、doOnCancel等及其变体方法;
用于数据流转换的when、and/or、merge、concat、collect、count、repeat等及其变体方法;
用于过滤/拣选的take、first、last、sample、skip、limitRequest等及其变体方法;
用于错误处理的timeout、onErrorReturn、onErrorResume、doFinally、retryWhen等及其变体方法;
用于分批的window、buffer、group等及其变体方法;
用于线程调度的publishOn和subscribeOn方法。

调度器和线程模型。

reactor提供了Scheduler接口做调度器的顶层接口,Schedulers提供了静态方式创建线程执行。

private static void test1() {

// 获取当前线程
System.out.println("当前线程:" + Schedulers.immediate());

// 创建一个可重用的单线程,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器被废弃
Scheduler single = Schedulers.single();
Scheduler.Worker worker = single.createWorker();
worker.schedule(() -> {
System.out.println(Thread.currentThread().getName());
});
single.start();
single.dispose();

// 创建一个独占的线程
Scheduler newSingle = Schedulers.newSingle("独占的线程");
Scheduler.Worker worker2 = single.createWorker();
worker2.schedule(() -> {
System.out.println(Thread.currentThread().getName());
});
newSingle.start();
newSingle.dispose();

// 弹性线程池(Schedulers.elastic())。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源;
Scheduler elastic = Schedulers.elastic();
elastic.schedule(() -> System.out.println(Thread.currentThread().getName()));

Scheduler parallel = Schedulers.parallel();
parallel.schedule(() -> {
System.out.println(Thread.currentThread().getName());
});

// 自定义线程池
// Schedulers.fromExecutorService()
}

注意下:

  • Schedulers.single()和Schedulers.newSingle()对应Executors.newSingleThreadExecutor();

  • Schedulers.elastic()和Schedulers.newElastic()对应Executors.newCachedThreadPool();

  • Schedulers.parallel()和Schedulers.newParallel()对应Executors.newFixedThreadPool();

Flux和Mono在异常时候的API

  • onErrorReturn: 在收到错误信号的时候提供一个缺省值。

  • onErrorResume: 在收到错误信号的时候提供一个新的数据流。

  • onErrorMap: 捕获异常信息。会包装成一个业务相关的异常交给后续的逻辑处理。

  • doOnError: 对于错误发生时,在不想改变它做出响应,并让错误继续传递下去,可以使用doOnError方法。

  • finally: 清理资源.(这个不知道怎么使用)

  • doFinally: 在结束时(onComplete,onError)触发,并且能够判断是什么类型的终止事件,以便进行针对性的操作。

  • retry: 重试。

回压(背压)

subscribe(Subscriber subscriber)

private static void test3() {
// Flux.range是一个特别快的Publisher
Flux.range(1, 6)
// 在每次请求的时候触发
.doOnRequest(n -> System.out.println(n))
// 自定义Subscriber
.subscribe(new BaseSubscriber<Integer>() {
// 在订阅的时候执行的操作
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed and make a request....");
// 订阅的时候,像上游请求一个元素
request(1);
}

// 在收到一个元素的时候操作
@Override
protected void hookOnNext(Integer value) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("get value [" + value + "]");
// 处理完成后在请求一个元素
request(1);
}
});
}

具体背压的内容,我们后续进行详细解释,目前就暂且了解到这里。

扫描二维码

获取更多精彩

方家小白




以上是关于这还是你认识的Spring吗?的主要内容,如果未能解决你的问题,请参考以下文章

阿里四面:你知道Spring AOP创建Proxy的过程吗?

在8102年的今天,你清楚Spring 5.0的WebFlux吗?

这两个代码片段之间有区别吗?如果有,那又如何? [复制]

一张图帮你记忆,Spring Boot 应用在启动阶段执行代码的几种方式

「Spring 」「AOP 容器」不看源码就带你认识核心流程以及运作原理

对Spring PostConstruct注解的一点新认识