C ++中的条件变量问题

Posted

技术标签:

【中文标题】C ++中的条件变量问题【英文标题】:Issue with condition variables in C++ 【发布时间】:2021-10-25 03:53:39 【问题描述】:

我们已经实现了TaskRunner,它的函数将被不同的线程调用来启动、停止和发布任务。 TaskRunner 将在内部创建一个线程,如果队列不为空,它将从队列中弹出任务并执行它。 Start() 将检查线程是否正在运行。如果没有创建一个新线程。 Stop() 将加入线程。代码如下。

bool TaskRunnerImpl::PostTask(Task* task) 
  tasks_queue_.push_back(task);
  return true;


void TaskRunnerImpl::Start() 
  std::lock_guard<std::mutex> lock(is_running_mutex_);
  if(is_running_) 
    return;
  
  is_running_ = true;

  runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);


void TaskRunnerImpl::Run() 
  while(is_running_) 
    if(tasks_queue_.empty()) 
      continue;
    
    Task* task_to_run = tasks_queue_.front();
    task_to_run->Run();
    tasks_queue_.pop_front();

    delete task_to_run;
  


void TaskRunnerImpl::Stop() 
  std::lock_guard<std::mutex> lock(is_running_mutex_);
  is_running_ = false;
  if(runner_thread_.joinable()) 
    runner_thread_.join();
  

此代码按预期工作。任务不断被推送,线程正在执行这些任务。我们现在要使用条件变量,否则线程将不断检查任务队列是否为空。我们实现如下。

线程函数 (Run()) 将等待条件变量。 如果有人发布任务,PostTask() 会发出信号。 Stop() 将在有人调用停止时发出信号。

实现代码如下。

bool TaskRunnerImpl::PostTask(Task* task, uint64_t delay_milliseconds) 
    std::lock_guard<std::mutex> taskGuard(m_task_mutex);
    tasks_queue_.push_back(task);
    m_task_cond_var.notify_one();
    INFO(" :  : ", __FUNCTION__, delay_milliseconds, tasks_queue_.size());
    return true;


void TaskRunnerImpl::Start() 
    INFO("", __FUNCTION__);
    std::lock_guard<std::mutex> taskGuard(m_task_mutex);

    if(!is_running_) 
        is_running_ = true;
        runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);
    


void TaskRunnerImpl::Run() 
    while(true) 
        INFO(" : ", __FUNCTION__, 1);

        
            std::unique_lock<std::mutex> mlock(m_task_mutex);
            INFO(" : Locked Mutex", __FUNCTION__);
            m_task_cond_var.wait(mlock, [this]() 
                INFO(" : Checking Condition", __FUNCTION__);
                return !(is_running_ && tasks_queue_.empty());
            );


                INFO(" : Came out of wait", __FUNCTION__);
            if(!is_running_) 
                return;
            

           INFO(" : Escaped if cond", __FUNCTION__);
            if(!tasks_queue_.empty()) 
                INFO(" :  : ", __FUNCTION__, 2, tasks_queue_.size());    // NO LOGS AFTER THIS GETTING PRINTED
                Task* task_to_run = tasks_queue_.front();
                task_to_run->Run();
                INFO(" : Deleting Task", __FUNCTION__);
                tasks_queue_.pop_front();
                INFO(" : After Deletion : ", __FUNCTION__, tasks_queue_.size());
                delete task_to_run;
            
        INFO(" : Out of scope", __FUNCTION__);
        

        INFO(" : End of iteration", __FUNCTION__);
    

    INFO(" : returning", __FUNCTION__);


void TaskRunnerImpl::Stop() 
    
        std::lock_guard<std::mutex> taskGuard(m_task_mutex);
        is_running_ = false;
        INFO(" : Signalling STOP", __FUNCTION__);
        m_task_cond_var.notify_one();
    

    INFO(" : ", __FUNCTION__, 1);

    if(runner_thread_.joinable()) 
        runner_thread_.join();
    

不确定代码有什么问题。我得到以下输出。

