C++并发编程之三 并发操作的同步

Posted ZHAOCHENHAO-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++并发编程之三 并发操作的同步相关的知识,希望对你有一定的参考价值。

文章目录

  1. 上个章节我们讨论了如何对共享数据的一个保护,通过std::lock_guard、std::unique_lock、初始化过程中使用std::call_once、std::once_flag、多个线程读取少量线程写入的时候使用std::shared_lock、std::unique_lock和std::shared_mutex或std::shared_timed_mutex搭配使用,还有单线程递归加锁的方式std::recursive_mutex的使用方法。
  2. 但是有时候我们不仅需要保护共享数据,还需要令独立线程上的行为同步。那么本章节我们就来讲一下线程同步的几种方法。

1. 等待事件或等待其他条件

有的时候需要各个线程之间协同操作,比如A线程需要等待B线程完成某一个功能之后才开始执行,那么有没有什么办法,B线程完成功能之后通知A线程一声,然后A线程接受到信号之后就开始工作呢?肯定是有的。

1.1 凭借条件变量等待条件成立

那么就开始介绍std::condition_variable 和 std::condition_variable_any的使用。
std::condition_variable 和 std::condition_variable_any 是 C++ 中用于多线程同步的两个类。它们都允许线程在等待某个条件变为真之前挂起自己,以免造成无谓的 CPU 时间浪费。

1.1.1 std::condition_variable

std::condition_variable 是 C++11 中引入的一个线程同步原语,用于在等待某个条件变为真之前挂起当前线程。使用 std::condition_variable 时,通常需要先定义一个 std::mutex,因为std::condition_variable仅限于与std::mutex一起使用,然后用 std::unique_lockstd::mutex 对其进行上锁,最后通过 std::condition_variable::wait() 解锁并且挂起当前线程:

#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void worker_thread() 
    // 等待条件变为真
    std::unique_lock<std::mutex> lck(mtx);
    while (!ready) 
        cv.wait(lck);
    

    // 条件已经变为真,执行一些操作
    // ...
    if (lck.owns_lock())
    
        std::cout << "lck has locked" << std::endl;
    


int main() 
    // 模拟某些操作
    std::this_thread::sleep_for(std::chrono::seconds(5));

    // 通知等待的线程条件已经变为真
    
        std::lock_guard<std::mutex> lck(mtx);
        ready = true;
    
    cv.notify_one();
    std::thread t(worker_thread);
    t.join();
    return 0;

在上面的代码中,worker_thread() 函数等待 ready 变为 true,如果当前 ready 的值为 false,则调用 cv.wait(lck) 挂起当前线程,同时释放 lck。当 cv.notify_one() 被调用时,cv.wait(lck) 会返回,worker_thread() 函数会重新获得 lck,然后执行一些操作。

需要注意的是,由于 std::condition_variable::wait() 可能会出现虚假唤醒(即没有被 notify 也会从 wait() 函数中返回),因此在使用 std::condition_variable 时,通常需要将等待条件的语句用循环包围起来,以确保条件变为真时不会错过信号。
当然也可以这样写:

#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void worker_thread() 
    // 等待条件变为真
    std::unique_lock<std::mutex> lck(mtx);
    cv.wait(lck, [&]() return ready; ); 

    // 条件已经变为真,执行一些操作
    // ...
    if (lck.owns_lock())
    
        std::cout << "lck has locked" << std::endl;
    


int main() 
    // 模拟某些操作
    std::this_thread::sleep_for(std::chrono::seconds(5));

    // 通知等待的线程条件已经变为真
    
        std::lock_guard<std::mutex> lck(mtx);
        ready = true;
    
    cv.notify_one();
    std::thread t(worker_thread);
    t.join();
    return 0;

在段代码里面使用了cv.wait(lock, [&]() return ready; )进行等待条件成立。正好在此介绍一下std::condition_variable::wait()函数的用法:

std::condition_variable::wait()是一个用于等待通知的函数,其使用方式如下:

template< class Predicate > 
void wait(std::unique_lock<std::mutex>& lock, Predicate pred ); 

