如何在 C++ 中并行化一个 for 循环,只创建一次线程池

Posted

技术标签:

【中文标题】如何在 C++ 中并行化一个 for 循环,只创建一次线程池【英文标题】:How to parallelize a for loop in C++ creating the thread pool only once 【发布时间】:2020-03-25 16:17:18 【问题描述】:

我有一个必须在 Windows 中运行的 C++ 程序。

我有一个游戏,在函数WinMain 中有一个主循环,它在每次迭代中调用Update 函数。我想将多线程应用到更新函数的循环中。

int __stdcall WinMain()

    // Windows initializations

    // Main loop
    
        Game game = new Game();
        bool bExit = false;
        while (!bExit)
        
            MSG msg;
            while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
            
                if (msg.message == WM_QUIT) bExit = true;
                TranslateMessage(&msg);
                DispatchMessage(&msg);
            

            game->Update();
        
        delete game;
        game = nullptr;
    

    // Windows destructions

    return 0;

// Game.cpp

void Game::Update() 
    // Loop that I want to be parallelized but I don't want to create the thread pool here because it's called on every frame
    for (size_t i = 0; i < entities.size(); i++) 
        entities[i]->Update();
    

我尝试使用 OpenMP,但我无法将 #pragma omp parallel 放在 WinMain 函数中(应用程序会崩溃),如果我在循环之前将 #pragma omp parallel for 放在 Game::Update 中,它实际上会降低性能,因为它在每一帧中创建线程循环。

我正在寻找一种基于库的解决方案,或者最好是本机解决方案,它可以让我轻松地并行化这个循环。

唯一的要求是它可以在 Windows 中运行,我宁愿不使用库(比如 boost,虽然 OpenMP 很好)。

编辑:我让它与 PPL 的 concurrency::parallel_for 一起工作。它不会降低性能,但也不会提高性能...我对此的问题是:此函数是否在每次调用时创建和销毁其线程池?

【问题讨论】:

你看过原生的 Windows 线程池吗? @ZuodianHu 线程管理的抽象是首选,因为我没有Windows线程API的经验,我今天需要做这个项目。 我知道你说 boost 不是首选,但它在 Boost.Asio 中有一个非常简单的仅标头线程池。 Boost.Asio 还允许您使用 io_context 轻松编写自己的池。除此之外,我不知道任何不需要您编写自己的线程安全队列的快速非库解决方案 @ZuodianHu 只是因为Boost需要安装和学习库。如果你能详细说明一个简单的例子,那么我可能会尝试一下。 @ZuodianHu 谢谢你的例子。我尝试了一个简单的concurrency::parallel_for,它成功了。但是我仍然不知道为它创建的线程池在哪里。 【参考方案1】:

根据我们在 cmets 中的讨论提供一个 Boost.Asio 示例:

#include <thread>

#include <boost/asio.hpp>

namespace asio = boost::asio;

int main( int argc, int* argv[] ) 
    asio::io_context context;

    asio::post( context, []()  /* any arbitrary job */  );

    // this donates the new_thread to the thread pool
    std::thread new_thread [&]() context.run();  ;

    // this donates the current thread to the thread pool
    context.run();

这里需要注意的主要事项是:

    asio::post 允许您向io_context 提交任意作业 从线程调用 io_context::run 会将线程捐赠给运行上下文的线程池

如果你想要一个预建的线程池,你可以使用boost::asio::thread_pool,并调用asio::post以同样的方式将作业放到线程池中。

您应该只需要下载 Boost,不需要安装它。如果您将BOOST_ERROR_CODE_HEADER_ONLY 添加到您的编译中,Boost.Asio 完全是仅标题。

编辑:

您仍然需要运行 Boost 的设置脚本,但您不需要构建任何库。

【讨论】:

【参考方案2】:

这里有一个基本线程库的解决方案。我模拟了您的 EntityGame 课程。 请注意,在此解决方案中,工作线程会在开始时创建并启动一次。他们将在每个Update 电话中被调用。当Update未被调用时,线程休眠...

我已尽力保留您程序的架构。 请注意,我们可以使用 std::thread、std::mutex 的另一种实现......但我只是想给你一个想法......

#define NB_ENTITIES 10

class CEntity

public:
    void Update();
    ~CEntity () 
;

