java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞
Posted Dreamer who
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞相关的知识,希望对你有一定的参考价值。
1、我们先简单复现这种情况:
package com.sdcuike.java11;
import java.util.concurrent.*;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class Demo
public static void main(String[] args) throws ExecutionException, InterruptedException
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("rpc-pool-%d").setDaemon(true).build();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.HOURS, new SynchronousQueue<>(), threadFactory, new ThreadPoolExecutor.DiscardPolicy()
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
System.out.println("rejectedExecution");
super.rejectedExecution(r, executor);
);
threadPoolExecutor.submit(new Runnable()
@Override
public void run()
try
TimeUnit.HOURS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
);
Future<String> future = threadPoolExecutor.submit(new Callable<String>()
@Override
public String call() throws Exception
return "done";
);
String result = future.get();
System.out.println(result);
System.out.println("done....");
线程池中线程的配置为daemon线程(后台运行),当我们的main线程退出时,后台线程也会退出。会输出
done....
而现实的结果是:
main线程一直阻塞在future.get()调用。
2、why?
我们看下源码:java.util.concurrent.FutureTask#get()
public V get() throws InterruptedException, ExecutionException
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
FutureTask 内部有几种状态:
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
当状态state<=COMPLETING 即COMPLETING和NEW两种状态时,会一直阻塞:awaitDone(false, 0L)。
FutureTask的状态设置是在线程池ThreadPoolExecutor执行过程中所设置的,一般情况下线程池队列不满,即不会执行RejectedExecutionHandler处理不能处理的任务时,状态都会设置为state > COMPLETING的某个
状态 ,但线程池执行RejectedExecutionHandler的时候,java内置的拒绝策略有:
- CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler
/**
* Creates a @code CallerRunsPolicy.
*/
public CallerRunsPolicy()
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
if (!e.isShutdown())
r.run();
这种策略会使用调用线程池执行任务的当前线程执行,java.util.concurrent.FutureTask#run() 执行肯定会设置新的完成状态。
- AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler
/**
* Creates an @code AbortPolicy.
*/
public AbortPolicy()
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
这种策略是线程池没配置的情况下使用的 默认策略,直接抛出异常,线程池根本就不会执行任务。
- DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler
/**
* Creates a @code DiscardOldestPolicy for the given executor.
*/
public DiscardOldestPolicy()
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
if (!e.isShutdown())
e.getQueue().poll();
e.execute(r);
丢弃任务队列队头的任务,任何重试执行提交的新任务。状态肯定会设置。
- DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler
/**
* Creates a @code DiscardPolicy.
*/
public DiscardPolicy()
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)
这种策略什么都不做,任务的状态自然是初始状态,永远不会更新任务的状态,导致java.util.concurrent.FutureTask#get() 一直阻塞。
所以,最好的情况下是不使用这种策略,或者使用java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit) 带有超时的方法获取结果,或者重写策略,取消任务:
((FutureTask) r).cancel(true);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
System.out.println("rejectedExecution");
((FutureTask) r).cancel(true)
super.rejectedExecution(r, executor);
3、解决一直阻塞的问题总结
1、重写DiscardPolicy,取消任务: ((FutureTask) r).cancel(true)。
2、不使用DiscardPolicy,建议还是用线程池默认的AbortPolicy,可以重写,打印日志,再调用super.rejectedExecution(r, executor),抛出默认异常行为。
3、使用java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit) 带有超时的方法获取结果
以上是关于java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞的主要内容,如果未能解决你的问题,请参考以下文章
java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