转载C++ 并发编程(从C++11到C++17)

Posted 字节卷动

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了转载C++ 并发编程(从C++11到C++17)相关的知识,希望对你有一定的参考价值。

原文地址:《C++ 并发编程(从C++11到C++17)》 by 保罗的酒吧

C++ 并发编程(从C++11到C++17)


自C++11标准以来,C++语言开始支持多线程模型。借助多线程模型,我们可以开发出更好的并发系统。本文以C++语言为例,讲解如何进行并发编程。并尽可能涉及C++11,C++14以及C++17中的主要内容。

为什么要并发编程

大型的软件项目常常包含非常多的任务需要处理。例如:对于大量数据的数据流处理,或者是包含复杂GUI界面的应用程序。如果将所有的任务都以串行的方式执行,则整个系统的效率将会非常低下,应用程序的用户体验会非常的差。

另一方面,自上个世纪六七十年代英特尔创始人之一 Gordon Moore 提出 摩尔定义 以来,CPU频率以每18个月翻一番的指数速度增长。但这一增长在最近的十年已经基本停滞,大家会发现曾经有过一段时间CPU的频率从3G到达4G,但在这之后就停滞不前了。因此最近的新款CPU也基本上都是3G左右的频率。相应的,CPU以更多核的形式在增长。目前的Intel i7有8核的版本,Xeon处理器达到了28核。并且,最近几年手机上使用的CPU也基本上是4核或者8核的了。

由此,掌握并发编程技术,利用多处理器来提升软件项目的性能将是软件工程师的一项基本技能。

本文以C++语言为例,讲解如何进行并发编程。并尽可能涉及C++11,C++14以及C++17中的主要内容。

并发与并行

并发(Concurrent)与并行(Parallel)都是很常见的术语。

Erlang之父Joe Armstrong曾经以人们使用咖啡机的场景为例描述了这两个术语。如下图所示:

  • 并发:如果多个队列可以交替使用某台咖啡机,则这一行为就是并发的。
  • 并行:如果存在多台咖啡机可以被多个队列交替使用,则就是并行。

这里队列中的每个人类比于计算机的任务,咖啡机类比于计算机处理器。因此:并发和并行都是在多任务的环境下的讨论。

更严格的来说:如果一个系统支持多个动作同时存在,那么这个系统就是一个并发系统。如果这个系统还支持多个动作(物理时间上)同时执行,那么这个系统就是一个并行系统。

你可能已经看出,“并行”其实是“并发”的子集。它们的区别在于是否具有多个处理器。如果存在多个处理器同时执行多个线程,就是并行。

在不考虑处理器数量的情况下,我们统称之为“并发”。

进程与线程

进程与线程是操作系统的基本概念。无论是桌面系统:MacOS,Linux,Windows,还是移动操作系统:androidios,都存在进程和线程的概念。

进程(英语:process),是指计算机中已运行的程序。进程为曾经是分时系统的基本运作单位。在面向进程设计的系统(如早期的UNIX,Linux 2.4及更早的版本)中,进程是程序的基本执行实体;

线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。

– 维基百科

关于这两个概念在任何一本操作系统书上都可以找到定义。网上也有很多文章对它们进行了解释。因此这里不再赘述,这里仅仅提及一下它们与编程的关系。

对于绝大部分编程语言或者编程环境来说,我们所写的程序都会在一个进程中运行。一个进程至少会包含一个线程。这个线程我们通常称之为主线程。

在默认的情况下,我们写的代码都是在进程的主线程中运行,除非开发者在程序中创建了新的线程。

不同编程语言的线程环境会不一样,Java语言在很早就支持了多线程接口。(Java程序在Java虚拟机中运行,虚拟机通常还会包含自己特有的线程,例如垃圾回收线程。)。而对于javascript这样的语言来说,它就没有多线程的概念。

当我们只有一个处理器时,所有的进程或线程会分时占用这个处理器。但如果系统中存在多个处理器时,则就可能有多个任务并行的运行在不同的处理器上。

下面两幅图以不同颜色的矩形代表不同的任务(可能是进程,也可能是线程)来描述它们可能在处理器上执行的顺序。

下图是单核处理器的情况:

下面是四核处理器的情况:

