Linux---多线程线程池

Posted 一棵灬胡杨树

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux---多线程线程池相关的知识,希望对你有一定的参考价值。

多线程

线程概念:线程就是进程中的一条执行流,负责代码的执行调度,在linux下线程执行流是通过pcb实现的,一个进程中可以有多个pcb,这些pcb共享进程中的大部分资源,所以线程被称为一个轻量级进程。

Notes:进程是系统进行资源分配的基本单元,线程是操作系统进行调度运行的基本单元,在linux下pcb相较于传统pcb较为轻量化(因为有很多都是共享的资源),因此linux下的线程实际上被称为轻量级进程。

Notes:
Linux下执行流是通过轻量级进程实现的。
1.进程是pcb,linux下是一个task_struct,程序运行的动态描述
2.然而linux下的执行流是通过pcb来实现的(一个进程中可以有多个pcb)
3.linux下的pcb是一个轻量级进程
4.线程是进程中的一条执行流,因此也说linux下的线程就是轻量级进程因此,进程是系统进行资源分配的基本单元,线程是系统调度运行的基本单元

线程间的独有与共享

独有:标识符,栈,寄存器,信号屏蔽字,errno(每个pcb都有)
共享:虚拟地址空间,IO信息,信号处理方式,工作路径

ps:线程之间的通信方式种类:管道、共享内存、消息队列、信号量、全局变量、函数传参

多进程与多线程进行多任务处理的优缺点

多线程的优点:
1.线程间通信更加灵活(包含进程间通信方式在内还有全局变量函数传参)
2.线程的创建与销毁成本更低
3.同一进程中的线程间切换调度成本更低

多进程的优点:应用场景----对于主程序安全性要求高的场景—shell/服务器
1.健壮性,稳定性高,独立性高
在多任务处理中,执行流不是越多越好,cpu资源有限,执行流多了,反而会增加切换调度成本

线程安全

概念:描述的是在线程中对于一个临界资源的访问操作是安全的。
实现:同步与互斥
同步:通过条件判断保证对临界资源访问的合理性
互斥:通过同一时间对临界资源同一时间的唯一访问保证访问安全性

  互斥的实现:  互斥锁(互斥量):就是只有0或1的计数器,标记临界资源的两种访问状态,在访问资源之前加锁---判断是否可访问,不可访问则阻塞
          在访问资源完毕后解锁---将资源状态置为可访问
          互斥锁本身就是一个临界资源,需要大家访问同一个互斥锁才能互斥,互斥锁本身计数操作是一个原子操作

操作流程+接口认识:

  1. 定义互斥锁变量:pthread_mutex_t mutex;
  2. 初始化互斥锁: (1) pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
(2)intpthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutexattr_t *attr)
  1. 访问临界资源之前加锁
    (1)int pthread_mutex_lock(pthread_mutex_t *mutex); --阻塞加锁
(2)int pthread_mutex_trylock(pthread_mutex_t *mutex)--尝试加锁,非阻塞,加不了锁则报错返回
  1. 访问临界资源完毕后解锁
int pthread_mutex_unock(pthread_mutex_t *mutex)
  1. 销毁互斥锁
int ptherad_mutex_destroy(pthread_mutex_t *mutex)

注意事项:
加锁尽量只保护临界资源的访问操作
加锁后在任意有可能退出线程之前的地方都要解锁

死锁

死锁:
概念:程序流程无法继续推进卡死的状态
产生:四个必要条件:
(1)互斥条件;—我加了锁,别人就不能加
(2)请求和保持条件;–加了锁A,请求锁B,锁B请求不到,不释放锁A
(3)不可剥夺条件;–我加的锁,只有我能解
(4)环路等待条件。–加了锁A请求锁B,对方加了锁B请求锁A

预防:破坏产生的必要条件:
1.保证多方加解锁顺序一致。
2.非阻塞加锁加锁失败后释放已有资源
避免:银行家算法----将系统运行分为两种状态:安全----非安全

同步的实现–条件变量

