Qt 多线程 QThreads 保持 TCP 连接并被重用
Posted
技术标签:
【中文标题】Qt 多线程 QThreads 保持 TCP 连接并被重用【英文标题】:Qt multithreading QThreads that keep a TCP connection and are reused 【发布时间】:2015-04-30 19:03:42 【问题描述】:我真的不确定如何解决这个问题,所以我先解释一下。 我需要运行多个线程,每个线程都通过 TCPSocket 连接到某个应用程序,到目前为止没有问题。该应用程序非常耗时,这就是为什么我希望它在多个线程上并行运行,并且每个线程都与之通信。计算完成后,我想将结果发送到另一个收集结果的线程。 为此我写了一个 Worker 类:
class Worker : public QObject
Q_OBJECT
public:
Worker();
Worker(int port);
~Worker();
QTcpSocket* sock;
void insert();
public slots:
void connect();
void process(const int &id, const QString ¶m, const int &arity);
signals:
void ready();
void finished(const int &id, const int &consistent, const QString &result);
void error(QString err);
;
现在 superThread 应该处理一个巨大的文件,并且需要将它分散到线程中,然后接收并处理结果。到目前为止,我的方法是在 main() 中连接的另一个 superThread,如下所示:
QThread* superThread = new QThread();
supWorker* super = new supWorker();
for (int i = 0; i < nrWorkers; i++)
Worker* worker = new Worker(portRange+i);
QThread* workerThread = new QThread();
QThread::connect(workerThread, SIGNAL(started()), worker, SLOT(connect()));
worker->moveToThread(workerThread);
workerThread->start();
QThread::connect(super, SIGNAL(process(int, QString, int)), worker, SLOT(process(int,QString,int)));
QThread::connect(worker, SIGNAL(finished(int, int, QString)), super, SLOT(handleResult(int, int, QString)));
这种方式的问题显然是我只能将信号发送到所有连接的线程。我想要 superThread 做的是只向其中一个线程发送参数。我不知道如何处理连接,以便只有一个工作线程接收它?
非常感谢任何帮助或架构想法,在此先感谢。
【问题讨论】:
对不起,但我认为这对 SO 来说是一种广泛的方式。 抱歉,编辑希望现在更具体一点。 【参考方案1】:不确定我的想法是否 100% 正确,但为什么不将工作线程数组传递给超级线程,保留一个表示当前活动线程索引的索引,仅将信号连接到那个,调度需要时发出信号,等待完成,断开信号,推进索引并重复?如果将序列化信号分派给线程是您真正想要的,这可能会起作用。
编辑
好吧,我真的拼命制作了一个基于 Qt 的示例,该示例实现了所需的工作流程并将其放在 github 上。
#pragma once
#include <QThread>
#include <QApplication>
#include <QMetaType>
#include <QTimer>
#include <vector>
#include <memory>
#include <cstdio>
#include <algorithm>
struct Work
int m_work;
;
struct Result
int m_result;
int m_workerIndex;
;
Q_DECLARE_METATYPE(Work);
Q_DECLARE_METATYPE(Result);
class Worker : public QThread
Q_OBJECT
public:
Worker(int workerIndex) : m_workerIndex(workerIndex)
moveToThread(this);
connect(this, SIGNAL(WorkReceived(Work)), SLOT(DoWork(Work)));
printf("[%d]: worker %d initialized\n", reinterpret_cast<int>(currentThreadId()), workerIndex);
void DispatchWork(Work work)
emit WorkReceived(work);
public slots:
void DoWork(Work work)
printf("[%d]: worker %d received work %d\n", reinterpret_cast<int>(currentThreadId()), m_workerIndex, work.m_work);
msleep(100);
Result result = work.m_work * 2, m_workerIndex ;
emit WorkDone(result);
signals:
void WorkReceived(Work work);
void WorkDone(Result result);
private:
int m_workerIndex;
;
class Master : public QObject
Q_OBJECT
public:
Master(int workerCount) : m_activeWorker(0), m_workerCount(workerCount)
printf("[%d]: creating master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
~Master()
std::for_each(m_workers.begin(), m_workers.end(), [](std::unique_ptr<Worker>& worker)
worker->quit();
worker->wait();
);
public slots:
void Initialize()
printf("[%d]: initializing master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
for (int workerIndex = 0; workerIndex < m_workerCount; ++workerIndex)
auto worker = new Worker(workerIndex);
m_workers.push_back(std::move(std::unique_ptr<Worker>(worker)));
connect(worker, SIGNAL(WorkDone(Result)), SLOT(WorkDone(Result)));
worker->start();
m_timer = new QTimer();
m_timer->setInterval(500);
connect(m_timer, SIGNAL(timeout()), SLOT(GenerateWork()));
m_timer->start();
void GenerateWork()
Work work = m_activeWorker ;
printf("[%d]: dispatching work %d to worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), work.m_work, m_activeWorker);
m_workers[m_activeWorker]->DispatchWork(work);
m_activeWorker = ++m_activeWorker % m_workers.size();
void WorkDone(Result result)
printf("[%d]: received result %d from worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), result.m_result, result.m_workerIndex);
void Terminate()
m_timer->stop();
delete m_timer;
private:
int m_workerCount;
std::vector<std::unique_ptr<Worker>> m_workers;
int m_activeWorker;
QTimer* m_timer;
;
QtThreadExample.cpp:
#include "QtThreadExample.hpp"
#include <QTimer>
int main(int argc, char** argv)
qRegisterMetaType<Work>("Work");
qRegisterMetaType<Result>("Result");
QApplication application(argc, argv);
QThread masterThread;
Master master(5);
master.moveToThread(&masterThread);
master.connect(&masterThread, SIGNAL(started()), SLOT(Initialize()));
master.connect(&masterThread, SIGNAL(terminated()), SLOT(Terminate()));
masterThread.start();
// Set a timer to terminate the program after 10 seconds
QTimer::singleShot(10 * 1000, &application, SLOT(quit()));
application.exec();
masterThread.quit();
masterThread.wait();
printf("[%d]: master thread has finished\n", reinterpret_cast<int>(QThread::currentThreadId()));
return 0;
一般来说,解决方案实际上是不从主线程本身发出信号,而是从工作线程发出信号 - 这样您就可以为每个线程获得一个唯一的信号,并且可以在事件循环中异步处理工作,并且然后发出一个线程完成的信号。该示例可以并且应该根据您的需要进行重构,但总的来说,它使用索引和信号线程的想法在 Qt 中演示了生产者/消费者模式。我正在使用通用计时器在主线程(Master::m_timer
)中生成工作 - 我猜在你的情况下,你将使用来自套接字、文件或其他东西的信号来生成工作。然后我在一个活动的工作线程上调用一个方法,该方法向工作线程的事件循环发出一个信号以开始执行工作,然后发出一个关于完成的信号。这是一般描述,请查看示例,尝试一下,如果您有任何后续问题,请告诉我。
如果您使用 Qt 对象,我想这会相当不错,但在传统意义上的消费者/生产者模式中,信号/插槽的东西实际上让生活变得更加艰难,一个标准的 C++11 管道,带有 @987654328 @ 和一个调用 condition_variable::notify_one() 的主线程和简单地等待条件变量的工作线程会更容易,但是 Qt 对所有 I/O 东西都有很好的包装器。因此,请尝试一下并做出决定。
下面是示例的示例输出,我猜线程日志表明达到了所需的效果:
还有一点需要注意,因为QApplication
本身运行一个事件循环,如果你没有 GUI,你实际上可以让你的所有 I/O 对象和主类都存在于主线程中并从那里发出信号,从而消除需要一个单独的主线程。当然,如果你有一个 GUI,你不会想用这些东西给它增加负担。
【讨论】:
感谢您的回复,并对这个令人困惑的问题感到抱歉,我编辑了我的帖子,希望我的问题现在更清楚一点。因为我不知道如何正确地将信号分配给特定的且只有一个工作线程。 @yonobi,正如我所提到的,其中一种可能性是仅连接您想要实际接收信号的那个/那些线程。然后根据需要执行断开/重新连接。 @yonobi,这里 (forum.qt.io/topic/12999/send-signals-to-a-specified-receiver/2) 是一个关于此的 qt 论坛主题,正如您所说的那样,您想要的是一种反对信号槽成语的方式。如果你想调用显式线程,为什么不简单地创建一个可以直接调用的方法,例如从等待中唤醒线程并做一些工作?你需要一个跨平台的解决方案吗? 以为你把我的问题弄错了,抱歉。起初我以为只能有一个线程以这种方式同时运行。但这只是我必须断开并重新连接到其中一个线程的处理信号,并让负责返回的那个线程在所有线程中都处于活动状态,对吗?再次感谢 @yonobi 是的,你是对的,所有线程都在运行,但只有一个线程在给定时刻接收信号:) 那么我的建议有帮助吗?不过,对我来说,这似乎是一个奇怪的场景,但话又说回来,每条管道都有它的用途。以上是关于Qt 多线程 QThreads 保持 TCP 连接并被重用的主要内容,如果未能解决你的问题,请参考以下文章