如何终止线程池中的所有预分配线程?
Posted
技术标签:
【中文标题】如何终止线程池中的所有预分配线程?【英文标题】:how to terminates all the preallocated threads in a threadpool? 【发布时间】:2020-11-13 22:19:15 【问题描述】:我已经使用下面的结构创建了一个线程池,现在的问题是如何让所有预分配线程正确结束?
std::vector<pthread_t> preallocatedThreadsPool; // threadpool
std::queue<int> tcpQueue; // a queue to hold my task
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition_var = PTHREAD_COND_INITIALIZER;
void* threadFunctionUsedByThreadsPool(void *arg);
main ()
preallocatedThreadsPool.resize(preallocatThreadsNumber);
for(pthread_t i : preallocatedThreadsPool)
pthread_create(&i, NULL, threadFunctionUsedByThreadsPool, NULL);
pthread_mutex_lock(&mutex); // one thread mess with the queue at one time
tcpQueue.push(task);
pthread_cond_signal(&condition_var);
pthread_mutex_unlock(&mutex);
void* threadFunctionUsedByThreadsPool(void *arg)
while (true)
pthread_mutex_lock(&mutex);
if (tcpQueue.empty()) // can't get work from the queue then just wait
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
task = tcpQueue.front();
tcpQueue.pop();
pthread_mutex_unlock(&mutex);
if (task)
// do task
return NULL;
我一直在寻找这个问题的日子仍然找不到一个像样的解决方案,我尝试过的最接近的一个是,当程序要退出时,将一个特殊项目推入队列,然后在 threadFunctionUsedByThreadsPool 中,当检测到这样的item,我会调用pthread_join,但是,当我使用gdb工具调试它时,那些预先分配的线程仍然存在,任何人都可以提供帮助,更好地使用一些代码,例如,我如何修改threadFunctionUsedByThreadsPool,以便我可以正确退出所有预先分配的线程? 非常感谢!!!
【问题讨论】:
您是否有意使用pthread_create
而不是C++ 线程? (即std::thread
)?
【参考方案1】:
TLDR:您只需要一个线程安全变量,所有线程都可以检查工作项之间的退出条件。使用pthread_join
等待线程退出。
首先,让我们让线程函数中的 while 循环在条件变量方面正确。
而不是这个:
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
task = tcpQueue.front();
tcpQueue.pop();
在条件变量唤醒之前和之后检查队列的状态。虚假唤醒是真实存在的,不能保证另一个线程没有唤醒并抓取最后一个工作项。您绝对不想从空队列中弹出。
更好:
while (tcpQueue.empty())
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
task = tcpQueue.front();
tcpQueue.pop();
解决了这个问题后,我们可以引入一个新的全局布尔值来表示停止条件:
bool stopCondition = false;
每当我们想告诉池中的所有线程停止时,我们可以将stopCondition
设置为true
并发出条件变量信号以提醒所有线程状态更改。读取或写入 stopCondition 应在锁定下完成。 (我想你也可以使用std::atomic<bool>
)
把它们放在一起,你的线程函数变成了这样:
void* threadFunctionUsedByThreadsPool(void* arg)
pthread_mutex_lock(&mutex);
while (!stopCondition)
// wait for a task to be queued
while (tcpQueue.empty() && !stopCondition)
pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
if (stopCondition == false)
task = tcpQueue.front();
tcpQueue.pop();
// exit lock while operating on a task
pthread_mutex_unlock(&mutex);
if (task)
// do task
// re-acquire the lock
pthread_mutex_lock(&mutex);
// release the lock before exiting the function
pthread_mutex_unlock(&mutex);
return NULL;
然后是一个辅助函数来通知所有线程退出并等待每个线程停止。请注意,我们使用pthread_cond_broadcast
通知所有线程从它们的条件变量等待中唤醒,而不是pthread_cond_signal
,它只唤醒一个线程。
void stopThreadPool()
// signal all threads to exit after they finish their current work item
pthread_mutex_lock(&mutex);
stopCondition = true;
pthread_cond_broadcast(&condition_var); // notify all threads
pthread_mutex_unlock(&mutex);
// wait for all threads to exit
for (auto& t : preAllocatedThreadsPool)
pthread_join(t, nullptr);
preAllocatedThreadsPool.clear();
我刚刚发现的最后一个错误 - 您的 main
不是像您想象的那样初始化您的 preAllocatedThreadsPool
向量的属性。您正在制作 pthread_t 的副本,而不是在向量中实际使用句柄。
而不是这个:
for(pthread_t i : preallocatedThreadsPool)
您的循环需要通过引用枚举:
更好:
for(pthread_t &i : preallocatedThreadsPool)
【讨论】:
【参考方案2】:发送一个任务,指示池线程将任务重新排队,然后终止。然后毒任务将运行池中的所有线程,将它们全部杀死。我使用了 null 作为毒药(即非法任务)——它在杀死最后一个线程时不需要被破坏。您可能希望在发送 null/whatever 之前清除任务队列。如果使用空值,则只需要在线程中进行空值检查,就在任务出队之后。
您只需要很少的额外代码,您不需要知道池中有多少线程并且它会工作:)
【讨论】:
以上是关于如何终止线程池中的所有预分配线程?的主要内容,如果未能解决你的问题,请参考以下文章