试图理解 pthread_cond_lock 和 pthread_cond_signal

Posted

技术标签:

【中文标题】试图理解 pthread_cond_lock 和 pthread_cond_signal【英文标题】:Trying to understand pthread_cond_lock and pthread_cond_signal 【发布时间】:2018-10-24 03:26:24 【问题描述】:

所以我试图了解 pthread_mutex_lock 的工作原理。

我目前的理解是,它会解锁互斥体,并使任何线程通过它进入睡眠状态。睡眠意味着线程处于非活动状态并且不消耗任何资源。

然后它等待信号从休眠状态变为阻塞状态,这意味着线程不能再更改任何变量。

thread 1:
    pthread_mutex_lock(&mutex);
    while (!condition)
        printf("Thread wating.\n");
        pthread_cond_wait(&cond, &mutex);
        printf("Thread awakened.\n");
        fflush(stdout);
   
   pthread_mutex_unlock(&mutex);

   pthread_cond_signal(&condVar);
   pthread_mutex_unlock(&mutex);

所以基本上在上面的示例中,循环运行并运行,每次迭代pthread_cond_wait 检查循环的条件是否为真。如果是,则发送cond_signal 并阻塞线程,因此它不能再操作任何数据。

我真的很难理解这个问题,我很感激一些关于它是如何工作的输入和反馈,以及我是否开始根据上面的内容理解这一点。

我已经查看了this 的帖子,但仍然遇到问题

【问题讨论】:

你的标题打错了:显然应该是pthread_cond_signal,而不是pthread_mutex_signal。我建议更改它,以确保更好地回答您的问题。 重要的是要认识到互斥体可以单独使用,而条件变量通常与互斥体一起使用。为了理解条件变量,首先需要理解互斥锁。如果您还没有,请退后一步,尝试按顺序学习这两件事。 哦,谢谢,我会更新标题。 【参考方案1】:

首先,总结一下:

pthread_mutex_lock(&mutex):

如果mutex 是空闲的,那么这个线程会立即抓取它。

如果mutex 被抓取,那么这个线程会一直等到mutex 空闲,然后抓取它。 

pthread_mutex_trylock(&mutex):

如果mutex 是空闲的,那么这个线程会抓取它。

如果mutex 被抓取,则调用会立即返回EBUSY。 

pthread_mutex_unlock(&mutex):

发布mutex。 

pthread_cond_signal(&cond):

唤醒一个等待条件变量cond的线程。 

pthread_cond_broadcast(&cond):

唤醒所有等待条件变量 cond 的线程。 

pthread_cond_wait(&cond, &mutex):

这必须用mutex 抓取。

调用线程将暂时释放mutex并等待cond

cond被广播,或者被发送信号并且这个线程恰好是被唤醒的那个,那么调用线程将首先重新抓取mutex,然后从调用中返回。

重要的是要注意,调用线程要么 一直抓到mutex 正在等待cond。中间没有间隔。


让我们看一个实际运行的示例代码。我们将按照 OP 的代码行创建它。

首先,我们将使用一个结构来保存每个工作函数的参数。由于我们希望在线程之间共享互斥锁和条件变量,因此我们将使用指针。

#define  _POSIX_C_SOURCE  200809L
#include <stdlib.h>
#include <pthread.h>
#include <limits.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>

/* Worker function work. */
struct work 
    pthread_t        thread_id;
    pthread_mutex_t *lock;      /* Pointer to the mutex to use */
    pthread_cond_t  *wait;      /* Pointer to the condition variable to use */
    volatile int    *done;      /* Pointer to the flag to check */
    FILE            *out;       /* Stream to output to */
    long             id;        /* Identity of this thread */
    unsigned long    count;     /* Number of times this thread iterated. */
;

线程工作者函数接收到一个指向上述结构的指针。每个线程迭代循环一次,然后等待条件变量。当被唤醒时,如果完成标志仍然为零,则线程迭代循环。否则,线程退出。

/* Example worker function. */
void *worker(void *workptr)

    struct work *const work = workptr;

    pthread_mutex_lock(work->lock);

    /* Loop as long as *done == 0: */
    while (!*(work->done)) 
        /* *(work->lock) is ours at this point. */

        /* This is a new iteration. */
        work->count++;

        /* Do the work. */
        fprintf(work->out, "Thread %ld iteration %lu\n", work->id, work->count);
        fflush(work->out);

        /* Wait for wakeup. */
        pthread_cond_wait(work->wait, work->lock);
    

    /* *(work->lock) is still ours, but we've been told that all work is done already. */
    /* Release the mutex and be done. */
    pthread_mutex_unlock(work->lock);
    return NULL;

