C++多线程

Posted CPP编程客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++多线程相关的知识,希望对你有一定的参考价值。

线程同步要么保证操作数的不可切割性,要么保证指定代码的执行顺序,这样就能避免数据竞争。

原子操作保证了不可切割性,临界区,互斥量,事件等等都保证了执行顺序,所以在并发环境里才能安全地操作数据。C++中也提供了一些同步方法,和Windows中的方法概念相似,所以可以先看Windows的同步再来看C++的。

1. 原子库(Atomic)

C++11提供了std::atomic原子操作库,对于不同的硬件封装了不同的底层操作,所以atomic支持跨平台的原子操作。

前面写过,原子操作即不可切割,不可被打断,以此来防止数据竞争。Windows以InterlockedXXX系列函数来完成原子操作,C++提供的atomic包含了这些函数提供的原子操作,基本摘要如下:

 1template<class _Ty>
2struct atomic :
 _Atomic_base<_Ty, sizeof (_Ty)>
3{
4public:
5    atomic() noexcept = default;
6    atomic(const atomic&) = delete;
7    atomic& operator=(const atomic&) = delete;
8    atomic& operator=(const atomic&) volatile = delete;
9
10public:
11    //是否无锁
12    bool is_lock_free() const volatile noexcept;
13    bool is_lock_free() const noexcept;
14
15    /*以下也皆有volatile和non-volatile两个版本,便只摘出non-volatile*/
16
17    //存值
18    void store(_Ty _Value, memory_order _Order = memory_order_seq_cst) noexcept;
19    //取值
20    _Ty load(memory_order _Order = memory_order_seq_cst) const noexcept;
21
22    //赋值,内部调用了store()
23    _Ty operator=(_Ty _Right) noexcept;
24    //内部调用的load()
25    operator _Ty() const noexcept;
26
27    //交换值
28    _Ty exchange(_Ty _Value, memory_order _Order = memory_order_seq_cst) noexcept;
29    //弱CAS操作
30    bool compare_exchange_weak(_Ty& _Exp, _Ty _Value,
31        memory_order _Order = memory_order_seq_cst)
 noexcept
;
32    //强CAS操作
33    bool compare_exchange_strong(_Ty& _Exp, _Ty _Value,
34        memory_order _Order = memory_order_seq_cst)
 noexcept
;
35
36}

它还针对整型和指针提供了特化版本,增加了一些专属操作。

针对整形特化版本:

 1template<>
