重写线程池 execute 方法导致线程池“失效” 问题
Posted 明明如月学长
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了重写线程池 execute 方法导致线程池“失效” 问题相关的知识,希望对你有一定的参考价值。
一、背景
今天群里有个同学遇到一个看似很奇怪的问题,自定义 ThreadPoolTaskExecutor
子类,重写了 execute 方法,通过 execute 方法来执行任务时打印当前线程,日志显示任务一直在调用者线程里执行 (其实并不是),似乎线程池失效了。
二、场景复现
自定义 ThreadPoolTaskExecutor
子类
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class ThreadPoolTaskExecutorImpl extends ThreadPoolTaskExecutor
@Override
public void execute(Runnable command)
System.out.println("当前线程" + Thread.currentThread().getName());
super.execute(command);
编写测试代码:
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo
public static void main(String[] args) throws InterruptedException
// 构造线程池
ThreadPoolTaskExecutorImpl executor = new ThreadPoolTaskExecutorImpl();
executor.initialize();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("[线程池的线程]");
// 执行任务
for (int i = 0; i < 5; i++)
executor.execute(() ->
System.out.println("测试");
);
TimeUnit.SECONDS.sleep(3);
executor.shutdown();
执行结果:
当前线程main
当前线程main
测试
测试
当前线程main
测试
当前线程main
当前线程main
测试
测试
由此断定:自定义的线程池失效,在 execute 方法中获取当前线程时,并没有出现我们定义的线程名称前缀的线程,仍然使用 main 线程来执行任务。
但是,真的是这样吗?
三、分析
由于很多同学没有认真思考过多线程的本质,会想当然地认为线程池的 execute
方法的所有代码都是在线程池创建的线程中执行,可是真的是这样吗?
我们知道在没有使用新线程的情况下,程序会使用当前线程(main 线程)顺序执行。
因此,在 org.example.thread.ThreadPoolTaskExecutorImpl#execute
中,打印的 “当前线程” 的代码仍然是在 main 方法中执行的。
进入 super.execute
方法
@Override
public void execute(Runnable task)
Executor executor = getThreadPoolExecutor();
try
executor.execute(task);
catch (RejectedExecutionException ex)
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
这里的 executor.execute
方法实际上是 java.util.concurrent.ThreadPoolExecutor#execute
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current @code RejectedExecutionHandler.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* @code RejectedExecutionHandler, if the task
* cannot be accepted for execution
* @throws NullPointerException if @code command is null
*/
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize)
if (addWorker(command, true))
return;
c = ctl.get();
if (isRunning(c) && workQueue.offer(command))
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
else if (!addWorker(command, false))
reject(command);
小于核心线程数,会执行到 `addWorker ,此时才真正创建新的线程去执行任务:
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core)
// 省略部分代码
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try
//【关键1】 创建新的线程(run 方法)
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null))
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
finally
mainLock.unlock();
if (workerAdded)
//【关键2】启动创建的线程
t.start();
workerStarted = true;
finally
if (! workerStarted)
addWorkerFailed(w);
return workerStarted;
我们再看下 new Worker
时做了什么。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask)
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//【关键】 使用线程工厂创建线程
this.thread = getThreadFactory().newThread(this);
//【关键】 重写 Runnable 的 run 方法,里面就封装了外面传入的 Runnable
/** Delegates main run loop to outer runWorker */
public void run()
runWorker(this);
final void runWorker(Worker w)
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try
while (task != null || (task = getTask()) != null)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
beforeExecute(wt, task);
Throwable thrown = null;
try
task.run();
catch (RuntimeException x)
thrown = x; throw x;
catch (Error x)
thrown = x; throw x;
catch (Throwable x)
thrown = x; throw new Error(x);
finally
afterExecute(task, thrown);
finally
task = null;
w.completedTasks++;
w.unlock();
completedAbruptly = false;
finally
processWorkerExit(w, completedAbruptly);
// 省略其他
因此
(1)自定义的 ThreadPoolTaskExecutorImpl
类里重写的 execute 方法里打印的当前线程实际还是调用者线程。
(2)外面传入的 Runnable 参数最终会在 Worker 现成的 run 方法中执行到。
四、解决之道
我们可以使用包装器模式处理下即可:
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public class ThreadPoolTaskExecutorImpl extends ThreadPoolTaskExecutor
@Override
public void execute(Runnable command)
// 当前在父线程里
super.execute(() ->
// 这里在子线程里执行
// 可以在任务前打印下当前线程名称,线程池的状态等信息
System.out.println("当前线程" + Thread.currentThread().getName());
// 原始的任务
command.run();
);
测试代码:
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDemo
public static void main(String[] args) throws InterruptedException
// 构造线程池
ThreadPoolTaskExecutorImpl executor = new ThreadPoolTaskExecutorImpl();
executor.initialize();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("[线程池的线程]");
// 执行任务
for (int i = 0; i < 5; i++)
executor.execute(() ->
System.out.println(Thread.currentThread().getName() + "-> 测试");
);
TimeUnit.SECONDS.sleep(3);
executor.shutdown();
输出结果:
当前线程[线程池的线程]2
当前线程[线程池的线程]1
[线程池的线程]1-> 测试
[线程池的线程]2-> 测试
当前线程[线程池的线程]3
[线程池的线程]3-> 测试
当前线程[线程池的线程]4
[线程池的线程]4-> 测试
当前线程[线程池的线程]5
[线程池的线程]5-> 测试
五、启示
5.1 关于提问
该同学提问非常模糊,甚至“反复修改问题”,最终给出关键代码截图,才真正理解问题是什么。
大家请教别人时,尽量能将问题转化为别人容易理解的表达方式。
大家请教别人时,一定自己先搞清楚问题究竟是什么,而不需要别人一再追问下,才不断逼近真实的问题。
大家请教别人时,最好能够有源码或者关键信息截图等。
5.2 现象与本质
我们使用线程池时,总是观察到我们传入的 Runnable 是在线程池中的线程执行的,我们是使用 execute 方法来执行的,但这并不意味着 execute 方法的所有步骤都是在线程池中的线程里执行的。
学习某个技术时,要真正理解技术的本质,而不是表象。
如调用线程的 start 方法才真正启动线程,在重写的 execute 方法第一行压根就没有创建新的线程,怎么会在新的线程里执行呢?
在实际开发和验证问题时,多进行代码调试,掌握高级的调试技巧,如调试时表达式计算、条件断点、移除栈帧回退调用等。
创作不易,如果本文对你有帮助,欢迎点赞、收藏加关注,你的支持和鼓励,是我创作的最大动力。
以上是关于重写线程池 execute 方法导致线程池“失效” 问题的主要内容,如果未能解决你的问题,请参考以下文章
12.ThreadPoolExecutor线程池原理及其execute方法