多个生产者和消费者,但一个共享资源 - 只有一个线程在运行

Posted

技术标签:

【中文标题】多个生产者和消费者,但一个共享资源 - 只有一个线程在运行【英文标题】:Multiple producers and consumers but one shared reource - Only one thread is running 【发布时间】:2015-12-18 01:14:41 【问题描述】:

在这里,我创建了 2 个生产者线程和 2 个消费者线程。他们只在一个共享队列中放入和取出值。

问题是第一个生产者确实填写然后进入等待模式。

之后没有其他线程运行。请解释一下我错过了什么。

#include "mainwindow.h"
#include <QApplication>

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <iostream>
#include <QDebug>


pthread_mutex_t mutexVariable     = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t  conditionVariable = PTHREAD_COND_INITIALIZER;

int numberOfActiveProducers;
int numberOfActiveConsumers;

QList <int> sharedQueue;
/*
 * `sharedQueue`'s size is assumed to be 10 ATM.
 * `sharedQueue` is supposed to be shared among two threads.
 * Producer threads will put the 1's in it, and Consumer threads will remove the 1's.
 * Assumption: `sharedQueue` can contain only 10 elements at a time.
 */

int sizeOfSharedQueue;

//  This function is run by the thread `Producer A`.
void *threadProducerAFunction (void *arg) 
    Q_UNUSED (arg);

    while (1) 
        qDebug () << "\nProducer A";

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () < 10) 
            sharedQueue.push_back (1);
            qDebug () << "\nPushed by Producer A: Length of queue is: " << sharedQueue.length ();
        
        else 
            qDebug () << "\nProducer A has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
            pthread_cond_wait (&conditionVariable, &mutexVariable);
        

        pthread_mutex_unlock (&mutexVariable);
    

    return NULL;


//  This function is run by the thread `ProducerB`.
void *threadProducerBFunction (void *arg) 
    Q_UNUSED (arg);

    while (1) 
        qDebug () << "\nProducer B";

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () < 10) 
            sharedQueue.push_back (1);
            qDebug () << "\nPushed by Producer B: Length of queue is: " << sharedQueue.length ();
        
        else 
            qDebug () << "\nProducer B has done its bit and is now in waiting mode. Length of queue is: " << sharedQueue.length ();
            pthread_cond_wait (&conditionVariable, &mutexVariable);
        

        pthread_mutex_unlock (&mutexVariable);
    

    return NULL;


//  This function is run by the thread `Consumer A`.
void *threadConsumerAFunction (void *arg) 
    Q_UNUSED (arg);

    while (1) 
        qDebug () << "\nConsumer A";

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () > 0) 
            sharedQueue.pop_front ();
            qDebug () << "\nRemoved by thread Consumer A. Length of queue is: " << sharedQueue.length ();
        
        else 
            pthread_cond_signal (&conditionVariable);
            qDebug () << "\nSignal issued by thread Consumer A. Length of queue is: " << sharedQueue.length ();
        

        pthread_mutex_unlock (&mutexVariable);
       
    return NULL;


//  This function is run by the thread `Consumer B`.
void *threadConsumerBFunction (void *arg) 
    Q_UNUSED (arg);

    while (1) 
        qDebug () << "\nConsumer B";

        pthread_mutex_lock (&mutexVariable);

        if (sharedQueue.length () > 0) 
            sharedQueue.pop_front ();
            qDebug () << "\nRemoved by thread Consumer B. Length of queue is: " << sharedQueue.length ();
        
        else 
            pthread_cond_signal (&conditionVariable);
            qDebug () << "\nSignal issued by thread Consumer B. Length of queue is: " << sharedQueue.length ();
        

        pthread_mutex_unlock (&mutexVariable);
    
    return NULL;


int main (int argc, char *argv[]) 
    numberOfActiveProducers = 2;
    numberOfActiveConsumers = 2;
    sizeOfSharedQueue       = 10;

    // `sharedQueue` initialization by 0.
    for (int i = 0; i < sizeOfSharedQueue; i++) 
        sharedQueue.push_back (0);
    

    // Producer threads creation and joining
    pthread_t producerA;
    pthread_t producerB;

    if (pthread_create (&producerA, NULL, threadProducerAFunction, NULL)) 
        fprintf (stderr, "Error creating thread Producer A\n");
        return 1;
    

    if (pthread_join (producerA, NULL)) 
        fprintf (stderr, "Error joining thread Producer A\n");
        return 2;
    

    if (pthread_create (&producerB, NULL, threadProducerBFunction, NULL)) 
        fprintf (stderr, "Error creating thread Producer A\n");
        return 1;
    

    if (pthread_join (producerB, NULL)) 
        fprintf (stderr, "Error joining thread Producer B\n");
        return 2;
    

    // Consumer threads creation and joining
    pthread_t consumerA;
    pthread_t consumerB;

    if (pthread_create (&consumerA, NULL, threadConsumerAFunction, NULL)) 
        fprintf (stderr, "Error creating thread Consumer A\n");
        return 1;
    

    if (pthread_join (consumerA, NULL)) 
        fprintf (stderr, "Error joining thread Consumer A\n");
        return 2;
    

    if (pthread_create (&consumerB, NULL, threadConsumerBFunction, NULL)) 
        fprintf (stderr, "Error creating thread Consumer B\n");
        return 1;
    

    if (pthread_join (consumerB, NULL)) 
        fprintf (stderr, "Error joining thread Consumer B\n");
        return 2;
    

    QApplication a (argc, argv);
    MainWindow w;
    w.show ();

    return a.exec ();