条件变量:提供一个pcb等待队列+以及使线程阻塞和唤醒阻塞线程接口
程序员自己在程序中进行判断,当前线程获取资源是否合理,不合理则调用阻塞接口使线程阻塞,其他线程促使条件满足之后调用唤醒阻塞的线程(唤醒的是pcb等待队列中的线程)

1.条件变量需要搭配互斥锁一起使用的。
接口介绍:
1.定义条件变量:pthread_cond_t cond;
2.初始化条件变量:pthread_cond_t cond = PTHREAD_COND_INITIALIZER

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);

3.阻塞接口:int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);

int pthread_cond_timewait(pthread_cond_t *cond, pthread_mutex_t *mutex,struct timespec *abst);//时间到了,自己报错返回

4.唤醒阻塞的线程:
(1) int pthread_cond_broadcast(pthread_cond_t *cond);唤醒所有
(2)int pthread_cond_signal(pthread_cond_t *cond)--唤醒至少一个
5.销毁条件变量:

int pthread_cond_destroy(pthread_cond_t *cond);

举例实现:

/生产者与消费者模型-----条件变量+互斥锁版本
#define  MAX_QUEUE 5

class BlockQueue
{
private:
    int _capacity;//队列容量
    std::queue<int> _queue;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond_pro;
    pthread_cond_t _cond_cus;
public:
    BlockQueue(int que = MAX_QUEUE):_capacity(que)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond_pro,nullptr);
        pthread_cond_init(&_cond_cus,nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond_pro);
        pthread_cond_destroy(&_cond_cus);
    }
    bool Push(const int &data)
    {
        pthread_mutex_lock(&_mutex);
        while (_queue.size() == _capacity)
        {
            //阻塞
            pthread_cond_wait(&_cond_pro,&_mutex);
        }
        _queue.push(data);
        //唤醒
        pthread_cond_signal(&_cond_cus);
        pthread_mutex_unlock(&_mutex);
        return true;
    }
    bool Pop(int *data)
    {
        pthread_mutex_lock(&_mutex);
        while (_queue.empty())
        {
            pthread_cond_wait(&_cond_cus,&_mutex);
        }
        *data = _queue.front();
        _queue.pop();
        pthread_cond_signal(&_cond_pro);
        pthread_mutex_unlock(&_mutex);
    }
};


void *productor(void *arg)
{
    BlockQueue *q = (BlockQueue*)arg;
    int data = 0;
    while (1)
    {
        q->Push(data);
        printf("put data:%d\\n",data++);
    }
}

void *customer(void *arg)
{
    BlockQueue *q = (BlockQueue*)arg;
    while (1)
    {
        int data = 0;
        q->Pop(&data);
        printf("get data:%d\\n",data);
    }
}
int main()
{
    BlockQueue q;
    int ret;
    pthread_t ptid[4],ctid[4];
    for(auto & i : ptid)
    {
        ret = pthread_create(&i, nullptr,productor,&q);
        if (ret != 0)
        {
            printf("thread creat error\\n");
            return  -1;
        }
    }
    for(int i = 0;i < 4;++i)
    {
        pthread_join(ptid[i],nullptr);
        pthread_join(ctid[i],nullptr);
        ret = pthread_create(&ctid[i],nullptr,customer,&q);
        if (ret != 0)
        {
            printf("thread creat error\\n");
            return  -1;
        }
    }
    return 0;
}

同步的实现–信号量

本质:计数器+pcb等待队列
作用:用于实现进程或线程间的同步与互斥
操作:P操作 -1,V操作:+1
同步的实现:通过计数器对资源进行计数,获取资源前访问信号量进行P操作,产生资源后进行V操作
互斥的实现:初始化信号量计数为1,访问资源前进行P操作,访问资源完毕后进行V操作。

接口认识:
 头文件:#include <semaphore.h>
 定义信号量:sem_init
初始化信号量: int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:   sem:信号量变量 pshared:0用于线程,非0用于进程

P操作:

  int sem_wait(sem_t *sem); :计数-1,判断是否大于0,小于0则阻塞----阻塞操作

   int sem_trywait(sem_t *sem);----非阻塞操作

   int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

V操作:

int sem_post(sem_t *sem);

销毁信号量:

