将工作线程与主线程同步

Posted

技术标签:

【中文标题】将工作线程与主线程同步【英文标题】:Synchronize worker threads with a main thread 【发布时间】:2021-01-31 18:53:23 【问题描述】:

如果工作线程可以生成另一个任务,如何正确地?我使用 std::queue 来维护由互斥锁和原子变量保护的任务来跟踪繁忙的线程。不幸的是,我在执行结束时遇到了死锁。

我已经从我的项目中提取代码并创建了以下示例(您可以使用 g++ 或 MSVC 轻松编译它):

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
#include <functional>
#include <stack>
#include <atomic>
#include <queue>

template <class T, class Compare>
class USort 
    using Task = std::pair<T*, T*>;
private:
    size_t m_ThreadsNum;
    std::atomic<bool> m_Finished;
    std::atomic<size_t> m_Busy;
    std::thread* m_Threads;
    std::queue<Task> m_Tasks;
    size_t m_Size;
    T* m_Data;
    Compare m_Comparator;
    std::condition_variable m_WaitFinished;
    std::condition_variable m_WaitSorter;
    std::mutex m_TaskQueueMutex;

private:
    const size_t THREAD_THRESHOLD = 1024;
    const size_t THREAD_POOL_THRESHOLD = 8192;


    bool HasTask() 
        std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
        return m_Tasks.size() > 0;
    

    bool PopTask(T** L, T** R) 
        std::unique_lock<std::mutex> lock(m_TaskQueueMutex);

        if (m_Tasks.size() == 0) 
            *L = *R = nullptr;
            return false;
        

        *L = m_Tasks.front().first;
        *R = m_Tasks.front().second;
        m_Tasks.pop();

        return true;
    

    void PushTask(T* L, T* R) 
        std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
        m_Tasks.emplace(std::pair<T*, T*>(L, R));
        m_WaitSorter.notify_one();
    

    void SortThread(size_t Id) 
        std::mutex sorter_mutex;
        for (;;) 
            std::unique_lock<std::mutex> lock(sorter_mutex);
            ///
            ///  ----------------------------------> some threads wait here
            /// 
            m_WaitSorter.wait(lock, [this]()  return m_Finished || HasTask(); );

            if (m_Finished) break;

            m_Busy++;

            T *left, *right;
            while (PopTask(&left, &right)) 
                Sort(left, right);
            

            if (--m_Busy == 0) 
                m_WaitFinished.notify_one();
            
        
    

    // just simulate work
    void Sort(T* Left, T* Right) 
        if (Right - Left > 10) 
            PushTask(Left, Right-10);
        
    

    void WaitForSortingIsFinished() 
        std::mutex finished;
        std::unique_lock<std::mutex> lock(finished);
        m_WaitFinished.wait(lock, [this]()  return m_Busy == 0 && !HasTask(); );
    

    void FinishThreads() 
        m_Finished = true;
        m_WaitSorter.notify_all();
    

    void ReleaseThreads() 
        if (m_Threads) 
            for (size_t i = 0; i < m_ThreadsNum; i++) 
                ///
                ///  ----------------------------------> main thread stuck here
                /// 
                m_Threads[i].join();
            
            delete[] m_Threads;
            m_Threads = nullptr;
        
    

public:
    USort(size_t NumberOfThreads = 0) : m_Comparator(Compare()) 
        if (NumberOfThreads == 0) 
            static const unsigned int max_concurrency = std::thread::hardware_concurrency();
            NumberOfThreads = max_concurrency;
            if (NumberOfThreads == 0) NumberOfThreads = 4;
        

        m_Finished = false;
        m_ThreadsNum = NumberOfThreads;
        m_Threads = nullptr;
    

    ~USort() 
        ReleaseThreads();
    

    void Sort(T* Data, size_t Size) 
        // build thread pool
        m_Threads = new std::thread[m_ThreadsNum];
        for (size_t i = 0; i < m_ThreadsNum; i++) 
            m_Threads[i] = std::thread(&USort::SortThread, this, i);
        

        // process data
        PushTask(Data, Data + Size - 1);
        WaitForSortingIsFinished();
        FinishThreads();
    

