Dubbo之线程池设计

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Dubbo之线程池设计相关的知识,希望对你有一定的参考价值。

参考技术A

Dubbo的线程模型中可使用4种线程池

想深入了解线程池原理的同学,可以阅读我的 线程池那些事 专栏。

我们的线程主要执行2种逻辑,一是普通IO事件,比如建立连接,断开连接,二是请求IO事件,执行业务逻辑。
在Dubbo的Dispatcher扩展点会使用到这些线程池,Dispatcher这个扩展点用于决定Netty ChannelHandler中的那些事件在Dubbo提供的线程池中执行。

缓冲线程池,默认配置如下

就默认配置来看,和Executors创建的差不多,存在内存溢出风险。
NamedInternalThreadFactory主要用于修改线程名,方便我们排查问题。
AbortPolicyWithReport对拒绝的任务打印日志,也是方便排查问题。

从keepAliveTime的配置可以看出来,LimitedThreadPool线程池的特性是线程数 只会增加不会减少

Dubbo的默认线程池,固定200个线程,就配置来看和LimitedThreadPool基本一致。
如果一定要说区别,那就是FixedThreadPool等到创建完200个线程,再往队列放任务。而LimitedThreadPool是先放队列放任务,放满了之后才创建线程。

我们知道,当线程数量达到corePoolSize之后,只有当workqueue满了之后,才会增加工作线程。
这个线程池就是对这个特性做了优化,首先继承ThreadPoolExecutor实现EagerThreadPoolExecutor,对 当前线程池提交的任务数submittedTaskCount 进行记录。
其次是通过自定义TaskQueue作为workQueue,它会在提交任务时判断是否currentPoolSize<submittedTaskCount<maxPoolSize,然后通过workQueue的offer方法返回false导致增加工作线程。

为什么返回false会增加工作线程,我们回顾下ThreadPoolExecutor的execute方法

然后看下TaskQueue的offer方法逻辑

最后看下EagerThreadPoolExecutor是如何统计已提交的任务

一般来讲使用Dubbo的默认配置,我们公司的业务量还没到需要对线程池进行特殊配置的地步。本文主要目的是,通过一个成熟框架对线程池的配置点,指导我们在实际使用线程池中需要注意的点。

Java并发程序设计线程池之异常终止和正常关闭

1.1. 线程池中的线程的异常终止

如果线程池中的线程的任务代码发生异常导致线程终止,线程池会自动创建一个新线程。

对于各种类型的线程池,都是如此。以下代码在单个线程的线程池中抛出一个异常,可以发现后续任务中输出的每个tid的值都不相同。

 

 

ExecutorService  executorService = Executors.newSingleThreadExecutor();

for(int j=0;j<10;j++){

final int t = j;

executorService.execute( new Runnable(){

@Override

public void run() {

for(int i=0;i<4;i++){

System.out.println("t:" + t + "," + "i:" + i + ", tid:" + Thread.currentThread().getId());

int a = 3 /0;//产生/ by zero异常,线程终止。新任务将创建新线程。

}

}

});

}

 

 

输出信息中可以看到对每个任务(t),线程id(tid)都不同。

t:0,i:0, tid:8

t:1,i:0, tid:10

Exception in thread "pool-1-thread-1" t:2,i:0, tid:11

Exception in thread "pool-1-thread-2" Exception in thread "pool-1-thread-3" java.lang.ArithmeticException: / by zero

t:3,i:0, tid:12

at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

Exception in thread "pool-1-thread-4" java.lang.ArithmeticException: / by zero

at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

t:4,i:0, tid:13

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

java.lang.ArithmeticException: / by zero

at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

t:5,i:0, tid:14

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

java.lang.ArithmeticException: / by zero

at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

t:6,i:0, tid:15

Exception in thread "pool-1-thread-6" Exception in thread "pool-1-thread-5" Exception in thread "pool-1-thread-7" java.lang.ArithmeticException: / by zero

at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

t:7,i:0, tid:16

at java.lang.Thread.run(Unknown Source)

java.lang.ArithmeticException: / by zero

t:8,i:0, tid:17 at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

 

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

Exception in thread "pool-1-thread-9" Exception in thread "pool-1-thread-8" java.lang.ArithmeticException: / by zero

t:9,i:0, tid:18 at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

 

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

Exception in thread "pool-1-thread-10" java.lang.ArithmeticException: / by zero

at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

java.lang.ArithmeticException: / by zero

at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

java.lang.ArithmeticException: / by zero

at com.test.concurrence.ThreadPoolTest$3.run(ThreadPoolTest.java:58)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

 

 

1.2. 线程池的终止

shutdownNow()shutdown()都可以终止线程池,两者都可以终止线程池,阻止新任务的提交。

shutdownNow()会停止当前正在执行的任务,清除已提交但是尚未运行的任务,然后终止线程池。

shutdown()会等待已经提交的所有任务执行完毕才终止线程池。

ExecutorService  executorService = Executors.newFixedThreadPool(2);

for(int j=0;j<10;j++){

final int t = j;

executorService.execute( new Runnable(){

@Override

public void run() {

for(int i=0;i<4;i++){

String s = "t:" + t + "," + "i:" + i + ", tid:" + Thread.currentThread().getId();

System.out.println(s);

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

 

});

 

}

 

executorService.shutdownNow();

 

 

 

 

t:0,i:0, tid:8

end.

t:1,i:0, tid:9

java.lang.InterruptedException: sleep interrupted

t:0,i:1, tid:8

at java.lang.Thread.sleep(Native Method)

at com.test.concurrence.ThreadPoolTest$1.run(ThreadPoolTest.java:27)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

java.lang.InterruptedException: sleep interrupted

at java.lang.Thread.sleep(Native Method)

at com.test.concurrence.ThreadPoolTest$1.run(ThreadPoolTest.java:27)

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

at java.lang.Thread.run(Unknown Source)

t:1,i:1, tid:9

t:0,i:2, tid:8

t:1,i:2, tid:9

t:0,i:3, tid:8

t:1,i:3, tid:9

 

以上是关于Dubbo之线程池设计的主要内容,如果未能解决你的问题,请参考以下文章

JDK线程池Tomcat线程池Dubbo线程池的不同

dubbo 线程池

记一次生产dubbo线程池耗尽的问题

dubbo-线程池监控

Dubbo 线程池源码解析

Java并发程序设计线程池之异常终止和正常关闭