Java 线程池拒绝策略

Posted FserSuN

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 线程池拒绝策略相关的知识,希望对你有一定的参考价值。

0 前置依赖

本例中会依赖assertj-core完成单测,maven依赖如下:

    <!-- https://mvnrepository.com/artifact/org.assertj/assertj-core -->
    <dependency>
      <groupId>org.assertj</groupId>
      <artifactId>assertj-core</artifactId>
      <version>3.21.0</version>
      <scope>test</scope>
    </dependency>

1 Abort Policy

默认的策略,这个策略当任务达到限制后会抛出一RejectedExecutionException异常。

一般用于快速响应的接口,适合使用,通过快速产生异常及时发现问题。避免客户端调用已经超时,服务端还在继续处理任务的情况。

        @Test
    public void testAbortPolicy() 
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
            new SynchronousQueue<>(),
            new ThreadPoolExecutor.AbortPolicy());

        executor.execute(() -> waitFor(250));

        assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected")))
            .isInstanceOf(RejectedExecutionException.class);
    

由于第一个任务耗时较长,因此提交第二个任务时线程池会抛出RejectedExecutionException异常

2 Caller-Runs Policy

与其它策略相比,该策略当任务达到限制后,会通过调用线程来执行任务。

@Test
    public void testCallersRunPolicy() 
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
            new SynchronousQueue<>(),
            new ThreadPoolExecutor.CallerRunsPolicy());

        executor.execute(() -> waitFor(25000));
        long startTime = System.currentTimeMillis();
        System.out.println("1 run");
        executor.execute(() -> waitFor(2000));
        long blockedDuration = System.currentTimeMillis() - startTime;
        System.out.println("2 run");
        executor.execute(() -> waitFor(2000));
        assertThat(blockedDuration).isGreaterThanOrEqualTo(500);
    

在提交首个任务后,执行器没法再接受新任务。因此调用线程将会阻塞,直到调用线程中任务执行完毕。

caller-runs policy 是一种简单的实现限流的方式。因此在一些消费型任务中可以考虑使用,而面向c端,高响应接口的不建议使用,避免客户端调用已经超时,服务端还在继续处理任务的情况。

3 Discard Policy

当新任务提交失败后,会将提交的新任务丢弃,且不会抛出异常。

@Test
public void testDiscardPolicy() throws Exception
     ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
         new SynchronousQueue<>(),
         new ThreadPoolExecutor.DiscardPolicy());

     executor.execute(() -> waitFor(100));

     BlockingQueue<String> queue = new LinkedBlockingDeque<>();
     executor.execute(() -> queue.offer("Discarded Result"));
     assertThat(queue.poll(200, MILLISECONDS)).isNull();
 

异常对于发现问题很重要,因此使用这种策略要谨慎考虑,看异常是否可忽略。

4 Discard-oldest

当提交任务达到限制后,移除队列头的任务,随后重新提交新任务。

@Test
public void testDiscardOldestPolicy() 
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
        new ArrayBlockingQueue<>(2),
        new ThreadPoolExecutor.DiscardOldestPolicy());

    executor.execute(() -> waitFor(100));

    BlockingQueue<String> queue = new LinkedBlockingDeque<>();
    executor.execute(() -> queue.offer("First"));
    executor.execute(() -> queue.offer("Second"));
    executor.execute(() -> queue.offer("Third"));
    executor.execute(() -> queue.offer("Fourth"));
    waitFor(150);

    List<String> results = new ArrayList<>();
    queue.drainTo(results);

    assertThat(results).containsExactlyInAnyOrder("Third","Fourth");

上述测试代码使用了一个有界阻塞队列,长度为2。

  • 第一个任务独占一个线程100毫秒

  • 随后第二个、第三个任务会成功加入线程队列

  • 当第4个、第5个任务到来时,根据Discard-oldest策略删除队列中最早提交的任务,从而为新任务腾出空间

因此最终结果是results数组中将会包含两个字符串 “Third"与"Fourth”。

这种策略和优先队列配合使用会存在一些问题。因为优先队列队头元素优先级最高。这种策略会导致最重要的任务被丢弃。

5 自定义策略

我们也可以通过实现RejectedExecutionHandler接口实现自定义拒绝策略。

class GrowPolicy implements RejectedExecutionHandler 

    private final Lock lock = new ReentrantLock();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
        lock.lock();
        try 
            executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
         finally 
            lock.unlock();
        

        executor.submit(r);
    

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new ArrayBlockingQueue<>(2), 
  new GrowPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);

List<String> results = new ArrayList<>();
queue.drainTo(results);

assertThat(results).contains("First", "Second", "Third");

本例中,当线程池饱和后,我们会增加max pool size,随后重新提交同一个任务。采用该策略,上例中任务拒绝后,第四个任务会被执行。

如果默认策略无法满足需求,可以通过自定义扩展机制进行扩展。

6 shutdown触发拒绝策略

这些拒绝策略除了在超过配置限制的执行器中触发,也会在调用shutdown方法后触发。

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow();

assertThatThrownBy(() -> executor.execute(() -> ))
 .isInstanceOf(RejectedExecutionException.class);


ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown();

assertThatThrownBy(() -> executor.execute(() -> ))
  .isInstanceOf(RejectedExecutionException.class);

例子中调用shutdown方法后,再次提交任务依然会触发拒绝策略。

以上是关于Java 线程池拒绝策略的主要内容,如果未能解决你的问题,请参考以下文章

Java线程池拒绝策略

Java八股系列——线程池拒绝策略

Java 并发编程线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )

java多线程系类:JUC线程池:05之线程池原理(转)

Java线程池中的四种拒绝策略

Java并发多线程编程——线程池