如何以抽象函数类型制作线程池任务?

Posted

技术标签:

【中文标题】如何以抽象函数类型制作线程池任务?【英文标题】:How to make Thread Pool Tasks in abstract function type? 【发布时间】:2020-06-25 05:50:11 【问题描述】:

我正在尝试实现非常简单的 C++ 线程池。 到目前为止,我已经检查过它是否有效。 但是,我想以抽象的形式完成任务。 我已经搜索了几十篇文章,它们似乎不是我想要的。 (也许我的关键字不合适......) 目前只有

void (*)()

form 可以被接受为任务函数。 简单的代码写在下面。

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <Windows.h>

class CYSThreadPool

private:

    static std::thread** s_ppThreads;
    static long s_lThreadCount;
    static bool s_bJoin;
    static std::mutex* s_pMutexJoin;

    static std::queue<void (*)()> s_queueTasks;
    static std::mutex* s_pMutexTasks;

    CYSThreadPool()
    ~CYSThreadPool()

    static void ThreadFunction()
    
        while (true)
        
            if (!s_pMutexJoin->try_lock())
                continue;

            bool bJoin = s_bJoin;
            s_pMutexJoin->unlock();

            if (bJoin)
                break;

            if (!s_pMutexTasks->try_lock())
                continue;

            void (*fp)() = nullptr;
            if (s_queueTasks.size() > 0ull)
            
                fp = s_queueTasks.front();
                s_queueTasks.pop();
            

            s_pMutexTasks->unlock();

            if (fp != nullptr)
                fp();
        
    

public:

    enum class EResult : unsigned long
    
        Success = 0ul,
        Fail_Undefined = 1ul,
        Fail_InvalidThreadCount = 2ul,
        Fail_ArgumentNull = 3ul
    ;

    static const EResult Join()
    
        if (s_ppThreads == nullptr)
        
            if (s_lThreadCount == 0l)
                return EResult::Success;
            else
                return EResult::Fail_Undefined;
        
        else
        
            if (s_lThreadCount <= 0l)
                return EResult::Fail_Undefined;
            else
            
                s_pMutexJoin->lock();
                s_bJoin = true;
                s_pMutexJoin->unlock();

                for (long i = 0l; i < s_lThreadCount; ++i)
                
                    s_ppThreads[i]->join();
                    s_ppThreads[i] = nullptr;
                

                delete s_ppThreads;
                s_ppThreads = nullptr;
                s_lThreadCount = 0l;

                s_pMutexJoin->lock();
                s_bJoin = false;
                s_pMutexJoin->unlock();
            
        

        return EResult::Success;
    

    static const EResult CreateThreads(const long _lThreadCount)
    
        if (_lThreadCount < 0l)
            return EResult::Fail_InvalidThreadCount;

        if (Join() != EResult::Success)
            return EResult::Fail_Undefined;

        if (_lThreadCount == 0l)
            return EResult::Success;

        s_ppThreads = new std::thread*[_lThreadCount];

        for (long i = 0l; i < _lThreadCount; ++i)
            s_ppThreads[i] = new std::thread(ThreadFunction);

        s_lThreadCount = _lThreadCount;

        return EResult::Success;
    

    static const EResult AddTask(void (*_fp)())
    
        if (_fp == nullptr)
            return EResult::Fail_ArgumentNull;

        s_pMutexTasks->lock();
        s_queueTasks.push(_fp);
        s_pMutexTasks->unlock();

        return EResult::Success;
    
;

std::thread** CYSThreadPool::s_ppThreads = nullptr;
long CYSThreadPool::s_lThreadCount = 0l;
bool CYSThreadPool::s_bJoin = false;
std::mutex* CYSThreadPool::s_pMutexJoin = new std::mutex();
std::queue<void (*)()> CYSThreadPool::s_queueTasks;
std::mutex* CYSThreadPool::s_pMutexTasks = new std::mutex();

void Test0()

    for (long i = 0l; i < 100000l; ++i)
    
        std::cout << "A";
    


int main()

    CYSThreadPool::EResult eResult = CYSThreadPool::Join();
    eResult = CYSThreadPool::CreateThreads(-1l);
    eResult = CYSThreadPool::CreateThreads(1l);
    eResult = CYSThreadPool::CreateThreads(0l);
    eResult = CYSThreadPool::CreateThreads(2l);

    CYSThreadPool::AddTask(Test0);

    Sleep(1000ul);

    CYSThreadPool::Join();

    return 0;

真诚而详尽的回答将不胜感激!

编辑:

我所说的“抽象形式的函数”是指获取除 void ()() 之外的任何函数类型。 如void()(long)、void()(ID3D12Resource1、IDXGISwapChain)或HResult()(ID3D12Device6、ID3D12Resource1*、IDXGISwapChain)等开。

感谢关于使用 condition_variable 代替 try_lock 并将 fp() 包装到 try-catch 中的评论。对不起,但我不认为我明白你的意思。可以问详细点吗?

【问题讨论】:

什么是“抽象形式”?你能添加一个这个线程池应该能够处理什么任务的例子吗? 感谢 cmets、怪胎和 Timo! 使用 concurrencpp:github.com/David-Haim/concurrencppauto runtime = concurrencpp::make_runtime(); runtime-&gt;thread_pool_executor()-&gt;enqueue(...);。不要编写自己的线程池。 【参考方案1】:

所以这里有几件事可以改进你的代码:

    您可以使用std::function<void()> 代替void(*)()。这将允许您通过将它们包装在 lambda 中来调用任何可调用的,而不仅仅是函数指针,例如CYSThreadPool::AddTask([]() Add(3, 1); );。 更好的是,创建一个抽象基类,比如Task 和虚拟.Run() 方法并使用它来代替。如果您愿意,您可以从中派生出FunctionPointerTaskStdFunctionTask(显然可以更改名称)。您可以创建 ProcessResource1Task 并将必要的参数传递给构造函数,以便 .Run() 仅执行它。 您的ThreadFunction 工作人员效率非常低。你一直在try_lock 上旋转。别。请改用condition_variables。 更好的是,将排队的东西移到单独的线程安全队列类中。 您的代码不是异常安全的。不要使用手动锁定/try_lock,而是使用lock_guard 或unique_lock。因为它是你的代码很容易死锁。如果您必须使用手动锁定/try_lock,那么您必须使用 try-catch。 很多静态变量。别。将它们全部设为非静态成员,添加适当的构造函数和析构函数。您的线程池一旦加入就完全无法使用。另外,Join() 不是线程安全的(如果线程池不是完全静态的,那就没那么重要了)。 您可以通过创建bool s_bJoin atomic 来提高性能。没有必要在锁下读/写它。

可能还有其他一些问题,这就是我现在能看到的。

您还可以查看this code 以获得不错的线程池实现。它很短,并利用了上述大部分提示。

【讨论】:

哇,有很多东西要修复!感谢您的详细说明!我似乎需要一两天的时间来搜索并理解您所指出的内容。这真的很有帮助!

以上是关于如何以抽象函数类型制作线程池任务?的主要内容,如果未能解决你的问题,请参考以下文章

关于线程池的工作队列类型

c++线程池std::promise和函数类型错误

如何暂停线程池、运行函数和恢复?

如何制作一个以用户定义的表类型为参数并在sql中返回相同的函数?

java线程池如何合理的设置大小

java中ExecutorService的线程池如何暂停所有的任务和继续所有的任务? 有这样的函数吗?