其中,lock是一个已经加锁的互斥量(必须是 std::unique_lock 类型),pred是一个可调用对象,用于判断等待条件是否满足。函数执行时会自动释放锁,并阻塞当前线程直到被通知。当线程被通知后,函数会重新获得锁,并重新检查等待条件。如果等待条件满足,函数返回;否则,函数再次进入阻塞状态等待下一次通知。
也就是如果pred返回true的时候,wait不会阻塞线程,而当pred返回false的时候,线程会进入休眠状态也就是阻塞,那什么时候苏醒呢?等待下一次notify_one或者notify_all的时候,再次检查pred的状态,如果pred返回true,那么wait苏醒不会阻塞线程。所以notify只会对阻塞休眠状态的线程起作用。
当没有pred函数的时候,wait就直接进行阻塞进入休眠状态,直到notify_one或这notify_all进行唤醒线程.

1.1.2 std::condition_variable_any

std::condition_variable_any 是 C++11 中引入的另一个线程同步原语,它的作用与 std::condition_variable 相同,但是可以与任何可锁定的互斥量(std::mutex、std::shared_mutex、std::recursive_mutex)等等一起使用。使用 std::condition_variable_any 时,需要先定义一个互斥量,然后使用 std::unique_lock 对其进行上锁。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable_any cv;
bool ready = false;

void worker_thread() 
    // 等待条件变为真
    
        std::unique_lock<std::mutex> lck(mtx);
        while (!ready) 
            cv.wait(lck);
        
    

    // 条件已经变为真,执行一些操作
    // ...
    std::cout << "shared data" << std::endl;


int main() 
    // 模拟某些操作
    std::this_thread::sleep_for(std::chrono::seconds(1));

    // 通知等待的线程条件已经变为真
    
        std::lock_guard<std::mutex> lck(mtx);
        ready = true;
    
    cv.notify_one();
    std::thread t(worker_thread);
    t.join();
    return 0;

注意:至于为什么能在main函数中使用std::lock_guard,而在worker_thread()函数中不能使用std::lock_guard,这是因为std::condition_variable或者std::condition_variable_any在等待操作的过程中要释放互斥量,如果使用 std::lock_guard,那么在等待操作期间就无法释放互斥量,从而无法满足 std::condition_variable 的要求。

std::unique_lock 比 std::lock_guard 更灵活,因为它允许在构造时不锁定互斥量,在析构时再解锁。这使得 std::unique_lock 可以用于实现一些更为复杂的同步操作,例如超时等待、可重入锁等。此外,std::unique_lock 还提供了一些额外的功能,例如手动锁定和解锁互斥量、转移互斥量的所有权等。

在使用 std::condition_variable或者std::condition_variable_any时,我们通常需要使用 std::unique_lock 来锁定互斥量,并在等待操作期间释放互斥量。这样可以让其他线程获得互斥量并修改共享变量,从而避免死锁的情况。

1.1.3 std::condition_variable和std::condition_variable_any之间的区别

std::condition_variable 和 std::condition_variable_any 都是用于多线程同步的 C++ 标准库类,它们的主要区别在于:

  • 适用范围:
    std::condition_variable 只能与 std::unique_lockstd::mutex 配合使用,而 std::condition_variable_any 可以与任何能够提供 lock() 和 unlock() 成员函数的互斥量配合使用,包括 std::mutex、std::recursive_mutex、std::shared_mutex 等等。
  • 实现细节:
    std::condition_variable_any 的实现可能比 std::condition_variable 更为复杂,因为它需要支持不同类型的互斥量,而且可能需要在等待队列中存储更多的信息来避免死锁和无效的等待。

综上所述,std::condition_variable 更为简单且更为常用,适用于绝大多数的同步场景。而 std::condition_variable_any 则更加灵活,适用于一些特殊的同步场景,例如需要使用不同类型的互斥量、需要跨线程进行等待和通知等。

1.2 使用future等待一次性事件发生

上面我们看到了std::condition_variable和std::condition_variable_any的使用,这种方式是一个线程A运行,另一个线程B在等待线程A释放信号之前,一直阻塞线程的运行。说明线程B依赖于线程A,如果线程A不释放,那么线程B一直阻塞。可是当我们遇到这种情况时,比如做饭的情形:我有没有办法一边煮饭,一边读书写博客呢?等我博客写完了之后,再去看一下饭有没有煮好。而不是非要我读完书再去一直等待煮饭,或者煮完饭再去写博客,这样的话就太浪费时间了。

