C++并发与多线程 8_condition_variablewaitnotify_onenotify_all

Posted TianSong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++并发与多线程 8_condition_variablewaitnotify_onenotify_all相关的知识,希望对你有一定的参考价值。

std::condition_variable

  • 条件变量是一个对象,该对象能够阻塞调用线程,直到被通知恢复;
  • 当调用其等待函数(wait,wait_for,wait_until)之一时,它使用 unique_lock (通过互斥锁)来锁定线程,该线程将保持阻塞状态,直到被另一个同在 condition_variable 对象上调用通知功能的线程唤醒为止;
  • condition_variable 类型的对象始终使用 unique_lock<mutex> 等待(有关可与任何类型的可锁定类型一起使用的替代方法,可参见 condition_variable_any)。
// condition_variable example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
  std::unique_lock<std::mutex> lck(mtx);
  while (!ready) cv.wait(lck);
  // ...
  std::cout << "thread " << id << \'\\n\';
}

void go() {
  std::unique_lock<std::mutex> lck(mtx);
  ready = true;
  cv.notify_all();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_id,i);

  std::cout << "10 threads ready to race...\\n";
  go();                       // go!

  for (auto& th : threads) th.join();

  return 0;
}

输出:

10 threads ready to race...
thread 5
thread 6
thread 0
thread 9
thread 4
thread 1
thread 3
thread 2
thread 8
thread 7

std::condition_variable::notify_one

void notify_one() noexcept;

  • 解锁当前正在等待此条件的其中一个线程;
  • 如果没有线程在等待,则该函数将不执行任何操作(不产生任何影响);
  • 如果超过一个线程在等待,则未指定选择哪个线程。
// condition_variable::notify_one
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable produce,consume;

int cargo = 0;     // shared value by producers and consumers

void consumer () {
  std::unique_lock<std::mutex> lck(mtx);
  while (cargo==0) consume.wait(lck);
  std::cout << cargo << \'\\n\';
  cargo=0;
  produce.notify_one();
}

void producer (int id) {
  std::unique_lock<std::mutex> lck(mtx);
  while (cargo!=0) produce.wait(lck);
  cargo = id;
  consume.notify_one();
}

int main ()
{
  std::thread consumers[10],producers[10];
  // spawn 10 consumers and 10 producers:
  for (int i=0; i<10; ++i) {
    consumers[i] = std::thread(consumer);
    producers[i] = std::thread(producer,i+1);
  }

  // join them back:
  for (int i=0; i<10; ++i) {
    producers[i].join();
    consumers[i].join();
  }

  return 0;
}

输出:

1
2
3
4
5
7
6
9
10
8

std::condition_variable::notify_all

void notify_all() noexcept;

  • 解锁当前正在等待此条件的所有线程;
  • 如果没有线程在等待,则该函数不执行任何操作(不产生任何影响)。
// condition_variable::notify_all
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void print_id (int id) {
  std::unique_lock<std::mutex> lck(mtx);
  while (!ready) cv.wait(lck);
  // ...
  std::cout << "thread " << id << \'\\n\';
}

void go() {
  std::unique_lock<std::mutex> lck(mtx);
  ready = true;
  cv.notify_all();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_id,i);

  std::cout << "10 threads ready to race...\\n";
  go();                       // go!

  for (auto& th : threads) th.join();

  return 0;
}

输出:

10 threads ready to race...
thread 7
thread 4
thread 3
thread 2
thread 0
thread 1
thread 6
thread 5
thread 8
thread 9

std::condition_variable::wait

unconditional (1)
void wait (unique_lock<mutex>& lck);

predicate (2)
template <class Predicate>
void wait (unique_lock<mutex>& lck, Predicate pred);
  • 当前线程(应以锁定lck的互斥对象)的执行被阻塞,直到得到通知;
  • 在阻塞线程的时刻,该函数自动调用 lck.unlock(), 从而允许其它锁定的线程继续执行;
  • 一旦得到通知(明确的由其它线程通知),该函数将取消阻塞并调用 lck.lock(), 使 lck 处于与调用该函数时相同的状态。然后函数返回(注意,最后一次互斥锁可能会在返回之前再次阻塞线程);
  • 通常,通过另一个线程对成员 notify_one 或 notify_all 的调用来通知该函数唤醒。但是某些实现可能会产生虚假的唤醒调用,而不会调用这些函数中的任何一个。因此,使用此功能的用户应确保满足其恢复条件;
  • 如果指定了 pred(2), 则该函数仅在 pred 返回 false 时调用 wait 才会阻塞当前线程,并且通知只能在线程变为 true 时才取消阻塞线程(这对检查虚假唤醒调用特别有用)

    • 此版本 (2) 的行为就像是实现为:while (!pred()) wait(lck);
参数说明
  • lck

    • 一个 unique_lock 对象,其互斥对象当前已被该线程锁定;
    • 该对象的所有等待成员函数的所有并发调用均应使用相同的基础互斥对象(由 lck.mutex()返回)。
  • pred

    • 可调用的对象或函数,不带任何参数,并返回可以作为 bool 值评估的值;

    反复调用它,直到评估值为 true。

// condition_variable::wait (with predicate)
#include <iostream>           // std::cout
#include <thread>             // std::thread, std::this_thread::yield
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;

