源码分析——线程池创建原理分析
Posted 喵喵7781
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析——线程池创建原理分析相关的知识,希望对你有一定的参考价值。
目录
Java review--线程池 我的这篇文章主要讲了为什么使用线程池,线程池包括哪些类型,还有简单的demo。接下来,我将分享的是线程池四种实现方式,对应的源码解析。源码是个非常好的东西,尤其是jdk的注释,真的让我感到震撼。第一,通俗易懂。第二,不用拾人牙慧,能看到最真实的东西。
线程池创建源码
1.newFixedThreadPool
创建一个重用固定数量线程的线程池,在共享的无界队列中运行。在任何时候,最多@code nThreads线程将被激活,共同处理任务。如果在所有线程都处于激活状态时提交了其他任务,他们将在队列中等待,直到线程可用。如果任何线程在执行期间因故障而终止,在shutdown之前,如果需要,新的线程将取代旧的线程执行后续任务。池中的线程将存在直到@link ExecutorServiceshutdown shutdown。
@param nThread池中的线程数
@return新创建的线程池
@throws IllegalArgumentException if @code nThreads <= 0
ThreadFactory:这个接口用来按照需要创建线程。通过线程工厂移除强制执行new Thread。确保应用采用独特的线程子类,优先级等。
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* @code nThreads threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly @link ExecutorService#shutdown shutdown.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if @code nThreads <= 0
*/
public static ExecutorService newFixedThreadPool(int nThreads)
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed.
* 同上
* @param threadFactory the factory to use when creating new threads
* @throws NullPointerException if threadFactory is null
* 同上
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
/**
* An object that creates new threads on demand. Using thread factories
* removes hardwiring of calls to @link Thread#Thread(Runnable) new Thread,
* enabling applications to use special thread subclasses, priorities, etc.
*
* <p>
* The simplest implementation of this interface is just:
* <pre> @code
* class SimpleThreadFactory implements ThreadFactory
* public Thread newThread(Runnable r)
* return new Thread(r);
*
* </pre>
*
* The @link Executors#defaultThreadFactory method provides a more
* useful simple implementation, that sets the created thread context
* to known values before returning it.
* @since 1.5
* @author Doug Lea
*/
public interface ThreadFactory
/**
* Constructs a new @code Thread. Implementations may also initialize
* priority, name, daemon status, @code ThreadGroup, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or @code null if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
ThreadPoolExecutor:
使用给定的初始值创建一个新的@code ThreadPoolExecutor参数和默认线程工厂和被拒绝的执行处理程序。使用@link Executors工厂替代通用构造函数。
* @param corePoolSize 在池中保留的线程数,即使它们处于空闲状态,除非设置了@code allowCoreThreadTimeOut
* @param maximumPoolSize 线程池中允许的最大线程数
* @param keepAliveTime 当线程数量大于在执行的线程数量时,这是多余空闲线程等待新线程的最大允许存活时间。
* @param unit @code keepAliveTime参数的时间单位
* @param workQueue 该队列用于在任务执行之前保存任务。此队列仅包含@code Runnable @code execute方法提交的任务。
* @throws IllegalArgumentException如果下列条件之一成立:
@code corePoolSize <0
@code keepAliveTime <0 <br>
@code maximumPoolSize <= 0 <br>
@code maximumPoolSize <corePoolSize
如果@code workQueue为null,则@throws NullPointerException
/**
* Creates a new @code ThreadPoolExecutor with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the @link Executors factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless @code allowCoreThreadTimeOut is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the @code keepAliveTime argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the @code Runnable
* tasks submitted by the @code execute method.
* @throws IllegalArgumentException if one of the following holds:<br>
* @code corePoolSize < 0<br>
* @code keepAliveTime < 0<br>
* @code maximumPoolSize <= 0<br>
* @code maximumPoolSize < corePoolSize
* @throws NullPointerException if @code workQueue is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
线程池对线程的创建,归根还是要到Thread的构造和创建。
翻译:
分配一个新的@code Thread对象,使其具有@code target作为其运行对象,具有指定的@code name作为其名称,并且属于@code group引用的线程组,并且具有指定的<i>stack size</ i>。
此构造函数与@link * #Thread(ThreadGroup,Runnable,String)相同,但它允许指定线程堆栈大小。堆栈大小是虚拟机为该线程分配的的近似字节数。 @code stackSize参数与高度平台相关。
在某些平台上,为。指定特别大的值 @code stackSize参数可以允许线程实现更大的递归深度,抛出@link StackOverflowError。同样,指定较低的值可能允许更多的线程同时存在,而不抛出@linkOutOfMemoryError(或其他内部错误)。<tt> stackSize </ tt>参数值体现了和最大递归深度和并发级别和平台依赖之间的关系。 在某些平台上, @code stackSize参数可能没有任何效果。
虚拟机可以免费处理@code stackSize参数作为建议。如果指定的值不合理地低对于平台,虚拟机可能会使用一些平台特定的最小值;如果指定的值太大不合理,虚拟机可能会使用某些特定于平台的最大值。同样,虚拟机可以自由舍入指定的按其认为合适的价值向高或向低调节(或完全忽略它)。
为@code stackSize参数指定零值导致此构造函数的行为与 @code Thread(ThreadGroup,Runnable,String)构造函数类似。
由于此行为的平台依赖性构造函数,在使用时应特别小心。执行给定计算所需的线程堆栈大小可能因JRE实施而异。有鉴于此变化,可能需要仔细调整堆栈大小参数,并且可能需要为每个JRE实现重复调整应用程序将运行。
实施说明:鼓励Java平台实施者记录他们的根据 @code stackSize参数调整后的行为。
* @param group
*线程组。如果@code null并且存在安全性管理,该小组取决于@linkplain * SecurityManager#getThreadGroup SecurityManager.getThreadGroup()
*如果没有安全管理或@code SecurityManager.getThreadGroup()返回组@code null。这个组设置为当前线程的线程组。
* @param target
*此线程启动的时候调用@code run方法的对象。如果@code null,则调用此线程的run方法。
* @param name
*新线程的名称
* @param stackSize
*新线程所需的堆栈大小,或者设置为零该参数将被忽略。
* @throws SecurityException
*如果当前线程无法在指定的线程组创建线程
/**
* Allocates a new @code Thread object so that it has @code target
* as its run object, has the specified @code name as its name,
* and belongs to the thread group referred to by @code group, and has
* the specified <i>stack size</i>.
*
* <p>This constructor is identical to @link
* #Thread(ThreadGroup,Runnable,String) with the exception of the fact
* that it allows the thread stack size to be specified. The stack size
* is the approximate number of bytes of address space that the virtual
* machine is to allocate for this thread's stack. <b>The effect of the
* @code stackSize parameter, if any, is highly platform dependent.</b>
*
* <p>On some platforms, specifying a higher value for the
* @code stackSize parameter may allow a thread to achieve greater
* recursion depth before throwing a @link StackOverflowError.
* Similarly, specifying a lower value may allow a greater number of
* threads to exist concurrently without throwing an @link
* OutOfMemoryError (or other internal error). The details of
* the relationship between the value of the <tt>stackSize</tt> parameter
* and the maximum recursion depth and concurrency level are
* platform-dependent. <b>On some platforms, the value of the
* @code stackSize parameter may have no effect whatsoever.</b>
*
* <p>The virtual machine is free to treat the @code stackSize
* parameter as a suggestion. If the specified value is unreasonably low
* for the platform, the virtual machine may instead use some
* platform-specific minimum value; if the specified value is unreasonably
* high, the virtual machine may instead use some platform-specific
* maximum. Likewise, the virtual machine is free to round the specified
* value up or down as it sees fit (or to ignore it completely).
*
* <p>Specifying a value of zero for the @code stackSize parameter will
* cause this constructor to behave exactly like the
* @code Thread(ThreadGroup, Runnable, String) constructor.
*
* <p><i>Due to the platform-dependent nature of the behavior of this
* constructor, extreme care should be exercised in its use.
* The thread stack size necessary to perform a given computation will
* likely vary from one JRE implementation to another. In light of this
* variation, careful tuning of the stack size parameter may be required,
* and the tuning may need to be repeated for each JRE implementation on
* which an application is to run.</i>
*
* <p>Implementation note: Java platform implementers are encouraged to
* document their implementation's behavior with respect to the
* @code stackSize parameter.
*
*
* @param group
* the thread group. If @code null and there is a security
* manager, the group is determined by @linkplain
* SecurityManager#getThreadGroup SecurityManager.getThreadGroup().
* If there is not a security manager or @code
* SecurityManager.getThreadGroup() returns @code null, the group
* is set to the current thread's thread group.
*
* @param target
* the object whose @code run method is invoked when this thread
* is started. If @code null, this thread's run method is invoked.
*
* @param name
* the name of the new thread
*
* @param stackSize
* the desired stack size for the new thread, or zero to indicate
* that this parameter is to be ignored.
*
* @throws SecurityException
* if the current thread cannot create a thread in the specified
* thread group
*
* @since 1.4
*/
public Thread(ThreadGroup group, Runnable target, String name,
long stackSize)
init(group, target, name, stackSize);
总结一下,创建线程最复杂的情况需要如下几个参数:线程组,线程目标对象,线程明,栈大小。之前做多线程项目的时候,重写过ThreadFactory,如下供大家参考。
static private class SDKDefaultThreadFactory implements ThreadFactory
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
SDKDefaultThreadFactory()
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-sdk-batchDownload-thread-";
public Thread newThread(Runnable r)
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
2.newCachedThreadPool
创建一个根据需要创建新线程数的线程池,但是可重用以前构造好的线程。这些池通常会提高许多短期异步任务的程序的性能。调用@code execute将重用以前构建的线程(如果有)。如果没有可用的现有线程,则为新的线程将被创建并将其添加到池中。有线程的未使用60秒被终止并从中删除缓存。因此,一个长时间闲置的线程池将会不消耗任何资源。请注意,线程池具有类似属性,但不同的实现细节(例如,超时参数)可以使用@link ThreadPoolExecutor构造函数创建。
@return新创建的线程池
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to @code execute will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using @link ThreadPoolExecutor constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool()
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
3.newSingleThreadExecutor
创建一个使用单个工作线程操作的Executor关闭无限队列。 (但请注意,如果单身线程在执行之前由于执行失败而终止关闭,如果需要执行,新的将取代它进行后续任务。)保证任务执行顺序,任何一个任务都不会超过一个给定时间。不同于其他等同线程池@code newFixedThreadPool(1)返回的executor是保证不可重新配置以使用其他线程。
@return新创建的单线程Executor
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* @code newFixedThreadPool(1) the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor()
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
4.newScheduledThreadPool
创建一个线程池,可以定时命令在a之后运行给予延迟,或定期执行。
@param corePoolSize要保留在池中的线程数,即使它们处于空闲状态
@return新创建的预定线程池
@throws IllegalArgumentException如果@code corePoolSize <0
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if @code corePoolSize < 0
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
return new ScheduledThreadPoolExecutor(corePoolSize);
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if @code corePoolSize < 0
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory)
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
/**
* Creates a new @code ScheduledThreadPoolExecutor with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless @code allowCoreThreadTimeOut is set
* @throws IllegalArgumentException if @code corePoolSize < 0
*/
public ScheduledThreadPoolExecutor(int corePoolSize)
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
/**
* Creates a new @code ScheduledThreadPoolExecutor with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless @code allowCoreThreadTimeOut is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if @code corePoolSize < 0
* @throws NullPointerException if @code threadFactory is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory)
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
因为DelayedWorkQueue的解释很长,所以该静态内部类的注释只截取了部分。
专门的延迟队列。即使它只能保存RunnableScheduledFutures,这个类必须必须声明为BlockingQueue <Runnable>
以用于与TPE声明进行啮合。
DelayedWorkQueue 是基于堆的数据结构,类似于那些 DelayQueue和PriorityQueue,除了每个ScheduledFutureTask还将其索引记录到堆数组中。这消除了在取消时找到任务的需求,大大加快了删除速度(从O到O) (n)TO O(log n)),并减少垃圾保留,否则将发生等待元素在清除之前上升到顶部。但是因为队列也可能保存不是ScheduledFutureTasks的RunnableScheduledFutures,我们不保证有这些索引可用,在这些case中我们回退到LINEAR搜索。(我们希望大多数任务不会被装饰,并且更快的case会更常见。)所有堆操作必须记录INDEX变化 - 主要是在siftUp和siftDown.Upon删除,一个任务的
heapIndex设置为-1。请注意,ScheduledFutureTasks可以在队列中最多出现一次(对于其他类型的任务或工作队列,可能不是正确的),所以是独一无二的由heapIndex标识。
/**
* Specialized delay queue. To mesh with TPE declarations, this
* class must be declared as a BlockingQueue<Runnable> even though
* it can only hold RunnableScheduledFutures.
*/
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable>
/*
* A DelayedWorkQueue is based on a heap-based data structure
* like those in DelayQueue and PriorityQueue, except that
* every ScheduledFutureTask also records its index into the
* heap array. This eliminates the need to find a task upon
* cancellation, greatly speeding up removal (down from O(n)
* to O(log n)), and reducing garbage retention that would
* otherwise occur by waiting for the element to rise to top
* before clearing. But because the queue may also hold
* RunnableScheduledFutures that are not ScheduledFutureTasks,
* we are not guaranteed to have such indices available, in
* which case we fall back to linear search. (We expect that
* most tasks will not be decorated, and that the faster cases
* will be much more common.)
*
* All heap operations must record index changes -- mainly
* within siftUp and siftDown. Upon removal, a task's
* heapIndex is set to -1. Note that ScheduledFutureTasks can
* appear at most once in the queue (this need not be true for
* other kinds of tasks or work queues), so are uniquely
* identified by heapIndex.
*/
5.四种线程池创建对比
挨个分析之后,对比四种线程池创建方式:
相同点:都使用了ThreadPoolExecutor进行创建。参数为核心线程数,最大线程个数,线程存活时间,时间单位,线程队列,线程工厂。
不同点:参数内容不一样。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
public static ExecutorService newSingleThreadExecutor()
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory)
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
public ScheduledThreadPoolExecutor(int corePoolSize)
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
线程池类型 | corePoolSize | maximumPoolSize | keepAliveTime | unit | workQueue | threadFactory |
---|---|---|---|---|---|---|
newFixedThreadPool | n | n | 0 | 毫秒 | LinkedBlockingQueue | 可选 |
newCachedThreadPool | 0 | 2的31次方-1 | 60 | 秒 | SynchronousQueue | 可选 |
newSingleThreadExecutor | 1 | 1 | 0 | 毫秒 | LinkedBlockingQueue | 无 |
newScheduledThreadPool | n | 2的31次方-1 | 0 | 微秒 | DelayedWorkQueue | 可选 |
队列名称 | LinkedBlockingQueue | SynchronousQueue | DelayedWorkQueue |
---|---|---|---|
概念(by jdk文档) | 一个基于已链接节点的、范围任意的 阻塞队列。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。 可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 此类及其迭代器实现 | 一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。 同步队列类似于 CSP 和 Ada 中使用的 rendezvous 信道。它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。 对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。 此类及其迭代器实现 | 静态的内部类,在jdk上面没有找到 详情参考4的翻译 |
以上是关于源码分析——线程池创建原理分析的主要内容,如果未能解决你的问题,请参考以下文章