如何将矩阵提升到具有多个线程的幂?

Posted

技术标签:

【中文标题】如何将矩阵提升到具有多个线程的幂?【英文标题】:How can I raise a matrix to a power with multiple threads? 【发布时间】:2015-04-10 11:25:56 【问题描述】:

我正在尝试将矩阵提升到具有多个线程的幂,但我不太擅长线程。另外我从键盘输入线程数,该数字在 [1,矩阵高度] 范围内,然后我执行以下操作:

unsigned period = ceil((double)A.getHeight() / threadNum);
unsigned prev = 0, next = period;
for (unsigned i(0); i < threadNum; ++i) 
        threads.emplace_back(&power<long long>, std::ref(result), std::ref(A), std::ref(B), prev, next, p);

        if (next + period > A.getHeight()) 
            prev = next;
            next = A.getHeight();
        
        else 
            prev = next;
            next += period;
        
    

我很容易用多个线程将一个矩阵乘以另一个矩阵,但这里的问题是,一旦完成了 1 步,例如我需要将 A 提高到 3 的幂,A^2 就是那个一步,在那一步之后,我必须等待所有线程完成,然后再继续执行 A^2*A.我怎样才能让我的线程等待呢?我正在使用 std::thread 的。

在发布第一个回复后,我意识到我忘了提及我只想创建一次这些线程,而不是为每个乘法步骤重新创建它们。

【问题讨论】:

顺便说一句,线性乘法将不必要地低效。尝试计算A^7 = A * A^2 * A^4,其中A^4 = A^2^2。这需要 4 次乘法而不是 7 天真的方式。一方面继续平方,然后根据指数的位将平方乘以结果,完成后停止平方。 我知道这一切,但是,这更像是一个线程问题,而不是矩阵,谢谢。 为什么只创建一次线程? @Hurkyl,因为我可以,哈哈。 @Hurkyl 创建线程并不便宜。 【参考方案1】:

我建议使用condition_variable。

算法是这样的:

    将矩阵拆分为 N 个部分用于 N 个线程。

    每个线程计算一次乘法所需的结果子矩阵。

    然后它使用fetch_add 递增一个原子threads_finished 计数器并等待共享条件变量。

    最后一个线程完成(fetch_add()+1 == 线程计数),通知所有线程,它们现在可以继续处理。

    利润。

编辑: 以下是如何停止线程的示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <algorithm>
#include <atomic>

void sync_threads(std::condition_variable & cv, std::mutex & mut, std::vector<int> & threads, const int idx) 
    std::unique_lock<std::mutex> lock(mut);
    threads[idx] = 1; 
    if(std::find(threads.begin(),threads.end(),0) == threads.end()) 
        for(auto & i: threads)
            i = 0;
        cv.notify_all();
     else 
        while(threads[idx])
            cv.wait(lock);
    


int main()

    std::vector<std::thread> threads;

    std::mutex mut;
    std::condition_variable cv;

    int max_threads = 10;
    std::vector<int> thread_wait(max_threads,0);

    for(int i = 0; i < max_threads; i++) 
        threads.emplace_back([&,i]()
                std::cout << "Thread "+ std::to_string(i)+" started\n";
                sync_threads(cv,mut,thread_wait,i);
                std::cout << "Continuing thread " + std::to_string(i) + "\n";
                sync_threads(cv,mut,thread_wait,i);
                std::cout << "Continuing thread for second time " + std::to_string(i) + "\n";

            );
    

    for(auto & i: threads)
        i.join();

有趣的部分在这里:

void sync_threads(std::condition_variable & cv, std::mutex & mut, std::vector<int> & threads, const int idx) 
    std::unique_lock<std::mutex> lock(mut); // Lock because we want to modify cv
    threads[idx] = 1; // Set my idx to 1, so we know we are sleeping
    if(std::find(threads.begin(),threads.end(),0) == threads.end()) 
        // I'm the last thread, wake up everyone
        for(auto & i: threads)
            i = 0;
        cv.notify_all();
     else  //I'm not the last thread - sleep until all are finished
        while(threads[idx]) // In loop so, if we wake up unexpectedly, we go back to sleep. (Thanks for pointing that out Yakk)
            cv.wait(lock);
    

【讨论】:

计数器不需要原子。 condition_variable 始终与保护相关条件的mutex 配对,在本例中为计数器。对条件的任何修改都必须在互斥锁被锁定时进行,否则您将在程序中出现竞争条件。 你能把这个写成代码吗?我不知道如何正确使用所有这些,第一步和第二步我理解,第三步和第四步我很困惑,尤其是关于原子计数器。不要写完整的东西,只是告诉我如何在每个乘法步骤之前使用原子计数器暂停线程。 但是这是在等待其他线程被创建,而不是等待其他线程完成它们的工作。 @paulpaul1076 你是什么意思 - 对我来说,它首先执行所有“启动线程”行,然后才执行“继续线程”行,然后才执行“第二次继续线程”。跨度> @UldisK 所以,等等,cv.wait(lock) 是仅适用于当前线程还是也适用于我完成后创建的线程?【参考方案2】:

这是mass_thread_pool