要运行上述代码,我们还需要一个 main():

#ifndef  THREADS
#define  THREADS  4
#endif

int main(void)

    pthread_mutex_t  lock = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t   wait = PTHREAD_COND_INITIALIZER;
    volatile int     done = 0;
    struct work      w[THREADS];

    char            *line = NULL, *p;
    size_t           size = 0;
    ssize_t          len  = 0;

    unsigned long    total;
    pthread_attr_t   attrs;
    int              i, err;

    /* The worker functions require very little stack, but the default stack
       size is huge. Limit that, to reduce the (virtual) memory use. */
    pthread_attr_init(&attrs);
    pthread_attr_setstacksize(&attrs, 2 * PTHREAD_STACK_MIN);

    /* Grab the mutex so the threads will have to wait to grab it. */
    pthread_mutex_lock(&lock);

    /* Create THREADS worker threads. */
    for (i = 0; i < THREADS; i++) 

        /* All threads use the same mutex, condition variable, and done flag. */
        w[i].lock = &lock;
        w[i].wait = &wait;
        w[i].done = &done;

        /* All threads output to standard output. */
        w[i].out = stdout;

        /* The rest of the fields are thread-specific. */
        w[i].id = i + 1;
        w[i].count = 0;

        err = pthread_create(&(w[i].thread_id), &attrs, worker, (void *)&(w[i]));
        if (err) 
            fprintf(stderr, "Cannot create thread %d of %d: %s.\n", i+1, THREADS, strerror(errno));
            exit(EXIT_FAILURE);  /* Exits the entire process, killing any other threads as well. */
        
    

    fprintf(stderr, "The first character on each line controls the type of event:\n");
    fprintf(stderr, "    e, q    exit\n");
    fprintf(stderr, "    s       signal\n");
    fprintf(stderr, "    b       broadcast\n");
    fflush(stderr);

    /* Let each thread grab the mutex now. */
    pthread_mutex_unlock(&lock);

    while (1) 
        len = getline(&line, &size, stdin);
        if (len < 1)
            break;

        /* Find the first character on the line, ignoring leading whitespace. */
        p = line;
        while ((p < line + len) && (*p == '\0' || *p == '\t' || *p == '\n' ||
                                    *p == '\v' || *p == '\f' || *p == '\r' || *p == ' '))
            p++;

        /* Do the operation mentioned */
        if (*p == 'e' || *p == 'E' || *p == 'q' || *p == 'Q')
            break;
        else
        if (*p == 's' || *p == 'S')
            pthread_cond_signal(&wait);
        else
        if (*p == 'b' || *p == 'B')
            pthread_cond_broadcast(&wait);
    

    /* It is time for the worker threads to be done. */
    pthread_mutex_lock(&lock);
    done = 1;
    pthread_mutex_unlock(&lock);

    /* To ensure all threads see the state of that flag,
       we wake up all threads by broadcasting on the condition variable. */
    pthread_cond_broadcast(&wait);

    /* Reap all threds. */
    for (i = 0; i < THREADS; i++)
        pthread_join(w[i].thread_id, NULL);

    /* Output the thread statistics. */
    total = 0;
    for (i = 0; i < THREADS; i++) 
        total += w[i].count;
        fprintf(stderr, "Thread %ld: %lu events.\n", w[i].id, w[i].count);
    
    fprintf(stderr, "Total: %lu events.\n", total);

    return EXIT_SUCCESS;

如果您将以上内容保存为example.c,您可以使用例如将其编译为examplegcc -Wall -O2 example.c -lpthread -o example.

要正确直观地掌握操作,请在终端中运行示例,并在其旁边的窗口中显示源代码,并在您提供输入时查看执行进度。

您甚至可以运行诸如 printf '%s\n' s s s b q | ./example 之类的命令来快速连续运行一系列事件,或者运行 printf 's\ns\ns\nb\nq\n' | ./example 等命令,从而缩短事件之间的时间。

经过一些实验,您有望发现并非所有输入事件都会导致它们各自的动作。这是因为退出事件(上面的q)不是同步的:它不会等待所有待处理的工作完成,而是告诉线程立即退出。这就是为什么即使对于完全相同的输入,事件的数量也会有所不同。