typedef struct ThreadData

    HANDLE  hMutex;
    HANDLE  hMutexDestructor;
    CEntity *pCEntity;  
    DWORD   *dwStat;

 ThreadData;

//------------------------------------
// This function will call "Upadate"
//------------------------------------
DWORD WINAPI MyThreadFunction( LPVOID lpParam )

    ThreadData *ThreadDatpa      = (ThreadData*) lpParam;
    CEntity    *pEntity          = ThreadDatpa->pCEntity;
    HANDLE     hMutex            = ThreadDatpa->hMutex;
    HANDLE     hMutexDestructor  = ThreadDatpa->hMutexDestructor;
    DWORD      *dwStat           = ThreadDatpa->dwStat;

    while (true)
    
        // When no update, thread sleep ... 0% CPU ...
        WaitForSingleObject(hMutex, INFINITE);
        if ( 0 == *dwStat ) break; // here thread stat for stopping 

        if ( nullptr != pEntity ) 
            pEntity->Update(); // Call your unpdate function ...
    

    // Each worker thread must release it semaphore.
    // Destructor must get ALL released semaphore before deleting memory
    ReleaseSemaphore(hMutexDestructor, 1, NULL );

    return 0;


class Game 

public :
    vector<ThreadData*>  entities; // Vector of entities pointers
    vector<HANDLE>   thread_group; // vector of threads handle


    //This function must called ONE time at the beginning (at init)
    void StartThreads ()
    
        DWORD  dwRet = 0;
        HANDLE hTemp = NULL;

        for (size_t i = 0; i <NB_ENTITIES; i++) 
        
            CEntity *pCEntity = new CEntity (); // just to simulate entity

            // This semaphore is used to release thread when update is called
            HANDLE ghMutex= CreateSemaphore( NULL,0, 1, NULL); 

            // This semaphore is used when destruction to check if all threads is terminated
            HANDLE ghMutexDestructor= CreateSemaphore( NULL,0, 1, NULL);

            // create a new CEntity data ...
            ThreadData *pThreadData = new ThreadData ();

            pThreadData->pCEntity = pCEntity;
            pThreadData->hMutex   = ghMutex;
            pThreadData->hMutexDestructor = ghMutexDestructor;
            pThreadData->dwStat   = new DWORD (1); // default status = 1 

            entities.push_back ( pThreadData );         
        

        // Here we start ONE time Threads worker.
        // Threads are stopped untile update was called
        for (size_t i = 0; i < entities.size(); i++) 
        
            // Each thread has it own entity
            hTemp = CreateThread( NULL,0, MyThreadFunction, entities.at(i), 0, &dwRet);  
            if ( NULL != hTemp )
                thread_group.push_back (hTemp);
        

    

    // Your function update juste wakeup threads
    void Game::Update() 
        for (size_t i = 0; i < entities.size(); i++) 
        
            HANDLE hMutex = entities.at(i)->hMutex;
            if ( NULL != hMutex )
                ReleaseSemaphore(hMutex, 1, NULL );
        
    


    ~Game()
    
        // Modifie stat before releasing threads
        for (size_t i = 0; i < entities.size(); i++) 
            *(entities.at(i)->dwStat) = 0;

        // Release threads (status =0 so break ...)
        Update();

        // This can be replaced by waitformultipleobjects ...
        for (size_t i = 0; i < entities.size(); i++)
            WaitForSingleObject ( entities.at(i)->hMutexDestructor, INFINITE);

        // Now i'm sur that all threads are terminated
        for (size_t i = 0; i < entities.size(); i++)
        
            delete entities.at(i)->pCEntity;
            delete entities.at(i)->dwStat;
            CloseHandle (entities.at(i)->hMutex);
            CloseHandle (entities.at(i)->hMutexDestructor);
            delete entities.at(i);
        

    
;

【讨论】:

以上是关于如何在 C++ 中并行化一个 for 循环,只创建一次线程池的主要内容,如果未能解决你的问题,请参考以下文章

使用 OpenMP 在 C、C++ 中并行化嵌套 for 循环的几种方法之间的区别

非for循环的OpenMP并行化

如何使用CUDA并行化嵌套for循环以在2D数组上执行计算

如何在工作线程中重用主线程创建的OMP线程池?

如何在异步函数中并行化 for 循环并跟踪 for 循环执行状态?

在 Python 中通过线程/核心/节点并行化 for 循环