为啥 CPU 上的线程浮点计算会使它们花费更长的时间?

Posted

技术标签:

【中文标题】为啥 CPU 上的线程浮点计算会使它们花费更长的时间?【英文标题】:Why does threading floating point computations on the CPU make them take significantly longer?为什么 CPU 上的线程浮点计算会使它们花费更长的时间? 【发布时间】:2019-03-10 01:53:25 【问题描述】:

我目前正在研究一个科学模拟(Gravitational nbody)。我首先用一个简单的单线程算法编写了它,这对于少数粒子来说是可以接受的。然后我对这个算法进行了多线程处理(它是令人尴尬的并行),程序花费了大约 3 倍的时间。下面是一个最小的、完整的、可验证的简单算法示例,它具有相似的属性并输出到 /tmp 中的文件(它被设计为在 Linux 上运行,但 C++ 也是标准的)。请注意,如果您决定运行此代码,它将生成一个 152.62MB 的文件。输出数据是为了防止编译器优化程序外的计算。

#include <iostream>
#include <functional>
#include <thread>
#include <vector>
#include <atomic>
#include <random>
#include <fstream>
#include <chrono>

constexpr unsigned ITERATION_COUNT = 2000;
constexpr unsigned NUMBER_COUNT = 10000;

void runThreaded(unsigned count, unsigned batchSize, std::function<void(unsigned)> callback)
    unsigned threadCount = std::thread::hardware_concurrency();
    std::vector<std::thread> threads;
    threads.reserve(threadCount);

    std::atomic<unsigned> currentIndex(0);

    for(unsigned i=0;i<threadCount;++i)
        threads.emplace_back([&currentIndex, batchSize, count, callback]
            unsigned startAt = currentIndex.fetch_add(batchSize);

            if(startAt >= count)
                return;
            else
                for(unsigned i=0;i<count;++i)
                    unsigned index = startAt+i;
                    if(index >= count)
                        return;
                    
                    callback(index);
                
            
        );
    

    for(std::thread &thread : threads)
        thread.join();
    


void threadedTest()
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i)
        numbers.push_back(rnd());
    

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i) 
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        runThreaded(NUMBER_COUNT, 100, [&numbers, &newNumbers](unsigned x)
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y)
                total += numbers[y]*(y-x)*(y-x);
            
            newNumbers[x] = total;
        );
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    


void unThreadedTest()
    std::mt19937_64 rnd(0);
    std::vector<double> numbers;

    numbers.reserve(NUMBER_COUNT);
    for(unsigned i=0;i<NUMBER_COUNT;++i)
        numbers.push_back(rnd());
    

    std::vector<double> newNumbers = numbers;

    std::ofstream fout("/tmp/test-data.bin");

    for(unsigned i=0;i<ITERATION_COUNT;++i)
        std::cout << "Iteration: " << i << "/" << ITERATION_COUNT << std::endl;
        for(unsigned x=0;x<NUMBER_COUNT;++x)
            double total = 0;
            for(unsigned y=0;y<NUMBER_COUNT;++y)
                total += numbers[y]*(y-x)*(y-x);
            
            newNumbers[x] = total;
        
        fout.write(reinterpret_cast<char*>(newNumbers.data()), newNumbers.size()*sizeof(double));
        std::swap(numbers, newNumbers);
    


int main(int argc, char *argv[]) 
    if(argv[1][0] == 't')
        threadedTest();
    else
        unThreadedTest();
    
    return 0;

当我运行它(在 Linux 上使用 clang 7.0.1 编译)时,我从 Linux time 命令得到以下时间。这些之间的区别与我在真实程序中看到的相似。标记为“真实”的条目与此问题相关,因为这是程序运行所需的时钟时间。

单线程:

real    6m27.261s
user    6m27.081s
sys     0m0.051s

多线程:

real    14m32.856s
user    216m58.063s
sys     0m4.492s

