dubbo源码:ThreadPool

Posted CVWarrior

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dubbo源码:ThreadPool相关的知识,希望对你有一定的参考价值。

ThreadPool

ThreadPool

dubbo搞了自己的一套spi线程池,目的就是为了更加根据url定制化。spi接口如下。默认使用的是fixed扩展名对应的扩展实例FixedThreadPool,方法带有@Adaptive注解,可以生成 ThreadPool$Adaptive,再根据@Adaptive注解里的值(THREADPOOL_KEY = threadpool),从url取出对应参数的值来决定使用哪种extName。

@SPI("fixed")
public interface ThreadPool {
   /**
    * Thread pool
    *
    * @param url URL contains thread parameter
    * @return thread pool
    */
   @Adaptive({THREADPOOL_KEY})
   Executor getExecutor(URL url);
}

FixedThreadPool

如下代码比较简单,核心线程和最大线程数默认是200。注意queues的判断使用不同的阻塞队列,这里说一下SynchronousQueue。

SynchronousQueue 也是一个队列来的,但它的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。https://blog.csdn.net/yanyan19880509/article/details/52562039

public class FixedThreadPool implements ThreadPool {

   @Override
   public Executor getExecutor(URL url) {
       String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);// "dubbo"
       int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);// 200
       int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);// 0
       // 所谓的固定就是体现在核心线程数 = 最大线程数
       return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
               queues == 0 ? new SynchronousQueue<Runnable>() :
                      (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                              : new LinkedBlockingQueue<Runnable>(queues)),
               new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
  }
}

LimitedThreadPool

直接看注释。核心线程池和最大线程池默认为0和200,200满了+队列满了,走拒绝策略。

public class LimitedThreadPool implements ThreadPool {

   @Override
   public Executor getExecutor(URL url) {
       String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
       int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);// 0
       int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);// 200
       int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);// 0
       // 创建线程池
       return new ThreadPoolExecutor(
               cores,// 核心线程数
               threads,// 最大线程数
               Long.MAX_VALUE, // keepAlive时间-->直接最大值,一直工作,所以就是类上面注释提到的不会自动收缩
               TimeUnit.MILLISECONDS,// keepAlive单位
               queues == 0 ? new SynchronousQueue<Runnable>() : // 任务队列
                      (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                              : new LinkedBlockingQueue<Runnable>(queues)),
               // 线程工厂,name作为线程名称的前缀,注意使用的是Named[Internal]ThreadFactory,new 的是 InternalThread 线程,
               // 目的是要用改造版的 ThreadLocal (即InternalThreadLocal),必须要配合 InternalThread 线程使用,否则就会退化为原生的 ThreadLocal
               // InternalThreadLocal可以后面去了解。NamedInternalThreadFactory进去
               new NamedInternalThreadFactory(name, true),
               new AbortPolicyWithReport(name, url));// 拒绝策略,使用基于AbortPolicy自定义的一个拒绝策略,进去

  }

}

CachedThreadPool

核心线程和最大线程数默认是0和Integer.MAX_VALUE。这个线程池是自调优的。线程在空闲一分钟后将被回收(最小可以回缩到默认的0),新的线程将为即将到来的请求创建。

public class CachedThreadPool implements ThreadPool {

   @Override
   public Executor getExecutor(URL url) {
       String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
       int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);// 0
       int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); // 无限大
       int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); // 0
       int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); // 60s
       return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
               queues == 0 ? new SynchronousQueue<Runnable>() :
                      (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                              : new LinkedBlockingQueue<Runnable>(queues)),
               new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
  }
}



以上是关于dubbo源码:ThreadPool的主要内容,如果未能解决你的问题,请参考以下文章

Percona 8.0 ThreadPool源码解析

threadpool源码学习

ThreadPool线程池源码解析

dubbo 线程池

《Elasticsearch 源码解析与优化实战》第16章:ThreadPool模块分析

《Elasticsearch 源码解析与优化实战》第16章:ThreadPool模块分析