Qt 多线程 QThreads 保持 TCP 连接并被重用

Posted

技术标签:

【中文标题】Qt 多线程 QThreads 保持 TCP 连接并被重用【英文标题】:Qt multithreading QThreads that keep a TCP connection and are reused 【发布时间】:2015-04-30 19:03:42 【问题描述】:

我真的不确定如何解决这个问题,所以我先解释一下。 我需要运行多个线程,每个线程都通过 TCPSocket 连接到某个应用程序,到目前为止没有问题。该应用程序非常耗时,这就是为什么我希望它在多个线程上并行运行,并且每个线程都与之通信。计算完成后,我想将结果发送到另一个收集结果的线程。 为此我写了一个 Worker 类:

class Worker : public QObject 
    Q_OBJECT

public:
    Worker();
    Worker(int port);
    ~Worker();
    QTcpSocket* sock;
    void insert();

public slots:
    void connect();
    void process(const int &id, const QString &param, const int &arity);

signals:
    void ready();
    void finished(const int &id, const int &consistent, const QString &result);
    void error(QString err);
;

现在 superThread 应该处理一个巨大的文件,并且需要将它分散到线程中,然后接收并处理结果。到目前为止,我的方法是在 main() 中连接的另一个 superThread,如下所示:

QThread* superThread = new QThread();
supWorker* super = new supWorker();
for (int i = 0; i < nrWorkers; i++)
    Worker* worker = new Worker(portRange+i);
    QThread* workerThread = new QThread();
    QThread::connect(workerThread, SIGNAL(started()), worker, SLOT(connect()));
    worker->moveToThread(workerThread);
    workerThread->start();
    QThread::connect(super, SIGNAL(process(int, QString, int)), worker, SLOT(process(int,QString,int)));
    QThread::connect(worker, SIGNAL(finished(int, int, QString)), super, SLOT(handleResult(int, int, QString)));

这种方式的问题显然是我只能将信号发送到所有连接的线程。我想要 superThread 做的是只向其中一个线程发送参数。我不知道如何处理连接,以便只有一个工作线程接收它?

非常感谢任何帮助或架构想法,在此先感谢。

【问题讨论】:

对不起,但我认为这对 SO 来说是一种广泛的方式。 抱歉,编辑希望现在更具体一点。 【参考方案1】:

不确定我的想法是否 100% 正确,但为什么不将工作线程数组传递给超级线程,保留一个表示当前活动线程索引的索引,仅将信号连接到那个,调度需要时发出信号,等待完成,断开信号,推进索引并重复?如果将序列化信号分派给线程是您真正想要的,这可能会起作用。

编辑

好吧,我真的拼命制作了一个基于 Qt 的示例,该示例实现了所需的工作流程并将其放在 github 上。

#pragma once

#include <QThread>
#include <QApplication>
#include <QMetaType>
#include <QTimer>

#include <vector>
#include <memory>
#include <cstdio>
#include <algorithm>

struct Work

    int m_work;
;

struct Result

    int m_result;
    int m_workerIndex;
;

Q_DECLARE_METATYPE(Work);
Q_DECLARE_METATYPE(Result);

class Worker : public QThread

    Q_OBJECT

public:
    Worker(int workerIndex) : m_workerIndex(workerIndex)
    
        moveToThread(this);
        connect(this, SIGNAL(WorkReceived(Work)), SLOT(DoWork(Work)));
        printf("[%d]: worker %d initialized\n", reinterpret_cast<int>(currentThreadId()), workerIndex);
    

    void DispatchWork(Work work)
    
        emit WorkReceived(work);
    

public slots:
    void DoWork(Work work)
    
        printf("[%d]: worker %d received work %d\n", reinterpret_cast<int>(currentThreadId()), m_workerIndex, work.m_work);
        msleep(100);
        Result result =  work.m_work * 2, m_workerIndex ;
        emit WorkDone(result);
    

signals:
    void WorkReceived(Work work);
    void WorkDone(Result result);

private:
    int m_workerIndex;
;

class Master : public QObject

    Q_OBJECT

