话说Spring 5里的WebFlux到底是个啥?
Posted 有事没事折腾几下
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了话说Spring 5里的WebFlux到底是个啥?相关的知识,希望对你有一定的参考价值。
在聊webflux之前, 应该先说说响应式编程. 那么什么是响应式编程呢?
响应式编程
响应式编程是异步的编程方式, 它更关注数据流和数据变化的传递. 换言之, 开发者能够很容易的表示静态或动态的数据流, 并且根据其依赖关系在传递时自动的发生变化.
那么它与之前的有什么不同呢?
a = b + c * d
同样的语句, 如果是之前命令式的编程, 那么这行代码只会执行一次, 在此之后, a的值就是固定的了.
而响应式编程是不同的, 这行代码不只是一个命令, 而是一个模型, b, c, d是它的输入, a是它的输出, 那么a的值就会随着b, c, d的变化而变化.
甚至你可以想象成一个excel表, 以上4个变量分别代表4个格子, 而这行代码就是它们的关联关系. 那么是不是只要b, c, d中的格子里的值变化了, a格子里的值就自动变化了?
对于spring boot 2.x来说, 响应式技术栈是其主打的核心特性. 响应式是思路, 是范式, 是标准, 而技术核心是Reactive Streams.
Reactive Streams
Reactive Streams其实就是关于流的异步处理, 能够做到:
处理可能无限的数据
按顺序处理
异步传递
非阻塞背压
Reactive Streams必须提供以下组件:
发布者(生产者), publisher
订阅者(消费者), Subscriber
订阅, Subscription
处理者, Processor
这在JDK 9中实际上就是4个接口, 这里我就不把源码拿过来了, 非常的简单.
这里只提几个需要注意的地方:
API要求所有的元素处理(onNext调用)或终止调用(onError,onComplete)禁止阻塞发布者, 但是每一个on开头的方法都能同步处理事件, 也能异步的处理事件, 响应式流为了能够在一个非阻塞,异步,动态的推拉式流的限制内, 实现灵活性来管理资源和调度,混合异步和同步处理, 这些接口定义的所有方法返回值都是void.
在底层设计中, 缓冲区必须有界, 如果无限制, 很容易出现OOM错误, 但背压又是强制的(下面有讲), 使用无界缓冲区能够避免背压带来的问题. 那么这就是一个矛盾.
一般说来, 只有在当一个队列可能无界增长时,此时也是发布者端比订阅者端保持一个更高的速率,且持续了一段较大的时间,才会发生缓冲区有界, 但是背压又出现的情况.
这里假设请求的总数为P, 已经处理的数目为N, 那么能接收的最大数量就是P - N, 在订阅者缓冲区的数目为B, 那么如果有新来的请求, 这时就产生了临界值, P - B - N, 这个值必须要让发布者知道, 那么新增的请求要么等待, 要么抛弃. 这时对于订阅者来说, 接收一个请求, 就可以申请获取另一个请求, 那么这个需求就和响应是对等的, 通过需求的方式获取数据, 而这个响应的花销就是分期偿还的.
总体说来, 这种事儿就是一个协作成果, 大家商量好怎么处理, 就能取得不错的效果.
背压
这里提一下背压. 背压的直观理解就是下游出了问题, 得不到解决时, 这个问题就会向上, 影响上游.
为什么要这么说? 因为流式数据, 特别是在线数据, 大小的量是无法预知的, 在异步系统中, 就可能造成大量的资源消耗, 响应式流的主要目标是控制横穿一个异步边界的流数据的交换, 这就相当于两个线程池在互换数据, 得要确保接收方不会被强迫缓冲任意数量的数据. 那么这个时候, 背压就产生了.
总结一下, 背压其实是一种策略,使得发布者拥有无限制的缓冲区存储元素,用于确保发布者发布元素太快时,不会去压制订阅者. 这也是响应式编程的好处, 它可以平衡请求和响应率, 也就是说, 当响应堵塞时, 请求也会堵塞. 因此, 可以这么说, 响应式 = 异步 + 背压.
异步
同步异步的概念, 这里不再多说了, 相信大家都是很了解的. 这里说一说不一样的.
我们都知道, tomcat默认的最大线程是200个, 假设我这时来了200个线程, 都特别的耗时, 这时候再来一个请求, 那肯定处理不了, 如果不做特别的处理, 就会丢失请求.
在spring mvc中, 请求不会丢失, 但一次只能处理大约200个. 来个例子:
@RestController
@RequestMapping("/mvc")
public class MvcController {
private StudentDao studentDao;
@Autowired
public MvcController(StudentDao studentDao) {
this.studentDao = studentDao;
}
@PostMapping("/find")
public Student find(int age) {
System.out.println(age);
return studentDao.find(age);
}
}
以下模拟了一个dao
@Component
public class StudentDao {
public Student find(int age) {
Student student = new Student();
student.setAge(age);
student.setGender("M");
student.setName("Michael");
if (age > 0) {
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return student;
}
}
用python来调用:
import requests
from multiprocessing import Pool
#url = "http://127.0.0.1:9292/mvc/find"
url = "http://127.0.0.1:9090/flux/find"
def req(age):
r = requests.post(url, params={"age": age})
return r.text
if __name__ == "__main__":
pool = Pool(processes=400)
x = range(0, 400)
results = pool.map(req, x)
print(results)
你会发现: 400个并发请求, 一次大概只能处理200个, 再过一会儿剩下200个再被处理. 这是因为程序运行在servlet 3.1+的容器上, 如果不是, 则后面200个请求会被抛弃.
servlet 3.1+的一个重要新特性就是异步处理的支持.
WebFlux
WebFlux返回的类型有两个, Mono
和Flux
, 如果返回到值只有0个或1个, 那么用Mono
, 如果是多个的话, 就用Flux
@PostMapping("/find")
public Mono<Student> find(int age) {
System.out.println(age);
return Mono.just(studentDao.find(age));
}
可以明显的发现, 过了最初的等待期后, 请求是不断被处理的, 并没有像MVC那种明显断开的感受.
小结
在一个微服务架构中,可以选择混合使用Spring MVC和WebFlux。它们都支持基于注解的编程模型,这对于代码的复用至关重要。
向非阻塞、函数式以及声明式编程的转变是一个陡峭的学习曲线
Spring MVC controllers 也可以直接调用其他的响应式组件
Spring MVC如果能运行的很好, 那么没有必要去改变。虽然因为历史原因,命令式编程大部分都是阻塞式的,但它在编码、理解和调试方面都是最容易的方式,并且背后有丰富的依赖库支撑.
webflux主要是运行在netty上, 当然也可以运行在tomcat里, 但是请注意, 只能运行在servlet 3.1+的容器里.
如果在持久层面上也需要响应式的数据访问, 那么也必须提供异步驱动, 如mongoDB, Redis. 当然也可以通过线程异步集成
在API网关层面上, 由于网关是反向代理, 有鉴权限流等处理, 在高并发和潜在高延迟的场景, API网关要实现高性能高吞吐量的一个基本要求就是全链路异步, 不能阻塞线程, 所以Zuul 1.x网关本质就是一个servlet, 每一个HTTP请求都会对应一个线程来处理, 对于IO操作还会再起一个工作线程来执行. 操作完成前, 工作线程是阻塞的.
以上是关于话说Spring 5里的WebFlux到底是个啥?的主要内容,如果未能解决你的问题,请参考以下文章