int cargo = 0;
bool shipment_available() {return cargo!=0;}

void consume (int n) {
  for (int i=0; i<n; ++i) {
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck,shipment_available);
    // consume:
    std::cout << cargo << \'\\n\';
    cargo=0;
  }
}

int main ()
{
  std::thread consumer_thread (consume,10);

  // produce 10 items when needed:
  for (int i=0; i<10; ++i) {
    while (shipment_available()) std::this_thread::yield();
    std::unique_lock<std::mutex> lck(mtx);
    cargo = i+1;
    cv.notify_one();
  }

  consumer_thread.join();

  return 0;
}

输出:

1
2
3
4
5
6
7
8
9
10

std::condition_variable::wait_for

unconditional (1)
template <class Rep, class Period>
  cv_status wait_for (unique_lock<mutex>& lck,
                      const chrono::duration<Rep,Period>& rel_time);

predicate (2)
template <class Rep, class Period, class Predicate>
       bool wait_for (unique_lock<mutex>& lck,
                      const chrono::duration<Rep,Period>& rel_time, Predicate pred);
  • 与 std:condition_variable::wait()类似,不过 wait_for 可以指定一个时间段,在当前线程收到通知或者指定的时间 rel_time 超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其它线程的通知, wait_for 返回,剩下的处理步骤和 wait() 类似;
  • wait_for 的重载版本 predicate (2) 的最后一个参数 pred 表示 wait_for 的预测条件,只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且收到其它线程通知后只有当 pred 为 true 时才会解除阻塞,因此相当于如下代码:

return wait_until (lck, chrono::steady_clock::now() + rel_time, std::move(pred));

// condition_variable::wait_for example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <chrono>             // std::chrono::seconds
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable, std::cv_status

std::condition_variable cv;

int value;

void read_value() {
  std::cin >> value;
  cv.notify_one();
}

int main ()
{
  std::cout << "Please, enter an integer (I\'ll be printing dots): \\n";
  std::thread th (read_value);

  std::mutex mtx;
  std::unique_lock<std::mutex> lck(mtx);
  while (cv.wait_for(lck,std::chrono::seconds(1))==std::cv_status::timeout) {
    std::cout << \'.\' << std::endl;
  }
  std::cout << "You entered: " << value << \'\\n\';

  th.join();

  return 0;
}

输出:

Please, enter an integer (I\'ll be priniting dots):
.
.
2
You entered: 2

std::condition_variable::wait_until

unconditional (1)
template <class Clock, class Duration>
  cv_status wait_until (unique_lock<mutex>& lck,
                        const chrono::time_point<Clock,Duration>& abs_time);

predicate (2)
template <class Clock, class Duration, class Predicate>
       bool wait_until (unique_lock<mutex>& lck,
                        const chrono::time_point<Clock,Duration>& abs_time,
                        Predicate pred);
  • 与 std::condition_variable::wait_for 类似,但是 wait_until 可以指定一个时间点,在当前线程收到通知或者指定的时间点 abs_time 超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_until 返回,剩下的处理步骤和 wait_until() 类似;
  • 另外,wait_until 的重载版本predicte(2)的最后一个参数 pred 表示 wait_until 的预测条件,只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞,因此相当于如下代码:
while (!pred())
  if ( wait_until(lck,abs_time) == cv_status::timeout)
    return pred();
return true;
参数:abs_time
  • 线程将停止阻塞的时间点,以允许函数返回;
  • time_point是代表特定绝对时间的对象。
#include <iostream>
#include <atomic>
#include <condition_variable>
#include <thread>
#include <chrono>
using namespace std::chrono_literals;
 
std::condition_variable cv;
std::mutex cv_m;
std::atomic<int> i{0};
 
void waits(int idx)
{
    std::unique_lock<std::mutex> lk(cv_m);
    auto now = std::chrono::system_clock::now();
    if(cv.wait_until(lk, now + idx*100ms, [](){return i == 1;}))
        std::cerr << "Thread " << idx << " finished waiting. i == " << i << \'\\n\';
    else
        std::cerr << "Thread " << idx << " timed out. i == " << i << \'\\n\';
}
 
void signals()
{
    std::this_thread::sleep_for(120ms);
    std::cerr << "Notifying...\\n";
    cv.notify_all();
    std::this_thread::sleep_for(100ms);
    i = 1;
    std::cerr << "Notifying again...\\n";
    cv.notify_all();
}
 
int main()
{
    std::thread t1(waits, 1), t2(waits, 2), t3(waits, 3), t4(signals);
    t1.join(); 
    t2.join();
    t3.join();
    t4.join();
}

输出:

Thread 1 timed out. i == 0
Notifying...
Thread 2 timed out. i == 0
Notifying again...
Thread 3 finished waiting. i == 1

以上是关于C++并发与多线程 8_condition_variablewaitnotify_onenotify_all的主要内容,如果未能解决你的问题,请参考以下文章

C++并发与多线程 4_创建多个线程数据共享问题分析

C++并发与多线程 10_shared_futureautomic

C++并发与多线程 9_asyncfuturepackaged_taskpromise

C++并发与多线程 3_线程传参数详解,detach 注意事项

C++并发与多线程 2_线程启动结束,创建线程多种方法,join,detach

C++并发与多线程 11_std::atomic叙谈std::launch(std::async) 深入