ThreadPoolExecutor 线程池
Posted xieyanke
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor 线程池相关的知识,希望对你有一定的参考价值。
用途:
用于自动化管理线程, 开发人员只需要关注业务实现, 无需关注线程的管理, 降低开发要求
方法解释:
Executor //执行任务(若执行线程有任务 则进入任务队列等待工作线程拉取) 无返回值 void execute(Runnable command); ExecutorService //关闭线程池 不再接收新的任务 但是等待队列中的任务仍会执行完 void shutdown(); //关闭线程池 不再接收新的任务 并且等待队列中的任务不会被执行 List<Runnable> shutdownNow(); //返回线程池状态是否关闭 boolean isShutdown(); //返回线程是是否结束运行 只有在执行shutdown和shutdownNow 之后,并且已经执行结束才会返回true boolean isTerminated(); //等待执行完结 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; //提交任务 类似execute() 区别是有返回结果对象 <T> Future<T> submit(Callable<T> task); //提交任务 若任务执行完成 返回当前给定的result <T> Future<T> submit(Runnable task, T result); //提交任务 基本和execute()方法一样, 虽然有返回Future<?>, 但实际是Future<Void>,无返回值 Future<?> submit(Runnable task); //批量提交任务 并且所有任务依旧会进入任务等候队列 在此批所有任务执行完成之后返回所有结果 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; //批量提交任务 并且所有任务依旧会进入任务等候队列 但是设有时间限制 如果超过给定时间仍有未执行完成的任务 则直接返回所有结果(此时可能会有些Future没有结果) <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException; //批量提交任务 并且所有任务依旧会进入任务等候队列 只要有一个执行完成则立即返回执行完成的那个任务结果 <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException; //批量提交任务 并且所有任务依旧会进入任务等候队列 只要有一个执行完成则立即返回执行完成的那个任务结果 若无任何返回则抛出超时异常 <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
使用举例:
execute(Runnable command)
public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(1, 10,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); for (int i=1; i<5; i++){ int finalI = i; executorService.submit(()->{ System.out.println("打印"+finalI); }); } } 结果: 打印1 打印2 打印3 打印4
submit(Callable<T> task)
//提交任务 类似execute() 区别是有返回结果对象 public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(1, 10,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); List<Future<Integer>> futureList = Lists.newArrayList(); for (int i=1; i<5; i++){ int finalI = i; Future<Integer> future = executorService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { Thread.sleep(200); return finalI; } }); futureList.add(future); } //等待所有提交的任务都执行完 Thread.sleep(5000); for (Future<Integer> future : futureList){ System.out.println("返回:"+future.get()); } } 结果: 返回:1 返回:2 返回:3 返回:4
shutdown()
public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(1, 10,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); for (int i=1; i<5; i++){ int finalI = i; executorService.submit(()->{ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("打印"+finalI); }); } executorService.shutdown(); System.out.println("线程池发起shutdown请求"); //等待线程池能执行完 Thread.sleep(5000); System.out.println(executorService.isShutdown()); } 结果: 线程池发起shutdown请求 打印1 打印2 打印3 打印4 true
shutdownNow()
public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(1, 10,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); for (int i=1; i<5; i++){ int finalI = i; executorService.submit(()->{ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("打印"+finalI); }); } executorService.shutdownNow(); System.out.println("线程池发起shutdownNow请求"); //等待线程池能执行完 Thread.sleep(5000); System.out.println(executorService.isShutdown()); } 结果: 线程池发起shutdownNow请求 java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at com.moredian.trade.biz.SocketServerTest.lambda$main$0(SocketServerTest.java:138) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 打印1 true
类图:
数据结构图:
投递任务可以是Runnable Callable 下图举Runnable的一个例子,从投递,到进入队列,再到被worker线程消费
工作流程简述:
1 提交任务, exucude、submit、 invokeAll、 invokeAny
2 获取当前工作线程数与核心线程数进行对比, 若核心线程数大于当前工作线程数则添加新的工作线程, 直到核心线程数等于工作线程数
3 工作线程处理完任务后向任务队列获取新的任务执行, 若任务队列没有任务则阻塞当前工作线程(这也是为什么不需要重复创建线程的原因,是否永久阻塞主要看keepAlive),直至新的任务进任务队列
4 随着任务提交, 当等待队列已满时,若满足 条件1: 工作线程都在执行中, 条件2: 最大线程数(maximumPoolSize)大于核心线程 此时开辟新的工作线程, 直至工作线程等于最大线程数
5 当所有工作线程都参与任务处理,任务队列仍满队列时,新的任务将抛弃不执行
6 随着任务的执行, 已经没有新的任务进来, 工作线程中将只保留核心(阻塞等待), 非核心线程将被释放
关键源码分析:
submit(Runnable task)
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } //执行任务 public void execute(Runnable command) { //若执行任务为空 抛出异常 if (command == null) throw new NullPointerException(); //获取当前ctl的值 ctl中是高3位作为状态值,低28位作为线程总数值 //有对应的位运算来获取 一值多用 int c = ctl.get(); //若工作中线程数小于核心线程数 则增加一个工作线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //当前线程池是否处于运行状态 且 将任务投递到任务队列是否成功(如果任务队列限定最大长度 则offer返回fase) if (isRunning(c) && workQueue.offer(command)) { //再次获取ctl的值 int recheck = ctl.get(); //若非线程池非运行中(可能是shutdown了), 将当前任务从等待队列中移除 刚放进去就移除 if (! isRunning(recheck) && remove(command)) //拒绝此任务 reject(command); //若工作线程数为0 增加一个工作线程 //这段代码看起来比较奇怪, 发生在核心线程也能被回收的场景(创建线程池时可以设置) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //增加工作线程失败 拒绝任务 else if (!addWorker(command, false)) reject(command); } //添加一个工作线程 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //获取当前ctl值 int c = ctl.get(); //获取线程池运行状态 int rs = runStateOf(c); //下面这句代码非常难理解对照 !(1==1 && 2==2 && 3==3) == (1!=1 || 2!=2 || 3!=3) //将其改写 if(rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())) //rs> SHUTDOWN表示线程池非运行状态 //总的来说就是如果非运行中 且 (状态不为shutdown 或 当前任务不等于空 或 工作队列为空) if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //当前工作线程数 int wc = workerCountOf(c); //如果工作线程数大于容量 或者 //判断是否为核心线程 1若是核心线程不能大于核心线程总数 2若是非核心线程则不能大于最大线程数 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //工作线程数+1 并且跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //若此时运行状态发生改变 则继续循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //以下代码为 compareAndIncrementWorkerCount(c) = true的前提下 并且ctl值已经发生改变 运行线程数已经加1 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //新增一个工作线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //全局锁 mainLock.lock(); try { //当前运行状态 int rs = runStateOf(ctl.get()); //工作线程集合workers增加一个工作线程 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //工作线程添加成功则启动当前工作线程 if (workerAdded) { //开始从任务队列取任务执行 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
维持线程池中工作线程不释放原理
从新建worker开始 new Worker(firstTask), 下面代码重点关注标红字体, 原理是利用阻塞队列调用 poll和take 方法时, 若取不到值则会阻塞当前前程,直至阻塞队列重新添加数据(实现原理是基于AQS 可以看我之前 ReentrentLock Condition文章)
//线程保持原因 //new Worker(firstTask); Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //重点关注newThread(this) this.thread = getThreadFactory().newThread(this); } //newThread(this) 那么直接找ThreadPoolExecutor的run()方法 public void run() { //往下看 runWorker(this); } //运行worker 其他的不看 只看getTask() final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } //从任务队列中获取任务 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //关键点在这里 workQueue的方法poll和take都是阻塞方法 若取不到值则当前工作线程阻塞 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
总结:
线程池能保持线程持续运行的关键在于阻塞队列的使用, 主要做的是何时阻塞队列 何时把线程从阻塞中释放出来, 亮点是ctl一值多用
以上是关于ThreadPoolExecutor 线程池的主要内容,如果未能解决你的问题,请参考以下文章
高并发多线程基础之ThreadPoolExecutor源代码分析