关于DeferredResult的思考

Posted skyice

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于DeferredResult的思考相关的知识,希望对你有一定的参考价值。

使用SpringBoot搭建web程序,里面内置了tomcat,一般都不会关心内部实现机制,上来就可以写程序,并且可以跑起来。但是是思考了每次的请求是如何工作的。
简单的来讲就是tomcat是将每次请求都将封装成一个Servlet,该Servlet来运行完业务逻辑代码,然后再有tomcat将信息返回给调用方。每个Servlet是同步的。即在该servlet的业务逻辑做完了然后才释放掉该Servlet。
但是servlet3提供了一个异步的机制,即每次请求过来之后,可以先释放该请求,但是会保存一些信息。业务逻辑由程序其他线程来处理,处理完成后将其值设置到DeferredResult里面。然后再由容器将返回值返回给前端。
这样做的好处:实现出现请求与业务IO分开,程序能够处理更多的请求。
网上可以找到其他例子来学习DeferredResult是如何运行的,即在请求内部开启一个线程来处理

@GetMapping
public DeferredResult<String> queryDevice(){
    DeferredResult<String> def = new DeferredResult<>();
    new Thread(()->{
        //处理业务逻辑
        def.setResult("处理后的结果");
    }).start();
    return def;
}

这样很好理解,但是不能这样做,为什么,因为每一次线程的创建销毁是消耗资源的,这样频繁的创建和销毁非常影响性能。这个时候,可以提使用线程池来处理,对是可以这样做的。是的,可以这样做,但是需要考虑到,在某一时刻,可能会产生几千个线程,这样是非常多的,如果加上tomcat创建的Servlet线程数,那确实挺消耗资源的。
上面已经有了一个可行的方案,这里提供我的一个思考,该思考是Java8新特性之后常用到的一个。

下面有三个类:

public abstract class Actor {

  public enum ActorType {
    ITC,  /* 立刻消费. */

    BLOCKING;  /* 阻塞.*/
  }
  /**
   * actor类型.
   */
  public ActorType type;
  /**
   * actor名.
   */
  public String name;
  public Actor(ActorType type) {
    this.type = type;
    this.name = this.getClass().getSimpleName();
  }
  /**
   * 任务消费
   */
  public void future(Consumer<Void> c) {
    if (this.type.ordinal() == ActorType.BLOCKING.ordinal()) {//阻塞
      ((ActorBlocking) this).push(c);
      return;
    } else {
      Misc.exeConsumer(c, null);
    }
  }
}
public class ActorBlocking extends Actor {

  /**
   * 等待处理的Consumer.
   */
  private ConcurrentLinkedQueue<Consumer<Void>> cs = new ConcurrentLinkedQueue<>();
  /**
   * 拥有线程的个数
   */
  private int tc = 1;

  /**
   * cs的size
   */
  private AtomicInteger size = new AtomicInteger(0);

  /**
   * 线程忙?
   */
  public volatile boolean busy = false;

  public ActorBlocking() {
    super(ActorType.BLOCKING);
  }

  /**
   * 添加任务.
   */
  public void push(Consumer<Void> c) {
    this.cs.add(c);
    this.size.incrementAndGet();
    synchronized (this) {//通知线程消费信息
      this.notify();
    }
  }

  /**
   * 线程忙?
   */
  public boolean isBusy() {
    return this.busy;
  }

  /**
   * 队列尺寸.
   */
  public int size() {
    return this.size.get();
  }

  public int getTc() {
    return tc;
  }

  public void setTc(int tc) {
    this.tc = tc < 1 ? 1 : tc;
  }

  /**
   * 启动线程
   */
  protected void start() {
    ActorBlocking ab = this;
    ExecutorService ex = Executors.newFixedThreadPool(this.tc);//创建线程池
    for (int i = 0; i < tc; i++) {
      ex.execute(() -> {
        while (true) {
          ab.run();
        }
      });
    }
  }

  /**
   * 抢占式消费任务
   */
  private void run() {
    Consumer<Void> c = this.cs.poll();
    if (c == null) {
      synchronized (this) {
        try {
          this.wait();
        } catch (InterruptedException e) {
        }
      }
      c = this.cs.poll();
    }
    if (c != null) /* 抢占式. */ {
      this.size.decrementAndGet();
      this.busy = true;
      Misc.exeConsumer(c, null);
      this.busy = false;
    }
  }
}
@Component
public class AppActorBlocking extends ActorBlocking {
   //可以设置CPU*2的     
  private int threadSize = 4;

  public AppActorBlocking() {
    this.setTc(threadSize);//设置线程数量
    this.start();
  }
}

该方法是工具Misc类总的方法:

 /**
   * 执行Consumer并将异常化解在内部.
   */
  public static final <T> boolean exeConsumer(Consumer<T> c, T t) {
    try {
      c.accept(t);
      return true;
    } catch (Exception e) {
      if (logger.isWarnEnabled()) {
        logger.warn("{}", Misc.trace(new Throwable()));
      }
      if (logger.isWarnEnabled()) {
        logger.warn("t: {}, e: {}", t, Misc.trace(e));
      }
      return false;
    }
  }

如何调用:

@Autowired
public AppActorBlocking appBlocking;

public void method(){
    appBlocking.future(v->{
        //处理逻辑代码
    });
}

上面的代码理解是所有的业务逻辑都是一个个Task,每一次请求过来,那么我就将业务逻辑代码生成一个Task,放入到队列中,然后由线程去取其中的任务来消费。
这里仅仅是换了一个思路,不是由线程池去创建线程来处理,而是创建几个线程,然后抢占式的去消费任务,而过来的每次请求,都会放入到队列中。

DeferredResult的异步处理能够提升一些服务器的性能,处理更多的连接数,但是一个WEB程序,处理连接数还与内置默认的tomcat相关(SpringBoot下还有其他容器),即tomcat默认的处理最大连接数为200,除了最大连接数,还有一个tomcat的最大处理线程数,如果该处设置小了,那么并发也一定会小,在设置这些之外,需要设置一个等待队列的大小,总有一些请求是不能被处理的,但又不能拒绝掉,否则用户体验特别不好,那么就进入到等待队列中,等tomcat有空闲的线程再来处理等待队列中的线程。

至于什么时候用到该DeferredResult,如果是访问量不大的程序,如管理系统,没必要使用到这个,毕竟没有访问量,反而增大了开发量,但是如果做了很好的封装,那么就没关系了,这个就考量各自程序员的水平了。







以上是关于关于DeferredResult的思考的主要内容,如果未能解决你的问题,请参考以下文章

使用DeferredResult实现异步处理REST服务示例

为啥 DeferredResult 在尝试使用 SSE 时以 setResult() 结束

在 Spring Boot 的 DeferredResult REST 方法中使用 Semaphore 服务

DeferredResult 实现长轮询

带有ajax请求的Spring deferredresult

DeferredResult使用方式和场景