public:
    Master(int workerCount) : m_activeWorker(0), m_workerCount(workerCount)
    
        printf("[%d]: creating master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
    
    ~Master()
    
        std::for_each(m_workers.begin(), m_workers.end(), [](std::unique_ptr<Worker>& worker)
        
            worker->quit();
            worker->wait();
        );
    


public slots:
    void Initialize()
    
        printf("[%d]: initializing master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
        for (int workerIndex = 0; workerIndex < m_workerCount; ++workerIndex)
        
            auto worker = new Worker(workerIndex);
            m_workers.push_back(std::move(std::unique_ptr<Worker>(worker)));
            connect(worker, SIGNAL(WorkDone(Result)), SLOT(WorkDone(Result)));
            worker->start();
        
        m_timer = new QTimer();
        m_timer->setInterval(500);
        connect(m_timer, SIGNAL(timeout()), SLOT(GenerateWork()));
        m_timer->start();
    
    void GenerateWork()
    
        Work work =  m_activeWorker ;
        printf("[%d]: dispatching work %d to worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), work.m_work, m_activeWorker);
        m_workers[m_activeWorker]->DispatchWork(work);
        m_activeWorker = ++m_activeWorker % m_workers.size();
    
    void WorkDone(Result result)
    
        printf("[%d]: received result %d from worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), result.m_result, result.m_workerIndex);
    
    void Terminate()
    
        m_timer->stop();
        delete m_timer;
    

private:
    int m_workerCount;
    std::vector<std::unique_ptr<Worker>> m_workers;
    int m_activeWorker;
    QTimer* m_timer;
;

QtThreadExample.cpp:

#include "QtThreadExample.hpp"
#include <QTimer>

int main(int argc, char** argv)

    qRegisterMetaType<Work>("Work");
    qRegisterMetaType<Result>("Result");
    QApplication application(argc, argv);
    QThread masterThread;
    Master master(5);
    master.moveToThread(&masterThread);
    master.connect(&masterThread, SIGNAL(started()), SLOT(Initialize()));
    master.connect(&masterThread, SIGNAL(terminated()), SLOT(Terminate()));
    masterThread.start();
    // Set a timer to terminate the program after 10 seconds
    QTimer::singleShot(10 * 1000, &application, SLOT(quit()));
    application.exec();
    masterThread.quit();
    masterThread.wait();
    printf("[%d]: master thread has finished\n", reinterpret_cast<int>(QThread::currentThreadId()));
    return 0;

一般来说,解决方案实际上是不从主线程本身发出信号,而是从工作线程发出信号 - 这样您就可以为每个线程获得一个唯一的信号,并且可以在事件循环中异步处理工作,并且然后发出一个线程完成的信号。该示例可以并且应该根据您的需要进行重构,但总的来说,它使用索引和信号线程的想法在 Qt 中演示了生产者/消费者模式。我正在使用通用计时器在主线程(Master::m_timer)中生成工作 - 我猜在你的情况下,你将使用来自套接字、文件或其他东西的信号来生成工作。然后我在一个活动的工作线程上调用一个方法,该方法向工作线程的事件循环发出一个信号以开始执行工作,然后发出一个关于完成的信号。这是一般描述,请查看示例,尝试一下,如果您有任何后续问题,请告诉我。

如果您使用 Qt 对象,我想这会相当不错,但在传统意义上的消费者/生产者模式中,信号/插槽的东西实际上让生活变得更加艰难,一个标准的 C++11 管道,带有 @987654328 @ 和一个调用 condition_variable::notify_one() 的主线程和简单地等待条件变量的工作线程会更容易,但是 Qt 对所有 I/O 东西都有很好的包装器。因此,请尝试一下并做出决定。

下面是示例的示例输出,我猜线程日志表明达到了所需的效果:

还有一点需要注意,因为QApplication 本身运行一个事件循环,如果你没有 GUI,你实际上可以让你的所有 I/O 对象和主类都存在于主线程中并从那里发出信号,从而消除需要一个单独的主线程。当然,如果你有一个 GUI,你不会想用这些东西给它增加负担。

【讨论】:

感谢您的回复,并对这个令人困惑的问题感到抱歉,我编辑了我的帖子,希望我的问题现在更清楚一点。因为我不知道如何正确地将信号分配给特定的且只有一个工作线程。 @yonobi,正如我所提到的,其中一种可能性是仅连接您想要实际接收信号的那个/那些线程。然后根据需要执行断开/重新连接。 @yonobi,这里 (forum.qt.io/topic/12999/send-signals-to-a-specified-receiver/2) 是一个关于此的 qt 论坛主题,正如您所说的那样,您想要的是一种反对信号槽成语的方式。如果你想调用显式线程,为什么不简单地创建一个可以直接调用的方法,例如从等待中唤醒线程并做一些工作?你需要一个跨平台的解决方案吗? 以为你把我的问题弄错了,抱歉。起初我以为只能有一个线程以这种方式同时运行。但这只是我必须断开并重新连接到其中一个线程的处理信号,并让负责返回的那个线程在所有线程中都处于活动状态,对吗?再次感谢 @yonobi 是的,你是对的,所有线程都在运行,但只有一个线程在给定时刻接收信号:) 那么我的建议有帮助吗?不过,对我来说,这似乎是一个奇怪的场景,但话又说回来,每条管道都有它的用途。

以上是关于Qt 多线程 QThreads 保持 TCP 连接并被重用的主要内容,如果未能解决你的问题,请参考以下文章

使用 QThreads 未正确使用 CPU 内核

C++/Qt - QThread 与 QRunnable

QT 信号槽 异步事件驱动 单线程 多并发

Qt 保持GUI响应的几种方法

确认:我可以在保持 TCP 套接字打开的同时调试应用程序吗?

如何在不关闭 TCP 连接的情况下关闭处理 TCP 请求的线程?