线程池之ThreadPoolExecutor
Posted 小LUA
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池之ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。
所属包:
java.util.concurrent.ThreadPoolExecutor
类关系:
public class ThreadPoolExecutor extends AbstractExecutorService
1. 继承关系
ThreadPoolExecutor 继承了一个抽象类:AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService
而这个AbstractExecutorService实现了一个接口:ExecutorService
public interface ExecutorService extends Executor
这个ExecutorService接口又继承了一个类:Executor
public interface Executor
可以看出:
Executor是一个顶层接口,它的子接口ExecutorService继承了它(其实还有一个子接口: ScheduledExecutorService),抽象类AbstractExecutorService实现了这个子接口ExecutorService,最终ThreadPoolExecutor 继承了抽象类AbstractExecutorService并且同时实现了子接口ExecutorService。
2. 构造方法
最简单的一个构造方法:
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory and rejected execution handler. * It may be more convenient to use one of the {@link Executors} factory * methods instead of this general purpose constructor. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
有五个参数:
corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut maximumPoolSize - 池中允许的最大线程数 keepAliveTime - 当线程数大于核心时,这是多余的空闲线程在终止之前等待新任务的最大时间。 unit - keepAliveTime参数的时间单位 workQueue - 在执行任务之前用于保存任务的队列。 该队列将仅保存execute方法提交的Runnable任务
实际上它调用了同类的另外一个“全能”构造方法,通过 this() 的形式,最后两个参数用的默认值
另外一个构造方法:
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
参数:
corePoolSize - 即使空闲时仍保留在池中的线程数,除非设置 allowCoreThreadTimeOut maximumPoolSize - 池中允许的最大线程数 keepAliveTime - 当线程数大于内核时,这是多余的空闲线程在终止前等待新任务的最大时间。 unit - keepAliveTime参数的时间单位 workQueue - 用于在执行任务之前使用的队列。 这个队列将仅保存execute方法提交的Runnable任务。 threadFactory - 执行程序创建新线程时使用的工厂 handler - 执行被阻止时使用的处理程序,因为达到线程限制和队列容量
3. 如何使用
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MyThreadPoolExecutor { /** * 使用有界队列: * 1、当线程数小于corePoolSize时,创建线程执行任务。 * 2、当线程数大于等于corePoolSize并且workQueue没有满时,放入workQueue中 * 3、线程数大于等于corePoolSize并且当workQueue满时,新任务新建线程运行,线程总数要小于maximumPoolSize * 4、当线程总数等于maximumPoolSize并且workQueue满了的时候执行handler的rejectedExecution。也就是拒绝策略。 * * ThreadPoolExecutor默认有四个拒绝策略: * 1、ThreadPoolExecutor.AbortPolicy() 直接抛出异常RejectedExecutionException * 2、ThreadPoolExecutor.CallerRunsPolicy() 直接调用run方法并且阻塞执行 * 3、ThreadPoolExecutor.DiscardPolicy() 直接丢弃后来的任务 * 4、ThreadPoolExecutor.DiscardOldestPolicy() 丢弃在队列中队首的任务 * * * */ public static void main(String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, 2, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3)); Runnable[] runs = new Runnable[6]; for (int i = 0; i < runs.length; i++) { runs[i] = new MyTask(i); } pool.execute(runs[0]); //线程1个,队列0个 pool.execute(runs[1]); //线程1个,队列1个 pool.execute(runs[2]); //线程1个,队列2个 pool.execute(runs[3]); //线程1个,队列3个 pool.execute(runs[4]); //线程2个,队列3个 pool.execute(runs[5]); //线程2个,队列3个,拒绝第六个 pool.shutdown(); } }
线程:
public class MyTask implements Runnable { private int id; public MyTask(int id) { this.id = id; } @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task-" + id); } }
这个就是最简单的一个使用方法了。ps:最后那个(线程1个,队列0个....)指的是,你仅仅执行runs[0];runs[0]+runs[1];runs[0]+runs[1]+runs[2];....的时候,任务被放到哪里。
但是推荐使用这种方式创建线程池:
package cn.ying.thread.pool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MyTest { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { pool.execute(new MyTask(i)); } pool.shutdown(); } public void note(){ Executors.newCachedThreadPool(); //无界线程池,可以进行自动线程回收 // public static ExecutorService newCachedThreadPool() { // return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 60L, TimeUnit.SECONDS, // new SynchronousQueue<Runnable>());//SynchronousQueue:长度为1的队列 // } Executors.newFixedThreadPool(10); //创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。 // public static ExecutorService newFixedThreadPool(int nThreads) { // return new ThreadPoolExecutor(nThreads, nThreads, // 0L, TimeUnit.MILLISECONDS, // new LinkedBlockingQueue<Runnable>()); // } Executors.newScheduledThreadPool(10); //创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行 // public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { // return new ScheduledThreadPoolExecutor(corePoolSize); // } // public ScheduledThreadPoolExecutor(int corePoolSize) { // super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, // new DelayedWorkQueue()); // } Executors.newSingleThreadExecutor(); //创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程 // public static ExecutorService newSingleThreadExecutor() { // return new FinalizableDelegatedExecutorService // (new ThreadPoolExecutor(1, 1, // 0L, TimeUnit.MILLISECONDS, // new LinkedBlockingQueue<Runnable>())); // } } }
main方法里面就是相关代码,而note方法则是简单介绍几种方法,注释部分是源码。
就先写这么多吧,以后再补充
以上是关于线程池之ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章