TaskRunnerImpl.cpp:34:INFO: Start
TaskRunnerImpl.cpp:45:INFO: Run : 1
TaskRunnerImpl.cpp:49:INFO: Run : Locked Mutex
TaskRunnerImpl.cpp:51:INFO: operator() : Checking Condition
TaskRunnerImpl.cpp:29:INFO: PostTask : 0 : 1
TaskRunnerImpl.cpp:29:INFO: PostTask : 0 : 2
TaskRunnerImpl.cpp:51:INFO: operator() : Checking Condition
TaskRunnerImpl.cpp:56:INFO: Run : Came out of wait
TaskRunnerImpl.cpp:61:INFO: Run : Escaped if cond
TaskRunnerImpl.cpp:63:INFO: Run : 2 : 2

这意味着在执行任务之前打印日志,之后就没有日志了。通常 PostTask() 将被连续调用以将任务发布到队列中。但是使用新代码在任务运行后没有日志。所以我假设线程函数持有互斥锁并且 PostTask() 无法将任务推送到队列中。但是无法理解为什么执行任务后没有日志。如果我恢复到原始代码,则代码按预期工作。如果代码有任何问题,谁能告诉我。

【问题讨论】:

tasks_queue_ 如果 std 队列意味着您的“精细”代码充满了 UB。发布minimal reproducible example 因为在第二个版本中访问tasks_queue_ 总是在持有互斥锁时完成,@yakk,我看不出有这个问题。第一个版本充满了 UB,不仅因为这个,还因为 is_running_ 也没有正确同步。但是,无论如何,minimal reproducible example 是必需的。 另外,你的 INFO 函数可能对代码有副作用,因为要正确输出到 cout 或类似的东西,你需要同步。 无论如何,架构方面,写一个条件值互斥锁拥有线程安全队列。它应该支持 push 和 pop,并在模板类型 T 上工作。pop 休眠直到有东西要弹出。也许添加一个中止方法,并让 pop 返回一个可选的(如果中止则为空)。然后将线程管理代码移出它。您将几乎免费获得许多 popers 和 pushers,并且由于结构的原因,队列的争用也将免费消失。 【参考方案1】:

您可能遇到了死锁。在许多地方,您的代码使互斥锁锁定的时间比要求的要长。

启动线程时,通常不需要锁。

停止时也不行。为此,您可以使用原子布尔值。

最后,当您运行每个任务时(即task_to_run-&gt;Run();),您绝对应该没有锁。

如果您在 99% 的时间内都锁定了互斥锁,那么为什么不在主线程中做所有事情呢?如前所述,当互斥锁被锁定时,您不能在运行任务时发布任何任务。所以你发布任务的线程会一直等到当前任务完成。

因此,当任务队列不为空时,您的发布线程几乎不会有任何进展。

显然,您希望将顶部任务放在局部变量中并将该项目从队列中弹出,然后释放锁,然后运行任务。类似的东西:

if(!tasks_queue_.empty())

    std::unique_ptr<Task> task_to_run(tasks_queue_.front());
    tasks_queue_.pop_front();
    mlock.unlock();
    task_to_run->Run();

任何好的 C++ 书籍都应该解释这一点。

如果您对多线程很认真,那么C++ Concurrency in action 是一本不错的读物。

顺便说一句,如果你可以使用 C++ 20,你可以通过使用 jthread 和取消令牌来简化代码。

您可能想阅读的另一本书是Concurrency with Modern C++或一本关于 C++20 的书。

顺便说一句,您的队列也应该存储std::unique_ptr。否则,在调用Stop 后,您需要清理代码来删除任务。它有助于编写异常安全的代码,而不需要太多额外的代码,例如显式 try/catch 或清理循环。

即使bool TaskRunnerImpl::PostTask(Task* task) 也应该替换为bool TaskRunnerImpl::PostTask(std::unique_ptr &lt;Task&gt; task),然后你应该std::move 你的任务。

【讨论】:

谢谢。但我不明白“停止时不需要锁”。如果我不使用锁和条件变量,线程如何知道它必须退出。线程将愉快地等待条件变量。 @kadina 我说过不要使用锁但我从来没有说过不要使用条件变量。我已经说过要 (1) 删除锁并 (2) 使用原子变量(显然是 is_running_)。

以上是关于C ++中的条件变量问题的主要内容,如果未能解决你的问题,请参考以下文章

C/C++ 和其他语言中的条件变量使用模式

使用条件变量超时的读取器/写入器锁定

R If then do - 如何根据条件创建变量

C POSIX 线程的互斥体/条件的工作方式因变量所在的位置而异

C ++:for循环中的多个退出条件(多个变量):AND -ed或OR -ed?

使用r中的长格式data.table中的两个变量按条件改变变量