任务会在何时占有处理器,通常是由操作系统的调度策略决定的。在《Android系统上的进程管理:进程的调度》一文中,我们介绍过Linux的调度策略。

当我们在开发跨平台的软件时,我们不应当对调度策略做任何假设,而应该抱有“系统可能以任意顺序来调度我的任务”这样的想法。

并发系统的性能

开发并发系统最主要的动机就是提升系统性能(事实上,这是以增加复杂度为代价的)。

但我们需要知道,单纯的使用多线程并不一定能提升系统性能(当然,也并非线程越多系统的性能就越好)。从上面的两幅图我们就可以直观的感受到:线程(任务)的数量要根据具体的处理器数量来决定。假设只有一个处理器,那么划分太多线程可能会适得其反。因为很多时间都花在任务切换上了。

因此,在设计并发系统之前,一方面我们需要做好对于硬件性能的了解,另一方面需要对我们的任务有足够的认识。

关于这一点,你可能需要了解一下阿姆达尔定律了。对于这个定律,简单来说:我们想要预先意识到那些任务是可以并行的,那些是无法并行的。只有明确了任务的性质,才能有的放矢的进行优化。这个定律告诉了我们将系统并行之后性能收益的上限。

关于阿姆达尔定律在Linux系统监测工具sysstat介绍一文中已经介绍过,因此这里不再赘述。

C++与并发编程

前面我们已经了解到,并非所有的语言都提供了多线程的环境。

即便是C++语言,直到C++11标准之前,也是没有多线程支持的。在这种情况下,Linux/Unix平台下的开发者通常会使用POSIX Threads,Windows上的开发者也会有相应的接口。但很明显,这些API都只针对特定的操作系统平台,可移植性较差。如果要同时支持Linux和Windows系统,你可能要写两套代码。

相较而言,Java自JDK 1.0就包含了多线程模型。

这个状态在C++ 11标准发布之后得到了改变。并且,在C++ 14和C++ 17标准中又对并发编程机制进行了增强。

下图是最近几个版本的C++标准特性的线路图。

编译器与C++标准

编译器对于语言特性的支持是逐步完成的。想要使用特定的特性你需要相应版本的编译器。

下面两个表格列出了C++标准和相应编译器的版本对照:

  • C++标准与相应的GCC版本要求如下:
C++版本GCC版本
C++114.8
C++145.0
C++177.0
  • C++标准与相应的Clang版本要求如下:
C++版本GCC版本
C++113.3
C++143.4
C++175.0

默认情况下编译器是以较低的标准来进行编译的,如果希望使用新的标准,你需要通过编译参数-std=c++xx告知编译器,例如:

g++ -std=c++17 your_file.cpp -o your_program

测试环境

本文的源码可以到下载我的github上获取,地址:paulQuei/cpp-concurrency

你可以直接通过下面这条命令获取源码:

git clone https://github.com/paulQuei/cpp-concurrency.git

源码下载之后,你可以通过任何文本编辑器浏览源码。如果希望编译和运行程序,你还需要按照下面的内容来准备环境。

本文中的源码使用cmake编译,只有cmake 3.8以上的版本才支持C++ 17,所以你需要安装这个或者更新版本的cmake。

另外,截止目前(2019年10月)为止,clang编译器还不支持并行算法

但是gcc-9是支持的。因此想要编译和运行这部分代码,你需要安装gcc 9.0或更新的版本。并且,gcc-9还要依赖Intel Threading Building Blocks才能使用并行算法以及<execution>头文件。

具体的安装方法见下文。

具体编译器对于C++特性支持的情况请参见这里:C++ compiler support

安装好之后运行根目录下的下面这个命令即可:

 ./make_all.sh

它会完成所有的编译工作。

本文的源码在下面两个环境中经过测试,环境的准备方法如下。

MacOS

在Mac上,我使用brew工具安装gcc以及tbb库。

考虑到其他人与我的环境可能会有所差异,所以需要手动告知tbb库的安装路径。

读者需要执行下面这些命令来准备环境:

brew install gcc
brew install tbb

export tbb_path=/usr/local/Cellar/tbb/2019_U8/
./make_all.sh