// launches n threads all doing task F with an index:
template<class F>
struct mass_thread_pool 
  F f;
  std::vector< std::thread > threads;
  std::condition_variable cv;
  std::mutex m;
  size_t task_id = 0;
  size_t finished_count = 0;
  std::unique_ptr<std::promise<void>> task_done;
  std::atomic<bool> finished;

  void task( F f, size_t n, size_t cur_task ) 
    //std::cout << "Thread " << n << " launched" << std::endl;
    do 
      f(n);
      std::unique_lock<std::mutex> lock(m);

      if (finished)
        break;

      ++finished_count;
      if (finished_count == threads.size())
      
        //std::cout << "task set finished" << std::endl;
        task_done->set_value();
        finished_count = 0;
      
      cv.wait(lock,[&]if (finished) return true; if (cur_task == task_id) return false; cur_task=task_id; return true;);
     while(!finished);
    //std::cout << finished << std::endl;
    //std::cout << "Thread " << n << " finished" << std::endl;
  

  mass_thread_pool() = delete;
  mass_thread_pool(F fin):f(fin),finished(false) 
  mass_thread_pool(mass_thread_pool&&)=delete; // address is party of identity

  std::future<void> kick( size_t n ) 
    //std::cout << "kicking " << n << " threads off.  Prior count is " << threads.size() << std::endl;
    std::future<void> r;
    
      std::unique_lock<std::mutex> lock(m);
      ++task_id;
      task_done.reset( new std::promise<void>() );
      finished_count = 0;
      r = task_done->get_future();
      while (threads.size() < n) 
        size_t i = threads.size();
        threads.emplace_back( &mass_thread_pool::task, this, f, i, task_id );
      
      //std::cout << "count is now " << threads.size() << std::endl;
    
    cv.notify_all();
    return r;
  
  ~mass_thread_pool() 
    //std::cout << "destroying thread pool" << std::endl;
    finished = true;
    cv.notify_all();
    for (auto&& t:threads) 
      //std::cout << "joining thread" << std::endl;
      t.join();
    
    //std::cout << "destroyed thread pool" << std::endl;
  
;

你用一个任务构建它,然后你 kick(77) 启动该任务的 77 个副本(每个副本都有不同的索引)。

kick 返回一个std::future&lt;void&gt;。您必须等待所有任务完成。

然后你可以要么销毁线程池,要么再次调用kick(77)重新启动任务。

这个想法是,您传递给mass_thread_pool 的函数对象可以访问您的输入和输出数据(例如,您要相乘的矩阵或指向它们的指针)。每个kick 都会导致它为每个索引调用一次您的函数。您负责将索引转换为任何偏移量。

Live example 我用它来为另一个vector 中的条目加1。在迭代之间,我们交换向量。这会进行 2000 次迭代,并启动 10 个线程,并调用 lambda 20000 次。

注意auto&amp;&amp; pool = make_pool( lambda ) 位。需要使用auto&amp;&amp;——因为线程池有指向自身的指针,所以我在大量线程池上禁用了移动和复制构造。如果您确实需要传递它,请创建一个指向线程池的唯一指针。

我在std::promise 重置时遇到了一些问题,所以我将它包装在一个unique_ptr 中。这可能不是必需的。

我用来调试它的跟踪语句被注释掉了。

用不同的n 调用kick 可能会也可能不会。绝对用较小的n 调用它不会像您期望的那样工作(在这种情况下它将忽略n)。

在您致电kick 之前,不会进行任何处理。 kick 是“开球”的缩写。

...

对于您的问题,我要做的是创建一个拥有mass_thread_pool 的多重对象。

乘数有一个指向 3 个矩阵的指针(about)。 n 个子任务中的每一个都会生成out 的一些小节。

您将 2 个矩阵传递给乘法器,它将指向 out 的指针设置为本地矩阵,并将 ab 设置为传入的矩阵,执行 kick,然后等待,然后返回局部矩阵。

对于幂,您使用上面的乘数来构建一个二的幂的塔,同时根据指数的位进行乘法累加到您的结果中(再次使用上面的乘数)。

上述更高级的版本可以允许乘法和std::future&lt;Matrix&gt;s(以及未来矩阵的乘法)排队。

【讨论】:

【参考方案3】:

我将从一个简单的分解开始:

矩阵乘法获得多线程 矩阵指数多次调用乘法。

类似的东西:

Mat multithreaded_multiply(Mat const& left, Mat const& right) ...

Mat power(Mat const& M, int n)

    // Handle degenerate cases here (n = 0, 1)

    // Regular loop
    Mat intermediate = M;
    for (int i = 2; i <= n; ++i) 
    
        intermediate = multithreaded_multiply(M, intermediate);
    

等待std::thread,你有methodjoin()

【讨论】:

但这意味着我每次都必须在 multithreaded_multiply 中重新创建这些线程,是否可以在 main 中创建线程并以某种方式重用它们,直到我得到结果? 在这种情况下,让每个线程专门计算一条线(甚至是给定像素)的结果,并让每个线程计算幂结果。 是的,但话又说回来,我的线程怎么会知道它已经完成了一个乘法步骤并等待其他线程完成相同的一个乘法步骤?这是我的主要问题。 我已经编辑了代码,看看 for(;;) join();循环。 但这就是力量,你的列会被改变,如果不按顺序完成,你的结果就会出错。【参考方案4】:

不是编程而是数学答案:对于每个方阵,都有一组所谓的“特征值”和“特征向量”,因此 M * E_i = lambda_i * E_i。 M是矩阵,E_i是特征向量,lambda_i是特征值,只是一个复数。所以 M^n * E_i = lambda_i^n *E_i。所以你只需要复数的 n 次方而不是矩阵。特征向量是正交的,即任何向量 V = sum_i a_i * E_i。所以 M^n * V = sum_i a_i lambda^n E_i。 根据您的问题,这可能会显着加快速度。

【讨论】:

谢谢,但我不明白这与我的问题有什么关系。

以上是关于如何将矩阵提升到具有多个线程的幂?的主要内容,如果未能解决你的问题,请参考以下文章

scipy.sparse 矩阵的元素功率

如何计算一个矩阵的幂

如何将C / C ++库代码封装为可在具有多个实例的单独线程中运行?

矩阵的幂

矩阵的幂

具有多个输入矩阵的块处理