Linux---多线程线程池
Posted 一棵灬胡杨树
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux---多线程线程池相关的知识,希望对你有一定的参考价值。
多线程
线程概念:线程就是进程中的一条执行流,负责代码的执行调度,在linux下线程执行流是通过pcb实现的,一个进程中可以有多个pcb,这些pcb共享进程中的大部分资源,所以线程被称为一个轻量级进程。
Notes:进程是系统进行资源分配的基本单元,线程是操作系统进行调度运行的基本单元,在linux下pcb相较于传统pcb较为轻量化(因为有很多都是共享的资源),因此linux下的线程实际上被称为轻量级进程。
Notes:
Linux下执行流是通过轻量级进程实现的。
1.进程是pcb,linux下是一个task_struct,程序运行的动态描述
2.然而linux下的执行流是通过pcb来实现的(一个进程中可以有多个pcb)
3.linux下的pcb是一个轻量级进程
4.线程是进程中的一条执行流,因此也说linux下的线程就是轻量级进程因此,进程是系统进行资源分配的基本单元,线程是系统调度运行的基本单元
线程间的独有与共享
独有:标识符,栈,寄存器,信号屏蔽字,errno(每个pcb都有)
共享:虚拟地址空间,IO信息,信号处理方式,工作路径
ps:线程之间的通信方式种类:管道、共享内存、消息队列、信号量、全局变量、函数传参
多进程与多线程进行多任务处理的优缺点
多线程的优点:
1.线程间通信更加灵活(包含进程间通信方式在内还有全局变量函数传参)
2.线程的创建与销毁成本更低
3.同一进程中的线程间切换调度成本更低
多进程的优点:应用场景----对于主程序安全性要求高的场景—shell/服务器
1.健壮性,稳定性高,独立性高
在多任务处理中,执行流不是越多越好,cpu资源有限,执行流多了,反而会增加切换调度成本
线程安全
概念:描述的是在线程中对于一个临界资源的访问操作是安全的。
实现:同步与互斥
同步:通过条件判断保证对临界资源访问的合理性
互斥:通过同一时间对临界资源同一时间的唯一访问保证访问安全性
互斥的实现: 互斥锁(互斥量):就是只有0或1的计数器,标记临界资源的两种访问状态,在访问资源之前加锁---判断是否可访问,不可访问则阻塞
在访问资源完毕后解锁---将资源状态置为可访问
互斥锁本身就是一个临界资源,需要大家访问同一个互斥锁才能互斥,互斥锁本身计数操作是一个原子操作
操作流程+接口认识:
- 定义互斥锁变量:
pthread_mutex_t mutex;
- 初始化互斥锁: (1)
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
(2)intpthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutexattr_t *attr)
- 访问临界资源之前加锁
(1)int pthread_mutex_lock(pthread_mutex_t *mutex); --阻塞加锁
(2)int pthread_mutex_trylock(pthread_mutex_t *mutex)--尝试加锁,非阻塞,加不了锁则报错返回
- 访问临界资源完毕后解锁
int pthread_mutex_unock(pthread_mutex_t *mutex)
- 销毁互斥锁
int ptherad_mutex_destroy(pthread_mutex_t *mutex)
注意事项:
加锁尽量只保护临界资源的访问操作
加锁后在任意有可能退出线程之前的地方都要解锁
死锁
死锁:
概念:程序流程无法继续推进卡死的状态
产生:四个必要条件:
(1)互斥条件;—我加了锁,别人就不能加
(2)请求和保持条件;–加了锁A,请求锁B,锁B请求不到,不释放锁A
(3)不可剥夺条件;–我加的锁,只有我能解
(4)环路等待条件。–加了锁A请求锁B,对方加了锁B请求锁A
预防:破坏产生的必要条件:
1.保证多方加解锁顺序一致。
2.非阻塞加锁加锁失败后释放已有资源
避免:银行家算法----将系统运行分为两种状态:安全----非安全
同步的实现–条件变量
条件变量:提供一个pcb等待队列+以及使线程阻塞和唤醒阻塞线程接口
程序员自己在程序中进行判断,当前线程获取资源是否合理,不合理则调用阻塞接口使线程阻塞,其他线程促使条件满足之后调用唤醒阻塞的线程(唤醒的是pcb等待队列中的线程)
1.条件变量需要搭配互斥锁一起使用的。
接口介绍:
1.定义条件变量:pthread_cond_t cond;
2.初始化条件变量:pthread_cond_t cond = PTHREAD_COND_INITIALIZER
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
3.阻塞接口:int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timewait(pthread_cond_t *cond, pthread_mutex_t *mutex,struct timespec *abst);//时间到了,自己报错返回
4.唤醒阻塞的线程:
(1) int pthread_cond_broadcast(pthread_cond_t *cond);唤醒所有
(2)int pthread_cond_signal(pthread_cond_t *cond)--唤醒至少一个
5.销毁条件变量:
int pthread_cond_destroy(pthread_cond_t *cond);
举例实现:
/生产者与消费者模型-----条件变量+互斥锁版本
#define MAX_QUEUE 5
class BlockQueue
{
private:
int _capacity;//队列容量
std::queue<int> _queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond_pro;
pthread_cond_t _cond_cus;
public:
BlockQueue(int que = MAX_QUEUE):_capacity(que)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond_pro,nullptr);
pthread_cond_init(&_cond_cus,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_pro);
pthread_cond_destroy(&_cond_cus);
}
bool Push(const int &data)
{
pthread_mutex_lock(&_mutex);
while (_queue.size() == _capacity)
{
//阻塞
pthread_cond_wait(&_cond_pro,&_mutex);
}
_queue.push(data);
//唤醒
pthread_cond_signal(&_cond_cus);
pthread_mutex_unlock(&_mutex);
return true;
}
bool Pop(int *data)
{
pthread_mutex_lock(&_mutex);
while (_queue.empty())
{
pthread_cond_wait(&_cond_cus,&_mutex);
}
*data = _queue.front();
_queue.pop();
pthread_cond_signal(&_cond_pro);
pthread_mutex_unlock(&_mutex);
}
};
void *productor(void *arg)
{
BlockQueue *q = (BlockQueue*)arg;
int data = 0;
while (1)
{
q->Push(data);
printf("put data:%d\\n",data++);
}
}
void *customer(void *arg)
{
BlockQueue *q = (BlockQueue*)arg;
while (1)
{
int data = 0;
q->Pop(&data);
printf("get data:%d\\n",data);
}
}
int main()
{
BlockQueue q;
int ret;
pthread_t ptid[4],ctid[4];
for(auto & i : ptid)
{
ret = pthread_create(&i, nullptr,productor,&q);
if (ret != 0)
{
printf("thread creat error\\n");
return -1;
}
}
for(int i = 0;i < 4;++i)
{
pthread_join(ptid[i],nullptr);
pthread_join(ctid[i],nullptr);
ret = pthread_create(&ctid[i],nullptr,customer,&q);
if (ret != 0)
{
printf("thread creat error\\n");
return -1;
}
}
return 0;
}
同步的实现–信号量
本质:计数器+pcb等待队列
作用:用于实现进程或线程间的同步与互斥
操作:P操作 -1,V操作:+1
同步的实现:通过计数器对资源进行计数,获取资源前访问信号量进行P操作,产生资源后进行V操作
互斥的实现:初始化信号量计数为1,访问资源前进行P操作,访问资源完毕后进行V操作。
接口认识:
头文件:#include <semaphore.h>
定义信号量:sem_init
初始化信号量: int sem_init(sem_t *sem, int pshared, unsigned int value);
参数: sem:信号量变量 pshared:0用于线程,非0用于进程
P操作:
int sem_wait(sem_t *sem); :计数-1,判断是否大于0,小于0则阻塞----阻塞操作
int sem_trywait(sem_t *sem);----非阻塞操作
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
V操作:
int sem_post(sem_t *sem);
销毁信号量:
int sem_destroy(sem_t *sem);
使用信号量实现生产者与消费者模型:
线程安全的环形队列: 1.read等于write时,表示没有数据节点;2 wirte下一个位置是read的时候,表示队列满了
实现思路:信号量对资源数量进行统计
对生产者来说:空闲结点是资源 —一个信号量对于空闲结点进行计数
1.对空闲结点数量进行P操作(如果计数<0则阻塞)
2.加锁(在P操作之前加锁会造成死锁)
3.向队列添加数据arr[wirte] = data;
4.解锁
5.对数据结点数据进行V操作(多了一个数据结点)
对消费者来说:数据节点是资源 —一个信号量对数据结点进行计数
1.对数据结点进行P操作
2.加锁
3.出队数据
4.解锁
5.对空闲结点进行进行V操作
具体代码:
通过信号量实现线程安全的环形队列,最终实现生产者与消费者模型
#define MAX_QUEUE 5
#define MAX_THREAD 5
class RingQueue{
private:
std::vector<int> _queue;
int _capacity;
int _step_read; // 当前即将读取数据的位置的下标
int _step_write; // 当前即将写入数据的位置的下标
sem_t _sem_lock;// 用于实现互斥的信号量
// 使用这个计数器,实现对当前队列中的数据资源的数量进行计数;
// 如果<=0表示没有资源,则消费者会陷入等待
sem_t _sem_data;
// 使用这个计数器,实现对当前队列 中的空闲空间数量进行计数;
// 如果<=0表示队列满了,则生产者陷入等待
sem_t _sem_space;
public:
RingQueue(int max = MAX_QUEUE) : _queue(max), _capacity(max),
_step_read(0), _step_write(0){
//...初始化过程...
//互斥信号量的初始化
//pshare设置为0,表示当前用于线程间的同步互斥
//value信号量初值,初始化为1,数值最大不大于1,实现互斥
sem_init(&_sem_lock, 0, 1);
sem_init(&_sem_data, 0, 0);//数据资源初始为0
sem_init(&_sem_space, 0, max);//空闲空间初值就是节点数量
}
~RingQueue() {
//...销毁资源过程...
sem_destroy(&_sem_lock);
sem_destroy(&_sem_data);
sem_destroy(&_sem_space);
}
bool Push(int data) {
sem_wait(&_sem_space);//统计空间节点数量,自动判断是否有空闲空间,没有则阻塞
//互斥保护临界资源,注意上边的信号量不需要保护
//若保护了上边的信号量操作,反而回出问题:若先加锁再判断,有可能会陷入休眠而没解锁
sem_wait(&_sem_lock);
_queue[_step_write] = data;
_step_write = (_step_write + 1) % _capacity;
sem_post(&_sem_lock);
sem_post(&_sem_data);//入队数据之后,数据资源的数量增加一个,并且唤醒消费者
return true;
}
bool Pop(int *data) {
sem_wait(&_sem_data);//消费者判读数据资源数量,若<=0则回陷入等待 ,计数-1
sem_wait(&_sem_lock);
*data = _queue[_step_read];
_step_read = (_step_read + 1) % _capacity;
sem_post(&_sem_lock);
sem_post(&_sem_space);//空间资源多了一个,唤醒生产者
return true;
}
};
void *thr_consumer(void *arg)
{
RingQueue *q = (RingQueue*)arg;
while(1) {
//消费者处理数据
int data;
q->Pop(&data);
printf("消费者:%p 出队数据:%d\\n", pthread_self(), data);
}
return NULL;
}
void *thr_productor(void *arg)
{
RingQueue *q = (RingQueue*)arg;
int i = 0;
while(1) {
//生产者生产数据
q->Push(i);
printf("生产者:%p 入队数据:%d\\n", pthread_self(), i++);
}
return NULL;
}
int main()
{
//完成生产者与消费者模型
//创建生产者角色的线程以及消费者角色的线程
pthread_t ctid[MAX_THREAD], ptid[MAX_THREAD];
int ret , i;
RingQueue q;
//创建消费者线程
for (i = 0; i < MAX_THREAD; i++) {
ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&q);
if (ret != 0) {
printf("thread create error\\n");
return -1;
}
}
//创建生产者线程
for (i = 0; i < MAX_THREAD; i++) {
ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&q);
if (ret != 0) {
printf("thread create error\\n");
return -1;
}
}
//等待所有线程退出
for (i = 0; i < MAX_THREAD; i++) {
pthread_join(ctid[i], NULL);
pthread_join(ptid[i], NULL);
}
return 0;
}
乐观锁和悲观锁
引用于这篇文章:面试必备之乐观锁与悲观锁,大家可以去看看
乐观锁总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。
悲观锁总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁
从上面对两种锁的介绍,我们知道两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般多写的场景下用悲观锁就比较合适。
线程池
线程池:一个或多个创建好的线程 + 线程安全的任务队列
其他线程将需要处理的任务,添加到线程池的任务队列中,线程池总的线程不断的从队列中获取任务进行处理。
应用:应用于有大量请求需要进行处理的场景
1.避免了峰值压力下资源耗尽的风险
2.避免大量频繁创建于销毁线程带来的时间成本
任务处理T = 创建线程T1 + 任务处理过程T2 + 销毁线程T3
实现:一个或多个创建好的线程 + 线程安全的任务队列
想要让线程池中的线程能够处理任务数据,就必须让线程池中的线程知道如何处理
1.在线程入口函数中直接定义好任务处理逻辑
2.添加任务的线程,添加任务时候,不但添加要处理的数据,顺便把这个任务数据的处理方法一起传入
线程池中的线程只需要调用处理函数传入处理数据即可
#include <cstdio>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#define MAX_QUEUE 10
#define MAX_THREAD 5
typedef void(*handler_t)(int);
class ThreadTask{
private:
int _data;
handler_t _handler;
public:
ThreadTask() {}
ThreadTask(int data, handler_t handler):_data(data),
_handler(handler){}
void Run() {
_handler(_data);
}
};
class BlockQueue{
private:
int _capacity;//队列容量
std::queue<ThreadTask> _queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond_pro;
pthread_cond_t _cond_cus;
public:
BlockQueue(int que = MAX_QUEUE):_capacity(que){
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_cond_pro, NULL);
pthread_cond_init(&_cond_cus, NULL);
}
~BlockQueue(){
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond_pro);
pthread_cond_destroy(&_cond_cus);
}
//
bool Push(const ThreadTask &data) {
pthread_mutex_lock(&_mutex);
while(_queue.size() == _capacity) {//队列满了
pthread_cond_wait(&_cond_pro, &_mutex);
}
_queue.push(data);
pthread_cond_signal(&_cond_cus);
pthread_mutex_unlock(&_mutex);
return true;
}
bool Pop(ThreadTask *data) {
pthread_mutex_lock(&_mutex);
while(_queue.empty() == true) {//队列为NULL
pthread_cond_wait(&_cond_cus, &_mutex);
}
*data = _queue.front();
_queue.pop();
pthread_cond_signal(&_cond_pro);
pthread_mutex_unlock(&_mutex);
}
};
class Linux多线程_(线程池,读者写者,自旋锁)