注意,请通过运行g++-9命令以确认gcc的版本是否正确,如果版本较低,则需要通过brew命令将其升级到新版本:

brew upgrade gcc

Ubuntu

Ubuntu上,通过下面的命令安装gcc-9

sudo add-apt-repository ppa:ubuntu-toolchain-r/test
sudo apt-get update
sudo apt install  gcc-9 g++-9

但安装tbb库就有些麻烦了。这是因为Ubuntu 16.04默认关联的版本是较低的,直接安装是无法使用的。我们需要安装更新的版本

联网安装的方式步骤繁琐,所以可以通过下载包的方式进行安装,我已经将这需要的两个文件放到的这里:

如果需要,你可以下载后通过apt命令安装即可:

sudo apt install ~/Downloads/libtbb2_2019~U8-1_amd64.deb 
sudo apt install ~/Downloads/libtbb-dev_2019~U8-1_amd64.deb 

线程

创建线程

创建线程非常的简单的,下面就是一个使用了多线程的Hello World示例:

// 01_hello_thread.cpp

#include <iostream>
#include <thread> // ①

using namespace std; // ②

void hello()  // ③
  cout << "Hello World from new thread." << endl;


int main() 
  thread t(hello); // ④
  t.join(); // ⑤

  return 0;

对于这段代码说明如下:

  1. 为了使用多线程的接口,我们需要#include <thread>头文件。
  2. 为了简化声明,本文中的代码都将using namespace std;
  3. 新建线程的入口是一个普通的函数,它并没有什么特别的地方。
  4. 创建线程的方式就是构造一个thread对象,并指定入口函数。与普通对象不一样的是,此时编译器便会为我们创建一个新的操作系统线程,并在新的线程中执行我们的入口函数。
  5. 关于join函数在下文中讲解。

thread可以和callable类型一起工作,因此如果你熟悉lambda表达式,你可以直接用它来写线程的逻辑,像这样:

// 02_lambda_thread.cpp

#include <iostream>
#include <thread>

using namespace std;

int main() 
  thread t([] 
    cout << "Hello World from lambda thread." << endl;
  );

  t.join();

  return 0;

为了减少不必要的重复,若无必要,下文中的代码将不贴出include指令以及using声明。

当然,你可以传递参数给入口函数,像下面这样:

// 03_thread_argument.cpp

void hello(string name) 
  cout << "Welcome to " << name << endl;


int main() 
  thread t(hello, "https://paul.pub");
  t.join();

  return 0;

不过需要注意的是,参数是以拷贝的形式进行传递的。因此对于拷贝耗时的对象你可能需要传递指针或者引用类型作为参数。但是,如果是传递指针或者引用,你还需要考虑参数对象的生命周期。因为线程的运行长度很可能会超过参数的生命周期(见下文detach),这个时候如果线程还在访问一个已经被销毁的对象就会出现问题。

join与detach

  • 主要API
API说明
join等待线程完成其执行
detach允许线程独立执行

一旦启动线程之后,我们必须决定是要等待直接它结束(通过join),还是让它独立运行(通过detach),我们必须二者选其一。如果在thread对象销毁的时候我们还没有做决定,则thread对象在析构函数出将调用std::terminate()从而导致我们的进程异常退出。

请思考在上面的代码示例中,thread对象在何时会销毁。

需要注意的是:在我们做决定的时候,很可能线程已经执行完了(例如上面的示例中线程的逻辑仅仅是一句打印,执行时间会很短)。新的线程创建之后,究竟是新的线程先执行,还是当前线程的下一条语句先执行这是不确定的,因为这是由操作系统的调度策略决定的。不过这不要紧,我们只要在thread对象销毁前做决定即可。

  • join:调用此接口时,当前线程会一直阻塞,直到目标线程执行完成(当然,很可能目标线程在此处调用之前就已经执行完成了,不过这不要紧)。因此,如果目标线程的任务非常耗时,你就要考虑好是否需要在主线程上等待它了,因此这很可能会导致主线程卡住。
  • detachdetach是让目标线程成为守护线程(daemon threads)。一旦detach之后,目标线程将独立执行,即便其对应的thread对象销毁也不影响线程的执行。并且,你无法再与之通信。

