ThreadPoolExecutor
Posted zheaven
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。
类的结构:
Executor
-ExecutorService
--AbstractExecutorService
---ThreadPoolExecutor
ThreadPoolExecutor七大构造参数:
package com.dwz.executors; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * ThreadPoolExecutor七大构造参数 * 测试情况: * 1.corePoolSize=1,maximumPoolSize=2,BlockingQueue size=1.What happen when submit 3 task? * 2.corePoolSize=1,maximumPoolSize=2,BlockingQueue size=5.What happen when submit 7 task? * 3.corePoolSize=1,maximumPoolSize=2,BlockingQueue size=5.What happen when submit 8 task? */ public class ThreadPoolExecutorBuild { /*ThreadPoolExecutor参数介绍 int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler*/ private static ExecutorService buildThreadPoolExecutor() { ExecutorService executorService = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), r -> { Thread t = new Thread(r); return t; }, new ThreadPoolExecutor.AbortPolicy()) ; System.out.println("====The ThreadPoolExecutor create done."); executorService.execute(() -> sleepSeconds(100)); executorService.execute(() -> sleepSeconds(10)); executorService.execute(() -> sleepSeconds(10)); return executorService; } private static void sleepSeconds(long seconds) { try { System.out.println("* " + Thread.currentThread().getName() + " *"); TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) buildThreadPoolExecutor(); int activeCount = -1; int queueSize = -1; while(true) { if(activeCount != threadPoolExecutor.getActiveCount() || queueSize != threadPoolExecutor.getQueue().size()) { System.out.println(threadPoolExecutor.getActiveCount()); System.out.println(threadPoolExecutor.getCorePoolSize()); System.out.println(threadPoolExecutor.getQueue().size()); System.out.println(threadPoolExecutor.getMaximumPoolSize()); activeCount = threadPoolExecutor.getActiveCount(); queueSize = threadPoolExecutor.getQueue().size(); System.out.println("=================================="); } } } }
ThreadPoolExecutor的关闭
1.使用shutdown()
package com.dwz.executors; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class ThreadPoolExecutorTask { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), r -> { Thread t = new Thread(r); return t; }, new ThreadPoolExecutor.AbortPolicy()) ; IntStream.range(0, 20).boxed().forEach(i -> executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(10); System.out.println(Thread.currentThread().getName() + "[ " + i + " ] finish done."); } catch (InterruptedException e) { e.printStackTrace(); } }) ); /* * shutdown() * 比如总共有20个线程,10个已经开始工作,10个处于空闲状态(idle),执行完shutdown()后 * 10个已经开始工作的会等待执行完退出-->interrupt 10个idle-->20个线程全部退出 */ executorService.shutdown(); System.out.println("======================over======================="); } }
这样执行结果不能保证over最后打印,想实现over最后输出需配合awaitTermination方法使用
优化后的代码
executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.HOURS); System.out.println("======================over=======================");
2.使用shutdownNow()
package com.dwz.executors; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class ThreadPoolExecutorTask { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), r -> { Thread t = new Thread(r); return t; }, new ThreadPoolExecutor.AbortPolicy()) ; IntStream.range(0, 20).boxed().forEach(i -> executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(10); System.out.println(Thread.currentThread().getName() + "[ " + i + " ] finish done."); } catch (InterruptedException e) { e.printStackTrace(); } }) ); /* * shutdownNow() * 共有20个线程,10个已经开始工作,10个在队列中,执行完shutdownNow()后 * 返回10个等待执行的线程,然后立即打断所有的线程,立即退出 */ List<Runnable> runnableList = null; try { runnableList = executorService.shutdownNow(); } catch (Exception e) { e.printStackTrace(); } System.out.println("======================over======================="); System.out.println(runnableList); System.out.println(runnableList.size()); } }
即使使用了shutdown()和shutdownNow(),有时候线程也不能完全停止,可以配合setDaemon(true)来使用
package com.dwz.executors; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class ThreadPoolExecutorLongTimeTask { public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), r -> { Thread t = new Thread(r); //设置为守护线程,随着主线程的结束而结束 t.setDaemon(true); return t; }, new ThreadPoolExecutor.AbortPolicy()) ; IntStream.range(0, 10).boxed().forEach(i -> executorService.submit(() -> { while(true) { TimeUnit.SECONDS.sleep(1); } }) ); executorService.shutdown(); // executorService.shutdownNow(); } }
以上是关于ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章
聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源代码分析
ThreadPoolExecutor().map 与 ThreadPoolExecutor().submit 有何不同?
Java中的线程池——ThreadPoolExecutor源代码分析
JDK1.7中的ThreadPoolExecutor源代码剖析