启动多个线程并重新启动它们

Posted

技术标签:

【中文标题】启动多个线程并重新启动它们【英文标题】:Launching multiple threads and restarting them 【发布时间】:2016-06-23 19:50:41 【问题描述】:

我正在尝试对创建 x 个工作线程的系统进行编程。这些线程将在不同的时间完成它们的工作。当他们中的任何一个完成他们的工作时,我将检查他们的输出并再次重新启动它们(将运行的线程数保持在 x 左右)。我将在多次随机迭代中执行此操作。因此,基本上一个控制器线程将启动 x 数量的线程,并在它们完成工作时重新启动它们,直到达到一定数量的迭代。

附加说明#1:当我说重新启动时,等到当前的退出/中止并被销毁,然后创建一个新的是完全可以的。它不必“重新启动”同一个线程。我最感兴趣的是以干净的异步方式执行此操作。

注意:我不是在寻找任何特定的代码,而是一些可能的伪代码和使用插槽和信号的设计模式。

我知道 qt 线程并且已经使用它们。我熟悉一些示例,您启动 x 数量的线程并等待所有线程完成使用 yield,然后等待。我正在寻找一种干净的方法来使用信号和插槽来实现我在第一段中描述的内容。

【问题讨论】:

与其重新启动线程,为什么不启动一个新线程呢? AFIAK 线程不能重复使用。 一起重新启动它们还是在它们完成时异步重新启动? 为什么不使用一些任务队列的循环? 不重复使用它们并创建一个新的也没关系。我想我想了解如何以异步方式做到这一点。 @Jarod42 - 您能否详细说明任务队列的含义?它是否像一个带有一些布尔标志的数组/向量,所以我知道它已经被一个线程更新为结果?我无限期地循环这个? 【参考方案1】:

这就是QtConcurrent::run()QThreadPool::start() 的用途。 Concurrent 框架在内部使用了一个线程池,所以它们是完全等价的:前者是后者的便利包装器。默认线程池最好留给短期运行的任务;要运行长任务,请使用您自己的线程池。您可以将其作为第一个参数传递给 QtConcurrent::run()

QThreadPool 维护一个工作项队列,将它们分派到线程,并动态创建和销毁工作线程。这是一门很棒的课程,您不必自己重新实现。

如果您没有太多的工作单元并且可以预先全部提供,只需使用QtConcurrent::run()QThreadPool::start() 提前将它们全部排队。它们可以从辅助对象发出信号,以在它们每个完成时通知您。

如果工作单元太昂贵而无法一次创建,则必须在线程池之上实现通知工作队列。

工作单元需要通知队列及其用户它已完成。这可以做到,例如通过重新实现QRunnable 作为WorkUnit 的基础,将工作转发给抽象方法,并在抽象方法完成时通知队列。同样的方法适用于QtConcurrent::run,除了你实现一个仿函数的operator()(),而不是重新实现QRunnable::run

队列将为每个完成的工作单元发出一个workUnitDone 信号。用户应在收到信号后用一项工作重新填充队列(如果没有更多工作,则不需要)。

为方便起见,队列可以通过发送workUnitDone(nullptr) 来请求一些初始工作项。如果每次前一个项目完成时您只补充一个项目,队列将保持工作项目的初始数量。

如果处理项目需要很短的时间,您应该有比线程数更多的可用时间,这样没有线程会在没有工作的情况下处于空闲状态。对于大部分需要很长时间(几十毫秒或更长时间)的项目,QThread::idealThreadCount 的 1.5-2 倍就足够了。

添加到队列中的工作单元可以是WorkUnit 的实例或函子。

// https://github.com/KubaO/***n/tree/master/questions/notified-workqueue-38000605
#include <QtCore>
#include <type_traits>

class WorkUnit;
class WorkQueue : public QObject 
   Q_OBJECT
   friend class WorkUnit;
   QThreadPool m_poolthis;
   union alignas(64)  // keep it in its own cache line
      QAtomicInt queuedUnits0;
      char filler[64];
    d;
   void isDone(WorkUnit * unit) 
      auto queued = d.queuedUnits.deref();
      emit workUnitDone(unit);
      if (!queued) emit finished();
   