因此,当我预计它会显着加速(大约是 8 倍,因为我有一个 8 核 16 线程 CPU)时,我会问是什么导致了这种大幅减速。我没有在 GPU 上实现这一点,因为下一步是对算法进行一些更改,以将其从 O(n²) 变为 O(nlogn),但这对 GPU 也不友好。与包含的示例相比,更改后的算法与我当前实现的 O(n²) 算法的差异较小。最后,我想观察到运行每次迭代的主观时间(根据出现的迭代行之间的时间来判断)在线程和非线程运行中都发生了显着变化。

【问题讨论】:

如果您的线程使用共享数据和原子变量或锁,那么所有同步成本很可能完全抵消了并行化带来的任何好处。为了获得最佳性能,不共享任何内容,或者尽可能少共享。你能用 map/reduce 风格的策略来解决这个问题吗? @tadman 正如我所见,原子变量每个线程只能访问一次。如果有 8 个线程,它只会被访问 8 次,这是非常少的。 这正是我所期望的,但肯定还有其他阻碍性能的因素。你为什么使用constexpr 来表示简单的数字?为什么不只是const?我无法重现您的问题,因为运行此代码时出现段错误。 c++ - What is the performance overhead of std::function? - Stack Overflow ? 你确定你正确地分配了工作吗?我可以看到偏移调整,但它们似乎都在迭代 size 条目,因此由于重复工作,在您的线程版本中它应该会慢很多。 【参考方案1】:

遵循此代码有点困难,但我认为您正在大规模重复工作,因为每个线程几乎都完成了所有工作,只是在开始时跳过了一小部分。

我假设runThreaded 的内部循环应该是:

unsigned startAt = currentIndex.fetch_add(batchSize);

while (startAt < count) 
  if (startAt >= count) 
    return;
   else 
    for(unsigned i=0;i<batchSize;++i)
      unsigned index = startAt+i;

      if(index >= count)
        return;
      

      callback(index);
    
  

  startAt = currentIndex.fetch_add(batchSize);

i &lt; batchSize 是这里的关键。您应该只做批次规定的工作量,而不是count 次,这是整个列表减去初始偏移量。

通过此次更新,代码的运行速度明显更快。我不确定它是否完成了所有必需的工作,因为很难判断这是否真的发生了,输出非常少。

【讨论】:

这将需要一个额外的循环,因此每个线程将获得另一批,否则您将不会处理每个索引。 @1201ProgramAlarm 可能就是这种情况,因为线程池类型的结构在这里会有所帮助。我已将代码调整为循环,直到它耗尽池。 谢谢!就是这样 :D 我已经盯着代码看了好几个小时了,但我没有看到它。【参考方案2】:

为了在多个 CPU 上轻松并行化,我建议使用 tbb::parallel_for。它使用正确数量的 CPU 并为您划分范围,完全消除了错误实施的风险。或者,有一个parallel for_each in C++17。换句话说,这个问题有很多好的解决方案。

矢量化代码是一个难题,clang++-6g++-8 都不会自动矢量化基线代码。因此,下面的 SIMD 版本我使用了出色的 Vc: portable, zero-overhead C++ types for explicitly data-parallel programming 库。

以下是比较的工作基准:

基准版本。 SIMD 版本。 SIMD + 多线程版本。

#include <Vc/Vc>
#include <tbb/parallel_for.h>

#include <algorithm>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <random>
#include <vector>

constexpr int ITERATION_COUNT = 20;
constexpr int NUMBER_COUNT = 20000;

double baseline() 
    double result = 0;

    std::vector<double> newNumbers(NUMBER_COUNT);
    std::vector<double> numbers(NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers)
        n = rnd();

    for(int i = 0; i < ITERATION_COUNT; ++i) 
        for(int x = 0; x < NUMBER_COUNT; ++x) 
            double total = 0;
            for(int y = 0; y < NUMBER_COUNT; ++y) 
                auto d = (y - x);
                total += numbers[y] * (d * d);
            
            newNumbers[x] = total;
        
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), 0.);
        swap(numbers, newNumbers);
    

    return result;


