Boost Threads Producer/Consumer 意外行为

Posted

技术标签:

【中文标题】Boost Threads Producer/Consumer 意外行为【英文标题】:Boost Threads Producer/Consumer unexpected behavior 【发布时间】:2012-07-11 21:45:02 【问题描述】:

我目前正在编写一个应用程序(使用 boost),它将有一个生产者抓取帧和一个消费者阅读帧。我在生产者中添加了一个睡眠语句来模拟抓取帧的时间。我希望消费者等待条件变量,并在生产者的第一次通知被唤醒以读取帧。但是,我在日志文件中看到的是消费者(主线程)在等待条件变量,但是在消费者退出等待读取帧之前,生产者会经历多次通知。

这是我的 Worker.h

class Worker 
static log4cxx::LoggerPtr m_log;

public:
    Worker();
    virtual ~Worker();

    void start();
    void stop();
    void getCurrentFrame(/*cv::Mat& frame*/);

private:
    void processFrames();

    volatile bool m_stopRequested;

    bool m_bFrameReady;
    boost::mutex m_mutex;
    boost::condition_variable condF;

    boost::shared_ptr<boost::thread> m_thread;
;

工人.cpp

LoggerPtr Worker::m_log(Logger::getLogger("fdx.Worker"));

Worker::Worker() 
    m_bFrameReady = false;

    LOG4CXX_INFO(m_log, "Worker() c-tor");

    m_stopRequested = false;



Worker::~Worker() 
    LOG4CXX_INFO(m_log, "Worker() d-tor");


void Worker::start()

    LOG4CXX_INFO(m_log, "Worker()::start()");
    assert(!m_thread);

    m_thread = boost::shared_ptr<boost::thread>(new boost::thread(&Worker::processFrames, this));

    LOG4CXX_WARN(m_log, "Worker()::start() thread[" << m_thread->get_id() << "] started!");


void Worker::stop()

    LOG4CXX_INFO(m_log, "Worker()::stop()");

    if(m_thread != NULL)
    
        LOG4CXX_INFO(m_log, "Worker()::stop() ThrId [" << m_thread->get_id() << "]");
        m_stopRequested = true;
        m_thread->join();
    
    else
    
        LOG4CXX_WARN(m_log, "Worker()::stop() The thread for this camera was never started.");
    

LOG4CXX_INFO(m_log, "Worker()::stop() thread stopped!");


void Worker::processFrames()

    LOG4CXX_WARN(m_log, "Worker()::processFrames() Thread[" << boost::this_thread::get_id() << "] starting...");

    int rc = 0;
    std::stringstream ss;

    while(!this->m_stopRequested)
    
        boost::mutex::scoped_lock lock(m_mutex);
        LOG4CXX_WARN(m_log, "Worker()::processFrames() Got a Write lock");

        m_bFrameReady = true;
        LOG4CXX_WARN(m_log, "Worker()::processFrames() Frame ready set to true");

        boost::this_thread::sleep(boost::posix_time::milliseconds(200));

        LOG4CXX_WARN(m_log, "Worker()::processFrames() Write Un-lock");

        lock.unlock();

        LOG4CXX_WARN(m_log, "Worker()::processFrames() Notify");

        condF.notify_one();
    


void Worker::getCurrentFrame()

    boost::mutex::scoped_lock lock(m_mutex);

    while(!this->m_bFrameReady)
    
        LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() wait for Read lock");
        condF.wait(lock);
    

    LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Frame ready; Got a Read lock");

    m_bFrameReady = false;

    LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Frame ready set to false");

    LOG4CXX_WARN(m_log, "Worker::getCurrentFrame() Read Un-lock");
    lock.unlock();


main.cpp

LoggerPtr logger(Logger::getLogger("TCamApp"));

int main(int argc, char** argv)

int rc = 0;

char cwDir[FILENAME_MAX];

Worker* pWorker = NULL;

memset(cwDir, 0, sizeof(cwDir));
getcwd(cwDir, FILENAME_MAX);

std::cout << "Current Working Dir[" << cwDir << "]" << endl;

std::stringstream ss;
ss << "" << cwDir << "/logs.properties";
std::cout << "logs.properties file[" << ss.str() << "]" << endl;

struct stat st;
if(!stat(ss.str().c_str(), &st))

    PropertyConfigurator::configure(ss.str());

