异步比顺序执行花费更长的时间

Posted

技术标签:

【中文标题】异步比顺序执行花费更长的时间【英文标题】:Async takes longer than sequential execution 【发布时间】:2019-07-24 02:06:21 【问题描述】:

我正在尝试使用多线程在 C++ 中执行一项非常基本的任务,我注意到使用较少数量的线程运行此代码实际上比使用多线程更快。

我已经尝试切换到线程(...),添加和删除互斥锁,但它仍然是相同的或更慢。

struct threadResults

    int64_t threadSize;
    int64_t sum =0;
    int32_t min_val = std::numeric_limits<int32_t>::max();
    int32_t max_val = std::numeric_limits<int32_t>::min();
;
void doCalculation(int thread_num, std::vector<int> rand_values, int32_t numPerThread, threadResults* myThreadResult, int remainder)

    if(remainder!=0&&thread_num==0)
    
        for(int i=0;i<numPerThread+remainder;++i)
        
            myThreadResult->sum += rand_values[i];
            if(myThreadResult->min_val>rand_values[i])
            
                myThreadResult->min_val = rand_values[i];
            
            if(myThreadResult->max_val<rand_values[i])
            
                myThreadResult->max_val = rand_values[i];
            
        
    
    else if(remainder!=0&&thread_num!=0)
    
        int start = (numPerThread*thread_num)+remainder;
        for(int i=start;i<numPerThread+start;++i) 
            myThreadResult->sum += rand_values[i];
            if (myThreadResult->min_val > rand_values[i]) 
                myThreadResult->min_val = rand_values[i];
            
            if (myThreadResult->max_val < rand_values[i]) 
                myThreadResult->max_val = rand_values[i];
            
        
    
    else if(remainder==0)
    
        int start = (numPerThread*thread_num);
        for(int i=start;i<numPerThread+start;++i) 
            myThreadResult->sum += rand_values[i];
            if (myThreadResult->min_val > rand_values[i]) 
                myThreadResult->min_val = rand_values[i];
            
            if (myThreadResult->max_val < rand_values[i]) 
                myThreadResult->max_val = rand_values[i];
            
        
    


    std::future<void> myThreads[num_threads];


    auto start = sc::high_resolution_clock::now();

    //TODO:  Implement multithreaded code to populate the above stat values
    for(int i=0;i<num_threads;i++) 
         myThreads[i] = async(std::launch::async, doCalculation, i, rand_values, numPerThread, &myThreadResults[i], remainder);
    
    for(int i=0;i<num_threads;i++) 
        myThreads[i].get();
    

    for(threadResults t : myThreadResults)
    
        sum += t.sum;
        if(t.min_val<min_val)
        
            min_val = t.min_val;
        
        if(t.max_val>max_val)
        
            max_val = t.max_val;
        
    

    auto end = sc::high_resolution_clock::now();

我希望使用 1 个线程运行此代码比 2+ 个线程花费更多时间,但事实并非如此。

$ ./sumit 10000000 1 0
Elapsed Time: 101228us
Sum: 101555736
Min: -100000
Max: 100000
Avg: 10.1556
$ ./sumit 10000000 2 0
Elapsed Time: 142738us
Sum: 101555736
Min: -100000
Max: 100000
Avg: 10.1556

【问题讨论】:

创建一个minimal reproducible example。 是的,它大得令人讨厌,很难拆开。 @Omnifarious 所说的。此外,完全在(-O3 或发布构建设置)上使用零售优化进行编译。 叫我疯了,但if(remainder!=0&amp;&amp;thread_num==0) 块在功能上不等同于随后的else if(remainder!=0&amp;&amp;thread_num!=0) 块。就此而言,这三个代码块不是同一组计算吗? @selbie - 我放弃了其中奇怪的扭曲逻辑,并编写了一个我认为是 OP 想要完成的程序。 【参考方案1】:

改变这一行:

void doCalculation(int thread_num, std::vector<int> rand_values, int32_t numPerThread, threadResults* myThreadResult, int remainder)

到这里:

void doCalculation(int thread_num, std::vector<int> const &rand_values, int32_t numPerThread, threadResults* myThreadResult, int remainder)

我敢打赌事情会变得更快。

我写了一个程序来完成你所做的事情。我会将整个内容粘贴到这里,如果您需要更具建设性的帮助,这确实是您应该做的:

#include <fmt/core.h>
#include <future>
#include <cstdint>
#include <limits>
#include <vector>
#include <stdexcept>
#include <random>
#include <string>
#include <algorithm>
#include <chrono>

using ::std::int64_t;
using ::std::uint32_t;
using ::std::future;

struct threadResults

    int64_t sum;
    int32_t min_val;
    int32_t max_val;
;

using randvec_t = ::std::vector<int>;

threadResults doCalculation(randvec_t::iterator begin, randvec_t::iterator end)

    if (begin == end) 
        throw ::std::range_error("Range to iterate over must not be empty!");
    
    threadResults results0, *begin, *begin;
    for (auto i = begin; i != end; ++i) 
        auto const cur = *i;
        results.sum += cur;
        if (results.min_val > cur) 
            results.min_val = cur;
         else if (results.max_val < cur) 
            results.max_val = cur;
        
    
    return results;


int main(int argc, char const * const argv[])

    if (argc != 3) 
        fmt::print(stderr, "Usage:  <num_to_sum> <threads_to_use>\n", argv[0]);
        return 1;
    
    int const size = ::std::stoi(argv[1]);
    int const nthreads = ::std::stoi(argv[2]);

    if (size <= nthreads || nthreads < 0) 
        fmt::print(stderr, "You must have more elements than threads and neither may be negative.");
    

    randvec_t elements;
    elements.reserve(size);

    
        //std::random_device rd;
        std::mt19937 gen(5);
        std::uniform_int_distribution<> dis(0, ::std::numeric_limits<randvec_t::value_type>::max());

        for (int i = 0; i < size; ++i) 
            elements.push_back(dis(gen));
        
    

    namespace sc = ::std::chrono;

    auto start = sc::high_resolution_clock::now();
    ::std::vector<::std::future<threadResults>> futures;
    futures.reserve(nthreads);
    for (unsigned int start = 0; start < elements.size();) 
        unsigned int const end = start + (elements.size() - start) / (nthreads - futures.size());
        futures.push_back(::std::async(::std::launch::async, doCalculation, elements.begin() + start, elements.begin() + end));
        start = end;
    
    threadResults aggregate0, 0, 0;
    bool first = true;
    for (auto &future: futures) 
        if (first) 
            aggregate = future.get();
            first = false;
         else 
            auto this_result = future.get();
            aggregate.sum += this_result.sum;
            aggregate.min_val = ::std::min(this_result.min_val, aggregate.min_val);
            aggregate.max_val = ::std::max(this_result.max_val, aggregate.max_val);
        
    
    auto end = sc::high_resolution_clock::now();

    auto duration = end - start;
    fmt::print("Duration: us\n", duration / sc::microseconds(1));
    fmt::print("     sum: \n", aggregate.sum);
    fmt::print("     min: \n", aggregate.min_val);
    fmt::print("     max: \n", aggregate.max_val);
    return 0;

这是在我漂亮的 16 个硬件线程 Ryzen 7 上运行该程序的结果(我使用 takoshell,因此我的提示很奇怪):

?? /tmp/so_problem
$ ./a.out 100000000 1
Duration: 76676us
     sum: 107373414219986326
     min: 11
     max: 2147483633
?? /tmp/so_problem
$ ./a.out 100000000 4
Duration: 20534us
     sum: 107373414219986326
     min: 11
     max: 2147483633
?? /tmp/so_problem
$ ./a.out 100000000 8
Duration: 17177us
     sum: 107373414219986326
     min: 11
     max: 2147483633

我不确定为什么结果会在 4 到 8 个线程之间触底。现在... 将是一个有趣的问题。我强烈怀疑它与内存带宽有关。

请注意我的程序是如何缺少您的程序所具有的许多奇怪的无意义的绒毛。

【讨论】:

以上是关于异步比顺序执行花费更长的时间的主要内容,如果未能解决你的问题,请参考以下文章

为啥偶数 N 比奇数 N 花费更长的时间?

为啥我使用 openMP atomic 的并行代码比串行代码花费更长的时间?

带有 SQL 注入保护的简单查询比没有的要花费更长的时间

没有附加列的子查询比有列的子查询花费更长的时间

NHibernate 执行简单查询需要更长的时间

Nodejs TCP服务器按顺序处理数据包