对于这两个接口,都必须是可执行的线程才有意义。你可以通过joinable()接口查询是否可以对它们进行join或者detach

管理当前线程

  • 主要API
APIC++标准说明
yieldC++11让出处理器,重新调度各执行线程
get_idC++11返回当前线程的线程 id
sleep_forC++11使当前线程的执行停止指定的时间段
sleep_untilC++11使当前线程的执行停止直到指定的时间点

上面是一些在线程内部使用的API,它们用来对当前线程做一些控制。

  • yield 通常用在自己的主要任务已经完成的时候,此时希望让出处理器给其他任务使用。
  • get_id 返回当前线程的id,可以以此来标识不同的线程。
  • sleep_for 是让当前线程停止一段时间。
  • sleep_untilsleep_for类似,但是是以具体的时间点为参数。这两个API都以chrono API(由于篇幅所限,这里不展开这方面内容)为基础。

下面是一个代码示例:

// 04_thread_self_manage.cpp

void print_time() 
  auto now = chrono::system_clock::now();
  auto in_time_t = chrono::system_clock::to_time_t(now);

  std::stringstream ss;
  ss << put_time(localtime(&in_time_t), "%Y-%m-%d %X");
  cout << "now is: " << ss.str() << endl;


void sleep_thread() 
  this_thread::sleep_for(chrono::seconds(3));
  cout << "[thread-" << this_thread::get_id() << "] is waking up" << endl;


void loop_thread() 
  for (int i = 0; i < 10; i++) 
    cout << "[thread-" << this_thread::get_id() << "] print: " << i << endl;
  


int main() 
  print_time();

  thread t1(sleep_thread);
  thread t2(loop_thread);

  t1.join();
  t2.detach();

  print_time();
  return 0;

这段代码应该还是比较容易理解的,这里创建了两个线程。它们都会有一些输出,其中一个会先停止3秒钟,然后再输出。主线程调用join会一直卡住等待它运行结束。

这段程序的输出如下:

now is: 2019-10-13 10:17:48
[thread-0x70000cdda000] print: 0
[thread-0x70000cdda000] print: 1
[thread-0x70000cdda000] print: 2
[thread-0x70000cdda000] print: 3
[thread-0x70000cdda000] print: 4
[thread-0x70000cdda000] print: 5
[thread-0x70000cdda000] print: 6
[thread-0x70000cdda000] print: 7
[thread-0x70000cdda000] print: 8
[thread-0x70000cdda000] print: 9
[thread-0x70000cd57000] is waking up
now is: 2019-10-13 10:17:51

一次调用

  • 主要API
APIC++标准说明
call_onceC++11即便在多线程环境下,也能保证只调用某个函数一次
once_flagC++11call_once配合使用

在一些情况下,我们有些任务需要执行一次,并且我们只希望它执行一次,例如资源的初始化任务。这个时候就可以用到上面的接口。这个接口会保证,即便在多线程的环境下,相应的函数也只会调用一次。

下面就是一个示例:有三个线程都会使用init函数,但是只会有一个线程真正执行它。

// 05_call_once.cpp

void init() 
  cout << "Initialing..." << endl;
  // Do something...


void worker(once_flag* flag) 
  call_once(*flag, init);


int main() 
  once_flag flag;

  thread t1(worker, &flag);
  thread t2(worker, &flag);
  thread t3(worker, &flag);

  t1.join();
  t2.join();
  t3.join();

  return 0;

我们无法确定具体是哪一个线程会执行init。而事实上,我们也不关心,因为只要有某个线程完成这个初始化工作就可以了。

请思考一下,为什么要在main函数中创建once_flag flag。如果是在worker函数中直接声明一个once_flag并使用行不行?为什么?

并发任务

下面以一个并发任务为示例讲解如何引入多线程。

任务示例:现在假设我们需要计算某个范围内所有自然数的平方根之和,例如[1, 10e8]

在单线程模型下,我们的代码可能是这样的:

// 06_naive_multithread.cpp

static const int MAX = 10e8; // ①
static double sum = 0; // ②

void worker(int min, int max)  // ③
  for (int i = min; i <= max; i++) 
    sum += sqrt(i);
  