1.2.1 从后台返回任务结果

当然是有办法的,我们可以使用std::future用于获取一个异步操作的结果。它可以让一个线程在执行一个耗时的操作时不必一直等待其完成,而是可以继续执行其他任务,并在需要时获取操作的结果。在使用std::future时,可以使用std::async函数或std::packaged_task对象来创建一个异步任务,该任务会在另一个线程中执行,并返回一个std::future对象,可以用来获取任务的结果。
示例如下:

#include <iostream>
#include <future>

int add(int x, int y) 

    return x + y;


int main() 
    // 使用std::async创建异步任务
    std::future<int> result = std::async(std::launch::async, add, 10, 20);
	//这里可以做其他事情,等事情做完了之后,查看一下异步线程的结果即可。
	//do other someting
	
    // 使用std::future获取异步任务的返回值
    int sum = result.get();

    // 输出异步任务的返回值
    std::cout << "The sum is: " << sum << std::endl;

    return 0;

在上面的例子中,我们定义了一个add函数,用于计算两个整数的和。然后我们使用std::async创建一个异步任务,并将add函数和两个整数10和20作为参数传递给它。

接着,我们使用std::future的get()函数获取异步任务的返回值,即add函数的返回值。最后,我们将异步任务的返回值打印出来,以确认异步任务已经成功完成并返回了正确的结果。

在这里我们简单介绍一下std::async()函数:
std::async()函数是C++11标准库中的一个函数,可以用来实现异步操作。它可以在一个新的线程或者一个线程池中执行一个函数,并返回一个std::future对象,可以用来获取函数执行的结果或者检查函数是否执行完成。

#include <iostream>
#include <future>

int main() 
    // 使用 std::async() 函数在一个新线程中执行函数
    std::future<int> future_result = std::async([]() 
        std::cout << "New thread running..." << std::endl;
        return 42;
    );

    // 在主线程中获取函数执行的结果
    int result = future_result.get();

    std::cout << "Result: " << result << std::endl;

    return 0; 
     

在上面的代码中,我们使用std::async()函数在一个新的线程中执行了一个匿名函数,该函数输出一条消息并返回一个整数值。std::async()函数返回一个std::future对象,我们可以使用get()方法获取函数执行的结果。最后,我们在主线程中输出了函数的返回值。

需要注意的是,std::async()函数有多种不同的执行策略,例如std::launch::async和std::launch::deferred等,它们会影响函数执行的方式和时间。
如下为std::launch::async 和 std::launch:deferred的详细说明:

  1. std::launch::async和std::launch::deferred都是std::async函数的执行策略参数,它们分别表示异步执行和延迟执行两种不同的执行方式。
  2. std::launch::async表示要求std::async函数在调用时立即在新的线程中执行函数,这种方式通常用于需要异步执行的任务,例如在后台处理一些耗时的计算任务。使用std::launch::async执行任务可以使主线程不必等待任务完成而继续执行其他工作。但需要注意的是,使用std::launch::async执行任务可能会导致程序使用的线程数增加,这可能会对系统资源产生一定的压力。
  3. std::launch::deferred表示要求std::async函数在调用get方法时才开始执行函数,这种方式通常用于需要延迟执行的任务。使用std::launch::deferred执行任务可以避免线程数增加,因为任务是在调用get方法时才开始执行,这时通常会在主线程中执行。但需要注意的是,使用std::launch::deferred执行任务可能会导致调用get方法时阻塞主线程,直到任务完成为止。

示例如下:

#include <iostream>
#include <future>

int compute() 
    std::cout << "Task started..." << std::endl;
    // 模拟一个耗时的操作
    std::this_thread::sleep_for(std::chrono::seconds(3));
    std::cout << "Task finished!" << std::endl;
    return 42; 

int main() 
    std::future<int> future_result1 = std::async(std::launch::async, compute);
    std::future<int> future_result2 = std::async(std::launch::deferred, compute);

    std::cout << "Do some other work..." << std::endl;

    int result1 = future_result1.get();
    int result2 = future_result2.get();

    std::cout << "Result1: " << result1 << std::endl;
    std::cout << "Result2: " << result2 << std::endl;

    return 0; 
   

