dubbo源码:ThreadPool
Posted CVWarrior
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dubbo源码:ThreadPool相关的知识,希望对你有一定的参考价值。
ThreadPool
ThreadPool
dubbo搞了自己的一套spi线程池,目的就是为了更加根据url定制化。spi接口如下。默认使用的是fixed扩展名对应的扩展实例FixedThreadPool,方法带有@Adaptive注解,可以生成 ThreadPool$Adaptive,再根据@Adaptive注解里的值(THREADPOOL_KEY = threadpool),从url取出对应参数的值来决定使用哪种extName。
@SPI("fixed")
public interface ThreadPool {
/**
* Thread pool
*
* @param url URL contains thread parameter
* @return thread pool
*/
@Adaptive({THREADPOOL_KEY})
Executor getExecutor(URL url);
}
FixedThreadPool
如下代码比较简单,核心线程和最大线程数默认是200。注意queues的判断使用不同的阻塞队列,这里说一下SynchronousQueue。
SynchronousQueue 也是一个队列来的,但它的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。https://blog.csdn.net/yanyan19880509/article/details/52562039
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);// "dubbo"
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);// 200
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);// 0
// 所谓的固定就是体现在核心线程数 = 最大线程数
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
LimitedThreadPool
直接看注释。核心线程池和最大线程池默认为0和200,200满了+队列满了,走拒绝策略。
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);// 0
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);// 200
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);// 0
// 创建线程池
return new ThreadPoolExecutor(
cores,// 核心线程数
threads,// 最大线程数
Long.MAX_VALUE, // keepAlive时间-->直接最大值,一直工作,所以就是类上面注释提到的不会自动收缩
TimeUnit.MILLISECONDS,// keepAlive单位
queues == 0 ? new SynchronousQueue<Runnable>() : // 任务队列
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
// 线程工厂,name作为线程名称的前缀,注意使用的是Named[Internal]ThreadFactory,new 的是 InternalThread 线程,
// 目的是要用改造版的 ThreadLocal (即InternalThreadLocal),必须要配合 InternalThread 线程使用,否则就会退化为原生的 ThreadLocal
// InternalThreadLocal可以后面去了解。NamedInternalThreadFactory进去
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));// 拒绝策略,使用基于AbortPolicy自定义的一个拒绝策略,进去
}
}
CachedThreadPool
核心线程和最大线程数默认是0和Integer.MAX_VALUE。这个线程池是自调优的。线程在空闲一分钟后将被回收(最小可以回缩到默认的0),新的线程将为即将到来的请求创建。
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);// 0
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE); // 无限大
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES); // 0
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE); // 60s
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
以上是关于dubbo源码:ThreadPool的主要内容,如果未能解决你的问题,请参考以下文章