void serial_task(int min, int max)  // ④
  auto start_time = chrono::steady_clock::now();
  sum = 0;
  worker(0, MAX);
  auto end_time = chrono::steady_clock::now();
  auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
  cout << "Serail task finish, " << ms << " ms consumed, Result: " << sum << endl;

这段代码说明如下:

  1. 通过一个常量指定数据范围,这个是为了方便调整。
  2. 通过一个全局变量来存储结果。
  3. 通过一个任务函数来计算值。
  4. 统计任务的执行时间。

这段程序输出如下:

Serail task finish, 6406 ms consumed, Result: 2.10819e+13

很显然,上面单线程的做法性能太差了。我们的任务完全是可以并发执行的。并且任务很容易划分。

下面我们就尝试以多线程的方式来改造原先的程序。

改造后的程序如下:

// 06_naive_multithread.cpp

void concurrent_task(int min, int max) 
  auto start_time = chrono::steady_clock::now();

  unsigned concurrent_count = thread::hardware_concurrency(); // ①
  cout << "hardware_concurrency: " << concurrent_count << endl;
  vector<thread> threads;
  min = 0;
  sum = 0;
  for (int t = 0; t < concurrent_count; t++)  // ②
    int range = max / concurrent_count * (t + 1);
    threads.push_back(thread(worker, min, range)); // ③
    min = range + 1;
  
  for (auto& t : threads) 
    t.join(); // ④
  

  auto end_time = chrono::steady_clock::now();
  auto ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();
  cout << "Concurrent task finish, " << ms << " ms consumed, Result: " << sum << endl;

这段代码说明如下:

  1. thread::hardware_concurrency()可以获取到当前硬件支持多少个线程并行执行。
  2. 根据处理器的情况决定线程的数量。
  3. 对于每一个线程都通过worker函数来完成任务,并划分一部分数据给它处理。
  4. 等待每一个线程执行结束。

很好,似乎很简单就完成了并发的改造。然后我们运行一下这个程序:

hardware_concurrency: 16
Concurrent task finish, 6246 ms consumed, Result: 1.78162e+12

很抱歉,我们会发现这里的性能并没有明显的提升。更严重的是,这里的结果是错误的。

要搞清楚为什么结果不正确我们需要更多的背景知识。

我们知道,对于现代的处理器来说,为了加速处理的速度,每个处理器都会有自己的高速缓存(Cache),这个高速缓存是与每个处理器相对应的,如下图所示:

事实上,目前大部分CPU的缓存已经不只一层。

处理器在进行计算的时候,高速缓存会参与其中,例如数据的读和写。而高速缓存和系统主存(Memory)是有可能存在不一致的。即:某个结果计算后保存在处理器的高速缓存中了,但是没有同步到主存中,此时这个值对于其他处理器就是不可见的。

事情还远不止这么简单。我们对于全局变量值的修改:sum += sqrt(i);这条语句,它并非是原子的。它其实是很多条指令的组合才能完成。假设在某个设备上,这条语句通过下面这几个步骤来完成。它们的时序可能如下所示:

在时间点a的时候,所有线程对于sum变量的值是一致的。

但是在时间点b之后,thread3上已经对sum进行了赋值。而这个时候其他几个线程也同时在其他处理器上使用了这个值,那么这个时候它们所使用的值就是旧的(错误的)。最后得到的结果也自然是错的。

竞争条件与临界区

当多个进程或者线程同时访问共享数据时,只要有一个任务会修改数据,那么就可能会发生问题。此时结果依赖于这些任务执行的相对时间,这种场景称为竞争条件(race condition)

访问共享数据的代码片段称之为临界区(critical section)。具体到上面这个示例,临界区就是读写sum变量的地方。

要避免竞争条件,就需要对临界区进行数据保护。

很自然的,现在我们能够理解发生竞争条件是因为这些线程在同时访问共享数据,其中有些线程的改动没有让其他线程知道,导致其他线程在错误的基础上进行处理,结果自然也就是错误的。

那么,如果一次只让一个线程访问共享数据,访问完了再让其他线程接着访问,这样就可以避免问题的发生了。

接下来介绍的API提供的就是这样的功能。

互斥体与锁

mutex

