线程池的原理及实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池的原理及实现相关的知识,希望对你有一定的参考价值。

1、线程池简介:

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

一个线程池包括以下四个基本组成部分:

  1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;

  2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;

  3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;

  4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
                
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

线程池实现代码:

  1 package com.wb.thread;
  2 
  3 import java.util.LinkedList;
  4 import java.util.List;
  5 
  6 /**
  7  * 线程池类
  8  * @author wangbo
  9  *
 10  */
 11 public class ThreadPool {
 12     
 13     private static int worker_num = 5;//线程池中线程的个数,默认为5
 14     
 15     private WorkThread[] workthreads;//工作线程
 16     
 17     private static volatile int finished_task = 0;//未处理的任务
 18     
 19     private List<Runnable> taskQueue = new LinkedList<Runnable>();//任务队列
 20     
 21     private static ThreadPool threadPool;
 22     
 23     /**
 24      * 无参构造器,创建线程池
 25      */
 26     private ThreadPool(){
 27         this(5);
 28     }
 29     
 30     /**
 31      * 含参构造器,创建线程池
 32      * @param num
 33      */
 34     private ThreadPool(int num){
 35         worker_num = num;
 36         workthreads = new WorkThread[num];
 37         for (int i = 0; i < workthreads.length; i++) {
 38             workthreads[i] = new WorkThread();
 39             workthreads[i].start();//开启线程
 40         }
 41     }
 42     
 43     /**
 44      * 获得一个默认线程个数的线程池
 45      * @return
 46      */
 47     public static ThreadPool getThreadPool(){
 48         return getThreadPool(ThreadPool.worker_num);
 49     }
 50     
 51     /**
 52      * 获得一个指定线程个数的线程池
 53      * @param num
 54      * @return
 55      */
 56     public static ThreadPool getThreadPool(int num) {
 57         if (num <= 0) {
 58             num = ThreadPool.worker_num;
 59         }
 60         if (threadPool == null) {
 61             threadPool = new ThreadPool(num);
 62         }
 63         return threadPool;
 64     }
 65 
 66     /**
 67      * 将任务单个添加到队列
 68      * @param task
 69      */
 70     public void execute(Runnable task){
 71         synchronized (taskQueue) {
 72             taskQueue.add(task);
 73             taskQueue.notify();
 74         }
 75     }
 76     
 77     /**
 78      * 将任务批量添加到队列
 79      * @param tasks
 80      */
 81     public void execute(Runnable[] tasks){
 82         synchronized (taskQueue) {
 83             for (Runnable runnable : tasks) {
 84                 taskQueue.add(runnable);
 85             }
 86             taskQueue.notify();
 87         }
 88     }
 89     
 90     /**
 91      * 将任务批量添加到队列
 92      * @param tasks
 93      */
 94     public void execute(List<Runnable> tasks){
 95         synchronized (taskQueue) {
 96             for (Runnable runnable : tasks) {
 97                 taskQueue.add(runnable);
 98             }
 99             taskQueue.notify();
100         }
101     }
102     
103     /**
104      * 销毁线程池
105      */
106     public void destroy(){
107         //还有任务没有执行完
108         while(!taskQueue.isEmpty()){
109             try {
110                 Thread.sleep(10);
111             } catch (InterruptedException e) {
112                 e.printStackTrace();
113             }
114         }
115         //停止工作线程,且置为null
116         for (int i = 0; i < workthreads.length; i++) {
117             workthreads[i].stopWorker();
118             workthreads[i] = null;
119         }
120         threadPool = null;
121         taskQueue.clear();//清空队列
122     }
123     
124     /**
125      * 获取工作线程的个数
126      * @return
127      */
128     public int getWorkThreadNumber(){
129         return worker_num;
130     }
131     
132     /**
133      * 获取已完成任务数量
134      * @return
135      */
136     public int getFinishedTaskNumber(){
137         return finished_task;
138     }
139     
140     /**
141      * 获取未完成任务数量
142      * @return
143      */
144     public int getWaitTaskNumber(){
145         return taskQueue.size();
146     }
147     
148     /**
149      * 获取线程池信息
150      */
151     @Override
152     public String toString() {
153         return "工作线程数量:" + getWorkThreadNumber() 
154                 + ",已完成任务数量" + getFinishedTaskNumber()
155                 + ",未完成任务数量" + getWaitTaskNumber();
156                 
157     }
158 
159     /**
160      * 内部类,工作线程
161      * @author wangbo
162      *
163      */
164     private class WorkThread extends Thread{
165         
166         private boolean isRunning = true;//线程有效标志
167 
168         @Override
169         public void run() {
170             Runnable runnable = null;
171             while (isRunning) {
172                 synchronized (taskQueue) {
173                     //队列为空
174                     while (isRunning && taskQueue.isEmpty()) {
175                         try {
176                             taskQueue.wait(20);
177                         } catch (InterruptedException e) {
178                             e.printStackTrace();
179                         }
180                     }
181                     //队列不为空
182                     if (!taskQueue.isEmpty()) {
183                         runnable = taskQueue.remove(0);//去除任务
184                     }
185                 }
186                 if (runnable != null) {
187                     runnable.run();//执行任务
188                 }
189                 finished_task++;
190                 runnable = null;
191             }
192             
193         }
194         
195         /**
196          * 停止线程
197          */
198         public void stopWorker() {
199             isRunning = false;
200         }
201         
202     }
203 
204 }