int sem_destroy(sem_t *sem);

使用信号量实现生产者与消费者模型:
线程安全的环形队列: 1.read等于write时,表示没有数据节点;2 wirte下一个位置是read的时候,表示队列满了

实现思路:信号量对资源数量进行统计
对生产者来说:空闲结点是资源 —一个信号量对于空闲结点进行计数
1.对空闲结点数量进行P操作(如果计数<0则阻塞)
2.加锁(在P操作之前加锁会造成死锁)
3.向队列添加数据arr[wirte] = data;
4.解锁
5.对数据结点数据进行V操作(多了一个数据结点)
对消费者来说:数据节点是资源 —一个信号量对数据结点进行计数
1.对数据结点进行P操作
2.加锁
3.出队数据
4.解锁
5.对空闲结点进行进行V操作

具体代码:

通过信号量实现线程安全的环形队列,最终实现生产者与消费者模型
#define MAX_QUEUE 5
#define MAX_THREAD 5
class RingQueue{
private:
    std::vector<int> _queue;
    int _capacity;
    int _step_read; // 当前即将读取数据的位置的下标
    int _step_write; // 当前即将写入数据的位置的下标
    sem_t _sem_lock;//  用于实现互斥的信号量
    // 使用这个计数器,实现对当前队列中的数据资源的数量进行计数;
    // 如果<=0表示没有资源,则消费者会陷入等待
    sem_t _sem_data;
    // 使用这个计数器,实现对当前队列 中的空闲空间数量进行计数;
    // 如果<=0表示队列满了,则生产者陷入等待
    sem_t _sem_space;
public:
    RingQueue(int max = MAX_QUEUE) : _queue(max), _capacity(max),
                                     _step_read(0), _step_write(0){
        //...初始化过程...
        //互斥信号量的初始化
        //pshare设置为0,表示当前用于线程间的同步互斥
        //value信号量初值,初始化为1,数值最大不大于1,实现互斥
        sem_init(&_sem_lock, 0, 1);
        sem_init(&_sem_data, 0, 0);//数据资源初始为0
        sem_init(&_sem_space, 0, max);//空闲空间初值就是节点数量
    }
    ~RingQueue() {
        //...销毁资源过程...
        sem_destroy(&_sem_lock);
        sem_destroy(&_sem_data);
        sem_destroy(&_sem_space);
    }
    bool Push(int data) {
        sem_wait(&_sem_space);//统计空间节点数量,自动判断是否有空闲空间,没有则阻塞
        //互斥保护临界资源,注意上边的信号量不需要保护
        //若保护了上边的信号量操作,反而回出问题:若先加锁再判断,有可能会陷入休眠而没解锁
        sem_wait(&_sem_lock);
        _queue[_step_write] = data;
        _step_write = (_step_write + 1) % _capacity;
        sem_post(&_sem_lock);
        sem_post(&_sem_data);//入队数据之后,数据资源的数量增加一个,并且唤醒消费者

        return true;

    }
    bool Pop(int *data) {
        sem_wait(&_sem_data);//消费者判读数据资源数量,若<=0则回陷入等待 ,计数-1
        sem_wait(&_sem_lock);
        *data = _queue[_step_read];
        _step_read = (_step_read + 1) % _capacity;
        sem_post(&_sem_lock);
        sem_post(&_sem_space);//空间资源多了一个,唤醒生产者

        return true;
    }
};

