在 C++ 中实现“临时可暂停”并发循环

Posted

技术标签:

【中文标题】在 C++ 中实现“临时可暂停”并发循环【英文标题】:Implementing a "temporarily suspendable" concurrent loop in C++ 【发布时间】:2021-11-19 14:35:01 【问题描述】:

我正在编写一个程序,它的主线程产生一个工作线程来执行一些工作,在无限循环中休眠一段时间,即工作线程执行:

void do_work() 
  for (;;) 
    // do some work
    
    std::this_thread::sleep_for(100ms);
  

现在,我还希望能够从主线程中暂时完全禁用该工作线程,即我想编写以下函数:

disable_worker():禁用工作线程 enable_worker():再次开启工作线程

我想出的是以下内容:

#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>

using namespace std::literals::chrono_literals;

bool enabled;
std::mutex mtx;
std::condition_variable cond;

void disable_worker() 
  std::lock_guard<std::mutex> lock(mtx);
  enabled = false;


void enable_worker() 
  
    std::lock_guard<std::mutex> lock(mtx);
    enabled = true;
  

  cond.notify_one();


void do_work() 
  for (;;) 
    std::unique_lock<std::mutex> lock(mtx);
    cond.wait(lock, [] return enabled; );

    // ... do some work ...

    std::this_thread::sleep_for(100ms);
  



int main() 
  std::thread t(do_work);

  // ... enable/disable t as necessary ...

我想这可行(至少我不能发现任何问题),但是,我还想保证当enable_workerdisable_worker 中的任何一个返回(在主线程中)时,工作线程是保证阻塞条件变量或休眠,即不执行任何工作。如何在没有任何竞争条件的情况下实现这一点?

【问题讨论】:

@TedLyngmo:这只能部分解决我的问题,因为工作线程可能会在执行一些我想避免的工作期间暂停。从本质上讲,disable_worker 的语义应该是“完成你当前正在做的任何事情(如果有的话),然后阻塞直到你再次启用”。 是的,我刚刚意识到它甚至不是可移植的 pthread 调用 :-) std::atomic&lt;bool&gt; enabled;, while (true) if (enabled) /*do some work*/ std::this_thread::sleep_for(100ms); ? 我相信显示的代码已经满足了您的要求。 do_work 的主体在互斥体下运行,disable_workerenable_worker 也是如此。可以保证,在这些函数解锁互斥锁的那一刻,线程要么在lock(mtx) 上被阻塞,要么在cond.wait 上等待。它不能处于休眠状态(因为它在持有锁的同时休眠,从而阻塞了enable_workerdisable_worker;顺便说一句,这可能是不明智的)。 【参考方案1】:

这是一个带有队列计数器的并发门的 API,以及“休眠”使用它的想法。

struct SleepyDoorQueue 
  void UseDoor() 
    auto l = lock();
    ++queue_size;
    cv.notify_all();
    cv.wait( l, [&] return open;  );
    --queue_size;
  
  // sleeps for a while, then tries to open the door.
  // considered in queue while sleeping.
  template<class Rep, class Period>
  void SleepyUseDoor( const std::chrono::duration<Rep, Period>& rel_time ) 
    
      auto l = lock();
      ++queue_size;
      cv.notify_all();
    
    std::this_thread::sleep_for(rel_time);
    auto l = lock();
    cv.wait( l, [&] return open;  );
    --queue_size;
  
  void CloseDoor() 
    auto l = lock();
    open = false;
  
  void OpenDoor() 
    auto l = lock();
    open = true;
    cv.notify_all();
  
  void WaitForQueueSize(std::size_t n) const 
    auto l = lock();
    cv.wait(l, [&] return queue_size >= n;  );
  
  explicit SleepyDoorQueue( bool startOpened = true ):open(startOpened) 
private:
  std::condition_variable cv;
  mutable std::mutex m;
  std::size_t queue_size = 0;
  bool open = true;
  auto lock() const  return std::unique_lock(m); 
;

主线程关门,等待队列大小为 1 以确保工作线程不工作。

工作线程在休眠 100ms 后执行SleepyUseDoor 尝试打开它。

当工作线程可以工作时,主线程才开门。

如果有大量工作线程和控制器线程,这将是低效的,因为我对队列和开门消息使用相同的 cv。所以一个会导致其他线程虚假地唤醒。使用一个工作线程和一个控制器线程,消息在任何程度上都不会是虚假的。

我只在队列大小增加和开门时通知,但我故意发出超过 1 个通知(如果有人在等待队列大小更改并且开门者吃掉它,那会很糟糕)。


实际上,您可能可以用两扇门来实现这一点。

struct Door 
  // blocks until the door is open
  void UseDoor() const 
    auto l = lock();
    cv.wait(l, [&] return open; );
  
  // opens the door.  Notifies blocked threads trying to use the door.
  void OpenDoor() 
    auto l = lock();
    open = true;
    cv.notify_all();
  
  // closes the door.
  void CloseDoor() 
    auto l = lock();
    open = false;
  
  explicit Door(bool startOpen=true):open(startOpen) 