【问题讨论】:

请与我们分享您获得的调试输出。 哦,在main 中创建每个线程后,您有一个pthread_join。这是行不通的,因为 pthread_join 在它等待的线程退出之前不会返回。你的线程都没有退出。所以当然,在创建第一个生产者线程之后甚至不会创建其他线程。 @kaylum 我应该纠正什么?我不应该写这样的连接吗? 将所有加入调用放在所有创建调用之后。或者将它们完全排除在外。仅当您真正关心线程退出后的结果时才需要它们(您不关心,因为它们没有退出)或者您想防止主线程过早退出(我认为您已经通过a.exec()). @kaylum 请写一个包含所有细节的正确答案。 【参考方案1】:

问题是在main 中的每个pthread_create 调用之后都有pthread_join 调用。 pthread_join 根据定义将阻塞,直到它正在等待的线程退出。由于没有子线程退出,结果是第一个pthread_join 调用将无限期阻塞,因此后续的pthread_create 调用都不会执行。

一种解决方法是删除所有pthread_join 调用。 pthread_join 通常用于等待并获取子线程的返回状态或同步主线程,使其在子线程完成之前不退出。所以在这种情况下实际上不需要那些pthread_join 调用,因为子线程不会退出,而主线程调用a.exec() 来执行阻止它退出的任务。

与实际问题无关,但我看到您基本上复制了每个线程的生产者和消费者代码。这是不必要的,因为可以将同一个线程函数传递给多个 pthread_create 调用(只要没有静态变量)。如果您想区分实例以进行调试,请使用线程 ID 或将不同的 arg 传递给每个线程以进行识别。

【讨论】:

那么我们什么时候需要为每个线程提供单独的函数呢?请举例。 @TheIndependentAquarius 当他们要做不同的事情时;-)。以一家大公司的 Web 服务器为例,它为每个连接生成一个线程。没有600个serveConnection[AA-ZZ]()函数单独编码……每个执行线程在函数中都有自己的一组局部变量,类似于递归调用同一个函数,当然还有自己的程序计数器。 @TheIndependentAquarius 我想不出一个实际的(非人为的)示例,说明您需要单独的函数,而线程在执行基本相同的工作。在这种情况下,几乎没有充分的理由拥有单独的功能。 arg 参数本质上是为了这个目的——如果需要,允许将状态传递给线程以参数化其行为。 @PeterA.Schneider 感谢您的解释。凯勒姆也感谢你。那么,对于所有人来说,函数的参数应该是什么?线程ID?你会展示一个小演示程序,其中一个功能可以被所有生产者访问吗?【参考方案2】:

您正在混合框架。

如果您已经在使用Qt,它提供了一系列线程类,让生活变得更轻松。

我已将您的代码转换为 Qt 等效项,它可以正常工作。

#include <QtCore/qthread.h>
#include <QtWidgets/QApplication>
#include <QtCore/qmutex.h>
#include <QtCore/qwaitcondition.h>
#include <QtCore/QList.h>
#include <QtCore/qdebug.h>

QMutex mutex;
QWaitCondition waitCondition;

int numberOfActiveProducers;
int numberOfActiveConsumers;

QList<int> sharedQueue;

class Producer : public QThread
public:
    Producer(QString const &label) : label(label) 

    void run() 
        forever 
            qDebug() << QString("\nProducer %1").arg(label);

            QMutexLocker locker(&mutex);

            if (sharedQueue.length() < 10)
                sharedQueue << 1;
                qDebug() << QString("\nPushed by Producer %1: Length of queue is: %2").arg(label).arg(sharedQueue.length());
             else 
                qDebug() << QString("\nProducer %1 has done its job and is now in waiting mode. Length of queue is: %2").arg(label).arg(sharedQueue.length());
                waitCondition.wait(&mutex);
            
        
    

private:
    QString label;
;

class Consumer : public QThread
public:
    Consumer(QString const &label) : label(label) 

    void run()
        forever 
            qDebug() << QString("\nConsumer %1").arg(label);

            QMutexLocker locker(&mutex);

            if (sharedQueue.length() > 0)
                sharedQueue.takeFirst();
                qDebug() << QString("\nRemoved by thread Consumer %1. Length of queue is: %2").arg(label).arg(sharedQueue.length());
             else 
                waitCondition.wakeAll();
                qDebug() << QString("\nSignal issued by thread Consumer %1. Length of queue is: %2").arg(label).arg(sharedQueue.length());
            
        
    

private:
    QString label;
;

int main(int argc, char **argv)
    numberOfActiveConsumers = 2;
    numberOfActiveProducers = 2;

    QCoreApplication a(argc, argv);

    Producer producerA("A");
    Producer producerB("B");
    Consumer consumerA("A");
    Consumer consumerB("B");

    producerA.start();
    producerB.start();
    consumerA.start();
    consumerB.start();

    return a.exec();

我必须补充一点,除非我需要直接访问线程,否则我通常不会将QThread 子类用于简单的函数。通常,我会将QRunnable 子类化,然后将对象赋予QThreadPool 以开始。

【讨论】:

以上是关于多个生产者和消费者,但一个共享资源 - 只有一个线程在运行的主要内容,如果未能解决你的问题,请参考以下文章

尝试锁定共享内存互斥体时出现分段错误

线程通信之生产者消费者模型

JAVA模拟生产者与消费者实例

一个生产者,多个消费者

基于Java 生产者消费者模式(详细分析)

生产者与消费者问题