C++11 简单的生产者消费者多线程

Posted

技术标签:

【中文标题】C++11 简单的生产者消费者多线程【英文标题】:C++11 Simple Producer Consumer Multithreading 【发布时间】:2021-11-25 21:42:39 【问题描述】:

我正在尝试自学多线程,我在这里学习了本教程:https://www.classes.cs.uchicago.edu/archive/2013/spring/12300-1/labs/lab6/

如果您一直滚动到底部,则会出现一个生产者-消费者的示例 sn-p,它要求我们解决此代码中发现的竞争条件:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <queue>
using namespace std;

int main() 
    int c = 0;
    bool done = false;
    queue<int> goods;

    thread producer([&]() 
        for (int i = 0; i < 500; ++i) 
            goods.push(i);
            c++;
        

        done = true;
    );

    thread consumer([&]() 
        while (!done) 
            while (!goods.empty()) 
                goods.pop();
                c--;
            
        
    );

    producer.join();
    consumer.join();
    cout << "Net: " << c << endl;

最后的净值应该是0,这是我的尝试:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <queue>
#include <atomic>
using namespace std;


int main() 

    int c = 0;

    bool done = false;
    queue<int> goods;
    mutex mtx;  
    condition_variable cond_var;

    // thread to produce 500 elements
    thread producer([&]() 

        for (int i = 0; i < 500; ++i) 
            // lock critical secion
            unique_lock<mutex> lock(mtx);   

            goods.push(i);
            c++;
            lock.unlock();

            // notify consumer that data has been produced
            cond_var.notify_one();
        

        // notify the consumer that it is done
        done = true;
        cond_var.notify_one();


    );

    // thread to consume all elements
    thread consumer([&]() 

        while (!done) 
            unique_lock<mutex> lock(mtx);   
            while (!goods.empty()) 
                goods.pop();
                c--;
            
            // unlocks lock and wait until something in producer gets put
            cond_var.wait(lock);
        
    );

    producer.join();
    consumer.join();
    cout << "Net: " << c << endl;

我觉得我从根本上错过了一些东西。我相信我遇到的最大问题是 cond_var.wait() 的消费者,因为如果生产者将“完成”设置为 true,那么消费者将不会回到 while(!goods.empty())。我不确定如何解决它。

任何提示、解释甚至不同的方法都将不胜感激!

【问题讨论】:

我认为在生产者中设置done = true; 会导致消费者提前退出。如果您将producer.join(); 移动到thread consumer([ 的声明上方(模拟在消费者开始之前排队的所有工作),那么消费者将不会工作。 你独立对待消费者的两个状态。但实际上它们都需要定义一个“完成”。 “完成”和goods.empty() 必须同时为真才能完成。 unique_lock 有一个调用解锁的析构函数。所以你不需要在producer中显式调用它。 condition_variable 有一个包含继续测试的 wait()。 想想goods.pop();c--; 这是goods.pop();doSomeWork(); 的简化 在现实生活中,如果doSomeWork() 不是微不足道的,那么您的消费者正在阻止生产者执行任务,因为它在执行时持有锁它的工作。 【参考方案1】:

制作人:

thread producer([&]() 

    for (int i = 0; i < 500; ++i)
    
        
            // Just have a lock while interacting with shared items.
            unique_lock<mutex> lock(mtx);   
            goods.push(i);
            c++;
        
        cond_var.notify_one();
    

    // Lock to update shared state.
    unique_lock<mutex> lock(mtx);   
    done = true;
    cond_var.notify_one();
);

消费者

thread consumer([&]() 

    // This loop exits when
    //        done          => true
    //   AND  goods.empty() => true

    // Acquire lock before checking shared state.
    unique_lock<mutex> lock(mtx);

    while (!(done && goods.empty()))
    
        // Wait until there is something in the queue to processes
        // releasing lock while we wait.
        // Break out if we are done or goods is not empty.
        cond_var.wait(lock, [&]()return done || !goods.empty(););

        // You now have the lock again, so modify shared state is allowed
        // But there is a possibility of no goods being available.
        // So let's check before doing work.
        if (!goods.empty())
        
            goods.pop();
            c--;
        
    
);

或者,如果我们只是解决竞争条件。我们可以简单地检查done 的状态,并确保没有其他变量发生交互。

制作人:

thread producer([&]() 

    // The consumer is not allowed to touch goods
    // until you are finished. So just use with
    // no locks.
    for (int i = 0; i < 500; ++i)
    
        goods.push(i);
        c++;
    

    // Lock to update shared state.
    // Tell consumer we are ready for processing.
    unique_lock<mutex> lock(mtx);   
    done = true;
    cond_var.notify_one();
);

消费者

thread consumer([&]() 

    // Acquire lock before checking shared state.
    unique_lock<mutex> lock(mtx);
    cond_var.wait(lock, [&]()return done;);

    // We now know the consumer has finished all updates.
    // So we can simply loop over the goods and processes them
    while (!goods.empty())
    
        goods.pop();
        c--;
    
);

【讨论】:

感谢您的详细回答,我复制了第一种方法,但似乎消费者陷入了无限循环,它对您有用吗?我还有一个关于消费者的问题,在你提到 cond_var.wait(...) 之后我们现在又获得了锁,但我没有看到这种情况发生? condition_variable 在您调用wait() 时释放锁,但在wait() 调用返回您的代码之前重新获取锁。 wait() 前置条件函数总是令人困惑。也许我测试错了。 !done || goods.empty()。尝试:done || !goods.empty() 让我知道是否可以解决问题。 啊,我没意识到 wait() 重新获取了锁。将其更改为完成 || !goods.empty() 修复了它!我还必须在生产者中进行另一个小的更改,即在带有锁的花括号内包含 c++ 操作,否则最后 c 的值不正确。感谢大家的帮助,我现在明白多了!

以上是关于C++11 简单的生产者消费者多线程的主要内容,如果未能解决你的问题,请参考以下文章

C++11多线程编程——生产消费者模型之条件变量

day11(多线程,唤醒机制,生产消费者模式,多线程的生命周期)

C++11中多线程例子

C++11中多线程例子

如何用c++builder 编写多线程

多线程 c++11-ish 队列在 Windows 上失败