Java线程池详解
Posted kehoudaanxiangjie
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java线程池详解相关的知识,希望对你有一定的参考价值。
Java 中的线程池(ThreadPoolExecutor)我们都知道(不知道请自行搜索),它的执行机制简单讲就是多个线程不停的从队列里面取任务执行。但是我们可能遇到下面这样的场景:
我有一批数据要通过线程池来处理,处理过程中需要调用某个远程服务。但该服务存在调用频率限制,比如每秒钟最多调用 50 次,超过这个阈值将返回错误信息。
这是否意味着我们不应该用多线程了呢?不是,在这个场景中,我们要保证的是以间隔不低于 20ms 的频率发起请求,至于处理时间,不管是几百甚至几千毫秒,都不影响发起请求的频率,因此多线程是必要的。
默认的线程池(ThreadPoolExecutor)没有按固定频率执行任务的特性,有的同学可能会想到 ScheduledThreadPoolExecutor,但是很可惜这个类也不能用,别看它名字里面带了计划任务的特性,但这个是用来反复执行同一个任务的,而我们的场景是一个任务只执行一次。
当然也有的同学会想到一种方案,依旧使用 ScheduledThreadPoolExecutor,但是将任务队列外部化(即不使用 ScheduledThreadPoolExecutor 的内部任务队列),然后 ScheduledThreadPoolExecutor 的任务本身就是从外部队列取任务执行。
这种方案是可行的,但是抛开实现起来过于复杂不说,线程池的执行机制也会遭到破坏,比如说我们本来可以通过 shutdown()
和 awaitTermination()
来等待线程池队列全部执行完,令线程池安全关闭;但若任务队列外部化,这点就做不到了,因为线程池会立刻关闭,不会再处理外部队列中的剩余任务。
这里有一个相对简单的解决方案。好在 ThreadPoolExecutor 给我们提供了 beforeExecute()
这样一个扩展点,我们可以通过继承 ThreadPoolExecutor,覆写这个方法来实现执行频率的限制:
- 使每个线程在执行任务前延迟一段时间;
- 使用一个信号量来同步这段延迟,这样每个线程在执行任务前被这个信号量锁住,拿到锁后延迟一段时间再释放锁,然后再执行任务。
由此可见,这样的设计既实现了执行频率限制,又保持了任务执行本身的并行性,同时线程池的执行机制没有受到影响。
代码实现起来不复杂,如下:
public class FundThreadPoolExecutor extends ThreadPoolExecutor { private int fixedRateMillis; private final Semaphore fixedRateSemaphore = new Semaphore(1); // 设置执行频率限制的延迟时间(ms) public void setFixedRateMillis(int fixedRateMillis) { this.fixedRateMillis = fixedRateMillis; } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public FundThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override protected void beforeExecute(Thread t, Runnable r) { if (this.fixedRateMillis > 0) { try { this.fixedRateSemaphore.acquire(); Thread.sleep(this.fixedRateMillis); } catch (InterruptedException e) { // ignore this } finally { this.fixedRateSemaphore.release(); } } } }
以上是关于Java线程池详解的主要内容,如果未能解决你的问题,请参考以下文章
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段