pthread为啥规定cond要和mutex一起使用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pthread为啥规定cond要和mutex一起使用相关的知识,希望对你有一定的参考价值。

条件变量是由互斥变量保护的,锁住互斥变量后才能计算条件。 参考技术A pthread_cond_wait总和一个互斥锁结合使用。在调用pthread_cond_wait前要先获取锁。pthread_cond_wait函数执行时先自动释放指定的锁,然后等待条件变量的变化。在函数调用返回之前,自动将指定的互斥量重新锁住。
int pthread_cond_signal(pthread_cond_t * cond);
pthread_cond_signal通过条件变量cond发送消息,若多个消息在等待,它只唤醒一个。pthread_cond_broadcast可以唤醒所有。调用pthread_cond_signal后要立刻释放互斥锁,因为pthread_cond_wait的最后一步是要将指定的互斥量重新锁住,如果pthread_cond_signal之后没有释放互斥锁,pthread_cond_wait仍然要阻塞。

无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()(或pthread_cond_timedwait(),下同)的竞争条件(Race Condition)。mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者适应锁 (PTHREAD_MUTEX_ADAPTIVE_NP),且在调用pthread_cond_wait()前必须由本线程加锁 (pthread_mutex_lock()),而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。在条件满足从而离开 pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。

激发条件有两种形式,pthread_cond_signal()激活一个等待该条件的线程,存在多个等待线程时按入队顺序激活其中一个;而pthread_cond_broadcast()则激活所有等待线程。
下面是另一处说明:给出了函数运行全过程。 为什么在唤醒线程后要重新mutex加锁?
了解 pthread_cond_wait() 的作用非常重要 -- 它是 POSIX 线程信号发送系统的核心,也是最难以理解的部分。

首先,让我们考虑以下情况:线程为查看已链接列表而锁定了互斥对象,然而该列表恰巧是空的。这一特定线程什么也干不了 -- 其设计意图是从列表中除去节点,但是现在却没有节点。因此,它只能:

锁定互斥对象时,线程将调用 pthread_cond_wait(&mycond,&mymutex)。pthread_cond_wait() 调用相当复杂,因此我们每次只执行它的一个操作。

pthread_cond_wait() 所做的第一件事就是同时对互斥对象解锁(于是其它线程可以修改已链接列表),并等待条件 mycond 发生(这样当 pthread_cond_wait() 接收到另一个线程的“信号”时,它将苏醒)。现在互斥对象已被解锁,其它线程可以访问和修改已链接列表,可能还会添加项。 【要求解锁并阻塞是一个原子操作】

此时,pthread_cond_wait() 调用还未返回。对互斥对象解锁会立即发生,但等待条件 mycond 通常是一个阻塞操作,这意味着线程将睡眠,在它苏醒之前不会消耗 CPU 周期。这正是我们期待发生的情况。线程将一直睡眠,直到特定条件发生,在这期间不会发生任何浪费 CPU 时间的繁忙查询。从线程的角度来看,它只是在等待 pthread_cond_wait() 调用返回。

现在继续说明,假设另一个线程(称作“2 号线程”)锁定了 mymutex 并对已链接列表添加了一项。在对互斥对象解锁之后,2 号线程会立即调用函数 pthread_cond_broadcast(&mycond)。此操作之后,2 号线程将使所有等待 mycond 条件变量的线程立即苏醒。这意味着第一个线程(仍处于 pthread_cond_wait() 调用中)现在将苏醒。

现在,看一下第一个线程发生了什么。您可能会认为在 2 号线程调用 pthread_cond_broadcast(&mymutex) 之后,1 号线程的 pthread_cond_wait() 会立即返回。不是那样!实际上,pthread_cond_wait() 将执行最后一个操作:重新锁定 mymutex。一旦 pthread_cond_wait() 锁定了互斥对象,那么它将返回并允许 1 号线程继续执行。那时,它可以马上检查列表,查看它所感兴趣的更改。

来看一个例子(你是否能理解呢?):

In Thread1:
pthread_mutex_lock(&m_mutex);
pthread_cond_wait(&m_cond,&m_mutex);
pthread_mutex_unlock(&m_mutex);

In Thread2:
pthread_mutex_lock(&m_mutex);
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_mutex);

pthread_cond_wait 和 pthread_mutex_lock 优先级?

【中文标题】pthread_cond_wait 和 pthread_mutex_lock 优先级?【英文标题】:pthread_cond_wait and pthread_mutex_lock priority? 【发布时间】:2017-11-08 23:04:14 【问题描述】:

我有多个读线程和一个写线程。如果我在其中一个读取线程上锁定互斥锁并从中发送广播,是否保证互斥锁将被等待 pthread_cond_wait() 的写线程锁定,或者是否有可能另一个正在等待 pthread_mutex_lock() 的读取线程将锁定互斥体?主要问题是 pthread_cond_wait() 是否优先于 pthread_mutex_lock()?

