并发数组检查

Posted

技术标签:

【中文标题】并发数组检查【英文标题】:Concurrent Array Checking 【发布时间】:2017-08-11 10:24:38 【问题描述】:

我既不是 C++ 专家,也不是并发编程专家。但是,我正在实现一个简单的推理算法,它需要检查许多独立的模型。可能的模型数量很多,所以我想并行检查它们。

为了使其尽可能简单,我将原来的问题转化为一个非常简单的问题:如何确定一个数组是否包含非零值?一个简单的顺序解决方案将像这样:

bool containsNonZero (int* arr, int len) 
    for (int i = 0; i < len; ++i)
        if (arr[i]) return true;
    return false;

(注意:实际上,len 不能放入 int,但在我原来的问题中,没有数组,只有我生成的许多组合,但是不存储。)

但是,我需要一个并行(且高效)的实现。有 t = std::thread::hardware_concurrency() 线程来搜索数组(注意 t len。如果 len % t != 0 那么让最后一个线程处理剩余的值不会有问题)。所以第一个线程将搜索从 0len/t 的索引,第二个线程将搜索从 len/t 到 (2 *len)/t 等。最后一个线程将搜索从 ((t-1)*len)/t 到 len 的索引。如果一个线程找到一个非零值,所有线程将停止并返回 true。否则,它们将等待其他线程完成,如果所有线程都同意,则返回 false

这似乎很容易,但我在网上找不到任何答案。欢迎任何 C++ 版本,但我不想依赖任何第三方库。

【问题讨论】:

【参考方案1】:

我尝试扩展 Davide Spataro 的解决方案以解决 atomic&lt;bool&gt; 的同步问题,使用 atomic_flag '与 std::atomic 的所有特化不同,它保证是无锁的'http://en.cppreference.com/w/cpp/atomic/atomic_flag

编辑: 与前一个问题无关,但我已经对哪种方法更快进行了基准测试,令我惊讶的是atomic&lt;bool&gt;atomic_flag 快了大约 100。

基准测试结果:

num_threads:2
400000001 iterations flag
401386195 iterations flag
atomic_flag : it took 24.1202 seconds. Result: 1
400000001 iterations bool
375842699 iterations bool
atomic<bool>: it took 0.334785 seconds. Result: 1
num_threads:3
229922451 iterations flag
229712046 iterations flag
233333335 iterations flag
atomic_flag : it took 21.5974 seconds. Result: 1
219564626 iterations bool
233333335 iterations bool
196877803 iterations bool
atomic<bool>: it took 0.200942 seconds. Result: 1
num_threads:4
151745683 iterations flag
150000001 iterations flag
148849108 iterations flag
148933269 iterations flag
atomic_flag : it took 18.6651 seconds. Result: 1
150000001 iterations bool
112825220 iterations bool
151838008 iterations bool
112857688 iterations bool
atomic<bool>: it took 0.167048 seconds. Result: 1

基准代码:

#include <thread>
#include <atomic>
#include <vector>
#include <iostream>
#include <algorithm>



template<typename Iterator>
static void any_of_flag(Iterator & begin, Iterator& end, std::atomic_flag & result)

    int counter = 0;
    for (auto it = begin; it != end; ++it)
    
        counter++;
        if (!result.test_and_set() || (*it) != 0)
        
            result.clear();
            std::cout << counter << " iterations flag\n";
            return;
        
    

template<typename Iterator>
static void any_of_atomic(Iterator & begin, Iterator& end, std::atomic<bool> & result)

    int counter = 0;
    for (auto it = begin; it != end; ++it)
    
        counter++;
        if (result || (*it) != 0)
        
            result = true;
            std::cout << counter << " iterations bool\n";
            return;
        
    


void test_atomic_flag(std::vector<int>& input, int num_threads)


    using namespace std::chrono;

    high_resolution_clock::time_point t1 = high_resolution_clock::now();


    size_t chunk_size = input.size() / num_threads;
    std::atomic_flag result = ATOMIC_FLAG_INIT;
    result.test_and_set();

    std::vector<std::thread> threads;
    for (size_t i = 0; i < num_threads; ++i)
    
        auto & begin = input.begin() + i *chunk_size;
        auto & end = input.begin() + std::min((i + 1) * chunk_size, input.size());
        // had to use lambda in VS 2017
        threads.emplace_back([&begin, &end, &result] any_of_flag(begin, end, result); );

    

    for (auto & thread : threads)
        thread.join();

    bool hasNonZero = !result.test_and_set();


    high_resolution_clock::time_point t2 = high_resolution_clock::now();

    duration<double> time_span = duration_cast<duration<double>>(t2 - t1);

    std::cout << "atomic_flag : it took " << time_span.count() << " seconds. Result: " << hasNonZero << std::endl;




void test_atomic_bool(std::vector<int>& input, int num_threads)


    using namespace std::chrono;

    high_resolution_clock::time_point t1 = high_resolution_clock::now();


    size_t chunk_size = input.size() / num_threads;
    std::atomic<bool> result(false);

    std::vector<std::thread> threads;
    for (size_t i = 0; i < num_threads; ++i)
    
        auto & begin = input.begin() + i *chunk_size;
        auto & end = input.begin() + std::min((i + 1) * chunk_size, input.size());
        // had to use lambda in VS 2017
        threads.emplace_back([&begin, &end, &result] any_of_atomic(begin, end, result); );

    

    for (auto & thread : threads)
        thread.join();

    bool hasNonZero = result;


    high_resolution_clock::time_point t2 = high_resolution_clock::now();

    duration<double> time_span = duration_cast<duration<double>>(t2 - t1);

    std::cout << "atomic<bool>: it took " << time_span.count() << " seconds. Result: " << hasNonZero << std::endl;


int main()

    std::vector<int> input(1e9, 0);
    input[1e9 - 1e8] = 1;
    for (int num_threads : 2, 3, 4)
    
        std::cout << "num_threads:" << num_threads << std::endl;
        test_atomic_flag(input, num_threads);
        test_atomic_bool(input, num_threads);
    

    int q;
    std::cin >> q;
    return 0;
;

旧帖子: 我在迭代器的常量性和放置线程方面遇到了一些问题,但是核心更改,即 atomic_flag 的使用似乎有效。它不会立即停止所有线程,但在最坏的情况下,每次迭代只会停止一个(因为每次迭代只有一个线程会知道由于清除标志它应该已经停止)。

#include <thread>
#include <atomic>
#include <vector>
#include <iostream>
#include <algorithm>

template<typename Iterator>
static void any_of(Iterator & begin, Iterator& end, std::atomic_flag & result)

    for (auto it = begin; it != end; ++it)
    
        if (!result.test_and_set() || (*it) != 0)
        
            result.clear();
            return;
        
    


int main()

    int num_threads = 3;
    std::vector<int> input =  0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0, 1,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0;
    size_t chunk_size = input.size() / num_threads;
    std::atomic_flag result = ATOMIC_FLAG_INIT;
    result.test_and_set();

    std::vector<std::thread> threads;
    for (size_t i = 0; i < num_threads; ++i)
    
        auto & begin = input.begin() + i *chunk_size;
        auto & end = input.begin() + std::min((i + 1) * chunk_size, input.size());
        // had to use lambda in VS 2017
        threads.emplace_back([&begin, &end, &result] any_of(begin, end, result); );

    

    for (auto & thread : threads)
        thread.join();

    bool hasNonZero = !result.test_and_set();
    return 0;
;

【讨论】:

让我们比较一下时间。使用更大的输入,比如 10e9 元素。 constexpr size_t lim = 10e9; vector&lt;int&gt; input(lim, 0); 和不同数量的线程,1,2,3,4 如果你的 CPU 至少有 4 个内核。 @DavideSpataro 我已经做到了,令我惊讶的是,您的方法似乎要快得多。问题仍然存在:为什么? @R2RT atomic&lt;bool&gt;atomic_flag 快的原因可能是因为您只能通过调用test_and_set()atomic_flag 中获取值。这是一个原子读取-修改-写入,与常规负载相比非常昂贵。【参考方案2】:

类似下面的内容呢?

每个工作人员检查其范围内的元素是否非零或是否设置了 atomic 标志(意味着其他一些线程已经找到它)。

以下是每个线程执行的函数(每个线程都分配了不同的范围)

 template<typename Iterator>
static void any_of(Iterator & begin, Iterator& end, std::atomic<bool> & result) 
    
        for (const auto & it=begin; it!=end; ++it)
        
            if (result || (*it)!=0)
            
                result= true;
                return;
            
       

你可以这样称呼它

size_t chunk_size = input.size() / num_threads;
std::atomic<bool> result(false);
std::vector<std::thread> threads;
for (size_t i = 0; i < num_threads; ++i)

    const auto & begin = input.begin() + i *chunk_size;
    const auto & end = input.begin() + std::min((i+1) * chunk_size, input.size());
    threads.emplace_back(any_element_of,begin,end,result);


for (auto & thread : threads)
    thread.join();

在此之后,您可以安全地检查return 以检索您的结果。

请注意,通过将一元谓词函数传递给 worker 使其更通用,这种方法很容易扩展。

 template<typename Iterator, typename Predicate>
static void any_of(Iterator & begin, Iterator& end, Predicate pred, std::atomic<bool> & result) 
    
        for (const auto & it=begin; it!=end; ++it)
        
            if (result || pred(*it))
            
                result= true;
                return;
            
       

【讨论】:

请注意,如果您在每次迭代时检查标志,您就可以有效地再次序列化算法,因为访问原子确实会在此处产生一些同步开销(具体多少取决于平台)。最好只在每 n 次迭代中检查标志,然后为您的平台选择一个相当大的 n @Davide Spataro 这令人印象深刻!我试过函数fork()。但它是一个真正的麻烦制造者。范围的计算是否有问题?如果 input.size() == 1001 和 num_threads == 4 那么我猜最后一个值没有被检查。如果我错了,请原谅我。 @ComicSansMS 这是一个非常好的观点。为每个线程选择(或随机生成)不同的 n 以防止可能的冲突是个好主意吗? @Ersin101 我希望线程很快就会失去同步,所以我不会为每个线程选择一个随机的 n 。但好主意! @ComicSansMS 改用atomic_flag 怎么样? '与 std::atomic 的所有特化不同,它保证是无锁的'en.cppreference.com/w/cpp/atomic/atomic_flag

以上是关于并发数组检查的主要内容,如果未能解决你的问题,请参考以下文章

如何检查正在运行的 Google Cloud Run 容器的实例数?

jmeter做http测试怎么设置并发数

并发数并发以及高并发分别是什么意思?

日常巡检

并发数并发以及高并发分别是什么意思?

修改TCP/IP并发连接数