void *thr_consumer(void *arg)
{
    RingQueue *q = (RingQueue*)arg;
    while(1) {
        //消费者处理数据
        int data;
        q->Pop(&data);
        printf("消费者:%p 出队数据:%d\\n", pthread_self(), data);
    }
    return NULL;
}
void *thr_productor(void *arg)
{
    RingQueue *q = (RingQueue*)arg;
    int i = 0;
    while(1) {
        //生产者生产数据
        q->Push(i);
        printf("生产者:%p 入队数据:%d\\n", pthread_self(), i++);
    }
    return NULL;
}
int main()
{
    //完成生产者与消费者模型
    //创建生产者角色的线程以及消费者角色的线程
    pthread_t ctid[MAX_THREAD], ptid[MAX_THREAD];
    int ret , i;
    RingQueue q;

    //创建消费者线程
    for (i = 0; i < MAX_THREAD; i++) {
        ret = pthread_create(&ctid[i], NULL, thr_consumer, (void*)&q);
        if (ret != 0) {
            printf("thread create error\\n");
            return -1;
        }
    }
    //创建生产者线程
    for (i = 0; i < MAX_THREAD; i++) {
        ret = pthread_create(&ptid[i], NULL, thr_productor, (void*)&q);
        if (ret != 0) {
            printf("thread create error\\n");
            return -1;
        }
    }
    //等待所有线程退出
    for (i = 0; i < MAX_THREAD; i++) {
        pthread_join(ctid[i], NULL);
        pthread_join(ptid[i], NULL);
    }
    return 0;
}

乐观锁和悲观锁

引用于这篇文章:面试必备之乐观锁与悲观锁,大家可以去看看
乐观锁总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。

悲观锁总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁

从上面对两种锁的介绍,我们知道两种锁各有优缺点,不可认为一种好于另一种,像乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般多写的场景下用悲观锁就比较合适。

线程池

线程池:一个或多个创建好的线程 + 线程安全的任务队列
其他线程将需要处理的任务,添加到线程池的任务队列中,线程池总的线程不断的从队列中获取任务进行处理。

  应用:应用于有大量请求需要进行处理的场景 
       1.避免了峰值压力下资源耗尽的风险
       2.避免大量频繁创建于销毁线程带来的时间成本 

任务处理T = 创建线程T1 + 任务处理过程T2 + 销毁线程T3

  实现:一个或多个创建好的线程 + 线程安全的任务队列

         想要让线程池中的线程能够处理任务数据,就必须让线程池中的线程知道如何处理

           1.在线程入口函数中直接定义好任务处理逻辑
           2.添加任务的线程,添加任务时候,不但添加要处理的数据,顺便把这个任务数据的处理方法一起传入
              线程池中的线程只需要调用处理函数传入处理数据即可
#include <cstdio>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>

#define MAX_QUEUE 10
#define MAX_THREAD 5

typedef void(*handler_t)(int);
class ThreadTask{
    private:
        int _data;
        handler_t _handler;
    public:
        ThreadTask() {}
        ThreadTask(int data, handler_t handler):_data(data),
            _handler(handler){}
        void Run() {
            _handler(_data);
        }
};
class BlockQueue{
    private:
        int _capacity;//队列容量
        std::queue<ThreadTask> _queue;
        pthread_mutex_t _mutex;
        pthread_cond_t _cond_pro;
        pthread_cond_t _cond_cus;
    public:
        BlockQueue(int que = MAX_QUEUE):_capacity(que){
            pthread_mutex_init(&_mutex, NULL);
            pthread_cond_init(&_cond_pro, NULL);
            pthread_cond_init(&_cond_cus, NULL);
        }  
        ~BlockQueue(){
            pthread_mutex_destroy(&_mutex);
            pthread_cond_destroy(&_cond_pro);
            pthread_cond_destroy(&_cond_cus);
        }
        //
        bool Push(const ThreadTask &data) {
            pthread_mutex_lock(&_mutex);
            while(_queue.size() == _capacity) {//队列满了
                pthread_cond_wait(&_cond_pro, &_mutex);
            }
            _queue.push(data);
            pthread_cond_signal(&_cond_cus);
            pthread_mutex_unlock(&_mutex);
            return true;
        }
        bool Pop(ThreadTask *data) {
            pthread_mutex_lock(&_mutex);
            while(_queue.empty() == true) {//队列为NULL
                pthread_cond_wait(&_cond_cus, &_mutex);
            }
            *data = _queue.front();
            _queue.pop();
            pthread_cond_signal(&_cond_pro);
            pthread_mutex_unlock(&_mutex);
        }
};

class Linux多线程_(线程池,读者写者,自旋锁)

Linux---多线程线程池

Linux---多线程线程池

Linux---多线程线程池

LINUX多线程(线程池,单例模式,线程安全,读者写者模型)

Linux下C/C++ 手写一个线程池-