c++线程和promise

Posted

技术标签:

【中文标题】c++线程和promise【英文标题】:c++ Threads and promises 【发布时间】:2019-11-30 21:54:04 【问题描述】:

我有一个线程安全队列,它预先填充了主线程的工作;当我启动我的工作线程时,他们会从工作队列中弹出一个任务,但也可以将新任务推回工作队列。简要介绍一下代码:

auto work_queue = safe_queue;

static void handle_task(T task) 
  // process the task
  // might push a new task to a work queue using work_queue.push()


int main() 
  //some work is done to prepopulate work_queue

  auto handle_work = []()
    while (!work_queue.empty) 
      T task = work_queue.pop();
      handle_task(task);
    
  ;

  std::vector<std::thread> threads;
  for (int i = 0; i < NUM_OF_THREADS; i++) 
    threads.push_back(std::thread(processing));
  

  std::for_each(threads.begin(), threads.end(), [](std::thread &t) 
    t.join();
  


我了解此代码无法正常工作,因为在某些情况下,队列可能为空,而某些工作线程正在处理工作,而另一个线程进入,找不到工作并退出(尽管线程处理任务可能会推回新工作要排队)。我的问题是,如何防止线程像那样过早退出?使用std::promise&lt;void&gt; 允许线程与其他可能仍在工作的线程进行通信是否可行?如果是这样,多线程如何工作(我是 C++ 新手,只使用单线程的 Promise)?

【问题讨论】:

【参考方案1】:

我不认为std::promise&lt;void&gt; 可以在这里使用,因为它更像是一次性的。在未来设置结果后,它不能被取消设置,因此我们不能为同一个承诺等待两次。

可以执行以下操作(计数器必须是线程安全的,但我现在懒得这样做):

int finished_threads = 0;
auto handle_work = [&finished_threads]()
    bool this_finished = false;
    while (finished_threads < NUM_OF_THREADS) 
      while (!work_queue.empty) 
        if(this_finished) 
          this_finished = false;
          --finished_threads;  // evil
        
        T task = work_queue.pop();
        handle_task(task);
      
      if(!this_finished) 
        this_finished = true;
        ++finished_threads; // evil
      
    
  ;

这应该只在所有线程完成后退出(例如:它们不再处理任务并且队列中没有任务)。然后不再有新的任务被放入队列中。局部变量最小化了对共享内存的访问。

请注意,我在多线程编程方面的经验非常有限。

【讨论】:

@KaenbyouRin 我从 OP 获取了这部分代码,所以我认为他们考虑过这一点。我认为您的评论中没有遗漏,我有点不确定在哪里。 @KaenbyouRin work_queue 这里是线程安全队列,我只是没有包含 safe_queue 结构的代码,它是队列的线程安全版本。 @n314159 有道理,谢谢。我能问一下为什么你增加/减少finished_threads的行被标记为'evil'吗? 因为它们不是线程安全的,因为变量是共享的,但访问是不同步的。可能原子会解决这个问题。但我不确定你如何确保在一个线程检查它是否为空并且它从中弹出之间没有从你的队列中弹出。如果你同步这个块,那么增量也应该是安全的。考虑一下,您可能想更改代码,以便弹出由一个线程完成,然后触发工作线程以执行handle_task @meriam 请注意这一点,因为thread safe 不能保证一切。例如,队列中有一个任务。 Thread A 检查empty,得到false。然后Thread B 检查empty,也得到false。所以两个线程都尝试弹出,但队列中只有一个任务!此外,一个幼稚的 while 循环可以是 resource hogging...【参考方案2】:

我用std::condition_variable 尝试了一些完全不同的东西:

#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <optional>
#include <algorithm>

constexpr int NUM_OF_THREADS = 5;
std::condition_variable input_cv;
std::condition_variable callback_cv;
std::optional<int> data ;
std::mutex m;
std::mutex callback_mutex;
int finished_threads = 0;

static void handle_task(int i)



struct worker 

    const int num;
    worker () = delete;
    void operator()() 
        while(true)
            std::unique_lock<std::mutex> lk(m);
            ++finished_threads; // protected by m
            callback_cv.notify_one(); // wake up main thread if it sleeps
            input_cv.wait(lk);
            if(!data)
                return;
            --finished_threads; // protected by m
            int local_data = *data;
            lk.unlock();

            handle_task(local_data);
        
    
;


struct safe_queue // dummy

    int pop () const  return 0; 
    bool empty() const return true;
;

void main_thread ()

    std::vector<std::thread> workers;
    safe_queue work_queue;
    for(int i = 0; i < NUM_OF_THREADS; ++i)
    
        workers.emplace_back(workeri);
    

    do
    
        do
        
            
                std::lock_guard<std::mutex> guard(m);
                data.emplace(work_queue.pop());
            
            input_cv.notify_one();
         while(!work_queue.empty() && finished_threads > 0)
        // If no thread has finished, we can wait for the next one to finish.

        std::unique_lock<std::mutex> lk(callback_mutex);
        callback_cv.wait(lk); // We wait on some thread to have finished
    while(finished_threads < NUM_OF_THREADS && !work_queue.empty()); // In either case, there remains something to do.

    data = ;
    input_cv.notify_all();

    std::for_each(begin(workers), end(workers), [](std::thread &t)  t.join(););



int main()


    std::thread t(main_thread);

    t.join();


不确定是否更好,但肯定更复杂^^。

【讨论】:

是否和在 safe_queue 本身中实现线程安全的 pop() 和 push() 一样?因为这就是我现在所拥有的 - 在 pop() 中,我等到队列变为非空,然后在 push() 中推送任务并使用条件变量来通知一些等待弹出的线程(当然两者都包括锁定) . 这也确保了所有线程在有工作的情况下都在工作并且没有ine过早退出。我认为,至少^^

以上是关于c++线程和promise的主要内容,如果未能解决你的问题,请参考以下文章

# Promise的简单理解和基本使用

# Promise的简单理解和基本使用

C++ 对象和线程

c++线程和promise

混合 C 和 C++ 的线程同步

C++ 持有多个线程