在上面的示例代码中,我们使用std::launch::async和std::launch::deferred分别创建了两个std::future对象,分别表示要异步执行和延迟执行一个函数compute。在主线程中,我们继续执行一些其他任务,并不必等待这些任务完成。最后,我们分别使用get方法获取这两个任务的结果,并输出结果到控制台。
在向std::async()向任务函数传递参数的时候与std::thread对象传递参数很类似:

  1. 定义一个函数,该函数将接受要传递的参数:
void task(int arg1, double arg2, const std::string& arg3) 
   // 任务代码 
   
  1. 调用std::async并将任务函数和参数传递给它:
std::future<void> fut = std::async(std::launch::async, task, 42, 3.14, "hello"); 
  1. 如果任务返回一个值,您可以使用std::future来获取该值:
std::future<int> fut = std::async(std::launch::async, task, 42, 3.14, "hello"); 
int result = fut.get(); 

这将启动异步任务并等待其完成。一旦任务完成,fut.get()将返回任务的返回值。如果任务没有返回值,您可以将std::future声明为std::future。
注意,如果函数是成员函数,则第一个参数应该是该函数的类实例指针。例如,如果task是类MyClass的成员函数,则调用应该像这样:

MyClass obj; std::future<void> fut = std::async(std::launch::async, &MyClass::task, &obj, 42, 3.14, "hello"); 

其中,&MyClass::task表示MyClass类的成员函数task,&obj表示MyClass类的实例对象指针。

1.2.2 关联future实例和任务

在前两篇文章中,我们异线程运行任务时都是使用std::thread,但是我们知道std::thread不返回任何结果,但我们有没有什么办法能知道std::thread异步线程中的执行结果呢?当然是可以的,在本章节中将引入std::packaged_task

std::packaged_task是一个模板类,可以将函数或可调用对象封装为一个可调用的future对象,使其能够在另一个线程中异步执行,并且可以获取异步执行结果。并且std::packaged_task对象是可调用对象,我们可以直接使用,还可以将其包装在std::function对象内,当作线程函数传递给std::thread对象,也可以传递给需要可调用对象的函数。

使用std::packaged_task的一般步骤如下:

  1. 创建一个std::packaged_task对象,并将要异步执行的函数或可调用对象作为参数传递给它的构造函数。
  2. 调用std::packaged_task::get_future方法获取一个future对象,它将在异步执行完成时接收结果。
  3. 将std::packaged_task对象传递给std::thread或std::async等异步执行函数,或者使用std::packaged_task::operator()调用函数。
  4. 在异步执行完成后,可以通过future::get方法获取异步执行结果。

以下是一个简单的示例代码,使用std::packaged_task异步执行一个函数并获取其结果:

#include <iostream>
#include <future>
#include <thread>

int square(int x) 
    return x * x;


int main() 
    std::packaged_task<int(int)> task(square);
    std::future<int> fut = task.get_future();
    std::thread th(std::move(task), 6);
    th.join();
    int result = fut.get();
    std::cout << "Result: " << result << std::endl;
    return 0;

在这个示例中,std::packaged_task对象被创建并传递给了一个新的线程。在调用std::thread构造函数时,需要使用std::move将std::packaged_task对象转移到新线程中。异步执行完成后,可以通过future::get方法获取结果,并将其输出到控制台。

需要注意的是,如果异步执行函数抛出异常,则future::get方法将重新抛出该异常。因此,在使用std::packaged_task时,需要考虑异常处理以及线程同步等问题。

为什么要使用std::move呢:

  1. 在向std::thread传递std::packaged_task对象时,需要使用std::move,因为std::thread的构造函数使用了右值引用来接收函数对象,而std::packaged_task是一个包装了可调用对象的类模板,其中包含了一个移动构造函数。

  2. 如果我们不使用std::move,则将尝试使用std::packaged_task的复制构造函数来创建一个新对象,并将其传递给std::thread的构造函数,这将导致编译错误,因为std::packaged_task的复制构造函数被删除了,只保留了移动构造函数。

  3. 因此,为了正确地将std::packaged_task对象传递给std::thread,需要使用std::move将其转换为右值,从而调用std::packaged_task的移动构造函数。