double simd() 
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) 
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    

    Vc::double_v const incv(Vc::double_v::Size);
    for(int i = 0; i < ITERATION_COUNT; ++i) 
        Vc::double_v x(Vc::IndexesFromZero);
        for(auto& new_n : newNumbers) 
            Vc::double_v totals;
            int y = 0;
            for(auto const& n : numbers) 
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) 
                    auto d = y - x;
                    totals += n[j] * (d * d);
                    ++y;
                
            
            new_n = totals;
            x += incv;
        
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v).sum();
        swap(numbers, newNumbers);
    

    return result;


double simd_mt() 
    double result = 0;

    constexpr int SIMD_NUMBER_COUNT = NUMBER_COUNT / Vc::double_v::Size;
    using vector_double_v = std::vector<Vc::double_v, Vc::Allocator<Vc::double_v>>;
    vector_double_v newNumbers(SIMD_NUMBER_COUNT);
    vector_double_v numbers(SIMD_NUMBER_COUNT);
    std::mt19937 rnd(0);
    for(auto& n : numbers) 
        alignas(Vc::VectorAlignment) double t[Vc::double_v::Size];
        for(double& v : t)
            v = rnd();
        n.load(t, Vc::Aligned);
    

    Vc::double_v const v0123(Vc::IndexesFromZero);
    for(int i = 0; i < ITERATION_COUNT; ++i) 
        constexpr int SIMD_STEP = 4;
        tbb::parallel_for(0, SIMD_NUMBER_COUNT, SIMD_STEP, [&](int ix) 
            Vc::double_v xs[SIMD_STEP];
            for(int is = 0; is < SIMD_STEP; ++is)
                xs[is] = v0123 + (ix + is) * Vc::double_v::Size;
            Vc::double_v totals[SIMD_STEP];
            int y = 0;
            for(auto const& n : numbers) 
                for(unsigned j = 0; j < Vc::double_v::Size; ++j) 
                    for(int is = 0; is < SIMD_STEP; ++is) 
                        auto d = y - xs[is];
                        totals[is] += n[j] * (d * d);
                    
                    ++y;
                
            
            std::copy_n(totals, SIMD_STEP, &newNumbers[ix]);
        );
        result += std::accumulate(newNumbers.begin(), newNumbers.end(), Vc::double_v).sum();
        swap(numbers, newNumbers);
    

    return result;


struct Stopwatch 
    using Clock = std::chrono::high_resolution_clock;
    using Seconds = std::chrono::duration<double>;
    Clock::time_point start_ = Clock::now();

    Seconds elapsed() const 
        return std::chrono::duration_cast<Seconds>(Clock::now() - start_);
    
;


std::ostream& operator<<(std::ostream& s, Stopwatch::Seconds const& a) 
    auto precision = s.precision(9);
    s << std::fixed << a.count() << std::resetiosflags(std::ios_base::floatfield) << 's';
    s.precision(precision);
    return s;


void benchmark() 
    Stopwatch::Seconds baseline_time;
    
        Stopwatch s;
        double result = baseline();
        baseline_time = s.elapsed();
        std::cout << "baseline: " << result << ", " << baseline_time << '\n';
    

    
        Stopwatch s;
        double result = simd();
        auto time = s.elapsed();
        std::cout << "    simd: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    

    
        Stopwatch s;
        double result = simd_mt();
        auto time = s.elapsed();
        std::cout << " simd_mt: " << result << ", " << time << ", " << (baseline_time / time) << "x speedup\n";
    


int main() 
    benchmark();
    benchmark();
    benchmark();

时间安排:

baseline: 2.76582e+257, 6.399848397s
    simd: 2.76582e+257, 1.600373449s, 3.99897x speedup
 simd_mt: 2.76582e+257, 0.168638435s, 37.9501x speedup

注意事项:

我的机器支持 AVX,但不支持 AVX-512,因此使用 SIMD 时速度大约提高了 4 倍。 simd_mt 版本在我的机器上使用 8 个线程和更大的 SIMD 步骤。理论加速是 128 倍,实践中是 38 倍。 clang++-6 不能自动矢量化基线代码,g++-8 也不能。 g++-8 为 SIMD 版本生成的代码比 clang++-6 快得多。

【讨论】:

这虽然有用,但由于在某些情况下 clang 具有自动矢量化功能,所以用处不大。我真正的程序就是其中之一。 (source) @john01dav 为您添加了并行 + SIMD 版本。 @john01dav 为您添加了矢量化注释。【参考方案3】:

你的心肯定是在正确的地方减去一两个错误。

par_for 是一个复杂的问题,具体取决于循环的有效负载。有 没有一刀切的解决方案。有效载荷可以是任何来自 几个添加到几乎无限的互斥块 - 例如通过做内存 分配。

原子变量作为工作项模式一直对我很有效,但是 请记住,原子变量在 X86 上的成本很高(约 400 个周期),甚至 如果我发现它们在未执行的分支中,则会产生高昂的成本。

以下的一些排列通常是好的。选择正确的 chunks_per_thread(如在您的 batchSize 中)是至关重要的。如果你不相信你的 用户,您可以测试执行循环的几次迭代来猜测 最佳分块级别。

#include <atomic>
#include <future>
#include <thread>
#include <vector>
#include <stdio.h>

template<typename Func>
void par_for(int start, int end, int step, int chunks_per_thread, Func func) 
  using namespace std;
  using namespace chrono;

  atomic<int> work_itemstart;
  vector<future<void>> futures(std::thread::hardware_concurrency());

  for (auto &fut : futures) 
    fut = async(std::launch::async, [&work_item, end, step, chunks_per_thread, &func]() 
      for(;;) 
        int wi = work_item.fetch_add(step * chunks_per_thread);
        if (wi > end) break;
        int wi_max = std::min(end, wi+step * chunks_per_thread);
        while (wi < wi_max) 
          func(wi);
          wi += step;
        
      
    );
  

  for (auto &fut : futures) 
    fut.wait();
  


int main() 
  using namespace std;
  using namespace chrono;
  for (int k = 0; k != 2; ++k) 
    auto t0 = high_resolution_clock::now();
    constexpr int loops = 100000000;
    if (k == 0) 
      for (int i = 0; i != loops; ++i ) 
        if (i % 10000000 == 0) printf("%d\n", i);
      
     else 
      par_for(0, loops, 1, 100000, [](int i) 
        if (i % 10000000 == 0) printf("%d\n", i);
      );
    
    auto t1 = high_resolution_clock::now();
    duration<double, milli> ns = t1 - t0;
    printf("k=%d %fms total\n", k, ns.count());
  

结果

...
k=0 174.925903ms total
...
k=1 27.924738ms total

大约 6 倍加速。

我避免使用“令人尴尬的平行”一词,因为它几乎从来没有出现过。您在从 1 级缓存(ns 延迟)到全球跨集群(ms 延迟)的旅程中使用的资源越多,您支付的成本就会成倍增加。但我希望这段代码 sn-p 作为答案有用。

【讨论】:

我不清楚这对以前的答案有何改进;您可能想要添加一些突出显示与已接受答案和其他答案相比差异的内容,速度提高 37 倍。不相关,但有几类问题绝对是令人尴尬的平行——这个老比喻的存在是有原因的。 YMMV。

以上是关于为啥 CPU 上的线程浮点计算会使它们花费更长的时间?的主要内容,如果未能解决你的问题,请参考以下文章

为啥偶数 N 比奇数 N 花费更长的时间?

为啥我使用 openMP atomic 的并行代码比串行代码花费更长的时间?

异步比顺序执行花费更长的时间

为啥 c 中的幂函数需要比预期更长的时间

java - 多线程中的简单计算比单线程中需要更长的时间

为啥一个循环比另一个循环检测共享内存更新需要更长的时间?