Linux线程安全

Posted 小倪同学 -_-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux线程安全相关的知识,希望对你有一定的参考价值。

文章目录

Linux线程互斥

进程线程间的互斥相关背景概念

  • 临界资源: 多线程执行流共享的资源就叫做临界资源
  • 临界区: 每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥: 任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
  • 原子性: 不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

下面模拟一个抢票系统,将总票数设计为全局变量,同时创建五个线程,让其抢票,当票被抢完后线程退出。

#include<stdio.h>
#include<unistd.h>
#include<pthread.h>

// 模拟抢票逻辑
int tickets=1000;// 临界资源

// tickets--,抢到了一张票
void* Route(void* args)

        while(1)
        		// 临界区
                if(tickets>0)
                        usleep(10000);// usleep来模拟抢票时间                        
                        printf("0x%x: get a ticket : %d\\n",pthread_self(),tickets);
                        tickets--;
                
                else   
                        break;
                
        
        printf("0x%x:quit,tickets: %d\\n",(int)pthread_self(),tickets);
        
        return NULL;


int main()

#define NUM 5
        pthread_t nums[NUM];
        for(int i=0;i<NUM;i++)
                pthread_create(nums+i,NULL,Route,NULL);
        
        for(int i=0;i<NUM;i++)
                pthread_join(nums[i],NULL);
        
        return 0;
 

运行后我们发现竟然出现了负数


代码中记录剩余票数的变量tickets就是临界资源,因为它被多个执行流同时访问,而判断tickets是否大于0、打印剩余票数以及–tickets这些代码就是临界区,因为这些代码对临界资源进行了访问。

为什么票数会出现负数的情况呢?

当if语句判断条件为真以后,代码可以并发的切换到其他线程。在一个线程进入休眠状态(unsleep)时可能会有其他线程进入该代码段。还有一点就是 - - tickets操作不是原子操作。

为什么–ticket不是原子操作?

我们对一个变量进行–,我们实际需要进行以下三个步骤:

  1. 将共享变量tickets从内存加载到CPU相关寄存器中。
  2. 在CPU内存中执行 - - (或++)操作。
  3. 将递减/递增完毕的值写会内存。


当执行- - 操作时,可能thread1刚把tickets的值读进CPU进行 - - 操作即刚完成步骤二,就被切走了,相当于从CPU上剥离下来,此时数据999被保存在上下文中。

与此同时hread2被调度了,由于thread1只进行了前两步操作,因此thread2此时看到tickets的值还是1000,而系统给thread2的时间片可能较多,导致thread2一次性执行了100次–才被切走,最终tickets由1000减到了900。

此时系统再把thread1恢复上来,执行最后一步操作,将数据999写回内存。剩余票数从900变为了1000。

互斥量mutex

  • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互.
  • 当多个线程并发的操作共享变量,可能会带来一些问题。

要解决操作共享变量带来的问题,需要做到三点:

  1. 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  2. 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
  3. 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。

互斥量的接口

初始化互斥量有两种方法:

方法1,静态分配:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER

方法2,动态分配:

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);

参数:

  • mutex:需要初始化的互斥量。
  • attr:初始化互斥量的属性,一般设置为NULL即可。

返回值: 互斥量初始化成功返回0,失败返回错误码。

销毁互斥量

函数

int pthread_mutex_destroy(pthread_mutex_t *mutex);

参数: mutex:需要销毁的互斥量。
返回值: 互斥量销毁成功返回0,失败返回错误码。

注意点:

  • 使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要销毁。
  • 不要销毁一个已经加锁的互斥量。
  • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁。

互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

返回值: 成功返回0,失败返回错误号

调用pthread_mutex_lock时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_mutex_lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。

上文模拟抢票的代码可以通过加锁解决总票数出现负数的情况

#include<stdio.h>
#include<unistd.h>
#include<pthread.h>

// 模拟抢票逻辑
int tickets=1000;// 临界资源

pthread_mutex_t lock; // lock需要被所有线程先看到,本质也是一种临界资源,只需要保证lock,unlock是原子性

