c++11 async<>,可用内核数量未知

Posted

技术标签:

【中文标题】c++11 async<>,可用内核数量未知【英文标题】:c++11 async<>, with unknown number of available cores 【发布时间】:2013-01-28 19:29:55 【问题描述】:

我的 C++ 代码在时间序列数据 (t2 >> t1) 上计算非常大的积分。积分是固定长度的,当前存储在双精度的 [m x 2] 列数组中。第 1 列是时间。第 2 列是被整合的信号。代码在四核或八核机器上运行。

对于具有 k 个内核的机器,我想:

分离出 k-1 个工作进程(每个剩余核心一个)以评估积分的部分(梯形积分)并将其结果返回到等待的主线程。 无需深度复制原始数组部分即可实现上述目标。 实现 C++11 异步模板以实现可移植性

如何在不硬编码可用内核数量的情况下实现上述目标?

我目前正在使用 VS 2012。

清晰度更新:

例如,这是粗略的伪代码

data is [100000,2] double

result = MyIntegrator(data[1:50000,1:2]) + MyIntegrator(data[50001:100000, 1:2]); 

我需要在单独的线程中评估 MyIntegrator() 函数。主线程等待这两个结果。

【问题讨论】:

单独的计算是否相互依赖? 我认为std::async 对于这个问题来说是一个过于抽象的层次。如果您想控制工作线程的数量,最好使用std::thread 手动生成它们。 @juanchopanza 我认为情况正好相反,他可能需要更多抽象,比如parallel_for。然而,这个问题目前的形式过于模糊,无法回答任何问题。另外,我认为std::async 总是比std::thread 更好,因为它提供了额外的异常安全性。 积分是单被积函数,可以分段计算。 【参考方案1】:

这是对问题进行多线程集成的源代码。

#include <vector>
#include <memory>
#include <future>
#include <iterator>
#include <iostream>

struct sample 
  double duration;
  double value;
;
typedef std::pair<sample*, sample*> data_range;
sample* begin( data_range const& r )  return r.first; 
sample* end( data_range const& r )  return r.second; 

typedef std::unique_ptr< std::future< double > > todo_item;

double integrate( data_range r ) 
  double total = 0.;
  for( auto&& s:r ) 
    total += s.duration * s.value;
  
  return total;


todo_item threaded_integration( data_range r ) 
  return todo_item( new std::future<double>( std::async( integrate, r )) );

double integrate_over_threads( data_range r, std::size_t threads ) 
  if (threads > std::size_t(r.second-r.first))
    threads = r.second-r.first;
  if (threads == 0)
    threads = 1;
  sample* begin = r.first;
  sample* end = r.second;

  std::vector< std::unique_ptr< std::future< double > > > todo_list;

  sample* highwater = begin;

  while (highwater != end) 
    sample* new_highwater = (end-highwater)/threads+highwater;
    --threads;
    todo_item item = threaded_integration( data_range(highwater, new_highwater) );
    todo_list.push_back( std::move(item) );
    highwater = new_highwater;
  
  double total = 0.;
  for (auto&& item: todo_list) 
    total += item->get();
  
  return total;


sample data[5] = 
  1., 1.,
  1., 2.,
  1., 3.,
  1., 4.,
  1., 5.,
;
int main() 
  using std::begin; using std::end;
  double result = integrate_over_threads( data_range( begin(data), end(data) ), 2 );
  std::cout << result << "\n";

它需要一些修改才能完全按照您指定的格式读取数据。

但是您可以使用std::thread::hardware_concurrency() 作为线程数来调用它,它应该可以工作。

(特别是为了简单起见,我有成对的 (duration, value) 而不是 (time, value),但这只是一个小细节)。

【讨论】:

这是我前进的方向。我将在今晚晚些时候更详细地回顾这一点。【参考方案2】:

std::thread::hardware_concurrency() 呢?

【讨论】:

-1 用于在不了解问题的情况下抛出随机代码。【参考方案3】:

获取运行的核心数,通常可以通过std::thread::hardware_concurrency()找到

返回实现支持的并发线程数。该值应仅被视为一个提示。

如果该值为零,那么您可以尝试运行基于操作系统的特定命令。 This 似乎是找出核心数量的好方法。

您仍然需要进行测试以确定多线程是否会给您带来实实在在的好处,记住不要过早优化:)

【讨论】:

-1 用于复制其他答案,-1 用于复制答案中提到的原因。 好的,我看看它的样子。我正在寻找更完整的答案。【参考方案4】:

您可以超期安排,看看是否会影响您的表现。将您的数组拆分为固定长度的小间隔(可在一个 quant 中计算,可能适合一个缓存页面),看看它在性能上与根据 CPU 数量进行拆分的比较。

使用 std::packaged_task 并将其传递给线程以确保您不会受到“启动”配置的伤害。

下一步是引入线程池,但这更复杂。

【讨论】:

【参考方案5】:

您可以接受工作线程数的命令行参数。

【讨论】:

以上是关于c++11 async<>,可用内核数量未知的主要内容,如果未能解决你的问题,请参考以下文章

内核网格大小是不是决定块数,而块大小决定线程数?

C++11 线程与异步性能(VS2013)

多线程(线程池:ThreadPool)-C#

C++11之std::future对象的基本用法

C++ std::async 不会产生新线程

Angular2 - 将 ASYNC 数据传递给子组件