万字详解Linux系列多线程(下)
Posted 山舟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了万字详解Linux系列多线程(下)相关的知识,希望对你有一定的参考价值。
文章目录
前言
由于多线程部分内容过多,所以本文接着【万字详解Linux系列】多线程(上)向后介绍多线程相关的内容。
一、线程同步
1.概念
在保证数据安全的前提下,让线程按照某种特定的顺序访问临界资源,从而高效使用临界资源。
2.条件变量
条件变量就相当于实现进程互斥中的互斥量,是Linux下实现进程同步的一种机制。可以理解为描述临界资源是否就绪的一个数据化变量。
注意条件变量不保护临界资源,所以条件变量常和互斥量(锁)一起使用。
3.代码实现
(1)相关函数
//初始化条件变量
// 要初始化的条件变量 设置条件变量属性,可设置为NULL
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
//销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);//传入条件变量的指针即可
//等待条件变量
// 在该条件变量上等待 互斥量,表示在等待的期间解锁
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
//等待的时候往往正处于临界区,这个函数在等待时将锁释放,当正在等待的线程被唤醒时又自动获得该锁
//唤醒某一个线程
int pthread_cond_signal(pthread_cond_t *cond);//传入条件变量的指针即可
//唤醒所有线程
int pthread_cond_broadcast(pthread_cond_t *cond);//传入条件变量的指针即可
(2)代码使用
下面通过代码来演示上面函数的使用方法,代码的大致功能是创建3个线程并通过主线程控制它们,每次输入字符时唤醒一个线程。
#include <cstdio>
#include <iostream>
#include <pthread.h>
using namespace std;
pthread_mutex_t lock;//创建互斥量
pthread_cond_t cond;//创建条件变量
void* Run(void* arg)
pthread_detach(pthread_self());
cout << (char*)arg << " create" << endl;
while(true)
pthread_cond_wait(&cond, &lock);//阻塞等待
cout << "thread " << pthread_self() << " is running ... " << endl;
int main()
pthread_mutex_init(&lock, nullptr);//初始化锁
pthread_cond_init(&cond, nullptr);//初始化条件变量
//创建三个线程
pthread_t t1,t2,t3;
pthread_create(&t1, nullptr, Run, (void*)"thread 1");
pthread_create(&t2, nullptr, Run, (void*)"thread 2");
pthread_create(&t3, nullptr, Run, (void*)"thread 3");
//主线程控制其余三个线程
while(true)
getchar();//每次收到输入就唤醒线程
pthread_cond_signal(&cond);//每次唤醒一个线程
pthread_mutex_destroy(&lock);//销毁锁
pthread_cond_destroy(&cond);//销毁条件变量
return 0;
结果如下:
注意在代码中没有对各线程进行任何的排序,但是在每次解除阻塞时它们显然是有序的(如黄框所示),这就是条件变量的作用。(还有要注意如果先输入一个字符、再按下回车,这时会一次性唤醒两个线程,因为回车本身在getchar时也被当做一个字符)
下面的代码使用pthread_cond_broadcast唤醒线程(仅有这里一处改动,剩下的与上面代码相同),它可以一次将所有线程都唤醒。
#include <cstdio>
#include <iostream>
#include <pthread.h>
using namespace std;
pthread_mutex_t lock;//创建互斥量
pthread_cond_t cond;//创建条件变量
void* Run(void* arg)
pthread_detach(pthread_self());
cout << (char*)arg << " create" << endl;
while(true)
pthread_cond_wait(&cond, &lock);//阻塞等待
cout << "thread " << pthread_self() << " is running ... " << endl;
int main()
pthread_mutex_init(&lock, nullptr);//初始化锁
pthread_cond_init(&cond, nullptr);//初始化条件变量
pthread_t t1,t2,t3;
pthread_create(&t1, nullptr, Run, (void*)"thread 1");
pthread_create(&t2, nullptr, Run, (void*)"thread 2");
pthread_create(&t3, nullptr, Run, (void*)"thread 3");
//主线程控制其余三个线程
while(true)
getchar();//每次收到输入就唤醒线程
pthread_cond_broadcast(&cond);//每次都唤醒所有线程
pthread_mutex_destroy(&lock);//销毁锁
pthread_cond_destroy(&cond);//销毁条件变量
return 0;
结果如下:
(3)关于pthread_cond_wait
从这个函数的命名中就可以看出,这是与条件变量相关的函数,但为什么它的第二个参数用到了互斥量呢?
因为条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且通知等待在条件变量上的线程。但条件不会无缘无故的突然变得满足,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护,通过互斥锁来安全地获取和修改共享数据。
二、生产者消费者模型
1.什么是生产者消费者模型
生产者消费者模型是通过一个容器来解决生产者和消费者的强耦合问题,有解耦、支持并发等等优点,是处理多线程同步的一个经典的例子
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,这样生产者生产完数据之后不用等待消费者处理,直接放进阻塞队列,同时消费者也不找生产者要数据,而是直接从阻塞队列里取。
阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
2.相关概念
(1)一个交易场所
通常是内存中的一段缓冲区,“交易”的内容就是数据。
(2)三种角色
生产者和消费者,这里特指特定的线程或进程。
仓库,这里指保存数据的缓冲区。
(3)三种关系
- 消费者与消费者:竞争关系,这里指互相竞争数据、互斥关系。
- 生产者与生产者:竞争关系,这里指互相竞争写入数据、互斥关系。
- 生产者与消费者:竞争关系(生产者写数据时消费者不能拿数据,消费者拿数据时生产者不能写数据,保证正确)、同步关系(多线程协同,保证高效)。
3.基于阻塞队列的单生产者、单消费者模型
(1)简介
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素。
- 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出
(以上的操作都是基于不同的线程来说的,在对阻塞队列进程操作时会被阻塞)
(2)代码实现
由于阻塞队列本身的实现代码量较大,所以我这里单独分出一个hpp来实现阻塞队列,其逻辑并没有很难,且在大部分代码后都附有注释:
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
using namespace std;
#define NUM 5//阻塞队列的大小
template<typename T>//模板
class BlockQueue//阻塞队列
public:
//给阻塞队列的容量一个缺省值
BlockQueue(int _capacity = NUM)
//初始化互斥量和条件变量
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&full, nullptr);
pthread_cond_init(&empty, nullptr);
capacity = _capacity;
//产生数据
void Push(const T& in)
pthread_mutex_lock(&lock);//访问临界区,加锁
//这里用while而不是if可以防止pthread_cond_wait调用失败导致伪唤醒
while(q.size() == capacity)//队列满
//队列已满,不能生产,等待直到q中可以存放新的数据
pthread_cond_wait(&full, &lock);
//代码运行到这里,说明q中有空间放新的数据,否则会一直在上面的if判断中等待
q.push(in);
pthread_mutex_unlock(&lock);//解锁
pthread_cond_signal(&empty);//唤醒消费者
//拿到数据
void Pop(T& out)
pthread_mutex_lock(&lock);//访问临界区,加锁
while(q.empty())//队列空
//队列为空,不能消费,等待直到q中有数据
pthread_cond_wait(&empty, &lock);
out = q.front();
q.pop();
pthread_mutex_unlock(&lock);//解锁
pthread_cond_signal(&full);//唤醒生产者
~BlockQueue()
//将所有的互斥量和条件变量销毁
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&full);
pthread_cond_destroy(&empty);
private:
queue<T> q;//阻塞队列
int capacity;//队列中的数据个数达到capacity后不允许再放入
pthread_mutex_t lock;//互斥量,保证访问临界资源时安全
pthread_cond_t full;//条件变量,在队列满时不允许继续生产
pthread_cond_t empty;//条件变量,在队列空时不允许消费
;
下面是在main函数内创建两个进程并用阻塞队列来访问临界资源的代码,总体逻辑就是生产者不断产生随机数并放入阻塞队列,消费者不断从阻塞队列中拿到数据:
#include "blockQueue.hpp"
void *Consumer(void* arg)//消费者的处理
auto bq = (BlockQueue<int>*)arg;
while(true)
sleep(1);
int data = 0;
bq->Pop(data);//从阻塞队列中拿到数据
cout << "Consumer : " << data << endl;
void *Productor(void* arg)//生产者的处理
auto bq = (BlockQueue<int>*)arg;
while(true)
sleep(1);
int data = rand() % 100 + 1;//生成随机数
bq->Push(data);//放入阻塞队列
cout << "Productor : " << data << endl;
int main()
srand((unsigned long)time(nullptr));//创建一个随机数种子
BlockQueue<int>* bq = new BlockQueue<int>();
//创建两个线程
pthread_t con, pro;
pthread_create(&con, nullptr, Consumer, bq);
pthread_create(&pro, nullptr, Productor, bq);
pthread_join(con, nullptr);
pthread_join(pro, nullptr);
return 0;
运行结果如下,但由于在生产者和消费者各自的处理内每次的间隔都是1s,是同步的,所以现象并不明显:
下面让生产者每1s生产一个数据,但消费者没7s拿一次数据,由于阻塞队列的大小设为5,所以一开始生产者生产够5个数据后就会被阻塞,直到消费者从阻塞队列中拿数据;之后每次生产者生产1个数据就要再等6s消费者拿数据后才能再生产。这样修改后的现象会很明显。
上面两种情况如下列出,经对比现象会很明显。
三、POSIX信号量
1.简介
这里首先要说明一下,POSIX信号量和进程信号毫无关系,而是适用于多线程间的同步。在【万字详解Linux系列】进程间通信(IPC)中提到过,但因为那时还没有介绍多线程的相关内容,所以一笔带过。
信号量本质是一个描述临界资源中资源数目的计数器。申请到信号量对应着让计数器–,释放信号量对应着让计数器++。
申请到信号量的本质:拥有了使用特定资源的权限(而不是开始使用申请的资源)。
2.函数介绍
#include <semaphore.h>//头文件
//初始化信号量
// 要初始化的信号量 value是信号量初始值
int sem_init(sem_t *sem, int pshared, unsigned int value);
// pshared为零表示线程间共享,非零表示进程间共享
//当value为1时称为二元信号量,可看成互斥量(锁)
//销毁信号量
int sem_destroy(sem_t *sem);//传入要销毁的信号量即可
//等待信号量,本质是将信号量的值减1,这样就申请到了一个信号量
int sem_wait(sem_t *sem);//P操作
//发布信号量,本质是将信号量的值加1,这样就归还了一个信号量
int sem_post(sem_t *sem);//V操作
3.函数调用
这里用信号量来实现之前的“抢票”的逻辑。
#include <iostream>
#include <semaphore.h>
#include <pthread.h>
#include <string>
#include <unistd.h>
using namespace std;
//简单地封装一下信号量
class Sem
public:
Sem(int num = 1)
//0是线程间共享,value默认给1
sem_init(&sem, 0, num);
void P()
sem_wait(&sem);
void V()
sem_post(&sem);
~Sem()
sem_destroy(&sem);
private:
sem_t sem;
;
Sem sem(1);//给的value为1
int tickets = 2000;
void* GetTickets(void* arg)//每个线程要"抢票"
string name = (char*)arg;
while(true)
sem.P();//申请信号量
if(tickets > 0)
usleep(1000);
cout << name << " get tickets : " << tickets-- << endl;
sem.V();//归还信号量
else
sem.V();//归还信号量
break;
cout << name << " quit" << endl;
pthread_exit((void*)0);
int main()
//创建6个线程,让线程间的切换更频繁些,效果会更好
pthread_t tid1, tid2, tid3, tid4, tid5, tid6;
pthread_create(&tid1, nullptr, GetTickets, (void*)"thread 1");
pthread_create(&tid2, nullptr, GetTickets, (void*)"thread 2");
pthread_create(&tid3, nullptr, GetTickets, (void*)"thread 3");
pthread_create(&tid4, nullptr, GetTickets, (void*)"thread 4");
pthread_create(&tid5, nullptr, GetTickets, (void*)"thread 5");
pthread_create(&tid6, nullptr, GetTickets, (void*)"thread 6");
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
pthread_join(tid3, nullptr);
pthread_join(tid4, nullptr);
pthread_join(tid5, nullptr);
pthread_join(tid6, nullptr);
return 0;
这里使用信号量与互斥量不同的就是会出现同一个线程一次执行许多次任务的情况,部分现象如下:
4.基于环形队列的生产者消费者模型
下面是环形队列的实现,对于临界资源用信号量来管理。
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
#include <cstdlib>
#include <ctime>
using namespace std;
#define NUM 5//环形队列的容量
template<typename T>
class RingQueue
public:
RingQueue(int _cap = NUM)<以上是关于万字详解Linux系列多线程(下)的主要内容,如果未能解决你的问题,请参考以下文章