开发并发系统的目的主要是为了提升性能:将任务分散到多个线程,然后在不同的处理器上同时执行。这些分散开来的线程通常会包含两类任务:

  1. 独立的对于划分给自己的数据的处理
  2. 对于处理结果的汇总

其中第1项任务因为每个线程是独立的,不存在竞争条件的问题。而第2项任务,由于所有线程都可能往总结果(例如上面的sum变量)汇总,这就需要做保护了。在某一个具体的时刻,只应当有一个线程更新总结果,即:保证每个线程对于共享数据的访问是“互斥”的。mutex 就提供了这样的功能。

mutexmutual exclusion(互斥)的简写。

APIC++标准说明
mutexC++11提供基本互斥设施
timed_mutexC++11提供互斥设施,带有超时功能
recursive_mutexC++11提供能被同一线程递归锁定的互斥设施
recursive_timed_mutexC++11提供能被同一线程递归锁定的互斥设施,带有超时功能
shared_timed_mutexC++14提供共享互斥设施并带有超时功能
shared_mutexC++17提供共享互斥设施

很明显,在这些类中,mutex是最基础的API。其他类都是在它的基础上的改进。所以这些类都提供了下面三个方法,并且它们的功能是一样的:

方法说明
lock锁定互斥体,如果不可用,则阻塞
try_lock尝试锁定互斥体,如果不可用,直接返回
unlock解锁互斥体

这三个方法提供了基础的锁定和解除锁定的功能。使用lock意味着你有很强的意愿一定要获取到互斥体,而使用try_lock则是进行一次尝试。这意味着如果失败了,你通常还有其他的路径可以走。

在这些基础功能之上,其他的类分别在下面三个方面进行了扩展:

  • 超时timed_mutexrecursive_timed_mutexshared_timed_mutex的名称都带有timed,这意味着它们都支持超时功能。它们都提供了try_lock_fortry_lock_until方法,这两个方法分别可以指定超时的时间长度和时间点。如果在超时的时间范围内没有能获取到锁,则直接返回,不再继续等待。
  • 可重入recursive_mutexrecursive_timed_mutex的名称都带有recursive。可重入或者叫做可递归,是指在同一个线程中,同一把锁可以锁定多次。这就避免了一些不必要的死锁。
  • 共享shared_timed_mutexshared_mutex提供了共享功能。对于这类互斥体,实际上是提供了两把锁:一把是共享锁,一把是互斥锁。一旦某个线程获取了互斥锁,任何其他线程都无法再获取互斥锁和共享锁;但是如果有某个线程获取到了共享锁,其他线程无法再获取到互斥锁,但是还有获取到共享锁。这里互斥锁的使用和其他的互斥体接口和功能一样。而共享锁可以同时被多个线程同时获取到(使用共享锁的接口见下面的表格)。共享锁通常用在读者写者模型上。

使用共享锁的接口如下:

方法说明
lock_shared获取互斥体的共享锁,如果无法获取则阻塞
try_lock_shared尝试获取共享锁,如果不可用,直接返回
unlock_shared解锁共享锁

接下来,我们就借助刚学到的mutex来改造我们的并发系统,改造后的程序如下:

// 07_mutex_lock.cpp

static const int MAX = 10e8;
static double sum = 0;

static mutex exclusive;

void concurrent_worker(int min, int max) 
  for (int i = min; i <= max; i++) 
    exclusive.lock(); // ①
    sum += sqrt(i);
    exclusive.unlock(); // ②
  


void concurrent_task(int min, int max) 
  auto start_time = chrono::steady_clock::now();

  unsigned concurrent_count = thread::hardware_concurrency();
  cout << "hardware_concurrency: " << concurrent_count << endl;
  vector<thread> threads;
  min = 0;
  sum = 0;
  for (int t = 0; t < concurrent_count; t++) 
    int range = max / concurrent_count * (t + 1);
    threads.push_back(threadC++并发编程(C++11到C++17)

C++ 并发编程(C++11 到 C++17 )

C++11 并发编程基础:并发并行与C++多线程

我的C/C++语言学习进阶之旅收集关于MODERN C++ 11/14/17/20/23 的一些资料

我的C/C++语言学习进阶之旅收集关于MODERN C++ 11/14/17/20/23 的一些资料

C++并发编程(C++11)