java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞

Posted Dreamer who

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞相关的知识,希望对你有一定的参考价值。

     1、我们先简单复现这种情况:

package com.sdcuike.java11;

import java.util.concurrent.*;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

public class Demo 

    public static void main(String[] args) throws ExecutionException, InterruptedException 
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("rpc-pool-%d").setDaemon(true).build();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, new SynchronousQueue<>(), threadFactory, new ThreadPoolExecutor.DiscardPolicy() 
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
                System.out.println("rejectedExecution");

                super.rejectedExecution(r, executor);
            
        );

        threadPoolExecutor.submit(new Runnable() 
            @Override
            public void run() 
                try 
                    TimeUnit.HOURS.sleep(1);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        );

        Future<String> future = threadPoolExecutor.submit(new Callable<String>() 

            @Override
            public String call() throws Exception 
                return "done";
            
        );

        String result = future.get();
        System.out.println(result);
        System.out.println("done....");
    


 

线程池中线程的配置为daemon线程(后台运行),当我们的main线程退出时,后台线程也会退出。会输出

done....

 

而现实的结果是:

 

main线程一直阻塞在future.get()调用。

 

2、why?

 

我们看下源码:java.util.concurrent.FutureTask#get()

public V get() throws InterruptedException, ExecutionException 
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    

FutureTask 内部有几种状态:

private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

当状态state<=COMPLETING 即COMPLETING和NEW两种状态时,会一直阻塞:awaitDone(false, 0L)。

FutureTask的状态设置是在线程池ThreadPoolExecutor执行过程中所设置的,一般情况下线程池队列不满,即不会执行RejectedExecutionHandler处理不能处理的任务时,状态都会设置为state > COMPLETING的某个

状态 ,但线程池执行RejectedExecutionHandler的时候,java内置的拒绝策略有:

  • CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler 
        /**
         * Creates a @code CallerRunsPolicy.
         */
        public CallerRunsPolicy()  

        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
            if (!e.isShutdown()) 
                r.run();
            
        
    

这种策略会使用调用线程池执行任务的当前线程执行,java.util.concurrent.FutureTask#run() 执行肯定会设置新的完成状态。

 

  • AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler 
        /**
         * Creates an @code AbortPolicy.
         */
        public AbortPolicy()  

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        

这种策略是线程池没配置的情况下使用的 默认策略,直接抛出异常,线程池根本就不会执行任务。

 

  • DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler 
        /**
         * Creates a @code DiscardOldestPolicy for the given executor.
         */
        public DiscardOldestPolicy()  

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
            if (!e.isShutdown()) 
                e.getQueue().poll();
                e.execute(r);
            
        
    

丢弃任务队列队头的任务,任何重试执行提交的新任务。状态肯定会设置。

 

  • DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler 
        /**
         * Creates a @code DiscardPolicy.
         */
        public DiscardPolicy()  

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
        
    

这种策略什么都不做,任务的状态自然是初始状态,永远不会更新任务的状态,导致java.util.concurrent.FutureTask#get() 一直阻塞。

 

 

所以,最好的情况下是不使用这种策略,或者使用java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit) 带有超时的方法获取结果,或者重写策略,取消任务:

  ((FutureTask) r).cancel(true);
 

 @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 
                System.out.println("rejectedExecution");
                ((FutureTask) r).cancel(true)
                super.rejectedExecution(r, executor);
            

 

3、解决一直阻塞的问题总结

 

1、重写DiscardPolicy,取消任务:  ((FutureTask) r).cancel(true)。

2、不使用DiscardPolicy,建议还是用线程池默认的AbortPolicy,可以重写,打印日志,再调用super.rejectedExecution(r, executor),抛出默认异常行为。

3、使用java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit) 带有超时的方法获取结果

 

以上是关于java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞的主要内容,如果未能解决你的问题,请参考以下文章

java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞

Java并发编程-扩展可回调的Future

异步编程之Future和Listener

Java Future源码分析

java8 之CompletableFuture -- 如何构建异步应用

Java并发编程之美之并发编程线程基础