;

template <class T, class Compare>
void usort(T* Data, size_t Size, size_t NumberOfThreads = 0) 
    USort<T, Compare> mt_sorter(NumberOfThreads);
    mt_sorter.Sort(Data, Size);



const size_t ARR_SIZE = 0x00010000;


struct comp 
    bool operator()(const int& L, const int& R) const 
        return L < R;
    
;

int main()

    int* arr = new int[ARR_SIZE];
    for (int i = 0; i < ARR_SIZE; i++) 
        arr[i] = rand() % 3200000;
    

    usort<int, comp>(arr, ARR_SIZE, 16);

    delete[] arr;

    return 0;

问题是,在我的示例中,线程并不总是完成。 m_WaitSorter.wait() 中不时有一些线程挂起,因此m_Threads[i].join(); 中的主线程挂起。逻辑缺陷在哪里。为什么调用FinishThreads() 没有完成所有线程?

编辑: 基本上我想实现多线程排序算法。

    主线程创建线程池,将第一个任务(排序整个数组)推送到任务队列,等待排序完成 池线程接受任务,将其划分为更小的任务(1-3)。其中一项任务立即由当前池线程处理,其他任务被推送到队列中 池线程必须在整个数据集排序后才能完成(队列中没有任务并且所有池线程都处于挂起状态) 当排序完成后,主线程应该被唤醒 主线程应该完成挂起的线程

因此,从我的角度来看,我需要两个 conditional_variabes,在主线程中带有谓词“所有线程都挂起 && 在队列中没有任务”,在池线程中需要“在队列中有任务 || 完成线程”。

【问题讨论】:

你实际上误用了同步对象,思想一定不能满足同步对象的接口。条件变量必须以某种方式与它所使用的对象和互斥体连接。请更准确地解释您愿意实现的目标,您要解决的任务是什么? 好的,我已经详细阐述了我的问题 【参考方案1】:

好的,我已经仔细阅读了文档,并在我的代码中发现了一个错误。对 notify_one()notify_all()wait() 的调用必须通过相同的 mutext 进行控制。考虑到这一点,我更新并简化了我的代码:

    bool WaitAndPopTask(T** L, T** R) 
        std::unique_lock<std::mutex> lock(m_TaskQueueMutex);
        m_WaitSorter.wait(lock, [this]()  return m_Finished || !m_Tasks.empty(); );

        if (m_Finished) return false;

        m_Busy++;

        *L = m_Tasks.front().first;
        *R = m_Tasks.front().second;
        m_Tasks.pop();

        return true;
    

    void SortThread(size_t Id) 
        for (;;) 
            T *left, *right;
            if (!WaitAndPopTask(&left, &right)) break;

            Sort(left, right);

            std::lock_guard<std::mutex> lk(m_TaskQueueMutex);
            if (--m_Busy == 0 && m_Tasks.empty()) 
                FinishThreads();
            
        
    

    void Sort(T* Data, size_t Size) 
        // build thread pool
        m_Threads = new std::thread[m_ThreadsNum];
        for (size_t i = 0; i < m_ThreadsNum; i++) 
            m_Threads[i] = std::thread(&USort::SortThread, this, i);
        

        // process data
        PushTask(Data, Data + Size - 1);
        ReleaseThreads();
    

【讨论】:

以上是关于将工作线程与主线程同步的主要内容,如果未能解决你的问题,请参考以下文章

golang中四种方式实现子goroutine与主协程的同步

同步线程 - InterlockedExchange

Qt多线程——方式1

工作线程中的同步与异步 ioctl

同步主线程和工作线程

访问 List 项时 UI 和 Worker 线程同步