(另外,如果您在条件变量上发出信号并立即对其进行广播,线程往往只会被唤醒一次。)

您可以通过延迟退出来缓解这种情况,例如使用(printf '%s\n' s s b s s s ; sleep 1 ; printf 'q\n' ) | ./example.

但是,还有更好的方法。条件变量不适用于可数事件;它真的很像旗帜。信号量会更好,但是你应该小心不要溢出信号量;它只能是从 0 到 SEM_VALUE_MAX,包括 0 到 SEM_VALUE_MAX。 (因此,您可以使用信号量来表示 pending 作业的数量,但可能不是每个/所有线程工作人员完成的迭代次数。)线程中待办工作的队列泳池时尚,是最常见的方法。

【讨论】:

非常感谢。这是非常全面和有用的。【参考方案2】:

pthread_cond_wait() 仅表示当前线程应释放互斥锁,然后等待某个条件。这里的诀窍是两者都以原子方式发生,因此不会发生线程已释放互斥锁但尚未等待条件或已在等待条件但尚未释放互斥锁的情况。要么两者都发生,要么都没有发生。

pthread_cond_signal() 只是唤醒当前正在等待信号条件的任何线程。被唤醒的线程要做的第一件事是再次获取互斥锁,如果它无法获取它(例如,由于信号线程当前拥有互斥锁),它将阻塞直到它可以。如果多个线程都在等待条件,pthread_cond_signal() 只是唤醒其中一个,哪个没有定义。如果要唤醒所有等待的线程,必须改用pthread_cond_broadcast();但当然它们不会同时运行,因为现在它们每个人都首先需要获取互斥锁,并且只能一个接一个地运行。

pthread_cond_t 没有状态。如果您发出一个没有线程正在等待的条件的信号,那么什么都不会发生。这不会在内部设置一个标志,如果稍后在某些线程调用pthread_cond_wait(),它将立即被唤醒,因为有一个挂起的信号。 pthread_cond_signal() 只唤醒已经在等待的线程,这意味着这些线程必须在你调用pthread_cond_signal() 之前调用pthread_cond_wait()

这里有一些简单的示例代码。首先是一个读者线程:

// === Thread 1 ===

// We want to process an item from a list.
// To make sure the list is not altered by one
// thread while another thread is accessing it,
// it is protected by a mutex.
pthread_mutex_lock(&listLock);

// Now nobody but us is allowed to access the list.
// But what if the list is empty?
while (list->count == 0) 
    // As long as we hold the mutex, no other thread
    // thread can add anything to the list. So we
    // must release it. But we want to know as soon
    // as another thread has changed it.
    pthread_cond_wait(&listCondition, &listLock);

    // When we get here, somebody has signaled the
    // condition and we have the mutex again and
    // thus are allowed to access the list. The list
    // may however still be empty, as another thread
    // may have already consumed the new item in case
    // there are multiple readers and all are woken 
    // up, thus the while-loop. If the list is still
    // empty, we just go back to sleep and wait again.


// If we get here, the list is not empty.
processListItem(list);

// Finally we release the mutex again.
pthread_mutex_unlock(&listLock);

然后是一个作家线程:

// === Thread 2 ===

// We want to add a new item to the list.
// To make sure that nobody is accessing the
// list while we do, we need to obtain the mutex.
pthread_mutex_lock(&listLock);

// Now nobody but us is allowed to access the list.
// Check if the list is empty.
bool listWasEmpty = (list->count == 0);

// We add our item.
addListItem(list, newItem);

// If the list was empty, one or even multiple
// threads may be waiting for us adding an item.
// So we should wake them up here.
if (listWasEmpty) 
    // If any thread is waiting for that condition,
    // wake it up as now there is an item to process.
    pthread_cond_signal(&listCondition);


// Finally we must release the mutex again.
pthread_mutex_unlock(&listLock);

编写代码以便可以有任意数量的读取器/写入器线程。仅在列表为空时发出信号 (listWasEmpty) 只是一种性能优化,如果您始终在添加项目后发出条件信号,代码也将正常工作。

【讨论】:

谢谢,这真的很有帮助。

以上是关于试图理解 pthread_cond_lock 和 pthread_cond_signal的主要内容,如果未能解决你的问题,请参考以下文章

试图理解 CMTime 和 CMTimeMake

试图理解 C++ 中的堆栈和堆

试图理解这个动态递归子集和的时间复杂度

试图理解 DependencyProperty

试图理解 TranslationInView

试图理解删除实体框架中的子实体