Java 线程池拒绝策略
Posted FserSuN
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 线程池拒绝策略相关的知识,希望对你有一定的参考价值。
0 前置依赖
本例中会依赖assertj-core完成单测,maven依赖如下:
<!-- https://mvnrepository.com/artifact/org.assertj/assertj-core -->
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.21.0</version>
<scope>test</scope>
</dependency>
1 Abort Policy
默认的策略,这个策略当任务达到限制后会抛出一RejectedExecutionException异常。
一般用于快速响应的接口,适合使用,通过快速产生异常及时发现问题。避免客户端调用已经超时,服务端还在继续处理任务的情况。
@Test
public void testAbortPolicy()
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.AbortPolicy());
executor.execute(() -> waitFor(250));
assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected")))
.isInstanceOf(RejectedExecutionException.class);
由于第一个任务耗时较长,因此提交第二个任务时线程池会抛出RejectedExecutionException异常
2 Caller-Runs Policy
与其它策略相比,该策略当任务达到限制后,会通过调用线程来执行任务。
@Test
public void testCallersRunPolicy()
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(() -> waitFor(25000));
long startTime = System.currentTimeMillis();
System.out.println("1 run");
executor.execute(() -> waitFor(2000));
long blockedDuration = System.currentTimeMillis() - startTime;
System.out.println("2 run");
executor.execute(() -> waitFor(2000));
assertThat(blockedDuration).isGreaterThanOrEqualTo(500);
在提交首个任务后,执行器没法再接受新任务。因此调用线程将会阻塞,直到调用线程中任务执行完毕。
caller-runs policy 是一种简单的实现限流的方式。因此在一些消费型任务中可以考虑使用,而面向c端,高响应接口的不建议使用,避免客户端调用已经超时,服务端还在继续处理任务的情况。
3 Discard Policy
当新任务提交失败后,会将提交的新任务丢弃,且不会抛出异常。
@Test
public void testDiscardPolicy() throws Exception
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Discarded Result"));
assertThat(queue.poll(200, MILLISECONDS)).isNull();
异常对于发现问题很重要,因此使用这种策略要谨慎考虑,看异常是否可忽略。
4 Discard-oldest
当提交任务达到限制后,移除队列头的任务,随后重新提交新任务。
@Test
public void testDiscardOldestPolicy()
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
executor.execute(() -> queue.offer("Fourth"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).containsExactlyInAnyOrder("Third","Fourth");
上述测试代码使用了一个有界阻塞队列,长度为2。
-
第一个任务独占一个线程100毫秒
-
随后第二个、第三个任务会成功加入线程队列
-
当第4个、第5个任务到来时,根据Discard-oldest策略删除队列中最早提交的任务,从而为新任务腾出空间
因此最终结果是results数组中将会包含两个字符串 “Third"与"Fourth”。
这种策略和优先队列配合使用会存在一些问题。因为优先队列队头元素优先级最高。这种策略会导致最重要的任务被丢弃。
5 自定义策略
我们也可以通过实现RejectedExecutionHandler接口实现自定义拒绝策略。
class GrowPolicy implements RejectedExecutionHandler
private final Lock lock = new ReentrantLock();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
lock.lock();
try
executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
finally
lock.unlock();
executor.submit(r);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
new ArrayBlockingQueue<>(2),
new GrowPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).contains("First", "Second", "Third");
本例中,当线程池饱和后,我们会增加max pool size,随后重新提交同一个任务。采用该策略,上例中任务拒绝后,第四个任务会被执行。
如果默认策略无法满足需求,可以通过自定义扩展机制进行扩展。
6 shutdown触发拒绝策略
这些拒绝策略除了在超过配置限制的执行器中触发,也会在调用shutdown方法后触发。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow();
assertThatThrownBy(() -> executor.execute(() -> ))
.isInstanceOf(RejectedExecutionException.class);
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown();
assertThatThrownBy(() -> executor.execute(() -> ))
.isInstanceOf(RejectedExecutionException.class);
例子中调用shutdown方法后,再次提交任务依然会触发拒绝策略。
以上是关于Java 线程池拒绝策略的主要内容,如果未能解决你的问题,请参考以下文章
Java 并发编程线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )