并行验证谓词,线程池中的线程返回 true 时立即返回

Posted

技术标签:

【中文标题】并行验证谓词,线程池中的线程返回 true 时立即返回【英文标题】:Verify a predicate in parallel, return as soon as a thread in a thread pool returns true 【发布时间】:2015-05-08 08:18:56 【问题描述】:

我想写一个函数anyElementSatisfiesPredicate,它接受一个谓词函数p的输入(它接受一个给定类型T的对象并返回一个布尔值)和一个std::vector v T 类型的对象,当且仅当它在 v s.t. 中存在一个元素时返回 true。 p(v) == true。

这可以通过 for 循环轻松完成:

bool anyElementSatisfiesPredicate(std::function<bool(T&)> p, std::vector<T> v) 
  for (auto it=v.begin(); it!=v.end(); ++it)
    if (p(*it))
      return true;
  return false;

这工作正常(给定正确定义的类型 T),但我想并行化此代码,同时在向量 v 的不同元素上测试谓词函数 p。我的想法是将工作分配给固定(取决于机器)数量的内核。每个线程都应该在原始向量的不同部分评估谓词,并在发现谓词 p 在其部分中的元素上成立时立即返回 true。一旦任何给定线程返回 true,核心函数 anyElementSatisfiesPredicate 应该杀死剩余的线程并返回 true,如果所有线程最终返回 false,它应该返回 false。

由于此代码将在具有不同内核数量的不同机器上运行,我不想引入任何定义要使用的内核数量的常量,我宁愿让系统为我选择最佳值。

效率是我最关心的问题。有没有简单的方法来完成这个(也许使用 boost 线程库)?

【问题讨论】:

那么,就像std::any_of 一样,但是是并行的?也许可以用例如完成std::async. “应该杀死剩余的线程” - std::thread 不支持异步线程取消(您的底层线程实现可能,但只有在非常小心和受限的使用情况下才安全)),但是您可以设置一个标志,让其他线程每隔一段时间检查一次(例如,每次检查进一步的 n 个元素时),如果他们看到是否设置,则提前返回。除此之外,只需找到您的核心数量,并在线程返回时加入它们。 如果你使用 C++11,也许你可以选择 std::future 您可以使用任务队列之类的东西 - 每个线程将从队列中弹出最后一个元素,计算谓词并将结果放入另一个队列。主线程监视结果队列并清除任务队列以停止计算。我不确定它是否被 std lib 直接支持... 抱歉 - 刚刚注意到 boost 标签 - 该库确实为协作中断提供了一些便利功能:请参阅here - 与上面建议的标志相同。 【参考方案1】:

不是世界上最优雅的解决方案,但这样的解决方案应该可以工作:(没有提升,但需要 c++ 11)

#include <thread>
#include <atomic>

template <typename T>
struct pred_evaluator

    static void any_element_satisfies(const std::function<bool(const T&)> & pred, const typename std::vector<T>::iterator & begin, const typename std::vector<T>::iterator & end, std::atomic<bool> & result) 
    
        for (const auto & it=begin; it!=end; ++it)
        
            if (result || pred(*it))
            
                result= true;
                return;
            
       
    

    static bool is_predicate_true_parallel(const std::function<bool(const T&)> & pred, const std::vector<T> & input, size_t num_threads)
    
        size_t chunk_size = input.size() / 4;
        std::atomic<bool> result(false);
        std::vector<std::thread> threads;
        for (size_t i = 0; i < num_threads; ++i)
        
            const auto & begin = input.begin() + i *chunk_size;
            const auto & end = input.begin() + std::min((i+1) * chunk_size, input.size());
            threads.emplace_back(any_element_satisfies,pred,begin,end,result);
        

        for (auto & thread : threads)
            thread.join();

        return result;
    
;

然后,您将调用pred_evaluator&lt;T&gt;::is_predicate_true_parallel,并将您的谓词、向量和线程数作为输入。

【讨论】:

感谢您的回答,似乎正是我想要的。我只看到两个问题,但我可能错了。 (1) is_predicate_true_parallel 不会在任何线程返回true时立即返回,它会等待所有线程终止; (2) 您在 any_element_satisfies 函数中使用的 any_predicate_done 标志从未声明过,我猜您的意图是改用 result 。我说的对吗? 1) 不,该函数不会立即返回。更好的方法可能是在完成后用条件变量向主线程发出信号(但是,请注意条件变量可能具有虚假唤醒,可能会给您带来误报)。 2) DERP,是的,_predicate_done 本来是result 另外,应该说明的是,如果_predicate_true_parallel 被多次调用,最好改为管理此函数范围之外的线程,以便它们可以在迭代之间重用。您可以将线程封装在 task \ task pool 对象中,这样您就可以测试连续的谓词,而不会产生过多的线程创建/破坏。当然,这个对象的这个任务设置方法是需要互斥保护的。

以上是关于并行验证谓词,线程池中的线程返回 true 时立即返回的主要内容,如果未能解决你的问题,请参考以下文章

处理线程池中的优先级

python多线程并行计算通过向线程池ThreadPoolExecutor提交任务的实现方法

关于线程池

Spring使用@Async注解,多线程

线程池判断是不是执行完毕

多线程