如何终止线程池中的所有预分配线程?

Posted

技术标签:

【中文标题】如何终止线程池中的所有预分配线程?【英文标题】:how to terminates all the preallocated threads in a threadpool? 【发布时间】:2020-11-13 22:19:15 【问题描述】:

我已经使用下面的结构创建了一个线程池,现在的问题是如何让所有预分配线程正确结束?

std::vector<pthread_t> preallocatedThreadsPool; // threadpool
std::queue<int> tcpQueue;  // a queue to hold my task

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition_var = PTHREAD_COND_INITIALIZER;

void* threadFunctionUsedByThreadsPool(void *arg);

main () 
    preallocatedThreadsPool.resize(preallocatThreadsNumber);
    for(pthread_t i : preallocatedThreadsPool) 
        pthread_create(&i, NULL, threadFunctionUsedByThreadsPool, NULL);
    


    pthread_mutex_lock(&mutex); // one thread mess with the queue at one time
 
    tcpQueue.push(task);

    pthread_cond_signal(&condition_var);
    pthread_mutex_unlock(&mutex);




void* threadFunctionUsedByThreadsPool(void *arg) 
    while (true) 
        pthread_mutex_lock(&mutex);
        if (tcpQueue.empty())   // can't get work from the queue then just wait
        pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
        task = tcpQueue.front();
        tcpQueue.pop(); 
        
        pthread_mutex_unlock(&mutex);
    
        if (task) 
            // do task
        
    
    return NULL;

我一直在寻找这个问题的日子仍然找不到一个像样的解决方案,我尝试过的最接近的一个是,当程序要退出时,将一个特殊项目推入队列,然后在 threadFunctionUsedByThreadsPool 中,当检测到这样的item,我会调用pthread_join,但是,当我使用gdb工具调试它时,那些预先分配的线程仍然存在,任何人都可以提供帮助,更好地使用一些代码,例如,我如何修改threadFunctionUsedByThreadsPool,以便我可以正确退出所有预先分配的线程? 非常感谢!!!

【问题讨论】:

您是否有意使用pthread_create 而不是C++ 线程? (即std::thread)? 【参考方案1】:

TLDR:您只需要一个线程安全变量,所有线程都可以检查工作项之间的退出条件。使用pthread_join 等待线程退出。

首先,让我们让线程函数中的 while 循环在条件变量方面正确。

而不是这个:

    pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
    task = tcpQueue.front();
    tcpQueue.pop();

在条件变量唤醒之前和之后检查队列的状态。虚假唤醒是真实存在的,不能保证另一个线程没有唤醒并抓取最后一个工作项。您绝对不想从空队列中弹出。

更好:

    while (tcpQueue.empty())   
        pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
    
    task = tcpQueue.front();
    tcpQueue.pop();

解决了这个问题后,我们可以引入一个新的全局布尔值来表示停止条件:

 bool stopCondition = false;

每当我们想告诉池中的所有线程停止时,我们可以将stopCondition 设置为true 并发出条件变量信号以提醒所有线程状态更改。读取或写入 stopCondition 应在锁定下完成。 (我想你也可以使用std::atomic&lt;bool&gt;

把它们放在一起,你的线程函数变成了这样:

void* threadFunctionUsedByThreadsPool(void* arg) 

    pthread_mutex_lock(&mutex);

    while (!stopCondition) 

        // wait for a task to be queued
        while (tcpQueue.empty() && !stopCondition)   
            pthread_cond_wait(&condition_var, &mutex); // wait for the signal from other thread to deal with client otherwise sleep
        

        if (stopCondition == false) 
            task = tcpQueue.front();
            tcpQueue.pop();

            // exit lock while operating on a task
            pthread_mutex_unlock(&mutex);

            if (task) 
                // do task
            

            // re-acquire the lock
            pthread_mutex_lock(&mutex);

        
 
    

    // release the lock before exiting the function
    pthread_mutex_unlock(&mutex);
    return NULL;

然后是一个辅助函数来通知所有线程退出并等待每个线程停止。请注意,我们使用pthread_cond_broadcast 通知所有线程从它们的条件变量等待中唤醒,而不是pthread_cond_signal,它只唤醒一个线程。

void stopThreadPool()


    // signal all threads to exit after they finish their current work item
    pthread_mutex_lock(&mutex);
        stopCondition = true;
        pthread_cond_broadcast(&condition_var); // notify all threads
    pthread_mutex_unlock(&mutex);

    // wait for all threads to exit
    for (auto& t : preAllocatedThreadsPool) 
        pthread_join(t, nullptr);
    
    preAllocatedThreadsPool.clear();

我刚刚发现的最后一个错误 - 您的 main 不是像您想象的那样初始化您的 preAllocatedThreadsPool 向量的属性。您正在制作 pthread_t 的副本,而不是在向量中实际使用句柄。

而不是这个:

for(pthread_t i : preallocatedThreadsPool) 

您的循环需要通过引用枚举:

更好:

for(pthread_t &i : preallocatedThreadsPool) 

【讨论】:

【参考方案2】:

发送一个任务,指示池线程将任务重新排队,然后终止。然后毒任务将运行池中的所有线程,将它们全部杀死。我使用了 null 作为毒药(即非法任务)——它在杀死最后一个线程时不需要被破坏。您可能希望在发送 null/whatever 之前清除任务队列。如果使用空值,则只需要在线程中进行空值检查,就在任务出队之后。

您只需要很少的额外代码,您不需要知道池中有多少线程并且它会工作:)

【讨论】:

以上是关于如何终止线程池中的所有预分配线程?的主要内容,如果未能解决你的问题,请参考以下文章

java线程池中正在执行的线程彻底停止

Java并发程序设计线程池之异常终止和正常关闭

如何在线程池中正确分配 pthread 空闲状态?

如何判断线程池中的线程是否全部执行完毕

如何判断线程池中的线程是否全部执行完毕

如何判断线程池中的线程是否全部执行完毕