多线程之最大并行数

Posted 居明明

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程之最大并行数相关的知识,希望对你有一定的参考价值。

4核8线程,采取超线程技术,将一个核心模拟成两个线程。

public class 最大并行数 
    public static void main(String[] args) 
        //向java虚拟机返回可用处理器的数目
        //getRuntime获取运行状态,获取可用线程数
        int count=Runtime.getRuntime().availableProcessors();
        System.out.println(count);
    

为啥我的多线程并行求和函数的向量受限于线程数?

【中文标题】为啥我的多线程并行求和函数的向量受限于线程数?【英文标题】:why my Multi threaded parallel sum function's vector is limited depended to the number of threads?为什么我的多线程并行求和函数的向量受限于线程数? 【发布时间】:2020-05-13 16:06:49 【问题描述】:

我编写了一个递归求和并行函数,它获取一个数字向量、一个线程池和向量大小,它应该返回向量的总和,但是当我想使用时,如下例所示,20 个单元格大小的向量,我必须使用至少 8 个线程,否则程序将被卡住并且无法完成(并且不会抛出错误)。

这是我正在使用的线程池代码:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool 
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;

    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
;

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)

    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            
                for(;;)
                
                    std::function<void()> task;

                    
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this] return this->stop || !this->tasks.empty(); );
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    

                    task();
                
            
        );


// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>

    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

    std::future<return_type> res = task->get_future();
    
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task]() (*task)(); );
    
    condition.notify_one();
    return res;


// the destructor joins all threads
inline ThreadPool::~ThreadPool()

    
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();


#endif

这是我的求和并行函数:

int Sum_Parallelled(int *begin, ThreadPool *threadPool,int size) 
    if (size == 1) 
        return *begin;
     else 
        auto res = threadPool->enqueue(Sum_Parallelled, (begin), threadPool, size / 2);
        if (size % 2 == 0) 
            return Sum_Parallelled(begin + (size / 2), threadPool, size / 2) + res.get();
         else 
            return Sum_Parallelled(begin + (size / 2), threadPool, size / 2 + 1) + res.get();
        
    

这是主要的功能代码:

int main() 
    std::vector<int> vec;
    for(int i = 0; i < 20; i++) // creating a vector with x cells.
        vec.push_back(i);
    
    ThreadPool threadPool(8); // creating a threadpool with y threads.
    int size = vec.size();
    int sum = threadPool.enqueue(Sum_Parallelled,vec.data(),&threadPool,size).get();
    std::cout << "The sum in the parallel sum: " << sum << std::endl;
    return 0;

【问题讨论】:

请提取并提供minimal reproducible example 作为您问题的一部分。我看到了一些看起来有问题的东西,但是如果没有您的代码的更多部分,就无法判断。 我尝试调试它,但它停止工作,我不知道为什么,这就是我给出示例的原因 我相信你,但这不是重点。这里的每个人都应该能够获取您的代码,将其放入单个文件并进行编译。没有多个文件,没有修改代码,没有手动输入(这里不是问题)——代码应该可以用作您问题的示例。 【参考方案1】:

您的Sum_Parallelled 为其前半部分的新子任务排入队列并阻塞直到该任务完成

因此,如果未完成任务的数量可能超过线程数量,您将陷入死锁。

我们可以轻松写出将为给定输入创建的任务数量:

sp(20) -> sp(10) + sp(10)
sp(10) -> (sp(5) + sp(5))
sp(5)  -> (sp(2) + sp(3))

等等

所以用 size=20 调用你的函数会:

    创建两个 size=10 的任务并阻塞当前线程直到它们都返回

    状态:一个线程被阻塞等待,队列中有两个任务。

    每个 size=10 的任务将创建两个 size=5 的任务并阻塞直到它们都返回

    状态:三个线程阻塞等待,队列中有四个任务。

    每个 size=5 任务将创建一个 size=2 任务和一个 size=3 任务并阻塞直到它们都返回

    状态:七个线程阻塞等待,队列中有八个任务。

很明显,下一级将有所有可用线程等待永远无法执行的任务。

这显然也是低效的:即使我们有足够多的线程,它们大多只是被阻塞等待其他线程而不是做任何有用的事情。

幸运的是,很容易想出更好的组织方式。例如:

    只需为每个线程创建一个任务,每个任务对向量的 1/n 个线程求和,然后在所有完成后对结果求和。

    缺点:它将(部分)逻辑上移了一层,如果输入向量非常很大,单线程的最终累加可能会成为瓶颈

    优点:除非向量确实非常大,否则这个简单的方案会执行得更好。

    像现在一样在每个级别创建两个子任务,但不要等待它们的结果,只需返回一对期货。

【讨论】:

/谢谢!,你有什么推荐的方法吗?现在我有一个函数,它所做的只是对一个向量求和的循环,但我想让该函数更有效并在更短的时间内运行。而且我以前从未使用过期货,这很复杂吗? 你的线程池已经使用了期货,所以你有一个如何做的工作示例。 TBH 多线程向量求和比单线程更快的阈值相当高 - 您可以在启动线程或切换上下文所需的时间内对大量缓存友好的整数求和。【参考方案2】:

为了总结你的二十个数字,你首先启动一个任务(在一个线程中)递归地总结这二十个数字。此任务启动两个任务,每个任务合计 20 个数字的一​​半。然后它坐在那里,等待两个任务完成并最终返回结果。它一直阻塞池中的一个线程。如果你记下每个线程何时拿起一个任务并完成一个,你会看到有 20 个数字,你需要比线程更多的并行任务,因此挂起程序。

您的问题是由于使用了递归引起的,这是该工作的错误工具。考虑一棵树,它将每个任务与父任务相关联。在叶子节点,你的任务只有一个数字,在根节点,你有二十个数字的任务。现在,每个有孩子的任务都被阻止,直到两个孩子都完成。使用固定的线程池大小,您只能处理一定的树大小,因为树的大小决定了根到叶的距离,从而决定了阻塞等待子任务的线程数。如果高度太大,线程会被阻塞,等待永远不会执行的子任务,因为没有空闲线程。

【讨论】:

有什么方法可以使用,例如 - 一个向量上的 4 个线程,大小为 - 8092 个单元格? 当然,您为四个线程中的每一个分配了四分之一的范围。 解决方案建议:将所需的计算打包成一种数据结构(它只包含要合并的元素范围,以及一个输出指针)。然后总是将它们推送到一个全局的、线程安全的队列中,并使用你的线程池来处理所有线程的队列。

以上是关于多线程之最大并行数的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程之线程池

JVM之SerialNew收集器

线程同步和并发

Java多线程之线程池

Java开发之多线程的基本概念与如何避坑

Python中级精华-并发之启动和停止线程