else

    BasicConfigurator::configure();


LOG4CXX_INFO(logger, "Application [" << argv[0] << "] starting...");

pWorker = new Worker();
assert(pWorker);

pWorker->start();

for(int i = 0; i < 100; i++)

    pWorker->getCurrentFrame();

    LOG4CXX_INFO(logger, "Iteration [" << i << "]");


    //boost::this_thread::sleep(boost::posix_time::milliseconds(20));


pWorker->stop();

LOG4CXX_INFO(logger, "Application [" << argv[0] << "] stopping...");

return rc;

以下是我的日志文件的摘录:

2012-07-11 15:33:53,943 [0x7f5707bcf780] INFO  TCamApp - Application [/home/op/workspace/TestThreads/Debug/TestThreads] starting...
2012-07-11 15:33:53,944 [0x7f5707bcf780] WARN  fdx.Worker - Worker()::start() thread[0x15e4c50] started!
2012-07-11 15:33:53,944 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() wait for Read lock
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Thread[0x15e4c50] starting...
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:53,944 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,145 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,345 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,345 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Frame ready; Got a Read lock
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Frame ready set to false
2012-07-11 15:33:54,345 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Read Un-lock
2012-07-11 15:33:54,346 [0x7f5707bcf780] INFO  TCamApp - Iteration [0]
2012-07-11 15:33:54,346 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() wait for Read lock
2012-07-11 15:33:54,346 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,346 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,546 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,547 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,747 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Got a Write lock
2012-07-11 15:33:54,948 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Frame ready set to true
2012-07-11 15:33:55,148 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Write Un-lock
2012-07-11 15:33:55,149 [0x7f57059c1700] WARN  fdx.Worker - Worker()::processFrames() Notify
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Frame ready; Got a Read lock
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Frame ready set to false
2012-07-11 15:33:55,149 [0x7f5707bcf780] WARN  fdx.Worker - Worker::getCurrentFrame() Read Un-lock
2012-07-11 15:33:55,149 [0x7f5707bcf780] INFO  TCamApp - Iteration [1]

从日志中可以看出,主线程等待读取,但另一个线程会在主线程退出它的 wait() 之前产生多个通知。

我研究了一些,并认为我的编码正确,但它的行为不像我预期的那样。我将不胜感激有关解决方案的任何建议。谢谢。

【问题讨论】:

除非你在一个非常特定的平台上工作,比如 x86,并且想将自己限制在这样的平台上,volatile is not useful for multithreaded programming。 感谢您的信息。我会删除它。 好吧,用别的东西代替它。 :) 喜欢atomic&lt;bool&gt;(或原子标志)。 【参考方案1】:

这是意料之中的,因为生产者线程在互斥锁锁定的情况下处于休眠状态。一旦它醒来,它就会通知消费者并再次锁定它。无法保证谁将锁定互斥锁是“公平的”。

您似乎试图实现的是一个异步队列。它通常包含 2 个条件变量:一个在队列满时抑制生产者,另一个在队列为空时抑制消费者。无论生产或消费队列中的项目需要多长时间,互斥锁只会在推送/弹出操作期间被锁定 - 这应该非常快。

您的 sleep 语句可能只是使您的操作系统的调度程序偏向于为生产者线程提供更多优先级。将睡眠移出临界区,以模拟推送操作外部的处理,您应该会看到消费者线程的响应速度更快。

在相关说明中,您可以将哨兵对象(即特殊值,如指针队列上的空指针)推送到队列中,而不是轮询原子变量以终止必须停下来。

【讨论】:

很棒的信息。将睡眠移出关键部分,一切都按预期运行。这让我想到了从视频设备检索帧的代码,并且文件句柄上有一个 select() 调用,并且超时。我将代码更改为仅锁定缓冲区复制操作,而不是整个检索帧。我真的是您对使用哨兵对象进行终止的想法。再次感谢!!

以上是关于Boost Threads Producer/Consumer 意外行为的主要内容,如果未能解决你的问题,请参考以下文章

Boost Threads Producer/Consumer 意外行为

我可以使用 Boost Signals2 和 Threads 在 C++ 中创建软件看门狗定时器线程吗?

用 VS2012 安装 boost 1.52

在禁用中断时中断 boost::thread

在 boost::thread 线程中使用异常

C ++ Boost:2个线程之间的变量同步