private:
  std::condition_variable cv;
  mutable std::mutex m;
  bool open = true;
  auto lock() const  return std::unique_lock(m); 
;

工作线程这样做:

Door AmNotWorking(true);
Door CanWork(true);

void work() 
  for(;;) 
    canWork.UseDoor()
    AmNotWorking.CloseDoor();
    // work
    AmNotWorking.OpenDoor();
    std::this_thread::sleep_for(100ms);
  

控制器线程会:

void preventWork() 
  CanWork.CloseDoor();
  AmNotWorking.UseDoor();

void allowWork() 
  CanWork.OpenDoor();

但我在那里看到了竞争条件;在CanWork.UseDoor()AmNotWorking.OpenDoor() 之间;有人可以关闭CanWork 门然后阅读AmNotWorking 门。我们需要它是原子的。

  // Goes through the door when it is open.
  // atomically runs the lambda passed in while the
  // mutex is locked with checking the door state.
  // WARNING: this can cause deadlocks if you do the
  // wrong things in the lambda.
  template<class F>
  void UseDoor(F atomicWhenOpen) const 
    auto l = lock();
    cv.wait(l, [&] return open; );
    atomicWhenOpen();
  

当我们成功使用门时,它会执行原子操作。有点危险,但工作线程现在可以:

void work() 
  for(;;) 
    canWork.UseDoor([]AmNotWorking.CloseDoor(););
    // work
    AmNotWorking.OpenDoor();
    std::this_thread::sleep_for(100ms);
  

这保证了我们将“AmNotWorking”门关闭在同一个锁中,因为我们验证了“CanWork”门是打开的。

void preventWork() 
  CanWork.CloseDoor();
  AmNotWorking.UseDoor();

如果“use can work and close am working”操作发生在CanWork.CloseDoor()之前,我们将无法AmNotWorking.UseDoor()直到工作线程完成他们的工作。

如果发生在CanWork.CloseDoor()之后,那么AmNotWorking.UseDoor()就关闭了,所以我们再次等到工作线程不工作。

我们不能 CanWork.CloseDoor() 在使用 can work 门和关闭 AmNotWorking 之间,这是额外的原子 lambda 回调给我们的。


我们可能可以制作一个不那么危险的原语,但我不确定如何优雅地完成它。

也许是一个简单的信号量?

template<class T = std::ptrdiff_t>
struct Semaphore 
  void WaitUntilExactValue( T t ) const 
    auto l = lock();
    cv.wait( l, [&] return value==t; 
  
  void WaitUntilAtLeastValue( T t ) const 
    auto l = lock();
    cv.wait( l, [&] return value>=t; 
  
  void WaitUntilAtMostValue( T t ) const 
    auto l = lock();
    cv.wait( l, [&] return value<=t; 
  
  void Increment() 
    auto l = lock();
    ++value;
    cv.notify_all();
  
  void BoundedIncrement(T ceil) 
    auto l = lock();
    cv.wait(l, [&] return value+1 <= ceil; );
    ++value;
    cv.notify_all();
  
  void Decrement() 
    auto l = lock();
    --value;
    cv.notify_all();
  
  void BoundedDecrement(T floor) 
    auto l = lock();
    cv.wait(l, [&] return value-1 >= floor; );
    --value;
    cv.notify_all();
  
  explicit Semaphore( T in = 0 ):value(std::forward<T>(in)) 
private:
  std::condition_variable cv;
  mutable std::mutex m;
  T value = 0;
  auto lock() const; // see above
;

然后

Semaphore workLimit(1);

void work() 
  for(;;) 
    workLimit.BoundedDecrement(0);
    // work
    workLimit.Increment();
    std::this_thread::sleep_for(100ms);
  


void preventWork() 
  workLimit.Decrement();
  workLimit.WaitUntilExactValue(0);

void allowWork() 
  workLimit.Increment();

这里,workLimit 是此时允许有多少工人工作。开头是1

当工人正在工作但不允许工作时,它是-1。当它工作并被允许时,它是0。当它处于睡眠状态并允许工作时,它是1。当它处于休眠状态时(或者因为它处于休眠状态,或者有界递减)并且不允许工作,它是0

【讨论】:

我很好奇。这个类名是你自己想出来的吗? @Timo 哪一个,SleepyDoorQueue?是的。 Door?可能不是。您可以打开/关闭并让物品通过的门的想法似乎是一个非常明显的原始想法。也可以叫门? 是的 SleepyDoorQueue 听起来很棒......而且很有趣:D 感谢超级详细的回答。我会试着绕开它(这可能需要一段时间),如果可行的话,我会接受它。

以上是关于在 C++ 中实现“临时可暂停”并发循环的主要内容,如果未能解决你的问题,请参考以下文章

C#中实现并发的几种方法的性能测试

如何在 C++ 中实现观察者设计模式流数据?

如何在 GUI 中实现我的 while 循环以使用 Visual Studio 在 C/C++ 中保持按键

在c++中实现,如何创建一个char的数组

C++11 中的读/写多线程

求大佬详答在unity中实现按钮播放视频的步骤(类似于抖音,触碰视频屏幕就可以暂停和播放)