线程池简单实现

Posted insaneXs

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池简单实现相关的知识,希望对你有一定的参考价值。

实现了一个简化版的线程池。

实现线程池的关键有两个:一是阻塞队列,用于任务的存取,二是内部的线程对象如何持续性的执行任务,并在空闲时被回收。

线程池代码:

  1 package learnConcurrent;
  2 
  3 import java.util.ArrayList;
  4 import java.util.Collection;
  5 import java.util.LinkedList;
  6 import java.util.List;
  7 import java.util.concurrent.ArrayBlockingQueue;
  8 import java.util.concurrent.BlockingQueue;
  9 import java.util.concurrent.Callable;
 10 import java.util.concurrent.ExecutionException;
 11 import java.util.concurrent.ExecutorService;
 12 import java.util.concurrent.Future;
 13 import java.util.concurrent.TimeUnit;
 14 import java.util.concurrent.TimeoutException;
 15 import java.util.concurrent.atomic.AtomicBoolean;
 16 import java.util.concurrent.atomic.AtomicInteger;
 17 import java.util.concurrent.locks.ReentrantLock;
 18 
 19 public class MyThreadPool implements ExecutorService{
 20     //线程队列
 21     private List<Worker> workers;
 22     //任务队列
 23     private BlockingQueue<Runnable> rQueue;
 24     //线程池核心大小
 25     private int corePoolSize;
 26     //线程池最大大小
 27     private int maxPoolSize;
 28     //空闲线程最长存活时间
 29     private int keepAliveTime = 5;
 30 
 31     private static final int ALIVE = 0;
 32     
 33     private static final int SHUTDOMN = 1;
 34     
 35     private AtomicInteger state = new AtomicInteger(ALIVE);
 36     
 37     private ReentrantLock lock = new ReentrantLock();
 38     
 39     public MyThreadPool(int corePoolSize, int maxPoolSize){
 40         this.corePoolSize = corePoolSize;
 41         this.maxPoolSize = maxPoolSize;
 42         
 43         this.workers = new LinkedList<Worker>();
 44         //阻塞队列,最大容量为maxPoolSize
 45         this.rQueue = new ArrayBlockingQueue<Runnable>(maxPoolSize, true);
 46     }
 47     
 48     @Override
 49     public void execute(Runnable command) {
 50         //FIXME size在获取时和判断时 可能发生改变
 51         lock.lock();
 52         int size = workers.size();
 53         if(size < corePoolSize){//当线程池线程数小于核心数量时,增加线程
 54             addWorker();
 55         }else if(size < maxPoolSize && !rQueue.isEmpty()){//当线程大于核心数量且任务队列中任务排队时,增加线程
 56             addWorker();
 57         }
 58         lock.unlock();
 59         
 60         if(!isShutdown()){
 61             rQueue.offer(command);
 62         }
 63     }
 64     
 65     @Override
 66     public void shutdown() {
 67         //关闭线程池的简单实现,设置状态让任务队列不在接受任务,线程也会因为超时被回收
 68         //缺点时空闲的线程资源得不到立即释放
 69         state.set(SHUTDOMN);
 70     }
 71     
 72     /**
 73      * 立即停止线程池,试图停止正在活动的线程,返回还在等待的任务列表
 74      */
 75     @Override
 76     public List<Runnable> shutdownNow() {
 77         if(isShutdown())
 78             return null;
 79         state.set(SHUTDOMN);
 80         lock.lock();
 81         List<Runnable> restRunnable = new ArrayList<Runnable>();
 82         while(!rQueue.isEmpty()){
 83             restRunnable.add(rQueue.poll());
 84         }
 85         for(Worker w : workers){
 86             w.interrupt();
 87         }
 88         lock.unlock();
 89         return restRunnable;
 90     }
 91 
 92     @Override
 93     public boolean isShutdown() {
 94         return state.get() == ALIVE;
 95     }
 96 
 97     @Override
 98     public boolean isTerminated() {
 99         return isShutdown() && rQueue.isEmpty();
100     }
101 
102     @Override
103     public boolean awaitTermination(long timeout, TimeUnit unit)
104             throws InterruptedException {
105         // TODO Auto-generated method stub
106         return false;
107     }
108 
109     @Override
110     public <T> Future<T> submit(Callable<T> task) {
111         // TODO Auto-generated method stub
112         return null;
113     }
114 
115     @Override
116     public <T> Future<T> submit(Runnable task, T result) {
117         // TODO Auto-generated method stub
118         return null;
119     }
120 
121     @Override
122     public Future<?> submit(Runnable task) {
123         return null;
124     }
125 
126     @Override
127     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
128             throws InterruptedException {
129         return null;
130     }
131 
132     @Override
133     public <T> List<Future<T>> invokeAll(
134             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
135             throws InterruptedException {
136         return null;
137     }
138 
139     @Override
140     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
141             throws InterruptedException, ExecutionException {
142         return null;
143     }
144 
145     @Override
146     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
147             long timeout, TimeUnit unit) throws InterruptedException,
148             ExecutionException, TimeoutException {
149         return null;
150     }
151     
152     private Runnable getTask(){
153         Runnable r = null;
154         try {
155             r = rQueue.poll(keepAliveTime, TimeUnit.SECONDS);
156         } catch (InterruptedException e) {
157             // TODO Auto-generated catch block
158             e.printStackTrace();
159         }
160         return r;
161     }
162     
163     private void addWorker(){
164         Worker w = new Worker();
165         w.start();
166         lock.lock();
167         workers.add(w);
168         lock.unlock();
169     }
170     
171     private void removeWorker(Worker w){
172         lock.lock();
173         workers.remove(w);
174         lock.unlock();
175     }
176     
177     class Worker extends Thread{
178         
179         private AtomicBoolean isAlive = new AtomicBoolean(true);
180         
181         private Runnable task;
182         
183         
184         @Override
185         public void run() {
186             while(isAlive.get()){
187                 //阻塞一定时间,超时则回收该线程
188                 task = getTask();
189                 if(task != null){
190                     task.run();
191                 }else{
192                     isAlive.set(false);
193                     
194                 }
195                 task = null;
196             }
197             System.out.println("remove worker");
198             removeWorker(this);
199         }
200         
201     }
202     
203     
204 }

测试代码:

 1 package learnConcurrent;
 2 
 3 
 4 public class ThreadPoolTest {
 5     static int taskNo = 0;
 6     public static void main(String[] args) throws InterruptedException {
 7         MyThreadPool pool = new MyThreadPool(2, 5);
 8         
 9         for(int i=0; i< 50; i++){
10             Task task = new Task(taskNo++);
11             pool.execute(task);
12             Thread.sleep((int)(Math.random() * 1000));
13         }
14         
15     }
16     
17 }
18 
19 class Task implements Runnable{
20     String str;
21     public Task(int taskNo){
22         str = "TaskNo:" + taskNo;
23     }
24     @Override
25     public void run() {
26         System.out.println(str + " start work ");
27         //DO SOMETHING
28         try {
29             Thread.sleep((int)(Math.random() * 1000));
30         } catch (InterruptedException e) {
31             // TODO Auto-generated catch block
32             e.printStackTrace();
33         }
34         
35         System.out.println(str + " done ");
36     }
37     
38 }

虽然是继承了ExecutorService对象,但是只实现了几个接口,设计上也可能有未考虑到的问题。

测试代码也很简陋,仅供参考。

以上是关于线程池简单实现的主要内容,如果未能解决你的问题,请参考以下文章

Java——线程池

Java线程池详解

Java线程池详解

Java 线程池详解

Linux简单线程池实现(带源码)

Motan在服务provider端用于处理request的线程池