C++多线程
Posted CPP编程客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++多线程相关的知识,希望对你有一定的参考价值。
线程同步要么保证操作数的不可切割性,要么保证指定代码的执行顺序,这样就能避免数据竞争。
原子操作保证了不可切割性,临界区,互斥量,事件等等都保证了执行顺序,所以在并发环境里才能安全地操作数据。C++中也提供了一些同步方法,和Windows中的方法概念相似,所以可以先看Windows的同步再来看C++的。
1. 原子库(Atomic)
C++11提供了std::atomic原子操作库,对于不同的硬件封装了不同的底层操作,所以atomic支持跨平台的原子操作。
前面写过,原子操作即不可切割,不可被打断,以此来防止数据竞争。Windows以InterlockedXXX系列函数来完成原子操作,C++提供的atomic包含了这些函数提供的原子操作,基本摘要如下:
1template<class _Ty>
2struct atomic : _Atomic_base<_Ty, sizeof (_Ty)>
3{
4public:
5 atomic() noexcept = default;
6 atomic(const atomic&) = delete;
7 atomic& operator=(const atomic&) = delete;
8 atomic& operator=(const atomic&) volatile = delete;
9
10public:
11 //是否无锁
12 bool is_lock_free() const volatile noexcept;
13 bool is_lock_free() const noexcept;
14
15 /*以下也皆有volatile和non-volatile两个版本,便只摘出non-volatile*/
16
17 //存值
18 void store(_Ty _Value, memory_order _Order = memory_order_seq_cst) noexcept;
19 //取值
20 _Ty load(memory_order _Order = memory_order_seq_cst) const noexcept;
21
22 //赋值,内部调用了store()
23 _Ty operator=(_Ty _Right) noexcept;
24 //内部调用的load()
25 operator _Ty() const noexcept;
26
27 //交换值
28 _Ty exchange(_Ty _Value, memory_order _Order = memory_order_seq_cst) noexcept;
29 //弱CAS操作
30 bool compare_exchange_weak(_Ty& _Exp, _Ty _Value,
31 memory_order _Order = memory_order_seq_cst) noexcept;
32 //强CAS操作
33 bool compare_exchange_strong(_Ty& _Exp, _Ty _Value,
34 memory_order _Order = memory_order_seq_cst) noexcept;
35
36}
它还针对整型和指针提供了特化版本,增加了一些专属操作。
针对整形特化版本:
1template<>
2struct atomic<_ITYPE> : _ATOMIC_ITYPE
3{
4public:
5 ...
6public:
7 /*增加的操作,同样只摘出non-volatile版本*/
8 _ITYPE operator++(int) noexcept;
9 _ITYPE operator--(int) noexcept;
10 _ITYPE operator++() noexcept;
11 _ITYPE operator--() noexcept;
12 _ITYPE operator+=(_ITYPE) noexcept;
13 _ITYPE operator-=(_ITYPE) noexcept;
14 _ITYPE operator&=(_ITYPE) noexcept;
15 _ITYPE operator|=(_ITYPE) noexcept;
16 _ITYPE operator^=(_ITYPE) noexcept;
17
18 //加法减法和与或异或操作,都返回原值
19 _ITYPE fetch_add(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
20 _ITYPE fetch_sub(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
21 _ITYPE fetch_and(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
22 _ITYPE fetch_or(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
23 _ITYPE fetch_xor(_ITYPE, memory_order = memory_order_seq_cst) noexcept;
24}
针对指针类型的特化:
1template<class _Ty>
2struct atomic<_Ty *> : _Atomic_address
3{
4public:
5 ...
6public:
7 _Ty *operator++(int) noexcept; //后++,都返回原值
8 _Ty *operator--(int) noexcept; //后--
9 _Ty *operator++() noexcept; //前++
10 _Ty *operator--() noexcept; //前--
11 _Ty *operator+=(ptrdiff_t _Right) noexcept;
12 _Ty *operator-=(ptrdiff_t _Right) noexcept;
13
14 /*增加的操作,同样只摘出non-volatile版本*/
15 //指针加偏移,返回原值
16 _Ty *fetch_add(ptrdiff_t _Value, memory_order _Order = memory_order_seq_cst) noexcept;
17 //指针减偏移,返回原值
18 _Ty *fetch_sub(ptrdiff_t _Value, memory_order _Order = memory_order_seq_cst) noexcept;
19}
其中最重要的操作是store()和load(),分别代表以原子的方式存储和读取aotmic内部的值。方便使用,其中重载了operatore=代表store(),operator _Ty()来代表load()。
使用起来非常简单,就像是使用普通类型那样,只不过这些操作都具有原子性:
1#include <iostream>
2#include <atomic>
3#include <cassert>
4
5int main()
6{
7 std::atomic<int> a(1016);
8 assert(a == 1016);
9 assert(++a == 1017);
10 assert(--a == 1016);
11
12 a.fetch_add(5); //加法
13 assert(a == 1021);
14 a.fetch_sub(5); //减法
15 assert(a == 1016);
16
17 a.exchange(10); //交换
18 assert(a == 10);
19
20 int num = 10;
21 a.compare_exchange_weak(num, 1016); //CAS操作
22 std::cout << "交换成功,返回" << a << std::endl; //1016
23 a.compare_exchange_strong(num, 1226);
24 std::cout << "交换失败,返回" << a << std::endl; //1016
25
26 std::atomic<bool> b(false);
27 b.store(true); //存值
28 std::cout << std::boolalpha << b.load() << std::endl;
29
30 std::atomic<char*> p(nullptr);
31 char szTemp[] = "pointer atomic";
32 p = szTemp; //存值
33 std::cout << p << std::endl; //读取
34 std::cout << p+2 << std::endl; //+2偏移
35 std::cout << p.fetch_add(3) << std::endl; //+3偏移,返回原值
36 std::cout << p << std::endl; //p已+3偏移
37
38 std::cin.get();
39 return 0;
40}
关于compare_exchange_weak()和compare_exchange_strong(),即比较交换(CAS)操作,看过前面win32原子同步的对这些操作都不陌生,对应的是InterlockedCompareExchange()。这里weak()版本比strong()版本的执行速度要快,但返回的值可能有误,可能执行成功了却返回false。
其它操作都很简单,就不多写了。我们可以安全地在线程中使用它们:
1#include <iostream>
2#include <atomic>
3#include <future>
4
5std::atomic<long> m_x(0);
6
7void ThreadFunc1()
8{
9 for (int i = 0; i < 100000; ++i)
10 {
11 ++m_x;
12 }
13}
14
15void ThreadFunc2()
16{
17 for (int i = 0; i < 100000; ++i)
18 {
19 --m_x;
20 }
21}
22
23int main()
24{
25 auto fu1 = std::async(ThreadFunc1);
26 auto fu2 = std::async(ThreadFunc2);
27 fu1.wait();
28 fu2.wait();
29
30 std::cout << m_x << std::endl; //is 0
31
32 std::cin.get();
33 return 0;
34}
我们发现在许多函数后面都提供了一个默认参数memory_order,因为编译器和CPU都可能会对指令进行各种优化,因此用这个来指定内存访问顺序。其值如下:
1typedef enum memory_order {
2 memory_order_relaxed,
3 memory_order_consume,
4 memory_order_acquire,
5 memory_order_release,
6 memory_order_acq_rel,
7 memory_order_seq_cst
8 } memory_order;
atomic中的memory_order默认都为memory_order_seq_cst,指定这个表示不允许CPU或编译器对代码进行任何优化,严格的保证了指令的顺序。若是指定memory_order_release,则不会对顺序作任何要求。
因为C是面向过程的,不支持对象,泛型这些操作,所以atomic类还提供了一些C类型的接口,如:
1atomic_bool
2atomic_char
3atomic_int
4atomic_size_t
很多基本类型都提供的有,这里就简单的提一下。
2. 互斥量(Mutex)
2.1 mutex
就不做过多介绍了,可以直接参考Win32线程同步中互斥量的,这里直接说用法。
1#include <iostream>
2#include <atomic>
3#include <future>
4#include <mutex>
5#include <cassert>
6
7long m_x;
8std::mutex mtx; //声明一个互斥量
9
10void ThreadFunc1()
11{
12 try
13 {
14 mtx.lock(); //锁定
15 for (int i = 0; i < 100000; ++i)
16 {
17 ++m_x;
18 }
19 mtx.unlock(); //解锁
20 }
21 catch (...)
22 {
23 mtx.unlock(); //捕获异常以保证解锁
24 }
25}
26
27void ThreadFunc2()
28{
29 try
30 {
31 mtx.lock(); //锁定
32 for (int i = 0; i < 100000; ++i)
33 {
34 --m_x;
35 }
36 mtx.unlock(); //解锁
37 }
38 catch (...)
39 {
40 mtx.unlock(); //捕获异常以保证解锁
41 }
42}
43
44int main()
45{
46 std::thread t1(ThreadFunc1);
47 std::thread t2(ThreadFunc2);
48 t1.join();
49 t2.join();
50
51 std::cout << m_x << std::endl;
52
53
54 std::cin.get();
55 return 0;
56}
当在一个地方对mutex进行锁定后,其它地方就无权执行,直到解锁,所以保护了资源同步访问。
若是不想阻塞,可以使用try_lock(),它用于尝试锁定互斥量,不论成功失败,都立即返回。
2.2 lock_guard
是的,mutex有其缺点,因为我们可能会忘记写unlock(),所以有了lock_guard来辅助它。
lock_guard是RAII型的,它会在构造时锁定mutex,析构时解锁mutex。
现在,应该这样使用它:
1void ThreadFunc1()
2{
3 std::lock_guard<std::mutex> lg(mtx);
4 for (int i = 0; i < 100000; ++i)
5 {
6 ++m_x;
7 }
8}
9
10void ThreadFunc2()
11{
12 std::lock_guard<std::mutex> lg(mtx);
13 for (int i = 0; i < 100000; ++i)
14 {
15 --m_x;
16 }
17}
2.3 timed_mutex
当不想因为lock而阻塞线程的时候,还可以使用timed_mutex。它提供了两个函数:
try_lock_for
try_lock_until
对于for和until的区别上篇已经说过了,还是直接看使用:
1std::timed_mutex tm; //声明一个互斥量
2
3void ThreadFunc1()
4{
5 //尝试锁定,等待100ms
6 auto locked = tm.try_lock_for(std::chrono::milliseconds(100));
7 if (locked)
8 {
9 for (int i = 0; i < 100000; ++i)
10 {
11 ++m_x;
12 }
13 tm.unlock(); //解锁
14 }
15}
同样也应该使用lock_guard,因为更加安全:
1std::timed_mutex tm; //声明一个互斥量
2
3void ThreadFunc1()
4{
5 //尝试锁定,等待100ms
6 auto locked = tm.try_lock_for(std::chrono::milliseconds(100));
7 if (locked)
8 {
9 //此时不会重复加锁
10 std::lock_guard<std::timed_mutex> lg(tm, std::adopt_lock);
11 for (int i = 0; i < 100000; ++i)
12 {
13 ++m_x;
14 }
15 }
16}
2.4 unique_mutex
unique_mutex()和lock_guard()相同,但它提供了更多灵活性,可以指定一些策略,说明什么时候,怎么锁定mutex等等。
unique_mutex()多提供了几个构造函数,可以指定锁定选项,其值可以如下:
1//不锁定,但持有所有权
2struct adopt_lock_t
3 { // indicates adopt lock
4 explicit adopt_lock_t() = default;
5 };
6
7//暂时不打算锁定
8struct defer_lock_t
9 { // indicates defer lock
10 explicit defer_lock_t() = default;
11 };
12
13//尝试锁定mutex,但不想阻塞(执行try_lock)
14struct try_to_lock_t
15 { // indicates try to lock
16 explicit try_to_lock_t() = default;
17 };
18
19//编译期常量标志
20_INLINE_VAR constexpr adopt_lock_t adopt_lock{};
21_INLINE_VAR constexpr defer_lock_t defer_lock{};
22_INLINE_VAR constexpr try_to_lock_t try_to_lock{};
如可以指定defer_lock:
1std::mutex mtx;
2std::unique_lock<std::mutex> ul(mtx, std::defer_lock);
3...
4ul.lock(); //现在再锁定
也可以指定一个时间段或时间点,那么到了指定时间才锁定mutex:
1std::mutex mtx;
2//1秒后再锁定
3std::unique_lock<std::mutex> ul(mtx, std::chrono::seconds(1));
unique_lock内部使用的是指针保存mutex,所以是可以拷贝的,它会保证同时只有一个lock拥有mutex。
unique_lock还比lock_guard多了一些操作,会用其它锁那么这些自然也会用了:
1void lock(); //锁定
2bool try_lock(); //尝试锁定
3void unlock(); //解锁
4bool owns_lock() const noexcept; //是否锁定
5void swap(unique_lock& _Other) noexcept; //交换两个lock
6_Mutex *release() noexcept; //返回指向mutex的指针并释放之
7bool owns_lock() const noexcept; //是否被锁定
2.5 recursive_mutex
有时我们需要在一个线程函数中调用其它线程函数,这会导致死锁,如在ThreadFunc2中调用ThreadFunc1:
1void ThreadFunc1()
2{
3 try
4 {
5 std::lock_guard<std::mutex> lg(tm);
6 for (int i = 0; i < 100000; ++i)
7 {
8 ++m_x;
9 }
10 }
11 catch (const std::system_error& e)
12 {
13 std::cout << e.what() << std::endl;
14 }
15
16}
17
18void ThreadFunc2()
19{
20 //加锁
21 std::lock_guard<std::mutex> lg(tm);
22 //调用ThreadFunc1
23 ThreadFunc1();
24}
当在ThreadFunc2中调用ThreadFunc1的时候,tm已被加锁,而ThreadFunc1中又尝试第二次加锁,这永远也不会成功,便成了死锁。这时会抛出system_error异常。
这种情况下可以使用递归锁,它允许同一线程多次锁定:
1std::recursive_mutex rm; //声明一个递归锁
2
3void ThreadFunc1()
4{
5 std::lock_guard<std::recursive_mutex> lg(rm);
6 for (int i = 0; i < 100000; ++i)
7 {
8 ++m_x;
9 }
10}
11
12void ThreadFunc2()
13{
14
15 std::lock_guard<std::recursive_mutex> lg(rm);
16 //ThreadFun1中二次锁定,没有问题
17 ThreadFunc1();
18}
3. 条件变量(Condition Variable)
条件变量可以实现线程间的通信,和事件同步有相似之处。可以在一个线程中等待另一个线程发生某个事件,即满足某个条件,然后线程再继续执行。
使用condition_variable需要包含< condition_variable>,它需要和mutex配合使用,触发和等待的线程中必须是同一个mutex,来看一个小例子:
1bool flag{false}; //标志(即条件)
2std::mutex mtx;
3std::condition_variable cv; //条件变量
4
5void ThreadFunc1()
6{
7 //添加作用域,以控制锁的生命周期
8 {
9 std::lock_guard<std::mutex> lg(mtx);
10 flag = true;
11 }
12
13 cv.notify_one(); //唤醒等待线程
14}
15
16void ThreadFunc2()
17{
18 std::unique_lock<std::mutex> ul(mtx)
19 cv.wait(ul, []{ return flag; }); //等待
20}
有时,我们需要在一个线程中做某些工作,另一个线程需要那个线程提供某些条件才能继续工作,此时就可以使用条件变量。
此处定义了一个条件标志flag,在ThreadFunc2中使用条件变量等待着flag为true时才继续向下执行。这个条件由ThreadFunc1提供,ThreadFunc1提供后使用notify_one()唤醒一个等待的线程。
可以发现在ThreadFunc1中使用的是lock_guard锁定mutex,而ThreadFunc2中却使用的却是unique_lock,这是必须的。因为wait()中若是不满足条件时需要经历解锁,阻塞,再加锁来使其它线程再得到执行,不然就会一直阻塞在这儿了。
wait()可能会在未有notify_one()唤醒的情况下返回,这种情况称为假醒,所以此处还加了一个lambda返回flag以进行二次确定,分开写就是这样的:
1if(flag)
2{
3 cv.wait(ul);
4}
若不进行二次验证,就是说我们的条件可能还并不满足,接着往下执行的可能会发生一些错误。
再来看一个例子:
1#include <iostream>
2#include <future>
3#include <condition_variable>
4#include <mutex>
5#include <stack>
6
7std::stack<char> stack; //存字符的栈变量
8std::mutex mtx;
9std::condition_variable convar; //条件变量
10
11void ThreadFunc1(char ch)
12{
13 for (int i = 0; i < 5; ++i)
14 {
15 //指定作用域以解锁
16 {
17 std::lock_guard<std::mutex> lg(mtx);
18 stack.push(ch + i); //字符入栈
19 }
20 convar.notify_one(); //唤醒线程
21 }
22}
23
24void ThreadFunc2()
25{
26 char ch;
27 for (;;)
28 {
29 {
30 std::unique_lock<std::mutex> ul(mtx);
31 //等待,并二次检验stack中是否有数据
32 convar.wait(ul, [] { return !stack.empty(); });
33 ch = stack.top(); //取出栈顶
34 stack.pop(); //弹栈
35 std::cout << ch << std::endl;
36 }
37 }
38}
39
40
41int main()
42{
43 std::async(std::launch::async, ThreadFunc1, 'A');
44 std::async(std::launch::async, ThreadFunc1, 'a');
45 std::async(std::launch::async, ThreadFunc1, '0');
46
47 std::async(std::launch::async, ThreadFunc2);
48 std::async(std::launch::async, ThreadFunc2);
49
50 std::cin.get();
51 return 0;
52}
这个例子中的ThreadFunc1用于往stack中加入数据,而ThreadFunc2用于从stack中读取数据。只有先加入了数据才能读取,所以在ThreadFunc2中使用条件变量等待,当ThreadFunc1中往栈中加入了数据后,使用notify_one()唤醒等待的线程。
输出结果如下:
除了这些,它还提供了notify_all()以唤醒所有的等待者,也提供了基于时间段和时间点的等待wait_for()和wait_until(),这些就不多说了。
以上是关于C++多线程的主要内容,如果未能解决你的问题,请参考以下文章