public:
   explicit WorkQueue(int initialUnits = 0) 
      if (initialUnits)
         QTimer::singleShot(0, [=]
            for (int i = 0; i < initialUnits; ++i)
               emit workUnitDone(nullptr);
         );
   
   Q_SLOT void addWork(WorkUnit * unit);
   template <typename F> void addFunctor(F && functor);
   Q_SIGNAL void workUnitDone(WorkUnit *);
   Q_SIGNAL void finished();
;

class WorkUnit : public QRunnable 
   friend class WorkQueue;
   WorkQueue * m_queue  nullptr ;
   void run() override 
      work();
      m_queue->isDone(this);
   
protected:
   virtual void work() = 0;
;

template <typename F>
class FunctorUnit : public WorkUnit, private F 
   void work() override  (*this)(); 
public:
   FunctorUnit(F && f) : F(std::move(f)) 
;

void WorkQueue::addWork(WorkUnit *unit) 
   d.queuedUnits.ref();
   unit->m_queue = this;
   m_pool.start(unit);


template <typename F> void WorkQueue::addFunctor(F && functor) 
   addWork(new FunctorUnit<typename std::decay<F>::type>std::forward<F>(functor));

为了演示,让我们在 1us 和 1s 之间的随机时间内做 50 个单位的睡眠“工作”。我们将一半的单位作为 SleepyWork 实例传递,另一半作为 lambdas 传递。

#include <random>

struct SleepyWork : WorkUnit 
   int usecs;
   SleepyWork(int usecs) : usecs(usecs) 
   void work() override 
      QThread::usleep(usecs);
      qDebug() << "slept" << usecs;
   
;

int main(int argc, char ** argv) 
   QCoreApplication appargc, argv;
   std::random_device dev;
   std::default_random_engine engdev();
   std::uniform_int_distribution<int> dist1, 1000000;
   auto rand_usecs = [&] return dist(eng); ;

   int workUnits = 50;
   WorkQueue queue2*QThread::idealThreadCount();
   QObject::connect(&queue, &WorkQueue::workUnitDone, [&]
      if (workUnits) 
         if (workUnits % 2) 
            auto us = dist(eng);
            queue.addFunctor([us]
               QThread::usleep(us);
               qDebug() << "slept" << us;
            );
          else
            queue.addWork(new SleepyWorkrand_usecs());
         --workUnits;
      
   );
   QObject::connect(&queue, &WorkQueue::finished, [&]
      if (workUnits == 0) app.quit();
   );

   return app.exec();


#include "main.moc"

示例到此结束。

【讨论】:

这看起来很干净,而且要走的路。我将执行此操作并通知进度。 队列(WorkQueue)初始化时不存在竞态条件,执行workUnitDone的connect。在connect语句之后初始化不是更好吗?在这种情况下,使用初始化逻辑向 WorkQueue 添加一个 init 方法。 @dev_nut 不可能有竞争,因为在控制返回事件循环之前,工作队列不执行任何代码。队列的构造函数并没有做太多的事情,只是安排了从事件循环中完成的信号发射。将工作推迟到事件循环是一种常见的模式,它的存在正是为了排除任何竞争。这就是信号和槽在这种情况下可用的原因。【参考方案2】:

每个线程都有以下内容:

布尔退出 在启动线程之前,将其设置为 false 主线程应该只将它设置为真。命令设置标志为真,加入线程 工作线程只能将其设置为 false。订单正在退出循环,进行任何后处理,将其设置为 false,然后返回。 信号量(不需要计数器,只需发布​​/停止,就像在 pthreads 中一样) 主线程会发帖 工作线程将在while( !exit ) 中运行,循环中的第一个命令是等待信号量。 布尔线程工作 在启动线程之前,将其设置为 false 主线程应该只将它设置为真。顺序是:准备好数据,将标志设置为 true,发布信号量。 工作线程只能将其设置为 false。或者是:准备好答案,通知主线程,设置标志为false。

这样你可以重复使用线程。

【讨论】:

我认为我的问题不够清楚。我添加了一些额外的注释。

以上是关于启动多个线程并重新启动它们的主要内容,如果未能解决你的问题,请参考以下文章

重新启动默认 cosos2d 项目导致“线程 1:信号 SIGBRT 1”错误

Docker - 如果另一个重新启动,则重新启动特定容器

如何让主管重新启动挂起的工人?

我在 django 中的静态文件不会被重新加载,除非我每次重新启动电脑时都重命名它们

Supervisor使用

尝试重新启动线程时发生 ThreadStateException