// tickets--,抢到了一张票
void* Route(void* args)

        while(1)
        		pthread_mutex_lock(&lock);// 下面代码,只能有我一个人运行
        		// 临界区
                if(tickets>0)
                        usleep(10000);// usleep来模拟抢票时间                        
                        printf("0x%x: get a ticket : %d\\n",pthread_self(),tickets);
                        tickets--;
                        pthread_mutex_unlock(&lock);
                
                else   
		                pthread_mutex_unlock(&lock);
                        break;
                
        
        printf("0x%x:quit,tickets: %d\\n",(int)pthread_self(),tickets);
        
        return NULL;


int main()

		pthread_mutex_init(&lock,NULL);// 初始化锁
#define NUM 5
        pthread_t nums[NUM];
        for(int i=0;i<NUM;i++)
                pthread_create(nums+i,NULL,Route,NULL);
        
        for(int i=0;i<NUM;i++)
                pthread_join(nums[i],NULL);
        
        pthread_mutex_destroy(&lock);// 释放锁
        return 0;
 

互斥量实现原理探究

经过上面的例子,大家已经意识到单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题

为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的总线周期也有先后,一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。

下面是lock和unlock的伪代码:

可重入VS线程安全

概念

  • 线程安全: 多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
  • 重入: 同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

注意: 线程安全讨论的是线程执行代码时是否安全,重入讨论的是函数被重入进入。

常见的线程不安全的情况

  • 不保护共享变量的函数
  • 函数状态随着被调用,状态发生变化的函数
  • 返回指向静态变量指针的函数
  • 调用线程不安全函数的函数。

常见的线程安全的情况

  • 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的。
  • 类或者接口对于线程来说都是原子操作。
  • 多个线程之间的切换不会导致该接口的执行结果存在二义性。

常见的不可重入的情况

  • 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的。
  • 调用了标准I/O库函数,标准I/O可以的很多实现都是以不可重入的方式使用全局数据结构。
  • 可重入函数体内使用了静态的数据结构。

常见的可重入的情况

  • 不使用全局变量或静态变量。
  • 不使用malloc或者new开辟出的空间。
  • 不调用不可重入函数。
  • 不返回静态或全局数据,所有数据都由函数的调用者提供。
  • 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据。

可重入与线程安全联系

  • 函数是可重入的,那就是线程安全的。
  • 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。
  • 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。

可重入与线程安全区别

  • 可重入函数是线程安全函数的一种。
  • 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
  • 如果对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数的锁还未释放则会产生死锁,因此是不可重入的。

死锁

概念

死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态。

死锁四个必要条件

  • 互斥条件:一个资源每次只能被一个执行流使用
  • 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
  • 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
  • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

避免死锁

  • 破坏死锁的四个必要条件
  • 加锁顺序一致
  • 避免锁未释放的场景
  • 资源一次性分配

Linux线程同步

当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。

同步概念与竞态条件

同步: 在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。
竞态条件: 因为时序问题,而导致程序异常,我们称之为竞态条件。

条件变量函数

初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

参数:

  • cond:要初始化的条件变
  • attr:初始化条件变量的属性,一般设置为NULL即可。

返回值: 条件变量初始化成功返回0,失败返回错误码。

销毁

int pthread_cond_destroy(pthread_cond_t *cond)

参数: cond:需要销毁的条件变量。
返回值: 条件变量销毁成功返回0,失败返回错误码。

等待

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);

参数:

  • cond:要在这个条件变量上等待
  • mutex:互斥量,后面详细解释

返回值: 函数调用成功返回0,失败返回错误码。

唤醒

int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
  • pthread_cond_signal函数用于唤醒等待队列中首个线程。
  • pthread_cond_broadcast函数用于唤醒等待队列中的全部线程。

参数: cond:在cond条件变量下等待的线程。
返回值: 函数调用成功返回0,失败返回错误码。

使用

让主线程创建三个新线程,用第三个线程控制前两个线程。利用条件变量让前两个线程循环打印。

#include<stdio.h>
#include<pthread.h>
#include<unistd.h>

pthread_cond_t cond;
pthread_mutex_t mutex;

void* r1(void* args)
        while(1)
                pthread_cond_wait(&cond,&mutex);
                printf("%s\\n",(char*)args);
        
        return NULL;


// 用线程2控制线程1
void* r2(void* args)
        while(1)
                pthread_cond_signal(&cond);
                sleep(1);
        


