可以重用线程来运行可变参数函数吗?

Posted

技术标签:

【中文标题】可以重用线程来运行可变参数函数吗?【英文标题】:Can a thread be reused to run variadic functions? 【发布时间】:2013-03-06 01:37:02 【问题描述】:

我正在尝试在 C++ 中创建一个 boost 线程,该线程可以重复用于运行具有不同数量和类型 args 的各种函数。

这可以用 C++11x 可变参数来完成吗?

在我的用例中,我不需要队列(如果线程很忙,那么方法就会失败),但如果需要实现这个“统一”功能,我会很不情愿地这样做。

我不明白如何使用 bind 或 lambda 处理统一,以便一个线程可以调用不同的函数,每个函数都有自己的不同数量和类型的 args。

我的想法大致如下:

class WorkThread

public:
   WorkThread()
       
       // create thread and bind runner to thread
       

   ~WorkThread()
       
       // tell runner to exit and wait and reap the thread
       

   template<typename F,typename ... Arguments>
   void doWork(F func, Arguments... args)
       
       if already busy
           return false;
       // set indication to runner that there is new work
       // here: how to pass f and args to runner?
       

private:
   void runner()
        
        while ( ! time to quit )
            
            wait for work
            // here: how to get f and args from doWork? do I really need a queue? could wait on a variadic signal maybe?
            f(args);
            
        

   boost::thread* m_thread;
;

class ThreadPool

public:
    template<typename F, typename ... Arguments>
    bool   doWork(F func,Arguments... args)
          
          const int i = findAvailableWorkThread();
          m_thread[i].doWork(f,args);
          
private:
   // a pool of work threads m_thread;
;

【问题讨论】:

【参考方案1】:

应该有很多现有的问题来说明如何做到这一点。

表示任意函数对象的规范 C++11 方法是 std::function&lt;void()&gt;,因此您需要该类型的共享对象,在下面的代码中称为 m_job,它应该受互斥体保护,并且您在尚未设置时为其分配新工作:

template<typename F,typename ... Arguments>
bool doWork(F func, Arguments&&... args)

    std::lock_guard<std::mutex> l(m_job_mutex);
    if (m_job)
        return false;
    m_job = std::bind(func, std::forward<Arguments>(args)...);
    m_job_cond.notify_one();  
    return true;   

这使用std::bind 将函数对象及其参数转换为不带参数的函数对象。 std::bind 返回的可调用对象存储了func 和每个参数的副本,并在调用时调用func(args...)

然后工人只是这样做:

void runner()

    while ( ! time to quit )
    
        std::function<void()> job;
        
            std::unique_lock<std::mutex> l(m_job_mutex);
            while (!m_job)
                m_job_cond.wait(l);
            swap(job, m_job);
        
        job();
    

此代码不是线程安全的:

template<typename F, typename ... Arguments>
    bool   doWork(F func,Arguments... args)
    
        const int i = findAvailableWorkThread();
        m_thread[i].doWork(f,args);
    

findAvailableWorkThread 返回后,该线程可能会变得忙碌,因此下一行将失败。您应该检查可用性并在单个操作中传递新作业,例如

template<typename F, typename ... Arguments>
    bool   doWork(F func,Arguments... args)
    
        for (auto& t : m_thread)
            if (t.doWork(f,args))
                return true;
        return false;
    

【讨论】:

【参考方案2】:

您可以使用boost::bind 或C++11 lambda 将所有函数统一为一个签名。因此,您将拥有一个带有循环函数的线程,并且在该循环内您将提取要执行的函数。 例如,您可以使用boost::lockfree::queue&lt;boost::function&lt;void()&gt;&gt; 来实现它。

作为这个想法的一个例子,您可以使用以下内容:

class TaskLoop

public:
    typedef std::function<void()> Task_t;
