线程池&任务
Posted 空方块
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池&任务相关的知识,希望对你有一定的参考价值。
1.线程池
1.无限制创建线程的缺点
#1:线程生命周期的开销。线程的创建与关闭不是“免费”的。实际的开销依据不同的平台不一样,而且创建线程是需要时间的,会带来请求的延迟。如果没处理一个请求创建一个线程就会大量消耗计算资源。
#2:资源消耗量。活动线程会消耗系统资源,尤其是内存,当可运行的线程多于可用的处理器数,线程则会空闲。大量的空闲线程占用更多的内存,给垃圾回收器带来压力,而且在竞争CPU资源时,会产生额外的开销。
#3:稳定性。限制可创建线程的数目。限制的数目依不同平台耳钉,同样受到JVM的启动参数、Thread的构造函数中请求的栈大小等因素的影响,以及底层操作系统线程的限制。
1.1.Executor框架
public interface Executor
void execute(Runnable command);
Executor接口虽然简单,但它可以用于异步任务执行,而且支持很多不同类型的任务执行策略。
1.2.执行策略
将任务的提交与任务的执行体进行解耦,价值在于给定任务执行执行策略。
1.3.创建线程池
类库提供了一个灵活的线程池实现和一些有用的预设配置。可以通过调用Executors的某个静态工厂方法来创建线程池:
newFixThreadPool:创建给定长的线程池。每提交一个任务就创建一个线程,直到达到池的最大长度,这是线程池会保持长度不再变化,如果一个线程由于非预期Exception结束,线程池会补充一个新的线程。队列使用无限的LinkedBlockingQueue。
newCachedThreadPool:创建一个可缓存的线程池。如果当前线程池的长度超过处理的需求,则会灵活回收线程;如果线程不足以满足需求,则会灵活增添新的线程,而并不会对池的长度作任何限制。队列使用SynchronousQueue。
newSingleThreadExecutor:创建一个单线程的Executor,如果该线程异常结束,会有另一个取代它。队列使用无限的LinkedBlockingQueue。
newScheduledThreadPool:创建一个定长的线程池,而且支持定时以及周期性的任务执行。
1.4.任务与执行策略的隐性耦合
#1:依赖性任务。任务依赖时序、其他任务的结果与边界效应,隐性为执行策略带来了约束。
#2:采用线程限制的任务。任务线程不安全时,Executor从一个单线程改为一个多线程线程池,就会失去线程安全性。
#3:对响应时间敏感的任务。将一些执行时间较长的任务提交到只有少量线程的线程池,线程池管理的服务响应性会被削弱。
#4:使用ThreadLocal。thread-local值的生命周期被线程的生命周期所限制,在池的某线程中使用才有意义,线程池的线程会发生回收、添加,这时Thread-loca值会一起回收。
NOTE:在线程池中任务依赖于其他任务的执行,此时俩个任务恰好在单线程的线程池中执行,有可能线程会发生无限期的阻塞,等待依赖任务的完成,依赖任务阻塞在等待队列里,这时就发生了线程饥饿死锁。
1.5.定制线程池的大小
线程池合理的长度取决于未来提交的任务类型和所部署系统的特征。
线程池过大,那么线程对稀缺的CPU和内存资源竞争,会导致内存的高使用量,还可能会耗尽资源。
线程池过小,对于可用的资源没有很好利用,会对吞吐量造成损失。:
定义:
Ncpu = CPU的数量
Ucpu = 目标CPU的使用率,[0,1]
W / C = 等待时间 / 计算时间
为了保持处理器达到期望的使用率,最优的池的大小等于:
N = Ncpu * Ucpu * ( 1 + W / C)
Note:
#1:Ncpu = Runtime.getRuntime().availableProcessors();
#2:计算密集型任务,一般分配Ncpu + 1 个线程的线程池来获得最优的利用率(防止某线程发生报错而暂停,刚好有“额外”的线程,来保证CPU周期不会中断工作)。
#3:对于I/O和其他阻塞操作的任务,不是所有的线程都会在所有的时间被调度,因此需要一个更大的池。
1.6.配置ThreadPoolExecutor
1.61.线程的创建与销毁
/**
* Creates a new @code ThreadPoolExecutor with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless @code allowCoreThreadTimeOut is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the @code keepAliveTime argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the @code Runnable
* tasks submitted by the @code execute method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* @code corePoolSize < 0<br>
* @code keepAliveTime < 0<br>
* @code maximumPoolSize <= 0<br>
* @code maximumPoolSize < corePoolSize
* @throws NullPointerException if @code workQueue
* or @code threadFactory or @code handler is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) ...
#1:核心池大小是目标的大小,所有核心线程并非立即开始,而已等到有任务提交的时刻,慢慢补充,除非调用@Code prestartAllCoreThreads()。
#2:池的大小等于核心池的大小,并且直到工作队列充满前,不会创建更多的线程,但不会超过@Code maxinumPoolSize。
#3:如果一个非核心池线程已经闲置时间超过存活时间,它将成为回收的候选名单。
#4:切忌勿设置核心池大小为0为达到所有工作线程都最终会销毁的目的。如果工作队列不是使用SynchronousQueue,会引起任务不会开始,因为工作队列没有充满,线程池不会创建新的线程去处理任务,可以设置@Code allowCoreThreadTimeOut()来使核心池的线程也加入超时销毁的范围内。
1.62.队列任务的管理
ThreadPoolExecutor允许提供BlockingQueue来持有等待执行的任务。任务队列有3种基本方法:无限队列、有限队列和同步移交。
#1:newFixThreadPool和newSingleThreadExecutor都是默认使用一个无限的LinkedBlockingQueue。任务提交速度超过线程池的处理速度,队列会无限制增加。
#2:稳妥的资源管理可使用有限队列,如ArrayBlockingQueue和有限的LinkedBlockingQueue以及PriorityBlockingQueue。对于一个有界队列,队列的长度和池的大小要一起调节。
#3:庞大或者无限的池,可使用SynchronousQueue,完全绕开队列,直接提交到工作者线程。newCachedThreadPool就使用了SynchronousQueue。
#4:LinkedBlockingQueue和ArrayBlockingQueue都是FIFO队列,任务执行顺序跟达到顺序保持一致,对执行顺序进行控制可以使用PriorityBlockingQueue。
#5:如果任务彼此独立,有限的线程池比较合理。如果任务存在相互依赖,为防止线程饥饿死锁,可使用无限的池进行避免。
1.63.饱和策略
#1:AbortPolicy:直接抛出RejectedExecutionException异常。默认使用。
#2:CallerRunsPolicy:只用调用者所在线程来运行任务。
#3:DiscardOldestPolicy:丢弃队列里最老的一个任务,并执行当前任务。
#4:DiscardPolicy:不处理,直接丢弃新提交的任务。
1.64.线程池监听
可以继承ThreadPoolExecutor并重写beforeExecute、afterExecute、terminated方法。
线程出现异常,面临死亡时,会告之JVM自己会挂掉,设置异常捕获,可以帮助查找原因:
Thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
public void uncaughtException(Thread t, Throwable e)
logger.error(t.getThreadGroup().getName() + "-" + t.getName() + "出错:", e);
);
2.任务
2.1.延迟的、并具有周期性的任务
Timer工具管理任务的延迟执行以及周期执行。但是Timer存在缺陷,应该考虑使用ScheduledThreadPoolExecutor作为代替。
Timer的缺陷:
#1:只创建唯一的线程执行所有提交的TimerTask。如果一个TimerTask的执行比较耗时,会导致其他TimerTask的失效准确性的问题。例如TimerTask a每10ms执行一次,TimerTask b每40ms执行一次,b比较耗时,b完成后,a要么会连续被调用4次,要么完全丢失4次调用(取决于他是否按照固定频率或延迟进行调度)。
#2:如果TimerTask抛出未检出异常,而Timer线程并不捕获异常,这会终止timer线程,而且Timer也不会自我重新恢复线程的执行,误认为整个Timer取消了。因此已经提交的TimerTask不会被执行,新的TimerTask也不会被执行。
2.2.可携带结果的任务:Callable和Future
public interface Callable<V>
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
public interface Future<V>
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
ExecutorService中所有submit方法都返回一个Future,因此可以将一个Runnable或Callable提交给executor,然后得到一个Future,用它可以重新获得任务执行的结果或者取消任务。
FutureTask实现了RunnableFuture,而RunnableFuture继承了Runnable和Callable接口。
2.21.Future取消任务
Future<Object> f = executor.submit(task);
try
Object result = f.get(1000L, TimeUnit.MILLISECONDS);
catch (TimeoutException e)
// 下面任务会取消
catch (InterruptedException e)
throw new Error(e.getCause());
catch (ExecutionException e)
throw new Error(e.getCause());
finally
// 如果已经结束了,无害的
f.cancel(true);
2.3.异类并行任务的局限性
public class Image
private final ExecutorService executor = Executors.newFixedThreadPool(10);
void readPage(final List<String> urls)
Callable<List<String>> task = new Callable<List<String>>()
@Override
public List<String> call() throws Exception
List<String> ret = new ArrayList<>();
for (String url : urls)
String result = url;
// result = download from url
return ret;
;
Future<List<String>> f = executor.submit(task);
List<String> result;
try
result = f.get();
for (String re : result)
// do something
catch (InterruptedException e)
Thread.currentThread().interrupt();// 声明线程中断状态
f.cancel(true);// 取消任务
catch (ExecutionException e)
throw new Error(e);
#1:代码的
复杂度提高了,但是
性能并没有有多大的
提高,大量相互独立且同类的任务进行并发处理,会将程序的任务量分配到不同的任务中,才能有真正的提升。
#2:就算将每个url下载资源作为一个独立的task提交到线程池去执行,也并不能很好地提升性能。因为不同task的处理时间是不一样的,for-each去get时有可能前面的没有处理完,后面的处理完了,后面的task被阻塞了。
ExecutorCompletionService可以处理以上的问题:
public class Image
private final ExecutorCompletionService<Object> executor = new ExecutorCompletionService<>(Executors.newFixedThreadPool(10));
void readPage(final List<String> urls)
for (final String url : urls)
Callable<Object> task = new Callable<Object>()
@Override
public String call() throws Exception
String result = url;
// result = download from url
return result;
;
executor.submit(task);
Future<Object> f;
try
for (int i = 0; i < urls.size(); i++)
f = executor.take();
// do something
catch (InterruptedException e)
Thread.currentThread().interrupt();// 声明线程中断状态
#1:多个ExecutorCompletionService可以共享同一个Executor。
#2:记录下提交给CompletionService的任务的个数,然后计算出获得了多少个已完成的结果,即使使用的是共享的Executor,也能知晓什么时候批处理任务的所有结果全部获得。
#3:批处理任务时,获取结果时,优先返回已经处理完的结果。
以上是关于线程池&任务的主要内容,如果未能解决你的问题,请参考以下文章