多线程多队列..如何管理它?

Posted

技术标签:

【中文标题】多线程多队列..如何管理它?【英文标题】:Multithreads multiqueue..how can manage it? 【发布时间】:2013-05-31 07:28:38 【问题描述】:

我必须创建一个程序,该程序使用线程通过 OpenCV 精心制作 10 张图像。为此,我认为将工作分解为 3 个线程,并使用 4 个队列来包含初始 10 个图像,以及精化期间的中间图像。

这是我必须做的图表:

现在,我认为为此我可以使用 std::queue 来管理队列,因此将队列对象传递给每个线程。 问题是:

1) 我必须使用 push、pop 方法线程安全地创建一个类“队列”,对吗?使用 mutex 变量重新定义多线程和同步...

2) Evey 线程在 2 个队列上工作:从一个队列 POP 和 PUSH 到另一个队列。所以每个线程都是消费者/生产者......如何将输入和输出中队列类的两个对象指针传递给每个线程? (有两个对象,所以有两个指针)?

使用我的代码,第二个线程看到输入队列总是空的。有人可以帮助我吗?给我一个关于这个问题的想法?

这是我的代码,不适用于两个线程(第二个线程看到输入队列总是空的:

 #include <opencv\cv.h>
 #include <opencv\highgui.h>
 #include <stdio.h>
 #include <windows.h>
 #include <process.h>
 #include <ctime>
 #include <queue>

 using namespace std;
 using namespace cv;


/* thread safe queue*/

template<typename T>
class coda_concorr

private:
    std::queue<T> la_coda;
    HANDLE mutex;
    public:
    bool complete;
    coda_concorr()
    
        mutex = CreateMutex(NULL,FALSE,NULL);
        complete = false;
    
    void push(T& data)
    
        WaitForSingleObject(mutex,INFINITE);
        la_coda.push(data);
        ReleaseMutex(mutex);
    
    bool vuota() const
    
        bool RetCode;
        WaitForSingleObject(mutex,INFINITE);
        RetCode= la_coda.empty();
        ReleaseMutex(mutex);
        return RetCode;
    

    bool try_pop(T& popped)
    
        WaitForSingleObject(mutex,INFINITE);
        if (la_coda.empty())
        
            ReleaseMutex(mutex);
            return false;
        
        popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
        return true;
    
;

//packet passing to threads
struct Args

    coda_concorr<cv::Mat> in;
    coda_concorr<cv::Mat> out;
;

//grey decrease funct
void grey (void *param)
    Mat temp1,temp2;
    Args* arg = (Args*)param;
    if(!arg->in.vuota())
    while(arg->in.try_pop(temp1))
    cvtColor(temp1,temp2,CV_BGR2GRAY);
    arg->out.push(temp2);
        
    arg->out.complete=true;
    
    else
        Sleep(100);
    
    _endthread();

//threshold funct
void soglia(void *param)
    Mat temp1a,temp2a;
    Args* arg = (Args*)param;
    if(arg->in.vuota())
    while(arg->in.vuota())
            cout<<endl<<"Coda vuota"<<endl;
            Sleep(100);
        
    
    else
        while(arg->in.try_pop(temp1a))
        threshold(temp1a,temp2a,128,255,THRESH_BINARY);
        arg->out.push(temp2a);
            
     
     arg->out.complete=true;
    _endthread();

int main()

    coda_concorr<cv::Mat> ingresso;
    coda_concorr<cv::Mat> coda1;
    coda_concorr<cv::Mat> coda2;
    coda_concorr<cv::Mat> uscita;


    //in array
    Mat inn[10];
    Mat out;

    //assing images
    inn[0]=imread("C:/OPENCV/Test/imgtest/bird1.jpg",1);
    inn[1]=imread("C:/OPENCV/Test/imgtest/bird2.jpg",1);
    inn[2]=imread("C:/OPENCV/Test/imgtest/bird3.jpg",1);
    inn[3]=imread("C:/OPENCV/Test/imgtest/pig1.jpg",1);
    inn[4]=imread("C:/OPENCV/Test/imgtest/pig2.jpg",1);
    inn[5]=imread("C:/OPENCV/Test/imgtest/pig3.jpg",1);
    inn[6]=imread("C:/OPENCV/Test/imgtest/spider1.jpg",1);
    inn[7]=imread("C:/OPENCV/Test/imgtest/spider2.jpg",1);
    inn[8]=imread("C:/OPENCV/Test/imgtest/spider3.jpg",1);
    inn[9]=imread("C:/OPENCV/Test/imgtest/Nutella.jpg",1);



    Args dati,dati2;
    //populating queue 
    for(int i=0;i<=9;i++)
        dati.in.push(inn[i]);
    
//assing second queue   
dati.out=coda1;

    HANDLE handle1,handle2;

    handle1 = (HANDLE) _beginthread(grey,0,&dati);

    //share part that don't WORK
    dati2.in=coda1;
    dati2.out=coda2;

    handle2 = (HANDLE) _beginthread(soglia,0,&dati2);



    WaitForSingleObject(handle2,INFINITE);
    WaitForSingleObject(handle1,INFINITE);



    //output
    while (dati2.out.try_pop(out))
        imshow("immagine",out);
        waitKey(100);
    

    system("PAUSE");
    return 0;


提前感谢您的宝贵时间。

【问题讨论】:

您使用哪个线程库?你是什​​么操作系统? 如果你打算使用互斥体/条件变量来保护访问,你不需要使用你自己的队列来代替 std::queue。另一个避免队列需求的选项是使用类似ZeroMQ 的消息传递概念 ... 或切换到同时为 opencv (Scala + Java OpenCV wrapper) 提供参与者模型和包装器的语言。学习曲线将(非常?)陡峭,但预计在很长一段时间内会很有趣。 感谢大家...我正在使用 Visual C++ 2010 和 OpenCV。 【参考方案1】:

对于第 1 点,您确实需要一些同步对象,或者使用消息传递库,如 cmets 中所建议的那样。

对于第 2 点,您的多线程库可能会为您提供将参数传递给线程函数的能力。

声明一个包含对两个队列的引用的小类,一个in队列,一个out队列。 在您的 main 中,为每个线程实例化您需要的所有队列和一个小类的实例,并为每个线程传递 inout 队列. 启动线程,将小类的实例作为回调函数参数传递。您现在可以从每个线程访问 inout 队列。

编辑(因为你编辑了你的问题)

bool vuota() const

    WaitForSingleObject(mutex,INFINITE);
    return la_coda.empty();
    ReleaseMutex(mutex);

错了。

bool vuota() const

    WaitForSingleObject(mutex,INFINITE);
    bool tmp = la_coda.empty();
    ReleaseMutex(mutex);
    return tmp;

bool try_pop(T&amp; popped) 也是如此。 在离开函数之前检查您是否释放了互斥锁。

【讨论】:

谢谢...那么,制作我自己的类队列的解决方案是正确的(我的意思是你所谓的小类队列)..在小类中我只需要声明对2的引用像“公共”变量一样的队列,以及这两个队列的构造函数? 很确定这就是 M.R. 的建议,是的。我可能也会走这条路。您以后发现需要与线程通信的任何其他内容也可以塞入其中,并且它使某些“监视”活动可以访问队列,这些活动偶尔会轮询队列计数并显示它们以进行调试/优化工作。 是的,队列可以是public 变量(引用一些queueobject),也可以是带有访问器的private 变量。此外,in queue 可以声明为 const 以防止处理线程对输入队列进行不必要的修改。 好的,我会试试这个解决方案...如果我还有其他问题,请在此处发布我的问题!非常感谢大家!! 我无法竞争...我有更新问题要向您展示我的代码...请帮助我...【参考方案2】:

如果线程 2 等待线程 1 工作,而线程 3 等待线程 2,那么线程没有意义。一个线程运行所有三个函数是最好的方法。

现在,您假设线程 2 正在处理第一个图像,而 T1 可以处理第二个图像。但是……

在这种情况下

线程 2 和 3 必须等待第一张图片 如果任何线程总是比其他线程快,它将停止等待工作。

这里正确的做法是只使用两个队列(In/Out),并有多个线程来分析不同的图像。因此,T1, T2, .. Tn 将从 IN 中获取图像,对其进行完全处理,然后将结果存储在 OUT 中。

甚至可以摆脱结果队列,直接将它们发送到某个地方(文件、显示器等)。只需确保同步目标对象即可。

【讨论】:

这是一个有趣的想法..在您看来,有更多线程,但每个线程都能够完成所有详细工作,最好有更多线程相互传递图像以逐步详细说明?在执行时间方面,两种想法的差异,对你来说是什么?感谢您的其他观点... 这种方法更好地利用了处理器时间(没有线程在等待其他线程处理,只是为了一些输入),并且总体上更有意义。此外,它更简单(必须处理 5 个同步队列有点……棘手)。您可以在输入队列上使用信号量来指示新的(或更多)输入图像。 我已经更新了问题以向您展示我的代码和问题...如果可以请帮助我

以上是关于多线程多队列..如何管理它?的主要内容,如果未能解决你的问题,请参考以下文章

iOS 开发--多线程

多线程通用处理队列类

第二天-多线程

IOS_多线程

Java多线程-工具篇-BlockingQueue

java多线程 --ConcurrentLinkedQueue 非阻塞 线程安全队列