用于图像处理的基于生产者-消费者的多线程
Posted
技术标签:
【中文标题】用于图像处理的基于生产者-消费者的多线程【英文标题】:Producer-consumer based multi-threading for image processing 【发布时间】:2015-07-08 13:31:15 【问题描述】:更新:我在下面的回答中提供了问题的原因及其解决方案。
我想为图像处理任务实现基于生产者-消费者方法的多线程。对于我来说,Producer
线程应该抓取图像并将它们放入container
,而消费者线程应该从Container
线程中提取图像。我认为我应该使用queue
来实现container
。
我想按照SO answer 中的建议使用以下代码。但我对container
的实现以及将传入的图像放入Producer
线程中感到非常困惑。
问题:第一个consumer thread
显示的图片不包含完整的数据。而且,第二个consumer thread
从不显示任何图像。可能是由于某些竞争情况或锁定情况,第二个线程根本无法访问队列的数据。我已经尝试过使用Mutex
。
#include <vector>
#include <thread>
#include <memory>
#include <queue>
#include <opencv2/highgui.hpp>
#include <opencv2/core.hpp>
#include <opencv2/imgproc.hpp>
Mutex mu;
struct ThreadSafeContainer
queue<unsigned char*> safeContainer;
;
struct Producer
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
void run()
while(true)
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
unsigned char *pt_src = image.data;
mu.lock();
container->safeContainer.push(pt_src);
mu.unlock();
std::shared_ptr<ThreadSafeContainer> container;
;
struct Consumer
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
~Consumer()
void run()
while(true)
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
unsigned char *ptr_consumer_Image;
ptr_consumer_Image = container->safeContainer.front(); //The front of the queue contain the pointer to the image data
container->safeContainer.pop();
Mat image(400, 400, CV_8UC3);
image.data = ptr_consumer_Image;
imshow("consumer image", image);
waitKey(33);
mu.unlock();
std::shared_ptr<ThreadSafeContainer> container;
;
int main()
//Pointer object to the class containing a "container" which will help "Producer" and "Consumer" to put and take images
auto ptrObject_container = make_shared<ThreadSafeContainer>();
//Pointer object to the Producer...intialize the "container" variable of "Struct Producer" with the above created common "container"
auto ptrObject_producer = make_shared<Producer>(ptrObject_container);
//FIRST Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto first_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//SECOND Pointer object to the Consumer...intialize the "container" variable of "Struct Consumer" with the above created common "container"
auto second_ptrObject_consumer = make_shared<Consumer>(ptrObject_container);
//RUN producer thread
thread producerThread(&Producer::run, ptrObject_producer);
//RUN first thread of Consumer
thread first_consumerThread(&Consumer::run, first_ptrObject_consumer);
//RUN second thread of Consumer
thread second_consumerThread(&Consumer::run, second_ptrObject_consumer);
//JOIN all threads
producerThread.join();
first_consumerThread.join();
second_consumerThread.join();
return 0;
【问题讨论】:
您是否考虑过使用带有互斥锁的队列?我在多个项目中使用这种方法,效果很好。 许多框架都支持线程池类。你确定要自己做吗? @dumbak:我在多线程方面没有任何经验,所以我愿意接受任何建议。 @skm 我下面的回答与 dumbak 的建议相同。如果您查看我的 github 存储库中的代码,您应该会找到使用队列和互斥体解决方案的简单消费者生产者所需的一切。 @qzcx:谢谢,我去看看。 【参考方案1】:我在您的原始问题中没有看到实际问题,因此我将向您提供我在大学课程中用于实施生产者-消费者的参考资料。
http://cs360.byu.edu/static/lectures/winter-2014/semaphores.pdf
幻灯片 13 和 17 给出了生产者-消费者的好例子
我在我的 github 上发布的实验室中使用了这个: https://github.com/qzcx/Internet_Programming/tree/master/ThreadedMessageServer
如果你查看我的 server.cc,你可以看到我对生产者-消费者模式的实现。
请记住,使用这种模式,您无法切换等待语句的顺序,否则您可能会陷入死锁。
希望这有帮助。
编辑:
好的,这是我上面链接的代码中的消费者-生产者模式的摘要。生产者消费者背后的想法是有一种线程安全的方式将任务从“生产者”线程传递到“消费者”工作线程。在我的示例中,要做的工作是处理客户端请求。生产者线程 (.serve()) 监视传入的套接字,并将连接传递给消费者线程 (.handle()) 以处理传入的实际请求。此模式的所有代码都可以在 server.cc 中找到文件(在 server.h 中有一些声明/导入)。
为了简短起见,我省略了一些细节。一定要仔细检查每一行并了解发生了什么。查找我正在使用的库函数以及参数的含义。我在这里为您提供了很多帮助,但您还有很多工作要做才能完全理解。
制作人:
就像我上面提到的,整个生产者线程都在 .serve() 函数中。它做了以下事情
初始化信号量。由于操作系统不同,这里有两个版本。我在 OS X 上编程,但必须在 Linux 上交代码。由于信号量与操作系统相关联,因此了解如何在特定设置中使用信号量非常重要。 它为客户端设置了与之交谈的套接字。对您的应用程序不重要。 创建消费者线程。 监视客户端套接字并使用生产者模式将项目传递给消费者。这段代码在下面在.serve()函数的底部可以看到如下代码:
while ((client = accept(server_,(struct sockaddr *)&client_addr,&clientlen)) > 0)
sem_wait(clients_.e); //buffer check
sem_wait(clients_.s);
clients_.q->push(client);
sem_post(clients_.s);
sem_post(clients_.n); //produce
首先,您检查缓冲区信号量“e”以确保队列中有空间来放置请求。其次,获取队列的信号量“s”。然后将您的任务(在本例中为客户端连接)添加到队列中。释放队列的信号量。最后,使用信号量“n”向消费者发出信号。
消费者:
在 .handle() 方法中,您实际上只关心线程的开头。
while(1)
sem_wait(clients_.n); //consume
sem_wait(clients_.s);
client = clients_.q->front();
clients_.q->pop();
sem_post(clients_.s);
sem_post(clients_.e); //buffer free
//Handles the client requests until they disconnect.
消费者执行与生产者类似的操作,但方式相反。首先,消费者等待生产者在信号量“n”上发出信号。请记住,由于有多个消费者,因此哪个消费者最终获得此信号量是完全随机的。他们为它而战,但每个信号量的每个 sem_post 只有一个人可以通过这一点。其次,他们像生产者一样获取队列信号量。将第一项从队列中弹出并释放信号量。最后,它们在缓冲区信号量“e”上发出信号,表明缓冲区中现在有更多空间。
免责声明:
我知道信号量的名字很糟糕。它们与我教授的幻灯片相匹配,因为那是我学到的。我认为它们代表以下几点:
e for empty :如果队列已满,此信号量会阻止生产者将更多项目推入队列。 s for semaphore : 我最不喜欢的。但是我的教授的风格是为每个共享数据结构都有一个结构。在这种情况下,“clients_”是包含所有三个信号量和队列的结构。基本上,这个信号量是为了确保没有两个线程同时接触同一个数据结构。 n 表示队列中的项目数。【讨论】:
我查看了您的代码。因为,我不知道你想做什么,你的逻辑是什么,所以我真的很难理解它。如果您可以仅发布代码的相关部分(关于问题)并在其上放置一些 cmets,那就太好了。 应该注意的是,如果你想像哑巴克的回答一样依赖队列结构来检查空或满,你可以简化这段代码。但是,这需要线程在访问信号量之前不断等待并发出信号。我更喜欢上述方式,因为它有操作系统句柄何时唤醒或休眠线程。【参考方案2】:好的,所以让它尽可能简单。您将需要 2 个线程、互斥体、队列和 2 个线程处理函数。
Header.h
static DWORD WINAPI ThreadFunc_Prod(LPVOID lpParam);
static DWORD WINAPI ThreadFunc_Con(LPVOID lpParam);
HANDLE m_hThread[2];
queue<int> m_Q;
mutex m_M;
添加所有需要的东西,这些只是你需要的核心部分
Source.cpp
DWORD dwThreadId;
m_hThread[0] = CreateThread(NULL, 0, this->ThreadFunc_Prod, this, 0, &dwThreadId);
// same for 2nd thread
DWORD WINAPI Server::ThreadFunc_Prod(LPVOID lpParam)
cYourClass* o = (cYourClass*) lpParam;
int nData2Q = GetData(); // this is whatever you use to get your data
m_M.lock();
m_Q.push(nData2Q);
m_M.unlock();
DWORD WINAPI Server::ThreadFunc_Con(LPVOID lpParam)
cYourClass* o = (cYourClass*) lpParam;
int res;
m_M.lock();
if (m_Q.empty())
// bad, no data, escape or wait or whatever, don't block context
else
res = m_Q.front();
m_Q.pop();
m_M.unlock();
// do you magic with res here
在 main 的末尾 - 不要忘记使用 WaitForMultipleObjects
所有可能的示例都可以直接在 MSDN 中找到,因此对此有很好的评论。
第 2 部分:
好的,所以我相信标题是不言自明的,所以我会给你更多的来源描述。在您的源代码中的某个地方(甚至可以在构造函数中)您创建线程 - 如何创建线程的方式可能不同,但想法是相同的(在 win 中 - 线程在 posix 中创建后立即运行,您必须加入)。我相信你会在某个地方拥有一个启动你所有魔法的功能,让我们称之为MagicKicker()
在 posix 的情况下,在构造函数中创建线程并在 MagicKicker()
中加入 em,赢 - 在 MagicKicker()
中创建
比你需要声明(在标题中)两个函数,你的线程函数将被实现 ThreadFunc_Prod
和 ThreadFunc_Prod
,这里重要的魔法是你将对你的对象的引用传递给这个函数(因为线程基本上是静态),因此您可以轻松访问共享资源,如队列、互斥锁等...
这些功能实际上是在做这项工作。实际上,您的代码中包含您需要的所有内容,只需将其用作在 Producer 中添加例程即可:
int nData2Q = GetData(); // this is whatever you use to get your data
m_M.lock(); // locks mutex so nobody cant enter mutex
m_Q.push(nData2Q); // puts data from producer to share queue
m_M.unlock(); // unlock mutex so u can access mutex in your consumer
并将其添加到您的消费者:
int res;
m_M.lock(); // locks mutex so u cant access anything wrapped by mutex in producer
if (m_Q.empty()) // check if there is something in queue
// nothing in you queue yet OR already
// skip this thread run, you can i.e. sleep for some time to build queue
Sleep(100);
continue; // in case of while wrap
return; // in case that u r running some framework with threadloop
else // there is actually something
res = m_Q.front(); // get oldest element of queue
m_Q.pop(); // delete this element from queue
m_M.unlock(); // unlock mutex so producer can add new items to queue
// do you magic with res here
【讨论】:
谢谢,但由于我的知识有限,你提供的例子对我来说并不是一个简单的例子。我知道我的帖子示例中存在的普通 C++ 代码。 这看起来像 Windows 代码 - 不确定这是否是 OP 要求的。 @Mark - 是的,这里是 Win,但概念是一样的(如果没有 Win,你只需要加入线程) @skm - 添加更多描述如何使用 @MarkSetchell 好吧,如果他需要的话,我的版本适用于 Mac/Linux。【参考方案3】:我的问题中提到的问题是Consumer thread
显示的图像不包含完整的数据。 Consumer thread
显示的图像包含几个补丁,这表明它无法获取 Producer thread
生成的完整数据。
ANSWER 其背后的原因是Consumer thread
的while loop
内部声明了Mat image
。一旦第二轮while loop
开始,在while loop
中创建的Mat
实例就会被删除,因此Producer thread
永远无法访问在Consumer thread
中创建的Mat image
的数据。
解决方案:我应该这样做
struct ThreadSafeContainer
queue<Mat> safeContainer;
;
struct Producer
Producer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
void run()
while(true)
// grab image from camera
// store image in container
Mat image(400, 400, CV_8UC3, Scalar(10, 100,180) );
mu.lock();
container->safeContainer.push(Mat);
mu.unlock();
std::shared_ptr<ThreadSafeContainer> container;
;
struct Consumer
Consumer(std::shared_ptr<ThreadSafeContainer> c) : container(c)
~Consumer()
void run()
while(true)
// read next image from container
mu.lock();
if (!container->safeContainer.empty())
Mat image= container->safeContainer.front(); //The front of the queue contain the image
container->safeContainer.pop();
imshow("consumer image", image);
waitKey(33);
mu.unlock();
std::shared_ptr<ThreadSafeContainer> container;
;
【讨论】:
以上是关于用于图像处理的基于生产者-消费者的多线程的主要内容,如果未能解决你的问题,请参考以下文章