如何在 C++ 中并行化一个 for 循环,只创建一次线程池
Posted
技术标签:
【中文标题】如何在 C++ 中并行化一个 for 循环,只创建一次线程池【英文标题】:How to parallelize a for loop in C++ creating the thread pool only once 【发布时间】:2020-03-25 16:17:18 【问题描述】:我有一个必须在 Windows 中运行的 C++ 程序。
我有一个游戏,在函数WinMain
中有一个主循环,它在每次迭代中调用Update
函数。我想将多线程应用到更新函数的循环中。
int __stdcall WinMain()
// Windows initializations
// Main loop
Game game = new Game();
bool bExit = false;
while (!bExit)
MSG msg;
while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
if (msg.message == WM_QUIT) bExit = true;
TranslateMessage(&msg);
DispatchMessage(&msg);
game->Update();
delete game;
game = nullptr;
// Windows destructions
return 0;
// Game.cpp
void Game::Update()
// Loop that I want to be parallelized but I don't want to create the thread pool here because it's called on every frame
for (size_t i = 0; i < entities.size(); i++)
entities[i]->Update();
我尝试使用 OpenMP,但我无法将 #pragma omp parallel
放在 WinMain
函数中(应用程序会崩溃),如果我在循环之前将 #pragma omp parallel for
放在 Game::Update
中,它实际上会降低性能,因为它在每一帧中创建线程循环。
我正在寻找一种基于库的解决方案,或者最好是本机解决方案,它可以让我轻松地并行化这个循环。
唯一的要求是它可以在 Windows 中运行,我宁愿不使用库(比如 boost,虽然 OpenMP 很好)。
编辑:我让它与 PPL 的 concurrency::parallel_for
一起工作。它不会降低性能,但也不会提高性能...我对此的问题是:此函数是否在每次调用时创建和销毁其线程池?
【问题讨论】:
你看过原生的 Windows 线程池吗? @ZuodianHu 线程管理的抽象是首选,因为我没有Windows线程API的经验,我今天需要做这个项目。 我知道你说 boost 不是首选,但它在 Boost.Asio 中有一个非常简单的仅标头线程池。 Boost.Asio 还允许您使用 io_context 轻松编写自己的池。除此之外,我不知道任何不需要您编写自己的线程安全队列的快速非库解决方案 @ZuodianHu 只是因为Boost需要安装和学习库。如果你能详细说明一个简单的例子,那么我可能会尝试一下。 @ZuodianHu 谢谢你的例子。我尝试了一个简单的concurrency::parallel_for
,它成功了。但是我仍然不知道为它创建的线程池在哪里。
【参考方案1】:
根据我们在 cmets 中的讨论提供一个 Boost.Asio 示例:
#include <thread>
#include <boost/asio.hpp>
namespace asio = boost::asio;
int main( int argc, int* argv[] )
asio::io_context context;
asio::post( context, []() /* any arbitrary job */ );
// this donates the new_thread to the thread pool
std::thread new_thread [&]() context.run(); ;
// this donates the current thread to the thread pool
context.run();
这里需要注意的主要事项是:
asio::post
允许您向io_context
提交任意作业
从线程调用 io_context::run
会将线程捐赠给运行上下文的线程池
如果你想要一个预建的线程池,你可以使用boost::asio::thread_pool
,并调用asio::post
以同样的方式将作业放到线程池中。
您应该只需要下载 Boost,不需要安装它。如果您将BOOST_ERROR_CODE_HEADER_ONLY
添加到您的编译中,Boost.Asio 完全是仅标题。
编辑:
您仍然需要运行 Boost 的设置脚本,但您不需要构建任何库。
【讨论】:
【参考方案2】:这里有一个基本线程库的解决方案。我模拟了您的 Entity
和 Game
课程。
请注意,在此解决方案中,工作线程会在开始时创建并启动一次。他们将在每个Update
电话中被调用。当Update
未被调用时,线程休眠...
我已尽力保留您程序的架构。 请注意,我们可以使用 std::thread、std::mutex 的另一种实现......但我只是想给你一个想法......
#define NB_ENTITIES 10
class CEntity
public:
void Update();
~CEntity ()
;
typedef struct ThreadData
HANDLE hMutex;
HANDLE hMutexDestructor;
CEntity *pCEntity;
DWORD *dwStat;
ThreadData;
//------------------------------------
// This function will call "Upadate"
//------------------------------------
DWORD WINAPI MyThreadFunction( LPVOID lpParam )
ThreadData *ThreadDatpa = (ThreadData*) lpParam;
CEntity *pEntity = ThreadDatpa->pCEntity;
HANDLE hMutex = ThreadDatpa->hMutex;
HANDLE hMutexDestructor = ThreadDatpa->hMutexDestructor;
DWORD *dwStat = ThreadDatpa->dwStat;
while (true)
// When no update, thread sleep ... 0% CPU ...
WaitForSingleObject(hMutex, INFINITE);
if ( 0 == *dwStat ) break; // here thread stat for stopping
if ( nullptr != pEntity )
pEntity->Update(); // Call your unpdate function ...
// Each worker thread must release it semaphore.
// Destructor must get ALL released semaphore before deleting memory
ReleaseSemaphore(hMutexDestructor, 1, NULL );
return 0;
class Game
public :
vector<ThreadData*> entities; // Vector of entities pointers
vector<HANDLE> thread_group; // vector of threads handle
//This function must called ONE time at the beginning (at init)
void StartThreads ()
DWORD dwRet = 0;
HANDLE hTemp = NULL;
for (size_t i = 0; i <NB_ENTITIES; i++)
CEntity *pCEntity = new CEntity (); // just to simulate entity
// This semaphore is used to release thread when update is called
HANDLE ghMutex= CreateSemaphore( NULL,0, 1, NULL);
// This semaphore is used when destruction to check if all threads is terminated
HANDLE ghMutexDestructor= CreateSemaphore( NULL,0, 1, NULL);
// create a new CEntity data ...
ThreadData *pThreadData = new ThreadData ();
pThreadData->pCEntity = pCEntity;
pThreadData->hMutex = ghMutex;
pThreadData->hMutexDestructor = ghMutexDestructor;
pThreadData->dwStat = new DWORD (1); // default status = 1
entities.push_back ( pThreadData );
// Here we start ONE time Threads worker.
// Threads are stopped untile update was called
for (size_t i = 0; i < entities.size(); i++)
// Each thread has it own entity
hTemp = CreateThread( NULL,0, MyThreadFunction, entities.at(i), 0, &dwRet);
if ( NULL != hTemp )
thread_group.push_back (hTemp);
// Your function update juste wakeup threads
void Game::Update()
for (size_t i = 0; i < entities.size(); i++)
HANDLE hMutex = entities.at(i)->hMutex;
if ( NULL != hMutex )
ReleaseSemaphore(hMutex, 1, NULL );
~Game()
// Modifie stat before releasing threads
for (size_t i = 0; i < entities.size(); i++)
*(entities.at(i)->dwStat) = 0;
// Release threads (status =0 so break ...)
Update();
// This can be replaced by waitformultipleobjects ...
for (size_t i = 0; i < entities.size(); i++)
WaitForSingleObject ( entities.at(i)->hMutexDestructor, INFINITE);
// Now i'm sur that all threads are terminated
for (size_t i = 0; i < entities.size(); i++)
delete entities.at(i)->pCEntity;
delete entities.at(i)->dwStat;
CloseHandle (entities.at(i)->hMutex);
CloseHandle (entities.at(i)->hMutexDestructor);
delete entities.at(i);
;
【讨论】:
以上是关于如何在 C++ 中并行化一个 for 循环,只创建一次线程池的主要内容,如果未能解决你的问题,请参考以下文章
使用 OpenMP 在 C、C++ 中并行化嵌套 for 循环的几种方法之间的区别