如果没有,我怎样才能实现互斥锁总是被 pthread_cond_broadcast() 上的写线程锁定?

例子

阅读线程:

pthread_mutex_lock(mutex);
pthread_cond_broadcast(cond);
pthread_mutex_unlock(mutex);

写线程:

pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);

【问题讨论】:

【参考方案1】:

让我们假设两个线程,读和写,在同一时刻到达pthread_mutex_lock。所以,要么写线程在pthread_mutex_lock调用上获取互斥锁,要么读线程。

如果是写线程,读线程将等待pthread_mutex_lock。写入,通过调用pthread_cond_wait 释放mutex 并阻塞cond。它是原子完成的。因此,当读取线程被授予mutex 时,我们可以确定读取线程等待cond。因此,cond 上的广播到达写入线程,它不再等待cond,但 - 仍在pthread_cond_wait 的范围内 - 尝试锁定 mutex(保持为读取线程)。在广播cond 后,读线程释放mutex 并进入写线程。所以写线程最终从pthread_cond_wait退出,mutex被锁定。记得稍后解锁。

如果是读线程,写线程将等待pthread_mutex_lock,读线程将在cond上广播一个信号,然后释放mutex。之后,写线程在pthread_mutex_lock 上获取mutex 并立即在其中释放pthread_cond_wait 等待cond(请注意,先前的cond 广播对当前的pthread_cond_wait 没有影响)。在读取线程的下一次迭代中,它在mutex 上获得锁定,在cond 上发送广播并解锁mutex。这意味着写线程在cond 上向前移动并在mutex 上获得锁。

它是否回答了您关于优先级的问题?


评论后更新。

假设我们有一个线程(我们将其命名为A 以供将来参考)持有mutex 上的锁,并且很少有其他线程试图获取相同的锁。一旦第一个线程释放了锁,就无法预测哪个线程会获得锁。此外,如果A 线程有一个循环并试图重新获取mutex 上的锁,则它有可能被授予此锁而其他线程将继续等待。添加pthread_cond_wait 不会更改授予锁定范围内的任何内容。

让我引用 POSIX 规范的片段(参见 https://***.com/a/9625267/2989411 以供参考):

这些函数以原子方式释放互斥体并导致调用线程阻塞条件变量 cond; atomically 这里的意思是“原子地相对于另一个线程访问互斥锁,然后是条件变量”。也就是说,如果另一个线程能够在即将阻塞的线程释放后获得互斥锁,那么该线程中对 pthread_cond_broadcast() 或 pthread_cond_signal() 的后续调用将表现得就好像它是在 about-- to-block 线程已阻塞。

这只是标准对操作顺序的保证。向其他线程授予锁的顺序是相当不可预测的,它会根据一些非常细微的时序波动而变化。

只有互斥量相关的代码,请用下面的代码玩一下:

#define _GNU_SOURCE
#include <pthread.h>

#include <stdio.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *th(void *arg) 
    int i;
    char *s = arg;
    for (i = 0; i < 10; ++i) 
        pthread_mutex_lock(&mutex);
        printf("%s %d\n", s, i);
        //sleep(1);
        pthread_mutex_unlock(&mutex);
#if 0
        pthread_yield();
#endif
    
    return NULL;


int main() 
    int i;
    for (i = 0; i < 10; ++i) 
        pthread_t t1, t2, t3;
        printf("================================\n");
        pthread_create(&t1, NULL, th, "t1");
        pthread_create(&t2, NULL, th, "     t2");
        pthread_create(&t3, NULL, th, "            t3");
        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
        pthread_join(t3, NULL);
    
    return 0;

在一台机器(单 CPU)上,它总是显示从 t3 开始的整个循环,然后是 t2,最后是 t1。在另一个(2 个核心)上,线程的顺序更加随机,但几乎总是在将互斥锁授予其他线程之前显示每个线程的整个循环。很少有这样的情况:

t1 8
t1 9
            t3 0
     t2 0
     t2 1
     [removed other t2 output]
     t2 8
     t2 9
            t3 1
            t3 2

通过将#if 0 替换为#if 1 来启用pthread_yield,并观察结果并检查输出。对我来说,它的工作方式是两个线程交错显示它们的输出,然后第三个线程终于有机会工作了。添加另一个或更多线程。玩睡眠等。它确认了随机行为。

如果您想尝试一下,请编译并运行以下代码。这是单个生产者 - 多个消费者模型的示例。它可以使用两个参数运行:第一个是消费者线程的数量,第二个是生产数据系列的长度。如果没有给定参数,则有一个消费者线程和 120 个要处理的项目。我还建议在标记为 /* play here */ 的地方使用 sleep/usleep:更改参数的值,完全删除 sleep,在适当的时候将其移动到关键部分或替换为 pthread_yield 并观察行为变化。

#define _GNU_SOURCE
#include <assert.h>
#include <limits.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

struct data_t 
    int seq;
    int payload;
    struct data_t *next;
;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct data_t *first = NULL, *last = NULL;
int in_progress = 1;
int num_data = 120;

void push(int seq, int payload) 
    struct data_t *e;
    e = malloc(sizeof(struct data_t));
    e->seq = seq;
    e->payload = payload;
    e->next = NULL;
    if (last == NULL) 
        assert(first == NULL);
        first = last = e;
     else 
        last->next = e;
        last = e;
    


struct data_t pop() 
    struct data_t res = 0;
    if (first == NULL) 
        res.seq = -1;
     else 
        res.seq = first->seq;
        res.payload = first->payload;
        first = first->next;
        if (first == NULL) 
            last = NULL;
        
    
    return res;


void *producer(void *arg __attribute__((unused))) 
    int i;
    printf("producer created\n");
    for (i = 0; i < num_data; ++i) 
        int val;
        sleep(1); /* play here */
        pthread_mutex_lock(&mutex);
        val = rand() / (INT_MAX / 1000);
        push(i, val);
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond);
        printf("prod %3d %3d signaled\n", i, val);
    
    in_progress = 0;
    printf("prod end\n");
    pthread_cond_broadcast(&cond);
    printf("prod end signaled\n");
    return NULL;