1.2.3 创建std::promise

上面我们提到std::packaged_task可以将函数封装成一个可被调用的对象,然后传入到其他线程中,进行异步线程的通信。除此之外,还有其他的方式可以异步通信,将std::promise<T>和std::future<T>进行相关联。

std::promise 是 C++11 中的一个类,用于在多线程编程中的异步操作中,通过分离了值的提供者和使用者,实现线程间的通信。

通常情况下,在一个异步任务中,线程 A 生成一个结果,线程 B 会等待这个结果,然后使用它进行后续操作。如果没有合适的同步机制,线程 B 可能会在线程 A 完成前一直等待,这可能导致线程 B 一直阻塞,从而使程序变得不可用。

使用 std::promise 可以解决这个问题。std::promise 允许线程 A 异步生成结果,而线程 B 则可以在需要结果时等待它的到来,这就使得线程 B 可以不必阻塞等待结果,而可以继续执行其他任务。具体来说,线程 A 可以使用 std::promise 对象的 set_value() 方法来设置一个结果值,而线程 B 可以使用 std::promise 对象的 get_future() 方法来获取一个关联的 std::future 对象,然后使用 std::future 对象的 get() 方法来等待并获取结果值。

当需要在两个线程之间进行通信并交换数据时,可以使用 std::promise 和 std::future 实现。以下是一个简单的例子:

#include <iostream>
#include <thread>
#include <future>

void do_something(std::promise<int> &&prms) 
    std::cout << "Inside thread A, doing something..." << std::endl;
    int result = 42;
    prms.set_value(result); //设置结果值


int main() 
    std::cout << "Starting thread A..." << std::endl;
    std::promise<int> prms;
    std::future<int> ftr = prms.get_future(); //获取 future 对象
    std::thread t(do_something, std::move(prms));
    
    std::cout << "Waiting for result in main thread..." << std::endl;
    int result = ftr.get(); //等待结果
    std::cout << "Result received: " << result << std::endl;
    
    t.join();
    return 0;

在这个例子中,do_something() 函数表示一个需要在单独线程中执行的异步任务,它使用 std::promise 对象设置了一个结果值。在主线程中,首先创建了一个 std::promise 对象,并通过调用其 get_future() 方法获得了一个 std::future 对象。接着,主线程创建了一个新的线程并将 std::promise 对象作为参数传递给线程函数 do_something()。在 do_something() 函数内部,它使用 set_value() 方法设置了一个结果值。

在主线程中,它调用了 ftr.get() 方法等待异步任务完成并获取结果值。因为 std::future 对象已经和 std::promise 对象相关联,所以它能够获得由线程 A 生成的结果值。最后,主线程打印出结果值,并等待线程 A 完成。

那么当有多个std::promise<T>和多个std::promise<T>相对应的时候,该怎么处理呢?比如在单线程内处理多个连接。

#include <iostream>
#include <vector>
#include <future>
#include <chrono>

void handle_connection(int connection_id, std::promise<int>& p) 
    // 模拟连接处理
    std::cout << "Handling connection " << connection_id << "..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));
    int result = connection_id * 2;
    p.set_value(result);


int main() 
    std::vector<std::future<int>> futures;
    std::vector<std::promise<int>> promises;

    // 创建多个连接和相应的promise对象
    for (int i = 0; i < 5; i++) 
        promises.emplace_back

Java并发编程系列之三JUC概述

上篇文章为解决多线程中出现的同步问题引入了锁的概念,上篇文章介绍的是Synchronized关键字锁,本篇文章介绍更加轻量级的锁Lock接口及引出JUC的相关知识。 本文不力争阐释清楚JUC框架的所有内容,而是站在一定的高度下,了解Juc下包的设计与实现。 一、LOCK锁概述 实现同步的另外一种方式

以上是关于C++并发编程之三 并发操作的同步的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程系列之三JUC概述

Java并发专题之三Java线程同步

Java并发编程系列之三JUC概述

java并发编程之三--CyclicBarrier的使用

手把手写C++服务器(16):服务端多线程并发编程入门精讲

Java并发编程实战 第15章 原子变量和非阻塞同步机制