2struct atomic<_ITYPE> : _ATOMIC_ITYPE
3{
4public:
5    ...
6public:
7    /*增加的操作,同样只摘出non-volatile版本*/
8    _ITYPE operator++(intnoexcept;
9    _ITYPE operator--(intnoexcept;
10    _ITYPE operator++() noexcept;
11    _ITYPE operator--() noexcept;
12    _ITYPE operator+=(_ITYPE) noexcept;
13    _ITYPE operator-=(_ITYPE) noexcept;
14    _ITYPE operator&=(_ITYPE) noexcept;
15    _ITYPE operator|=(_ITYPE) noexcept;
16    _ITYPE operator^=(_ITYPE) noexcept;
17
18    //加法减法和与或异或操作,都返回原值
19    _ITYPE fetch_add(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
20    _ITYPE fetch_sub(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
21    _ITYPE fetch_and(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
22    _ITYPE fetch_or(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
23    _ITYPE fetch_xor(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
24}

针对指针类型的特化:

 1template<class _Ty>
2struct atomic<_Ty *> :
 _Atomic_address
3{
4public:
5    ...
6public:
7    _Ty *operator++(intnoexcept;  //后++,都返回原值
8    _Ty *operator--(intnoexcept;  //后--
9    _Ty *operator++() noexcept;     //前++
10    _Ty *operator--() noexcept;     //前--
11    _Ty *operator+=(ptrdiff_t _Right) noexcept;
12    _Ty *operator-=(ptrdiff_t _Right) noexcept;
13
14    /*增加的操作,同样只摘出non-volatile版本*/
15    //指针加偏移,返回原值
16    _Ty *fetch_add(ptrdiff_t _Value, memory_order _Order = memory_order_seq_cst) noexcept;
17    //指针减偏移,返回原值
18    _Ty *fetch_sub(ptrdiff_t _Value, memory_order _Order = memory_order_seq_cst) noexcept;
19}

其中最重要的操作是store()和load(),分别代表以原子的方式存储和读取aotmic内部的值。方便使用,其中重载了operatore=代表store(),operator _Ty()来代表load()。

使用起来非常简单,就像是使用普通类型那样,只不过这些操作都具有原子性:

 1#include <iostream>
2#include <atomic>
3#include <cassert>
4
5int main()
6
{
7    std::atomic<int> a(1016);
8    assert(a == 1016);
9    assert(++a == 1017);
10    assert(--a == 1016);
11
12    a.fetch_add(5);  //加法
13    assert(a == 1021);
14    a.fetch_sub(5);  //减法
15    assert(a == 1016);
16
17    a.exchange(10);  //交换
18    assert(a == 10);
19
20    int num = 10;
21    a.compare_exchange_weak(num, 1016);  //CAS操作
22    std::cout << "交换成功,返回" << a << std::endl;  //1016
23    a.compare_exchange_strong(num, 1226);
24    std::cout << "交换失败,返回" << a << std::endl;  //1016
25
26    std::atomic<bool> b(false);
27    b.store(true);  //存值
28    std::cout << std::boolalpha << b.load() << std::endl;
29
30    std::atomic<char*> p(nullptr);
31    char szTemp[] = "pointer atomic";
32    p = szTemp;  //存值
33    std::cout << p << std::endl;               //读取
34    std::cout << p+2 << std::endl;             //+2偏移
35    std::cout << p.fetch_add(3) << std::endl;  //+3偏移,返回原值
36    std::cout << p << std::endl;               //p已+3偏移
37
38    std::cin.get();
39    return 0;
40}

关于compare_exchange_weak()和compare_exchange_strong(),即比较交换(CAS)操作,看过前面win32原子同步的对这些操作都不陌生,对应的是InterlockedCompareExchange()。这里weak()版本比strong()版本的执行速度要快,但返回的值可能有误,可能执行成功了却返回false。

其它操作都很简单,就不多写了。我们可以安全地在线程中使用它们:

 1#include <iostream>
2#include <atomic>
3#include <future>
4
5std::atomic<long> m_x(0);
6
7void ThreadFunc1()
8
{
9    for (int i = 0; i < 100000; ++i)
10    {
11        ++m_x;
12    }
13}
14
15void ThreadFunc2()
16
{
17    for (int i = 0; i < 100000; ++i)
18    {
19        --m_x;
20    }
21}
22
23int main()
24
{
25    auto fu1 = std::async(ThreadFunc1);
26    auto fu2 = std::async(ThreadFunc2);
27    fu1.wait();
28    fu2.wait();
29
30    std::cout << m_x << std::endl;  //is 0
31
32    std::cin.get();
33    return 0;
34}

我们发现在许多函数后面都提供了一个默认参数memory_order,因为编译器和CPU都可能会对指令进行各种优化,因此用这个来指定内存访问顺序。其值如下:

1typedef enum memory_order {
2    memory_order_relaxed,
3    memory_order_consume,
4    memory_order_acquire,
5    memory_order_release,
6    memory_order_acq_rel,
7    memory_order_seq_cst
8    } memory_order;

atomic中的memory_order默认都为memory_order_seq_cst,指定这个表示不允许CPU或编译器对代码进行任何优化,严格的保证了指令的顺序。若是指定memory_order_release,则不会对顺序作任何要求。

因为C是面向过程的,不支持对象,泛型这些操作,所以atomic类还提供了一些C类型的接口,如:

1atomic_bool
2atomic_char
3atomic_int
4atomic_size_t

很多基本类型都提供的有,这里就简单的提一下。


2. 互斥量(Mutex)

2.1 mutex

就不做过多介绍了,可以直接参考Win32线程同步中互斥量的,这里直接说用法。

 1#include <iostream>
2#include <atomic>
3#include <future>
4#include <mutex>
5#include <cassert>
6
7long m_x;
8std::mutex mtx;  //声明一个互斥量
9
10void ThreadFunc1()
11
{
12    try 
13    {
14        mtx.lock();  //锁定
15        for (int i = 0; i < 100000; ++i)
16        {
17            ++m_x;
18        }
19        mtx.unlock();  //解锁
20    }
21    catch (...)
22    {
23        mtx.unlock();  //捕获异常以保证解锁
24    }
25}
26
27void ThreadFunc2()
28
{
29    try 
30    {
31        mtx.lock();  //锁定
32        for (int i = 0; i < 100000; ++i)
33        {
34            --m_x;
35        }
36        mtx.unlock();  //解锁
37    }
38    catch (...)
39    {
40        mtx.unlock();  //捕获异常以保证解锁
41    }
42}
43
44int main()
45
{
46    std::thread t1(ThreadFunc1);
47    std::thread t2(ThreadFunc2);
48    t1.join();
49    t2.join();
50
51    std::cout << m_x << std::endl;
52
53
54    std::cin.get();
55    return 0;
56}

当在一个地方对mutex进行锁定后,其它地方就无权执行,直到解锁,所以保护了资源同步访问。

若是不想阻塞,可以使用try_lock(),它用于尝试锁定互斥量,不论成功失败,都立即返回。

2.2 lock_guard

是的,mutex有其缺点,因为我们可能会忘记写unlock(),所以有了lock_guard来辅助它。

lock_guard是RAII型的,它会在构造时锁定mutex,析构时解锁mutex。

现在,应该这样使用它:

 1void ThreadFunc1()
2
{
3    std::lock_guard<std::mutex> lg(mtx);
4    for (int i = 0; i < 100000; ++i)
5    {
6        ++m_x;
7    }
8}
9
10void ThreadFunc2()
11
{
12    std::lock_guard<std::mutex> lg(mtx);
13    for (int i = 0; i < 100000; ++i)
14    {
15        --m_x;
16    }
17}

2.3 timed_mutex

当不想因为lock而阻塞线程的时候,还可以使用timed_mutex。它提供了两个函数:

  • try_lock_for

  • try_lock_until

对于for和until的区别上篇已经说过了,还是直接看使用:

 1std::timed_mutex tm;  //声明一个互斥量
2
3void ThreadFunc1()
4
{
5    //尝试锁定,等待100ms
6    auto locked = tm.try_lock_for(std::chrono::milliseconds(100));
7    if (locked)
8    {
9        for (int i = 0; i < 100000; ++i)
10        {
11            ++m_x;
12        }
13        tm.unlock();  //解锁
14    }
15}

同样也应该使用lock_guard,因为更加安全:

 1std::timed_mutex tm;  //声明一个互斥量
2
3void ThreadFunc1()
4
{
5    //尝试锁定,等待100ms
6    auto locked = tm.try_lock_for(std::chrono::milliseconds(100));
7    if (locked)
8    {
9        //此时不会重复加锁
10        std::lock_guard<std::timed_mutex> lg(tm, std::adopt_lock);
11        for (int i = 0; i < 100000; ++i)
12        {
13            ++m_x;
14        }
15    }
16}

2.4 unique_mutex

unique_mutex()和lock_guard()相同,但它提供了更多灵活性,可以指定一些策略,说明什么时候,怎么锁定mutex等等。

unique_mutex()多提供了几个构造函数,可以指定锁定选项,其值可以如下:

 1//不锁定,但持有所有权
2struct adopt_lock_t
3    {
   // indicates adopt lock
4    explicit adopt_lock_t() default;
5    };
6
7//暂时不打算锁定
8struct defer_lock_t
9    {
   // indicates defer lock
10    explicit defer_lock_t() default;
11    };
12
13//尝试锁定mutex,但不想阻塞(执行try_lock)
14struct try_to_lock_t
15    {
   // indicates try to lock
16    explicit try_to_lock_t() default;
17    };
18
19//编译期常量标志
20_INLINE_VAR constexpr adopt_lock_t adopt_lock{};
21_INLINE_VAR constexpr defer_lock_t defer_lock{};
22_INLINE_VAR constexpr try_to_lock_t try_to_lock{};

如可以指定defer_lock:

1std::mutex mtx;
2std::unique_lock<std::mutex> ul(mtx, std::defer_lock);
3...
4ul.lock();  //现在再锁定

也可以指定一个时间段或时间点,那么到了指定时间才锁定mutex:

1std::mutex mtx;
2//1秒后再锁定
3std::unique_lock<std::mutex> ul(mtx, std::chrono::seconds(1));

unique_lock内部使用的是指针保存mutex,所以是可以拷贝的,它会保证同时只有一个lock拥有mutex。

unique_lock还比lock_guard多了一些操作,会用其它锁那么这些自然也会用了:

1void lock();                              //锁定
2bool try_lock();                          //尝试锁定
3void unlock();                            //解锁
4bool owns_lock() const noexcept;         //是否锁定
5void swap(unique_lock& _Other) noexcept
;  //交换两个lock
6_Mutex *release() noexcept;               //返回指向mutex的指针并释放之
7bool owns_lock() const noexcept;          //是否被锁定

2.5 recursive_mutex

有时我们需要在一个线程函数中调用其它线程函数,这会导致死锁,如在ThreadFunc2中调用ThreadFunc1:

 1void ThreadFunc1()
2
{
3    try
4    {
5        std::lock_guard<std::mutex> lg(tm);
6        for (int i = 0; i < 100000; ++i)
7        {
8            ++m_x;
9        }
10    }
11    catch (const std::system_error& e)
12    {
13        std::cout << e.what() << std::endl;
14    }
15
16}
17
18void ThreadFunc2()
19
{
20    //加锁
21    std::lock_guard<std::mutex> lg(tm);
22    //调用ThreadFunc1
23    ThreadFunc1();
24}

当在ThreadFunc2中调用ThreadFunc1的时候,tm已被加锁,而ThreadFunc1中又尝试第二次加锁,这永远也不会成功,便成了死锁。这时会抛出system_error异常。

这种情况下可以使用递归锁,它允许同一线程多次锁定:

 1std::recursive_mutex rm;  //声明一个递归锁
2
3void ThreadFunc1()
4
{
5    std::lock_guard<std::recursive_mutex> lg(rm);
6    for (int i = 0; i < 100000; ++i)
7    {
8        ++m_x;
9    }
10}
11
12void ThreadFunc2()
13
{
14
15    std::lock_guard<std::recursive_mutex> lg(rm);
16    //ThreadFun1中二次锁定,没有问题
17    ThreadFunc1();
18}

3. 条件变量(Condition Variable)

条件变量可以实现线程间的通信,和事件同步有相似之处。可以在一个线程中等待另一个线程发生某个事件,即满足某个条件,然后线程再继续执行。

使用condition_variable需要包含< condition_variable>,它需要和mutex配合使用,触发和等待的线程中必须是同一个mutex,来看一个小例子:

 1bool flag{false};  //标志(即条件)
2std::mutex mtx;
3std::condition_variable cv;  //条件变量
4
5void ThreadFunc1()
6
{
7    //添加作用域,以控制锁的生命周期
8    {
9        std::lock_guard<std::mutex> lg(mtx);
10        flag = true;
11    }
12
13    cv.notify_one();  //唤醒等待线程
14}
15
16void ThreadFunc2()
17
{
18    std::unique_lock<std::mutex> ul(mtx)
19    cv.wait(ul, []{ return flag; });  //等待
20}

有时,我们需要在一个线程中做某些工作,另一个线程需要那个线程提供某些条件才能继续工作,此时就可以使用条件变量。

此处定义了一个条件标志flag,在ThreadFunc2中使用条件变量等待着flag为true时才继续向下执行。这个条件由ThreadFunc1提供,ThreadFunc1提供后使用notify_one()唤醒一个等待的线程。

可以发现在ThreadFunc1中使用的是lock_guard锁定mutex,而ThreadFunc2中却使用的却是unique_lock,这是必须的。因为wait()中若是不满足条件时需要经历解锁,阻塞,再加锁来使其它线程再得到执行,不然就会一直阻塞在这儿了。

wait()可能会在未有notify_one()唤醒的情况下返回,这种情况称为假醒,所以此处还加了一个lambda返回flag以进行二次确定,分开写就是这样的:

1if(flag)
2{
3    cv.wait(ul);
4}

若不进行二次验证,就是说我们的条件可能还并不满足,接着往下执行的可能会发生一些错误。

再来看一个例子:

 1#include <iostream>
2#include <future>
3#include <condition_variable>
4#include <mutex>
5#include <stack>
6
7std::stack<charstack;  //存字符的栈变量
8std::mutex mtx;
9std::condition_variable convar;  //条件变量
10
11void ThreadFunc1(char ch)
12
{
13    for (int i = 0; i < 5; ++i)
14    {
15        //指定作用域以解锁
16        {
17            std::lock_guard<std::mutex> lg(mtx);
18            stack.push(ch + i);  //字符入栈
19        }
20        convar.notify_one();  //唤醒线程
21    }
22}
23
24void ThreadFunc2()
25
{
26    char ch;
27    for (;;)
28    {
29        {
30            std::unique_lock<std::mutex> ul(mtx);
31            //等待,并二次检验stack中是否有数据
32            convar.wait(ul, [] { return !stack.empty(); });
33            ch = stack.top();  //取出栈顶
34            stack.pop();       //弹栈
35            std::cout << ch << std::endl;
36        }
37    }
38}
39
40
41int main()
42
{
43    std::async(std::launch::async, ThreadFunc1, 'A');
44    std::async(std::launch::async, ThreadFunc1, 'a');
45    std::async(std::launch::async, ThreadFunc1, '0');
46
47    std::async(std::launch::async, ThreadFunc2);
48    std::async(std::launch::async, ThreadFunc2);
49
50    std::cin.get();
51    return 0;
52}

这个例子中的ThreadFunc1用于往stack中加入数据,而ThreadFunc2用于从stack中读取数据。只有先加入了数据才能读取,所以在ThreadFunc2中使用条件变量等待,当ThreadFunc1中往栈中加入了数据后,使用notify_one()唤醒等待的线程。

输出结果如下:

除了这些,它还提供了notify_all()以唤醒所有的等待者,也提供了基于时间段和时间点的等待wait_for()和wait_until(),这些就不多说了。


以上是关于C++多线程的主要内容,如果未能解决你的问题,请参考以下文章

多个用户访问同一段代码

在这个多线程 C++ 代码中是不是需要“易失性”?

线程学习知识点总结

多个请求是多线程吗

PySide 中的多线程提升 Python C++ 代码

多线程递归程序c++