测试代码:

 1 package com.wb.thread;
 2 
 3 public class ThreadPoolTest {
 4 
 5      public static void main(String[] args) {  
 6         // 创建3个线程的线程池  
 7         ThreadPool t = ThreadPool.getThreadPool(3);  
 8         t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
 9         t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
10         System.out.println(t);  
11         t.destroy();//所有线程都执行完成才destory  
12         System.out.println(t);  
13     }
14       
15     // 任务类  
16     static class Task implements Runnable {
17         
18         private static volatile int i = 1;
19         
20         @Override
21         public void run() {// 执行任务
22             System.out.println("任务 " + (i++) + " 完成");
23         }  
24     }  
25 
26 }

2、java类库中提供的线程池简介:

java.util.concurrent包提供了现成的线程池的实现。

技术分享技术分享

示例代码:

 1 package com.wb.thread;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 /**
 6  * newCachedThreadPool()
 7  * 线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
 8  * 有任务才会创建线程,空闲线程会被保留60s
 9  * @author wangbo
10  *
11  */
12 public class ThreadPoolExecutorTest1 {
13     
14     public static void main(String[] args) {
15         ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
16         for (int i = 0; i < 10; i++) {
17             final int index = i;
18             try {
19                 Thread.sleep(1000);
20             } catch (InterruptedException e) {
21                 e.printStackTrace();
22             }
23             cachedThreadPool.execute(new Runnable() {
24                 @Override
25                 public void run() {
26                     System.out.println(index);
27                     System.out.println(Thread.currentThread().getName());
28                 }
29             });
30         }
31     }
32 
33 }
 1 package com.wb.thread;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 /**
 6  * newFixedThreadPool(int nThreads)
 7  * 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
 8  * 线程池中包含固定数目的线程,空闲线程会一直保留,参数nThreads表示设定线程池中线程的数目
 9  * @author wangbo
10  *
11  */
12 public class ThreadPoolExecutorTest2 {
13     
14     public static void main(String[] args) {
15         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
16         for (int i = 0; i < 10; i++) {
17             final int index = i;
18             fixedThreadPool.execute(new Runnable() {
19                 @Override
20                 public void run() {
21                     try {
22                         System.out.println(index);
23                         System.out.println(Thread.currentThread().getName());
24                         Thread.sleep(2000);
25                     } catch (InterruptedException e) {
26                         e.printStackTrace();
27                     }
28                 }
29             });
30         }
31     }
32 
33 }
 1 package com.wb.thread;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.ScheduledExecutorService;
 7 import java.util.concurrent.TimeUnit;
 8 /**
 9  * newScheduledThreadPool(int corePoolSize)
10  * 线程池能按时间计划来执行任务,允许用户设定计划执行任务的时间。
11  * 参数corePoolSize设定线程池中线程最小数目,当任务较多时,线程池可能会创建更多的工作线程来执行任务。
12  * @author wangbo
13  *
14  */
15 public class ThreadPoolExecutorTest3 {
16     
17     public static void main(String[] args) {
18         
19         method1();
20         method2();
21         
22     }
23     
24     /**
25      * 延迟3s执行
26      */
27     private static void method1(){
28         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
29         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
30         scheduledThreadPool.schedule(new Runnable() {
31             public void run() {
32                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
33                 System.out.println("延迟2s执行");
34             }
35         }, 2, TimeUnit.SECONDS);
36     }
37     
38     /**
39      * 延迟2s执行后每3s执行一次
40      */
41     private static void method2() {
42         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
43         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
44         scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
45             public void run() {
46                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
47                 System.out.println("延迟2s执行后每3s执行一次");
48                 System.out.println(Thread.currentThread().getName());
49             }
50         }, 2, 3, TimeUnit.SECONDS);
51     }
52 
53 }
 1 package com.wb.thread;
 2 
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 /**
 6  * newSingleThreadExecutor(int nThreads)
 7  * 线程池中只有一个线程,它依次执行每个任务。
 8  * 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
 9  * @author wangbo
10  *
11  */
12 public class ThreadPoolExecutorTest4 {
13     
14     public static void main(String[] args) {
15         ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
16         for (int i = 0; i < 10; i++) {
17             final int index = i;
18             singleThreadPool.execute(new Runnable() {
19                 @Override
20                 public void run() {
21                     try {
22                         System.out.println(index);
23                         System.out.println(Thread.currentThread().getName());
24                         Thread.sleep(2000);
25                     } catch (InterruptedException e) {
26                         e.printStackTrace();
27                     }
28                 }
29             });
30         }
31     }
32 
33 }
 1 package com.wb.thread;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.ScheduledExecutorService;
 7 import java.util.concurrent.TimeUnit;
 8 /**
 9  * newSingleThreadScheduledExecutor()
10  * 线程池中只有一个线程,它能按照时间计划执行每个任务。
11  * @author wangbo
12  *
13  */
14 public class ThreadPoolExecutorTest5 {
15     
16     public static void main(String[] args) {
17         
18         method1();
19         method2();
20         
21     }
22     
23     /**
24      * 延迟3s执行
25      */
26     private static void method1(){
27         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
28         ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
29         scheduledThreadPool.schedule(new Runnable() {
30             public void run() {
31                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
32                 System.out.println("延迟2s执行");
33             }
34         }, 2, TimeUnit.SECONDS);
35     }
36     
37     /**
38      * 延迟2s执行后每3s执行一次
39      */
40     private static void method2() {
41         System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
42         ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
43         scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
44             public void run() {
45                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
46                 System.out.println("延迟2s执行后每3s执行一次");
47                 System.out.println(Thread.currentThread().getName());
48             }
49         }, 2, 3, TimeUnit.SECONDS);
50     }
51 
52 }

 



以上是关于线程池的原理及实现的主要内容,如果未能解决你的问题,请参考以下文章

线程池的原理及实现

线程池的原理及实现

线程池的原理及实现

线程池的原理及实现

java多线程总结五:线程池的原理及实现

Java线程池的使用及工作原理