关于DeferredResult的思考
Posted skyice
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于DeferredResult的思考相关的知识,希望对你有一定的参考价值。
使用SpringBoot搭建web程序,里面内置了tomcat,一般都不会关心内部实现机制,上来就可以写程序,并且可以跑起来。但是是思考了每次的请求是如何工作的。
简单的来讲就是tomcat是将每次请求都将封装成一个Servlet,该Servlet来运行完业务逻辑代码,然后再有tomcat将信息返回给调用方。每个Servlet是同步的。即在该servlet的业务逻辑做完了然后才释放掉该Servlet。
但是servlet3提供了一个异步的机制,即每次请求过来之后,可以先释放该请求,但是会保存一些信息。业务逻辑由程序其他线程来处理,处理完成后将其值设置到DeferredResult里面。然后再由容器将返回值返回给前端。
这样做的好处:实现出现请求与业务IO分开,程序能够处理更多的请求。
网上可以找到其他例子来学习DeferredResult是如何运行的,即在请求内部开启一个线程来处理
@GetMapping
public DeferredResult<String> queryDevice(){
DeferredResult<String> def = new DeferredResult<>();
new Thread(()->{
//处理业务逻辑
def.setResult("处理后的结果");
}).start();
return def;
}
这样很好理解,但是不能这样做,为什么,因为每一次线程的创建销毁是消耗资源的,这样频繁的创建和销毁非常影响性能。这个时候,可以提使用线程池来处理,对是可以这样做的。是的,可以这样做,但是需要考虑到,在某一时刻,可能会产生几千个线程,这样是非常多的,如果加上tomcat创建的Servlet线程数,那确实挺消耗资源的。
上面已经有了一个可行的方案,这里提供我的一个思考,该思考是Java8新特性之后常用到的一个。
下面有三个类:
public abstract class Actor {
public enum ActorType {
ITC, /* 立刻消费. */
BLOCKING; /* 阻塞.*/
}
/**
* actor类型.
*/
public ActorType type;
/**
* actor名.
*/
public String name;
public Actor(ActorType type) {
this.type = type;
this.name = this.getClass().getSimpleName();
}
/**
* 任务消费
*/
public void future(Consumer<Void> c) {
if (this.type.ordinal() == ActorType.BLOCKING.ordinal()) {//阻塞
((ActorBlocking) this).push(c);
return;
} else {
Misc.exeConsumer(c, null);
}
}
}
public class ActorBlocking extends Actor {
/**
* 等待处理的Consumer.
*/
private ConcurrentLinkedQueue<Consumer<Void>> cs = new ConcurrentLinkedQueue<>();
/**
* 拥有线程的个数
*/
private int tc = 1;
/**
* cs的size
*/
private AtomicInteger size = new AtomicInteger(0);
/**
* 线程忙?
*/
public volatile boolean busy = false;
public ActorBlocking() {
super(ActorType.BLOCKING);
}
/**
* 添加任务.
*/
public void push(Consumer<Void> c) {
this.cs.add(c);
this.size.incrementAndGet();
synchronized (this) {//通知线程消费信息
this.notify();
}
}
/**
* 线程忙?
*/
public boolean isBusy() {
return this.busy;
}
/**
* 队列尺寸.
*/
public int size() {
return this.size.get();
}
public int getTc() {
return tc;
}
public void setTc(int tc) {
this.tc = tc < 1 ? 1 : tc;
}
/**
* 启动线程
*/
protected void start() {
ActorBlocking ab = this;
ExecutorService ex = Executors.newFixedThreadPool(this.tc);//创建线程池
for (int i = 0; i < tc; i++) {
ex.execute(() -> {
while (true) {
ab.run();
}
});
}
}
/**
* 抢占式消费任务
*/
private void run() {
Consumer<Void> c = this.cs.poll();
if (c == null) {
synchronized (this) {
try {
this.wait();
} catch (InterruptedException e) {
}
}
c = this.cs.poll();
}
if (c != null) /* 抢占式. */ {
this.size.decrementAndGet();
this.busy = true;
Misc.exeConsumer(c, null);
this.busy = false;
}
}
}
@Component
public class AppActorBlocking extends ActorBlocking {
//可以设置CPU*2的
private int threadSize = 4;
public AppActorBlocking() {
this.setTc(threadSize);//设置线程数量
this.start();
}
}
该方法是工具Misc类总的方法:
/**
* 执行Consumer并将异常化解在内部.
*/
public static final <T> boolean exeConsumer(Consumer<T> c, T t) {
try {
c.accept(t);
return true;
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("{}", Misc.trace(new Throwable()));
}
if (logger.isWarnEnabled()) {
logger.warn("t: {}, e: {}", t, Misc.trace(e));
}
return false;
}
}
如何调用:
@Autowired
public AppActorBlocking appBlocking;
public void method(){
appBlocking.future(v->{
//处理逻辑代码
});
}
上面的代码理解是所有的业务逻辑都是一个个Task,每一次请求过来,那么我就将业务逻辑代码生成一个Task,放入到队列中,然后由线程去取其中的任务来消费。
这里仅仅是换了一个思路,不是由线程池去创建线程来处理,而是创建几个线程,然后抢占式的去消费任务,而过来的每次请求,都会放入到队列中。
DeferredResult的异步处理能够提升一些服务器的性能,处理更多的连接数,但是一个WEB程序,处理连接数还与内置默认的tomcat相关(SpringBoot下还有其他容器),即tomcat默认的处理最大连接数为200,除了最大连接数,还有一个tomcat的最大处理线程数,如果该处设置小了,那么并发也一定会小,在设置这些之外,需要设置一个等待队列的大小,总有一些请求是不能被处理的,但又不能拒绝掉,否则用户体验特别不好,那么就进入到等待队列中,等tomcat有空闲的线程再来处理等待队列中的线程。
至于什么时候用到该DeferredResult,如果是访问量不大的程序,如管理系统,没必要使用到这个,毕竟没有访问量,反而增大了开发量,但是如果做了很好的封装,那么就没关系了,这个就考量各自程序员的水平了。
以上是关于关于DeferredResult的思考的主要内容,如果未能解决你的问题,请参考以下文章
使用DeferredResult实现异步处理REST服务示例
为啥 DeferredResult 在尝试使用 SSE 时以 setResult() 结束
在 Spring Boot 的 DeferredResult REST 方法中使用 Semaphore 服务