C++中的线程池/队列系统

Posted

技术标签:

【中文标题】C++中的线程池/队列系统【英文标题】:Threadpool / Queueing system in C++ 【发布时间】:2013-08-31 21:16:21 【问题描述】:

我有一种情况,我需要做一些繁重的计算。我发现细分我的数据,然后将其重新合并在一起是最快的(随着大小的增加,时间增加得更快,所以拆分是合乎逻辑的)。

它应该能够为应用程序提供数据大小,例如一百万个双精度值。

我现在所做的是将基于此大小创建的数据发送到某个函数,在计算后将其返回,然后循环返回以将此数据卸载到主向量中。

我想发送 200 个部分,其中一个“最后”部分。例如,给定 size = 1000005 最初会执行此函数 5000 次,然后是最后一个大小为 5 的数据。

int size = 1000000;
int times = size / 200; // 5000
int leftover = size % 200; // 0, this not performed

QVector<double> x(size);
QVector<double> y(size);

x = createData(size);
x = createData(size);

for (int i = 0; i < times; i++)

    holder = createData(200);
    QVector<double> tempx = x.mid(i*200, 200);
    QVector<double> tempy = y.mid(i*200, 200);
    holder = myfunction(tempx, tempy, 200);  // let it now just return `tempy`
    for (int j = 0; j < 200; j++)
    
        y[i*200 + j] = holder[j];
    

// leftover function here, really similar to this part before.

// plotting function here

最后,x 将保持初始化状态,y 将进行计算。

由于这些代码部分可以彼此分开运行并且速度至关重要,因此我想使用多个内核。

以下进一步描述了这种情况:

这些函数调用是相互独立的,只有当向量完成时,我才想绘制结果。 每次通话的完成时间会有很大差异。 times 的数量应该是可变的。

我读过一些关于建议最大线程数是核心数量(至少作为起点)的信息,因为使用太多线程可能会减慢进程。考虑到这种情况,排队系统/线程池似乎是有意义的,因为当一个线程有一些简单的工作而其他线程通过更难的工作减慢一切时,不会浪费时间。

虽然在几十个教程中使用一些(通常是 2 个)线程打印一些消息似乎很容易,但任何人都可以提供更详细的帮助,说明如何返回向量并将这些线程安全地卸载到主函数中,以及如何创建一个线程池,所以时间不会被浪费?

使用 Ubuntu 13.04、Qt 和 C++11x,虽然这不重要。

【问题讨论】:

为什么不用intel线程积木,它有“parallel_for”和优秀的线程池(也可以让你微调池调整),而不是手动拆分计算?使用 intel tbb,您可以指定 main 中的线程数,库将尝试调整池以便为每个线程提供相同的工作负载 @ViniciusMiranda 我不确定这是什么?是否有任何文档或您可以尝试给出答案? 【参考方案1】:

首先,写一个胎面池是很难的。如果你真的想学习如何编写,Antony Williams 所著的 C++ Concurrency in Action 一书会教你如何实现。

但是,您的情况似乎是一个简单的 parallel_for 非常适合的情况。所以我建议使用 Intel Threading Building Blocks library 。该库的优势在于它有一个非常好的线程池,并且与 C++11 特性配合得非常好。

示例代码:

#include "tbb/task_scheduler_init.h"
#include "tbb/blocked_range.h"
#include "tbb/parallel_for.h"
#include "tbb/tbb_thread.h"
#include <vector>

int main() 
  tbb::task_scheduler_init init(tbb::tbb_thread::hardware_concurrency());
  std::vector<double> a(1000);
  std::vector<double> c(1000);
  std::vector<double> b(1000);

  std::fill(b.begin(), b.end(), 1);
  std::fill(c.begin(), c.end(), 1);

  auto f = [&](const tbb::blocked_range<size_t>& r) 
    for(size_t j=r.begin(); j!=r.end(); ++j) a[j] = b[j] + c[j];    
  ;
  size_t hint_number_iterations_per_thread = 100;
  tbb::parallel_for(tbb::blocked_range<size_t>(0, 1000, hint_number_iterations_per_thread), f);
  return 0;

完成!英特尔 TBB 有一个非常好的线程池,它会尝试调整每个线程的工作负载。只要hint_number_iterations_per_thread不是一个疯狂的数字,就会非常接近最优解

顺便说一句:intel TBB 是一个开源库,可以与大多数编译器一起使用!

【讨论】:

@Dualinity 现在我包含了一个完整的示例。更容易适应您的代码 非常感谢。这似乎很有趣。我还无法安装 tbb(尽管我确实“制作”了源代码),Qt 还没有识别它。此外,它看起来确实像一些复杂的代码。你能补充一下“blocked_range”和“硬件并发”是什么吗? 我使用简单的sudo apt-get install libtbb-dev安装了它 请告诉我,这与 Qt 有什么关系?您应该提供使用 Qt API 的答案。 @KubaOber - 首先,英特尔 TBB 可以与 QT 一起使用。这是一个有趣的选择,特别是考虑到 TBB 线程池是多么复杂/优化和简单(为什么 QT 人不能使用外部库?)。其次,更重要的是,他提到他使用 QT,但他通常询问 C++ 线程池。他的示例非常适合需要parallel_for 的C++ 情况。第三,是的,有 qthreapool,但在其他答案中远不清楚如何编写带有自动池的 parallel_for,这是他真正需要的。【参考方案2】:

你不需要创建任何东西。如果你使用Qt,你的问题已经解决了。您可以从QRunnable 派生一个类,然后将其传递给QThreadPool 以执行。

您可以指示QThreadPool 应该同时运行多少线程(任何额外的线程都只是在队列中等待直到插槽打开)但这不应该是必需的,因为QThreadPool 根据您的架构设置限制,这些限制通常是足够好。

QThreadPool

QRunnable

【讨论】:

我明白了,我认为这真的可能是要走的路。您知道如何在这种情况下实现它吗?【参考方案3】:

甚至比创建 QThreadPool 和扩展 QRunabble 更简单,您可以使用 QtConcurrent 库。具体使用 QtConcurrent::mapped 函数,它接受一个开始迭代器和一个结束迭代器,以及一个函数(可以是 lambda),并在内部为您处理线程池的创建和执行。

有两种变体:“mapped”向结果返回一个QFuture但不阻塞当前线程,而“blockingMapped”直接返回一个结果列表。

要对一个大的整数向量求平方,您可以执行以下操作:

std::vector<int> myInts = ....

QVector<int> result = QtConcurrent::blockingMapped(myInts.begin(), myInts.end(), [](int x)  return x*x; );

【讨论】:

以上是关于C++中的线程池/队列系统的主要内容,如果未能解决你的问题,请参考以下文章

C++线程池

线程池工作原理

c++缓冲池循环队列实现

C++线程池

4种线程池和7种并发队列

JAVA线程池