int main()

        pthread_cond_init(&cond,NULL);
        pthread_mutex_init(&mutex,NULL);

        pthread_t tid1,tid2,tid3;
        pthread_create(&tid1,NULL,r1,(void*)"我是线程A");
        pthread_create(&tid2,NULL,r1,(void*)"我是线程B");
        pthread_create(&tid3,NULL,r2,(void*)"我是控制");

        pthread_join(tid1,NULL);
        pthread_join(tid2,NULL);
        pthread_join(tid3,NULL);
        return 0;

线程一和线程二循环打印

我们也可以将代码中的pthread_cond_signal函数改为pthread_cond_broadcast,让线程每次同时唤醒线程一和线程二。

生产者消费者模型

为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型的特点

生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。
  • 一个交易场所: 通常指的是内存中的一段缓冲区。

生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

用C++的queue模拟阻塞队列的生产消费模型

测试文件cp.cc

#include"block_queue.hpp"
#include<unistd.h>

#define NUM 10

void* consumer(void* c)

        BlockQueue<int> *bq=(BlockQueue<int>*)c;

        int out=0;
        while(true)
                bq->Get(&out);

                std::cout<<"consumer: "<<out<<std::endl;
        
        return nullptr;


void* producter(void* p)

        BlockQueue<int> *bq=(BlockQueue<int>*)p;

        int in=1;
        while(true)
                sleep(1);
                bq->Put(in);
                in++;
                in%=100;
                std::cout<<"producter: "<<in<<std::endl;
        
        return nullptr;


int main()

        BlockQueue<int>* bq=new BlockQueue<int>(NUM);
        pthread_t c,p;
        pthread_create(&c,nullptr,consumer,bq);
        pthread_create(&p,nullptr,producter,bq);

        pthread_join(p,nullptr);
        pthread_join(c,nullptr);

        delete bq;

        return 0;

头文件block_queue.hpp

#pragma once

#include<iostream>
#include<queue>
#include<pthread.h>

template<class T>
class BlockQueue
        private:
                int cap;
                std::queue<T> bq;
                pthread_mutex_t lock;
                pthread_cond_t have_space;// 生产者
                pthread_cond_t have_data;// 消费者
                bool IsFull()
                
                        return bq.size()==cap;
                
                bool IsEmpty()
                
                        return bq.size()==0;
                
        public:
                BlockQueue(int _cap):cap(_cap)
                
                        pthread_cond_init(&have_space,nullptr);
                        pthread_cond_init(&have_data,nullptr);
                        pthread_mutex_init(&lock,nullptr);
                
                void Put(const T &in)// const type&: 输入型参数
                
                        // 生产过程
                        pthread_mutex_lock(&lock);
                        while(IsFull())
                                // 生产者不应该再声产,应该等待
                                pthread_cond_wait(&have_space,&lock);// wait最后带一个锁,当程序挂起时,自动释放锁,当唤醒时,自动获取锁
                        

                        bq.push(in);
                        if(bq.size()>=cap/2)
                                pthread_cond_signal(&have_data);
                        
                        pthread_mutex_unlock(&lock);
                
                void Get(T* out)// Type*:输出型参数,type & :输入输出型参数
                
                        // 消费过程
                        pthread_mutex_lock(&lock);
                        while(IsEmpty())
                                pthread_cond_wait(&have_data,&lock);
                        
                        *out=bq.front();
                        bq.pop();
                        /*
                        if(!IsEmpty())
                                *out=bq.front();
                                bq.pop();
                        
                        else
                                std::cout<<"queue is empty! get() should block<<std::endl;
                        */
                        pthread_mutex_unlock(&lock);
                        pthread_cond_signal(&have_space);
                
                ~BlockQueue()
                
                        pthread_mutex_destroy(&lock);
                        pthread_cond_destroy(&have_space);
                        pthread_cond_destroy(&have_data);
                
;

运行结果

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

什么是信号量

信号量本质是描述临界资源数目的计数器。

什么时候使用信号量

当我们的临界资源是可以看作多份的情况下,是可以做到多个线程同时访问的,只要访问的区域不是同一个即可。

以上是关于Linux线程安全的主要内容,如果未能解决你的问题,请参考以下文章

Linux 线程间的同步与互斥

线程安全

多线程环境,线程安全知识点Violatile和synchronized

Linux___线程互斥与同步

说说java中的多线程

Linux并发与同步专题