从向量中删除已完成的线程
Posted
技术标签:
【中文标题】从向量中删除已完成的线程【英文标题】:Remove finished threads from vector 【发布时间】:2018-08-03 23:28:25 【问题描述】:我有许多作业,我想并行运行其中的一部分。例如。我有 100 个作业要运行,我想一次运行 10 个线程。这是我当前解决此问题的代码:
#include <thread>
#include <vector>
#include <iostream>
#include <atomic>
#include <random>
#include <mutex>
int main()
constexpr std::size_t NUMBER_OF_THREADS(10);
std::atomic<std::size_t> numberOfRunningJobs(0);
std::vector<std::thread> threads;
std::mutex maxThreadsMutex;
std::mutex writeMutex;
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(0, 2);
for (std::size_t id(0); id < 100; ++id)
if (numberOfRunningJobs >= NUMBER_OF_THREADS - 1)
maxThreadsMutex.lock();
++numberOfRunningJobs;
threads.emplace_back([id, &numberOfRunningJobs, &maxThreadsMutex, &writeMutex, &distribution, &generator]()
auto waitSeconds(distribution(generator));
std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));
writeMutex.lock();
std::cout << id << " " << waitSeconds << std::endl;
writeMutex.unlock();
--numberOfRunningJobs;
maxThreadsMutex.unlock();
);
for (auto &thread : threads)
thread.join();
return 0;
在 for 循环中,我检查有多少作业正在运行,如果一个插槽空闲,我将一个新线程添加到向量中。在每个线程结束时,我会减少正在运行的作业数量并解锁互斥锁以启动一个新线程。这解决了我的任务,但有一点我不喜欢。我需要一个大小为 100 的向量来存储所有线程,并且我需要在最后加入所有 100 个线程。我想在完成后从向量中删除每个线程,以便向量包含最多 10 个线程,并且我必须在最后加入 10 个线程。我考虑通过引用 lambda 来传递向量和迭代器,以便我可以在最后删除元素,但我不知道如何。如何优化我的代码以在向量中使用最多 10 个元素?
【问题讨论】:
考虑建立一个线程池并重用现有线程。 请注意,在锁定maxThreadsMutex
时解锁会产生未定义的行为
为什么要保留所有线程?由于您不重复使用,我认为您可以简单地将其分离并等待一些完成信号。
或者你可以简单地等待最后 10 个线程。
@LWimsey:你的意思是在未锁定的情况下解锁?
【参考方案1】:
由于您似乎不需要非常细粒度的线程控制,我建议您使用 OpenMP 来解决这个问题。 OpenMP 是一种基于行业标准指令的方法,用于并行化 C、C++ 和 FORTRAN 代码。这些语言的每个主要编译器都实现了它。
使用它可以显着降低代码的复杂性:
#include <iostream>
#include <random>
int main()
constexpr std::size_t NUMBER_OF_THREADS(10);
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(0, 2);
//Distribute the loop between threads ensuring that only
//a specific number of threads are ever active at once.
#pragma omp parallel for num_threads(NUMBER_OF_THREADS)
for (std::size_t id(0); id < 100; ++id)
#pragma omp critical //Serialize access to generator
auto waitSeconds(distribution(generator));
std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));
#pragma omp critical //Serialize access to cout
std::cout << id << " " << waitSeconds << std::endl;
return 0;
要使用您编译的 OpenMP:
g++ main.cpp -fopenmp
有时需要生成和直接协调线程,但大量旨在简化并行性的新语言和库说明了使用更简单的并行路径就足够了的用例数量。
【讨论】:
【参考方案2】:关键字“线程池”对我帮助很大。我尝试了 boost::asio::thread_pool ,它以与我的第一种方法相同的方式完成我想要的操作。我解决了我的问题
#include <thread>
#include <iostream>
#include <atomic>
#include <random>
#include <mutex>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/post.hpp>
int main()
boost::asio::thread_pool threadPool(10);
std::mutex writeMutex;
std::default_random_engine generator;
std::uniform_int_distribution<int> distribution(0, 2);
std::atomic<std::size_t> currentlyRunning(0);
for (std::size_t id(0); id < 100; ++id)
boost::asio::post(threadPool, [id, &writeMutex, &distribution, &generator, ¤tlyRunning]()
++currentlyRunning;
auto waitSeconds(distribution(generator));
writeMutex.lock();
std::cout << "Start: " << id << " " << currentlyRunning << std::endl;
writeMutex.unlock();
std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));
writeMutex.lock();
std::cout << "Stop: " << id << " " << waitSeconds << std::endl;
writeMutex.unlock();
--currentlyRunning;
);
threadPool.join();
return 0;
【讨论】:
以上是关于从向量中删除已完成的线程的主要内容,如果未能解决你的问题,请参考以下文章
在第一个线程的 wait_until(20 秒) 之后,在多个线程中完成的向量 push_backs(20 秒内)不显示正确的向量大小