线程资源同步——互斥量和条件变量

Posted vector6_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程资源同步——互斥量和条件变量相关的知识,希望对你有一定的参考价值。

Linux 线程资源同步对象——互斥量和条件变量

Linux 互斥体

Linux 互斥体的用法和 Windows 的临界区对象用法很相似,一般也是通过限制多个线程同时执行某段代码来达到保护资源的目的。和接下来要介绍的信号量、条件变量一样,Linux 互斥体都实现在 NPTL (Native POSIX Thread Library)。在 NPTL 中我们使用数据结构 pthread_mutex_t 来表示一个互斥体对象(定义于 pthread.h 头文件中)。互斥体对象我们可以使用两种方式来初始化:

  • 使用 PTHREAD_MUTEX_INITIALIZER 直接给互斥体变量赋值
    示例代码如下:
    #include <pthread.h>
    

pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;


- 使用 pthread_mutex_init 函数初始化

如果互斥量是动态分配的或者需要给互斥量设置属性,则需要使用 `pthread_mutex_init` 函数来初始化互斥体,这个函数的签名如下:
```cpp
int pthread_mutex_init(pthread_mutex_t* restrict mutex, 
                     const pthread_mutexattr_t* restrict attr);

参数 mutex 即我们需要初始化的 mutex 对象的指针,参数 attr 是需要设置的互斥体属性,通常情况下,我们使用默认属性可以将这个参数设置为 NULL,后面我们会详细介绍每一种属性的用法。 pthread_mutex_init 代码示例如下:

#include <pthread.h>

pthread_mutex_t mymutex;
pthread_mutex_init(&mutex, NULL);

当我们不再需要一个互斥体对象时,可以使用 pthread_mutex_destroy 函数来销毁它, pthread_mutex_destroy 函数的签名如下:

int pthread_mutex_destroy(pthread_mutex_t* mutex);

参数 mutex 即我们需要销毁的互斥体对象,如果函数执行成功会返回 0,如果执行失败会返回一个错误码表面出错原因。这里我们需要注意两点:

  1. 使用 PTHREAD_MUTEX_INITIALIZER 初始化的互斥量无须销毁;

  2. 不要去销毁一个已经加锁或正在被条件变量使用的互斥体对象,当互斥量处于已加锁的状态或者正在和条件变量配合使用时,调用 pthread_mutex_destroy 函数会返回 EBUSY 错误。

在实际开发中,如果我们遵循正确的使用 mutex 的规范,如创建 mutex 对象后再对其加锁,加锁后才对其进行解锁操作,解锁后才做销毁操作,那么编码时一般不用考虑 pthread_mutex_init/pthread_mutex_destroy/pthread_mutex_lock/pthread_mutex_unlock 等函数的返回值。

对于互斥体的加锁和解锁操作我们一般使用以下三个函数:

int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_trylock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);

参数 mutex 设置为我们需要加锁和解锁的互斥体对象,上述函数执行成功返回 0,如果执行失败则返回一个错误码表示具体的出错原因。具体错误码,随互斥体对象的属性类型的不同而不同。

设置互斥体对象的属性需要创建一个 pthread_mutexattr_t 类型的对象,和互斥体对象一样,需要使用 pthread_mutexattr_init 函数初始化之,当不需要这个属性对象时,记得使用 pthread_mutexattr_destroy 去销毁它,这两个函数的签名如下:

int pthread_mutexattr_init(pthread_mutexattr_t* attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t* attr);

使用 pthread_mutexattr_settype/pthread_mutexattr_gettype 设置或获取你想要的属性类型:

int pthread_mutexattr_settype(pthread_mutexattr_t* attr, int type);
int pthread_mutexattr_gettype(const pthread_mutexattr_t* restrict attr, int* restrict type);

属性类型一般有如下取值:

  • PTHREAD_MUTEX_NORMAL(普通锁)
    这是互斥体对象的默认属性(即上文中介绍的 pthread_mutex_init 第二个函数设置为 NULL)。当一个线程对一个普通锁加锁以后,其他线程会阻塞在 pthread_mutex_lock 调用处, 直到对互斥体加锁的线程释放了锁,我们来用一段实例代码来验证一下:
#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>

pthread_mutex_t mymutex;
int             resourceNo = 0;

void* worker_thread(void* param)
{
    pthread_t threadID = pthread_self();

    printf("thread start, ThreadID: %d\\n", threadID);

    while (true)
    {
        pthread_mutex_lock(&mymutex);

        printf("Mutex lock, resourceNo: %d, ThreadID: %d\\n", resourceNo, threadID);
        resourceNo++;

        printf("Mutex unlock, resourceNo: %d, ThreadID: %d\\n", resourceNo, threadID);

        pthread_mutex_unlock(&mymutex);

        //休眠1秒
        sleep(1);
    }

    return NULL;
}
int main()
{
    pthread_mutexattr_t mutex_attr;
    pthread_mutexattr_init(&mutex_attr);
    pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_NORMAL);
    pthread_mutex_init(&mymutex, &mutex_attr);

    //创建5个工作线程
    pthread_t threadID[5];

    for (int i = 0; i < 5; ++i)
    {
        pthread_create(&threadID[i], NULL, worker_thread, NULL);
    }

    for (int i = 0; i < 5; ++i)
    {
        pthread_join(threadID[i], NULL);
    }

    pthread_mutex_destroy(&mymutex);
    pthread_mutexattr_destroy(&mutex_attr);

    return 0;
}

上述代码创建了五个工作线程,由于使用了互斥体保护资源 resourceNo,所以每次在 pthread_mutex_lock 与 pthread_mutex_unlock 之间的输出都是连续的,一个线程必须完成了这个工作,其他线程才有机会获得执行这段代码的机会,当一个线程拿到锁后,其他线程会阻塞在 pthread_mutex_lock 处。

一个线程如果对一个已经加锁的普通锁再次使用 pthread_mutex_lock 加锁,程序会阻塞在第二次调用 pthread_mutex_lock 代码处。测试代码如下:

  #include <pthread.h>
  #include <stdio.h>
  #include <errno.h>
  #include <unistd.h>

  int main()
  {
      pthread_mutex_t mymutex;
      pthread_mutexattr_t mutex_attr;
      pthread_mutexattr_init(&mutex_attr);
      pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_NORMAL);
      pthread_mutex_init(&mymutex, &mutex_attr);

      int ret = pthread_mutex_lock(&mymutex);
      printf("ret = %d\\n", ret);

      ret = pthread_mutex_lock(&mymutex);
      printf("ret = %d\\n", ret);



      pthread_mutex_destroy(&mymutex);
      pthread_mutexattr_destroy(&mutex_attr);

      return 0;
  }

在这种类型的情况,pthread_mutex_trylock 函数如果拿不到锁,不会阻塞,函数会立即返回,并返回 EBUSY 错误码。

  • PTHREAD_MUTEX_ERRORCHECK(检错锁)

    如果一个线程使用 pthread_mutex_lock 对已经加锁的互斥体对象再次加锁,pthread_mutex_lock 会返回 EDEADLK。
    我们验证一下线程对自己已经加锁的互斥体对象再次加锁是什么行为?

#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>

int main()
{
    pthread_mutex_t mymutex;
    pthread_mutexattr_t mutex_attr;
    pthread_mutexattr_init(&mutex_attr);
    pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK);
    pthread_mutex_init(&mymutex, &mutex_attr);

    int ret = pthread_mutex_lock(&mymutex);
    printf("ret = %d\\n", ret);

    ret = pthread_mutex_lock(&mymutex);
    printf("ret = %d\\n", ret);
    if (ret == EDEADLK)
    {
        printf("EDEADLK\\n");
    }

    pthread_mutex_destroy(&mymutex);
    pthread_mutexattr_destroy(&mutex_attr);

    return 0;
}

编译并运行程序,程序输出结果确实如上面所说:

[root@localhost testmultithread]# g++ -g -o test11 test.cpp -lpthread
[root@localhost testmultithread]# ./test11
ret = 0
ret = 35
EDEADLK

再来看一下,一个线程加锁,其他线程再次加锁的效果:

#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>

pthread_mutex_t mymutex;

void* worker_thread(void* param)
{
    pthread_t threadID = pthread_self();

    printf("thread start, ThreadID: %d\\n", threadID);

    while (true)
    {
        int ret = pthread_mutex_lock(&mymutex);
        if (ret == EDEADLK)
        {
            printf("EDEADLK, ThreadID: %d\\n", threadID);
        } 
        else
            printf("ret = %d, ThreadID: %d\\n", ret, threadID);

        //休眠1秒
        sleep(1);
    }

    return NULL;
}

int main()
{
    pthread_mutexattr_t mutex_attr;
    pthread_mutexattr_init(&mutex_attr);
    pthread_mutexattr_settype(&mutex_attr, PTHREAD_MUTEX_ERRORCHECK);
    pthread_mutex_init(&mymutex, &mutex_attr);

    int ret = pthread_mutex_lock(&mymutex);
    printf("ret = %d\\n", ret);

    //创建5个工作线程
    pthread_t threadID[5];
    for (int i = 0; i < 5; ++i)
    {
        pthread_create(&threadID[i], NULL, worker_thread, NULL);
    }

    for (int i = 0; i < 5; ++i)
    {
        pthread_join(threadID[i], NULL);
    }

    pthread_mutex_destroy(&mymutex);
    pthread_mutexattr_destroy(&mutex_attr);

    return 0;
}

通过实验,如果互斥体的属性是 PTHREAD_MUTEX_ERRORCHECK,当前线程重复调用 pthread_mutex_lock 会直接返回 EDEADLOCK,其他线程如果对这个互斥体再次调用 pthread_mutex_lock 会阻塞在该函数的调用处。

  • PTHREAD_MUTEX_RECURSIVE(嵌套锁)

    该属性允许同一个线程对其持有的互斥体重复加锁,每次成功调用 pthread_mutex_lock 一次,该互斥体对象的锁引用计数就会增加一次,相反,每次成功调用 pthread_mutex_unlock 一次,锁引用计数就会减少一次,当锁引用计数值为 0 时允许其他线程获得该锁,否则其他线程调用 pthread_mutex_lock 时尝试获取锁时,会阻塞在那里。

总结下 Linux 下的互斥体对象的使用要点:

  • 虽然在上文演示了同一个线程对一个互斥体对象反复进行加锁,但实际开发中,我们需要用到这种场景的情形非常少。
  • 与 Windows 的临界区对象一样,一些有很多出口的逻辑中,为了避免因忘记调用 pthread_mutex_lock 出现死锁或者在逻辑出口处有大量解锁的重复代码出现,建议使用 RAII 技术将互斥体对象封装起来

Linux 条件变量

首先我们先来讨论一下为什么会存在条件变量这样一种机制。

为什么需要使用条件变量?

实际应用中,我们常常会有类似如下需求:

//以下是伪码,m的类型是pthread_mutex_t,并且已经初始化过了
int WaitForTrue()
{
    pthread_mutex_lock(&m);
    while (condition is false)        //条件不满足
    {
        pthread_mutex_unlock(&m);    //解锁等待其他线程改变condition
        sleep(n);                    //睡眠n秒
        //n秒后再次加锁验证条件是否满足
        pthread_mutex_lock(&m);
    }

    return 1;
}

这段逻辑的用途是我们需要反复判断一个多线程共享条件是否满足,一直到该条件满足为止,由于该条件被多个线程操作因此每次判断之前我们都需要进行加锁操作,判断完毕后需要进行解锁操作。但是上述逻辑存在严重的效率问题,假设我们解锁离开临界区后,此时由于其他线程修改了条件导致条件满足了,此时程序仍然需要睡眠 n 秒后才能得到反馈。因此我们需要这样一种机制:

某个线程 A 在条件不满足的情况下,主动让出互斥量,让其他线程去折腾,线程在此处等待,等待条件的满足;一旦条件满足,线程就可以被立刻唤醒。线程 A 之所以可以安心等待,依赖的是其他线程的协作,它确信会有一个线程在发现条件满足以后,将向它发送信号,并且让出互斥量。如果其他线程不配合(不发信号,不让出互斥量),这个主动让出互斥量并等待事件发生的线程 A 就真的要等到花儿都谢了。

这个例子解释了为什么需要条件等待,但是条件等待还不是条件变量的全部功能。

条件变量为什么要与互斥体对象结合

很多第一次学习 Linux 条件变量的读者会觉得困惑:为什么条件变量一定要与一个互斥体对象结合使用?我们来看下,假设条件变量不与互斥体对象结合的效果。

1 //m的类型是pthread_mutex_t,并且已经初始化过了,cv是条件变量
2 pthread_mutex_lock(&m)
3 while(condition_is_false)
4 {
5     pthread_mutex_unlock(&m);
6     //解锁之后,等待之前,可能条件已经满足,信号已经发出,但是该信号可能会被错过
7     cond_wait(&cv);
8     pthread_mutex_lock(&m);
9 }

上述代码中,假设线程 A 执行完第 5 行代码 pthread_mutex_unlock(&m); 后 CPU 时间片被剥夺,此时另外一个线程 B 获得该互斥体对象 m,然后发送条件信号,等线程 A 重新获得时间片后,由于该信号已经被错过了,这样可能会导致线程 A 在 第 7 行 cond_wait(&cv); 无限阻塞下去。

造成这个问题的根源是释放互斥体对象与条件变量等待唤醒不是原子操作,即解锁和等待这两个步骤必须是同一个原子性的操作以确保 cond_wait 唤醒之前不会有其他线程获得这个互斥体对象。

条件变量的使用

接下來正式介绍一下条件变量相关的系统 API 的使用方法。

条件变量的初始化和销毁可以使用如下 API 函数:

int pthread_cond_init(pthread_cond_t* cond, const pthread_condattr_t* attr);
int pthread_cond_destroy(pthread_cond_t* cond);

在 Linux 系统中 pthread_cond_t 即是条件变量的类型,当然和前面介绍的互斥体一样,你也可以使用如下方式去初始化一个条件变量:

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

等待条件变量的满足可以使用如下 API 函数:

int pthread_cond_wait(pthread_cond_t* restrict cond, pthread_mutex_t* restrict mutex);
int pthread_cond_timedwait(pthread_cond_t* restrict cond, pthread_mutex_t* restrict mutex, const struct timespec* restrict abstime);

一般情况下如果条件变量代表的条件不会满足,调用 pthread_cond_wait 的线程会一直等待下去;pthread_cond_timedwaitpthread_cond_wait 非阻塞版本,它会在指定时间内等待条件满足,超过参数 abstime 设置的时间后 pthread_cond_timedwait 函数会立即返回。

注意:对于参数 abstime,正如其名字暗示的,这是一个 absolute time(绝对时间),也就是说,如果你打算让函数等待 5 秒,那么你应该先得到当前系统的时间,然后加上 5 秒计算出最终的时间作为参数 abstime 的值。

因调用 pthread_cond_wait 等待的线程可以被以下 API 函数唤醒:

int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);

pthread_cond_signal 一次唤醒一个线程,如果有多个线程调用 pthread_cond_wait 等待,具体哪个线程被唤醒是不确定的(可以认为是随机的);pthread_cond_broadcast 可以同时唤醒多个调用 pthread_cond_wait 等待的线程。前者相当于发送一次条件通知,后者广播一次条件通知。成功等待到条件信号,pthread_cond_signalpthread_cond_broadcast 返回 0,反之返回非0值,具体错误原因可以通过错误码 errno 获得。

条件变量最关键的一个地方就是需要清楚地记得 pthread_cond_wait 在条件满足与不满足时的两种行为,这是难点也是重点:

  • 当 pthread_cond_wait 函数阻塞时,它会释放其绑定的互斥体,并阻塞线程,因此在调用该函数前应该对互斥体有个加锁操作)。

  • 当收到条件信号时, pthread_cond_wait 会返回并对其绑定的互斥体进行加锁,因此在其下面一定有个对互斥体进行解锁的操作。

条件变量的虚假唤醒

上面将互斥量和条件变量配合使用的示例代码中有个很有意思的地方,就是用了 while 语句,醒来 之后要再次判断条件是否满足。

while (tasks.empty())
{                
    pthread_cond_wait(&mycv, &mymutex);
}

为什么不写成:

if (tasks.empty())
{                
    pthread_cond_wait(&mycv, &mymutex);
}

答案是不得不如此。因为可能某次操作系统唤醒 pthread_cond_waittasks.empty() 可能仍然为 true,言下之意就是操作系统可能会在一些情况下唤醒条件变量,即使没有其他线程向条件变量发送信号,等待此条件 变量的线程也有可能会醒来。我们将条件变量的这种行为称之为 虚假唤醒 (spurious wakeup)。因此将条件(判断 tasks.empty() 为true)放在一个 while 循环中意味着光唤醒条件变量不行,还必须条件满足程序才能继续执行正常的逻辑

这看起来这像是个bug,但它在 Linux 系统中是实实在在存在的。为什么会存在虚假唤醒呢?一个原因是 pthread_cond_wait 是 futex 系统调用,属于阻塞型的系统调用,当系统调用被信号中断的时候,会返回 -1,并且把 errno 错误码置为EINTR。很多这种系统调用为了防止被信号中断都会重启系统调用(即再次调用一次这个函数),代码如下:

pid_t r_wait(int *stat_loc)
{
    int retval;
    //wait函数因为被信号中断导致调用失败会返回-1,错误码是EINTR  
    //注意:这里的while循环体是一条空语句
    while(((retval = wait(stat_loc)) == -1 && (errno == EINTR));

    return retval;
}

但是 pthread_cond_wait 用途有点不一样,假设 pthread_cond_wait 函数被信号中断了,在 pthread_cond_wait 返回之后,到重新调用之前,pthread_cond_signal 或 pthread_cond_broadcast 可能已经调用过。一旦错失,可能由于条件信号不再产生,再次调用 pthread_cond_wait 将导致程序无限制地等待下去。为了避免这种情况,宁可虚假唤醒,也不能再次调用pthread_cond_wait,以免陷入无穷的等待中。

除了上面的信号因素外,还存在以下情况:条件满足了发送信号,但等到调用 pthread_cond_wait 的线程得到 CPU 资源时,条件又再次不满足了。

好在无论是哪种情况,醒来之后再次测试条件是否满足就可以解决虚假等待的问题。这就是使用 while 循环来判断条件,而不是使用 if 语句的原因。

条件变量信号丢失问题

上文中,我们介绍了,如果一个条件变量信号条件产生时(调用 pthread_cond_signalpthread_cond_broadcast),没有相关的线程调用 pthread_cond_wait 捕获该信号,那么该信号条件就会永久性地丢失了,再次调用pthread_cond_wait 会导致永久性的阻塞。这种情况在设计那些条件变量信号条件只会产生一次的逻辑中尤其需要注意,例如假设现在某个程序有一批等待条件变量的线程,和一个只产生一次条件变量信号的线程。为了让你的等待条件变量的线程能正常运行不阻塞,你的逻辑中一定要确保等待的线程在产生条件变量信号的线程发送条件信号之前调用 pthread_cond_wait

以上是关于线程资源同步——互斥量和条件变量的主要内容,如果未能解决你的问题,请参考以下文章

互斥量和条件变量

生产者-消费者问题:介绍POSIX线程的互斥量和条件变量的使用

线程同步与互斥详解

多线程之线程同步(互斥锁信号量条件变量和读写锁​)

多线程之线程同步(互斥锁信号量条件变量和读写锁​)

多线程之线程同步(互斥锁信号量条件变量和读写锁​)