public:
    TaskLoop():
        m_IsDone(false)
    
        m_spThread.reset(new std::thread(&TaskLoop::_run, this));
    
    ~TaskLoop()
    
        Task_t task = [this]()m_IsDone = true;;
        postTask(task);
        m_spThread->join();
    
    void postTask(const Task_t& Msg)
    
        std::lock_guard<std::mutex> lock(m_Mutex);
        m_Tasks.push(Msg);
    
    void wait() 
    
        while(!m_Tasks.empty());
    
private:
    bool m_IsDone;
    std::unique_ptr<std::thread> m_spThread; 
    std::mutex m_Mutex;
    std::queue<Task_t> m_Tasks;
private:
    void _run()
    
        while(!m_IsDone)
        
            Task_t task;
            m_Mutex.lock();
            if(!m_Tasks.empty())
            
                task = m_Tasks.front();
                m_Tasks.pop();
            
            m_Mutex.unlock();
            if(task)
                task();
        
    
;

void foo(const std::string& first, int second)

    std::cout << first << second << "\n";


int main(int argc, char **argv)

    TaskLoop loop;
    loop.postTask([]foo("task", 0););
    loop.wait();
    return 0;

示例不使用并发队列并且非常简单,因此您需要替换队列并使其适应您的要求

【讨论】:

不是真的:boost::function 不符合boost::lockfree::queue 的要求 如何将函数、它的返回类型和 arg 打包到队列中?我已经看到提到为此使用元组,那么建议队列将 arg 包作为元组保存吗?我不知道该怎么做。可以显示吗?【参考方案3】:

由于您已经在使用 boost,请参阅 http://think-async.com/Asio/Recipes 的线程池类,它使用 Boost.Asio 将工作函数排队,而无需锁定。

【讨论】:

【参考方案4】:

// 这是我目前所拥有的似乎可行的方法......

#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

#include <queue>
#include <atomic>

class ThreadWorker
    
    public:
        typedef boost::function<void()> Task_t;
        typedef std::unique_ptr<boost::thread>  UniqueThreadPtr_t;

        ThreadWorker()
            : m_timeToQuit(false)
            
            m_spThread = UniqueThreadPtr_t( new boost::thread(std::bind(&ThreadWorker::runner, this)));
            

        virtual ~ThreadWorker()
            
            quit();
            

        void quit()
            
            Task_t task = [this]()  m_timeToQuit = true; ;
            enqueue(task);
            m_spThread->join();
            

        template<typename F,typename ...Args>
        void enqueue(F&& f, Args&&... args)
            
            boost::mutex::scoped_lock lock(m_jobMutex);
            m_Tasks.push(std::bind(std::forward<F>(f),std::forward<Args>(args)...));
            m_newJob.notify_one();
            

    private:
        std::atomic<bool>       m_timeToQuit;
        UniqueThreadPtr_t       m_spThread;
        mutable boost::mutex    m_jobMutex;
        std::queue<Task_t>      m_Tasks;
        boost::condition        m_newJob;

    private:
        void runner()
            
            while ( ! m_timeToQuit )
                
                Task_t task;
                    
                    boost::mutex::scoped_lock lock(m_jobMutex);
                    while ( ! task )
                        
                        if ( m_timeToQuit )
                            
                            break;
                            
                        if ( m_Tasks.empty() )
                            
                            m_newJob.wait(lock);
                            
                        task = m_Tasks.front();
                        m_Tasks.pop();
                        
                    
                if (task)
                    
                    task();
                    
                
            
    ;

【讨论】:

如何添加等待结果的功能?我需要一个带有模板返回类型的新版本的 enqueue? 可能类似于:code template::type> auto enqueue( F&& f, Args&&... args) -> std::future... 然后如何定义任务队列 有人可以展示如何改变队列和入队来处理期货吗?我想不通。

以上是关于可以重用线程来运行可变参数函数吗?的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin:你可以为可变参数使用命名参数吗?

c语言如何封装一个带有可变参数的方法?

java中参数变量具体是啥,可以干啥,有啥作用,

如何在 C# 中运行可变数量的并发参数化无限循环类型的线程?

C语言奇淫技巧之函数的可变参数

可变参数模板类:是不是可以为每个可变参数模板参数实现一个唯一的成员函数?