C++ - 多线程 - 线程之间的通信

Posted

技术标签:

【中文标题】C++ - 多线程 - 线程之间的通信【英文标题】:C++ - Multi-threading - Communication between threads 【发布时间】:2017-01-06 12:11:33 【问题描述】:
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>

using namespace std;

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone)
    while(!workdone) 
        unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one();
     


void work(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone) 
    while(!workdone) 
        unique_lock<std::mutex> lk(m);
        cv.wait(lk);
        cout << numbers.front() << endl;
        numbers.pop();
        consumer_count++;

     


int main() 
    condition_variable cv;
    mutex m;
    bool workdone = false;
    queue<int> numbers;

    //start threads
    thread producer(generateNumbers, ref(numbers), ref(cv), ref(m),     ref(workdone));
    thread consumer(work, ref(numbers), ref(cv), ref(m), ref(workdone));

    //wait for 3 seconds, then join the threads
    this_thread::sleep_for(std::chrono::seconds(3));
    workdone = true;

    producer.join();
    consumer.join();

    //output the counters
    cout << producer_count << endl;
    cout << consumer_count << endl;

    return 0;

大家好, 我尝试用 C++ 实现生产者-消费者模式。 生产者线程生成随机整数,将它们添加到队列中,然后通知消费者线程添加了新数字。

消费者线程等待通知,然后将队列的第一个元素打印到控制台并删除它。

我为添加到队列中的每个数字增加一个计数器,并为从队列中取出的每个数字增加一个计数器。

我希望两个计数器在程序完成后保持相同的值,但是差异很大。 代表加入队列的计数器始终在百万范围内(我上次测试中为 3871876),代表消费者从队列中取出数字的计数器始终低于 100k(我上次测试中为 89993)。

有人可以向我解释为什么会有如此巨大的差异吗? 我是否必须添加另一个条件变量,以便生产者线程也等待消费者线程? 谢谢!

【问题讨论】:

可能是生产者比消费者快一点,并且差异是由于线程加入后numbers 正好有producer_count - consumer_count 元素造成的吗? std::cout &lt;&lt; numbers.front() &lt;&lt; std::endl; 涉及大量工作,尤其是因为您不必要地 (?) 刷新每个数字的输出。 不应该workdoneatomic&lt;bool&gt; 之类的吗? 生产者和消费者之间存在竞争条件。恰好生产者比消费者更频繁地获取锁。 删除“工作”函数中的“while(!workdone)”。然后你会得到预期的结果。因为您希望队列为空。现在只有以下情况成立:producer_count == consumer_count + numbers.size()。 尝试while (!(workdone &amp;&amp; numbers.empty())) 让消费者继续,直到它应该退出并且队列为空。也许这就是你想要的行为。 【参考方案1】:

不需要第二个std::condition_variable,只需重复使用您拥有的那个。正如其他人所提到的,您应该考虑使用std::atomic&lt;bool&gt; 而不是普通的bool。但我必须承认,带有 -O3 的 g++ 并没有优化它。

#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)

    while(!workdone.load())
    
        std::unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one(); // Notify worker
        cv.wait(lk); // Wait for worker to complete
     


void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)

    while(!workdone.load())
    
        std::unique_lock<std::mutex> lk(m);
        cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
        cv.wait(lk); // Wait for the generator to complete
        std::cout << numbers.front() << std::endl;
        numbers.pop();
        consumer_count++;
     


int main() 
    std::condition_variable cv;
    std::mutex m;
    std::atomic<bool> workdone(false);
    std::queue<int> numbers;

    //start threads
    std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));


    //wait for 3 seconds, then join the threads
    std::this_thread::sleep_for(std::chrono::seconds(3));
    workdone = true;
    cv.notify_all(); // To prevent dead-lock

    producer.join();
    consumer.join();

    //output the counters
    std::cout << producer_count << std::endl;
    std::cout << consumer_count << std::endl;

    return 0;

编辑:

为了避免零星的一对一错误,您可以使用这个:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)

    while(!workdone.load())
    
        std::unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one(); // Notify worker
        cv.wait(lk); // Wait for worker to complete
     


void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)

    while(!workdone.load() or !numbers.empty())
    
        std::unique_lock<std::mutex> lk(m);
        cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
        if (numbers.empty())
            cv.wait(lk); // Wait for the generator to complete
        if (numbers.empty())
            continue;
        std::cout << numbers.front() << std::endl;
        numbers.pop();
        consumer_count++;
     


int main() 
    std::condition_variable cv;
    std::mutex m;
    std::atomic<bool> workdone(false);
    std::queue<int> numbers;

    //start threads
    std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));


    //wait for 3 seconds, then join the threads
    std::this_thread::sleep_for(std::chrono::seconds(1));
    workdone = true;
    cv.notify_all(); // To prevent dead-lock

    producer.join();
    consumer.join();

    //output the counters
    std::cout << producer_count << std::endl;
    std::cout << consumer_count << std::endl;

    return 0;

【讨论】:

虽然这修复了潜在的死锁,但它不会让消费者消费我认为预期的所有数字。 @nwp 可能时不时地,是的。 感谢您的回答!我进行了一些测试,计数器一直保持相同的值! 欢迎您。第一个解决方案几乎肯定不会进入死锁。考虑以下事件:worker 通知、generator 获取锁、generator 生成、generator 通知、worker 等待。如果发生这种情况(几乎肯定不会),但它会死锁。第二个关心是否更不可能......对于“几乎肯定”的定义en.m.wikipedia.org/wiki/Almost_surely【参考方案2】:

请注意,此代码可能无法正常工作。 workdone 变量被定义为常规布尔值 并且编译器假设它可以被安全地优化掉是完全合法的,因为它在代码块内永远不会改变。

如果你有一个混蛋的反应只是添加 volatile... 不,那也行不通。 您需要正确同步对 workdone 变量的访问,因为两个线程都在读取,而另一个线程(ui 线程)正在写入。 另一种解决方案是使用事件之类的东西而不是简单的变量。

但是对您的问题的解释。 两个线程具有相同的结束条件 (!workdone),但它们具有不同的持续时间,因此目前无法保证生产者和消费者以某种方式同步以随着时间的推移以相似数量的循环运行。

【讨论】:

以上是关于C++ - 多线程 - 线程之间的通信的主要内容,如果未能解决你的问题,请参考以下文章

多线程之间的通信~~~管道通道

Java 多线程与并发:内存模型

Java学习笔记46(多线程三:线程之间的通信)

多线程之间的通信

多线程之间的通信(等待唤醒机制Lock 及其它线程的方法)

Java多线程系列:线程的五大状态,以及线程之间的通信与协作