线程池队列满了之后任务丢弃,怎么解决(实战篇)
Posted go大鸡腿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池队列满了之后任务丢弃,怎么解决(实战篇)相关的知识,希望对你有一定的参考价值。
前言
最近有朋友去面了路客,面试官问的问题比较大,说实话很难细分。挑一道实战的来讲,线程池队列满了之后任务丢弃,怎么解决
开整
众所周知,线程池默认拒绝策略是丢弃并抛出异常,这样的话那溢出的任务就不见了,在实际业务中会受到影响。
优化:
- 改写拒绝策略,延迟任务重新投向线程池
- 打印对应任务参数,可以做塞回数据库,或者打印出来方便排查问题
代码
重新拒绝策略
static class RetryPolicy implements RejectedExecutionHandler {
private DelayQueue<PullJobDelayTask> queue = new DelayQueue<>();
private int i = 0;
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
queue.offer(new PullJobDelayTask(5, TimeUnit.SECONDS, r));
log.error("等待5秒...");
if(i>0){
throw new RejectedExecutionException();
}
CompletableFuture.runAsync(()-> {
log.info("新增线程池...");
while (true) {
try {
log.info("拉去任务...");
PullJobDelayTask task = queue.take();
executor.execute(task.runnable);
} catch (Exception e) {
log.error("抛出异常,{}", e);
}
}
});
i++;
throw new RejectedExecutionException();
}
}
实现了一个延迟任务,还有一个异步去拉取任务投放到线程池里头。
延迟任务
@Data
static class PullJobDelayTask implements Delayed {
private long scheduleTime;
private Runnable runnable;
public PullJobDelayTask(long scheduleTime, TimeUnit unit, Runnable runnable) {
this.scheduleTime = System.currentTimeMillis() + (scheduleTime > 0 ? unit.toMillis(scheduleTime) : 0);
this.runnable = runnable;
}
@Override
public long getDelay(@NotNull TimeUnit unit) {
return scheduleTime - System.currentTimeMillis();
}
@Override
public int compareTo(@NotNull Delayed o) {
return (int) (this.scheduleTime - ((PullJobDelayTask) o).scheduleTime);
}
}
验证
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1),Executors.defaultThreadFactory(),new RetryPolicy());
executor.execute(() -> {
try {
log.info("1");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
try {
log.info("2");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
try {
log.info("3");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try{
executor.execute(() -> {
try {
log.info("4");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}catch (Exception e){
log.error("报错任务参数:{},异常:{}",3,e);
throw e;
}
}
问题
Q:为啥在拒绝策略里头还要throw new RejectedExecutionException();
A:可以通过抛出异常来捕获,然后打印任务的参数,不然.RetryPolicy#rejectedExecution里头runnable是拿不到参数的
Q:有没有其他方案
A:有的,比如说有些就不用使用延迟队列,比如说我们是从数据库读取到的任务,执行成功就修改执行的标识,如果不成功或者任务被拒绝了,它下次扫描还是会继续塞回去
Q:延迟队列如果宕机的话,任务也丢失了怎么办
A:这里的打印日志就很重要了,可以记录起来,或者加个hook回调钩子,在宕机的时候将这些数据写回数据库(kill -9 pid不会调用hook~)
以上是关于线程池队列满了之后任务丢弃,怎么解决(实战篇)的主要内容,如果未能解决你的问题,请参考以下文章