void *consumer(void *arg) 
    char c_id[1024];
    int t_id = *(int *)arg;
    sprintf(c_id, "%*s c %02d", t_id % 10, "", t_id);
    printf("%s created\n", c_id);
    while (1) 
        struct data_t item;
        pthread_mutex_lock(&mutex);
        item = pop();
        while (item.seq == -1 && in_progress) 
            printf("%s waits for data\n", c_id);
            pthread_cond_wait(&cond, &mutex);
            printf("%s got signal\n", c_id);
            item = pop();
        
        if (!in_progress && item.seq == -1) 
            printf("%s detected end of data.\n", c_id);
            pthread_mutex_unlock(&mutex);
            break;
        
        pthread_mutex_unlock(&mutex);
        printf("%s processing %3d %3d\n", c_id, item.seq, item.payload);
        sleep(item.payload % 10); /* play here */
        printf("%s processed  %3d %3d\n", c_id, item.seq, item.payload);
    
    printf("%s end\n", c_id);
    return NULL;


int main(int argc, char *argv[]) 
    int num_cons = 1;
    pthread_t t_prod;
    pthread_t *t_cons;
    int i;
    int *nums;
    if (argc > 1) 
        num_cons = atoi(argv[1]);
        if (num_cons == 0) 
            num_cons = 1;
        
        if (num_cons > 99) 
            num_cons = 99;
        
    
    if (argc > 2) 
        num_data = atoi(argv[2]);
        if (num_data < 10) 
            num_data = 10;
        
        if (num_data > 600) 
            num_data = 600;
        
    

    printf("Spawning %d consumer%s for %d items.\n", num_cons, num_cons == 1 ? "" : "s", num_data);
    t_cons = malloc(sizeof(pthread_t) * num_cons);
    nums = malloc(sizeof(int) * num_cons);
    if (!t_cons || !nums) 
        printf("Out of memory!\n");
        exit(1);
    
    srand(time(NULL));
    pthread_create(&t_prod, NULL, producer, NULL);

    for (i = 0; i < num_cons; ++i) 
        nums[i] = i + 1;
        usleep(100000); /* play here */
        pthread_create(t_cons + i, NULL, consumer, nums + i);
    

    pthread_join(t_prod, NULL);

    for (i = 0; i < num_cons; ++i) 
        pthread_join(t_cons[i], NULL);
    
    free(nums);
    free(t_cons);

    return 0;

我希望我已经消除了您的疑虑,并为您提供了一些代码来试验并获得对 pthread 行为的信心。

【讨论】:

感谢您的回答。无论如何,我不确定它是否绝对回答了我的问题。如果一个写线程正在等待 pthread_cond_wait() 并且我们有两个读线程会发生什么。一个读取线程设法锁定互斥锁并发送广播。释放此互斥体后,是否保证写线程在等待广播时会锁定互斥体(另一个读线程同时等待锁定互斥体)? @testtest1235 我用一些额外的信息和例子扩展了我的答案。

以上是关于pthread为啥规定cond要和mutex一起使用的主要内容,如果未能解决你的问题,请参考以下文章

pthread_cond_wait 和 pthread_mutex_lock 优先级?

关于pthread_cond_wait()的疑问

Linux中 条件变量为啥要用互斥锁来保护?

试图理解 pthread_cond_lock 和 pthread_cond_signal

pthread_mutex_t 和 pthread_cond_t 配合使用的简要分析

多线程队列模型