指定线程的 std::async 模拟

Posted

技术标签:

【中文标题】指定线程的 std::async 模拟【英文标题】:std::async analogue for specified thread 【发布时间】:2018-03-27 08:12:02 【问题描述】:

我需要处理多个对象,每个操作可能需要很长时间。

无法将处理放置在我启动它的 GUI(主)线程中。

我需要在我的主框架 (Qt 5) 中使用std::futureQtConcurrent::run()std::async 进行所有通信,使用@ 987654326@等,但不提供线程选择。我需要始终只在一个附加线程中处理选定的对象(对象 == 设备),

因为:

    我需要制定一个通用的解决方案,并且不想让每个类都是线程安全的 例如,即使如果为 QSerialPort 创建一个线程安全的容器,Serial port in Qt 也不能在多个线程中访问:

注意:串口总是以独占方式打开(即没有其他进程或线程可以访问已经打开的串口)。

    通常与设备的通信包括发送命令和接收应答。我想在发送请求的地方准确处理每个答案,并且不想使用仅事件驱动的逻辑。

所以,我的问题。

该功能如何实现?

MyFuture<T> fut = myAsyncStart(func, &specificLiveThread);

一个活线程必须可以多次传递。

【问题讨论】:

标准库从 c++11 开始就有未来,找一些标准库评论书看看如何使用。 在我们处理这个问题之前,还有其他问题需要回答。例如,您是否有任何方法可以让特定线程完成工作?未来的课程?你有什么,你需要实现什么? @AndrewKashpur 这就是我的问题所在。 std::async 做不到 好吧,如果我需要实现future/asynStart,future 将是具有值和状态的结构,并且asyncStart 将创建并返回future 对象,该对象将在完成传递给asyncStart 的仿函数时进行修改.它将任务添加到线程,任务将包括仿函数和未来对象的修改。当然,这一切也应该同步。这就是我有限的学科知识所能带来的。我很确定有更好的方法可以做到这一点,但我不知道。 也许active object pattern 可以提供帮助?另请参阅实现示例here 【参考方案1】:

让我在不参考 Qt 库的情况下回答,因为我不知道它的线程 API。

在 C++11 标准库中,没有直接的方法可以重用创建的线程。线程执行单个功能,只能加入或分离。但是,您可以使用生产者-消费者模式来实现它。消费者线程需要执行由生产者线程放入队列的任务(例如表示为std::function 对象)。所以如果我是正确的,你需要一个单线程线程池。

我可以推荐我的 C++14 线程池实现作为任务队列。它不常用(还没有!),但它包含单元测试并多次使用线程清理程序检查。文档很少,但请随时在 github 问题中提出任何问题!

库:https://github.com/Ravirael/concurrentpp

还有你的用例:

#include <task_queues.hpp>

int main() 
    // The single threaded task queue object - creates one additional thread.
    concurrent::n_threaded_fifo_task_queue queue(1);

    // Add tasks to queue, task is executed in created thread.
    std::future<int> future_result = queue.push_with_result([]  return 4; );

    // Blocks until task is completed.
    int result = future_result.get();

    // Executes task on the same thread as before.
    std::future<int> second_future_result = queue.push_with_result([]  return 4; );

【讨论】:

【参考方案2】:

如果您想遵循 Active Object 方法,这里有一个使用模板的示例:

WorkPackage 及其接口仅用于将不同返回类型的函数存储在向量中(请参阅后面的ActiveObject::async 成员函数):

class IWorkPackage 
    public:
        virtual void execute() = 0;

        virtual ~IWorkPackage() 

        
;

template <typename R>
class WorkPackage : public IWorkPackage
    private:
        std::packaged_task<R()> task;
    public:
        WorkPackage(std::packaged_task<R()> t) : task(std::move(t)) 

        

        void execute() final 
            task();
        

        std::future<R> get_future() 
            return task.get_future();
        
;

这是 ActiveObject 类,它希望您的设备作为模板。此外,它有一个向量来存储设备的方法请求和一个线程来依次执行这些方法。最后使用 async 函数向设备请求方法调用:

template <typename Device>
class ActiveObject 
    private:
        Device servant;
        std::thread worker;
        std::vector<std::unique_ptr<IWorkPackage>> work_queue;
        std::atomic<bool> done;
        std::mutex queue_mutex;
        std::condition_variable cv;
        void worker_thread() 
            while(done.load() == false) 
                std::unique_ptr<IWorkPackage> wp;
                
                    std::unique_lock<std::mutex> lck queue_mutex;

                    cv.wait(lck, [this] return !work_queue.empty() || done.load() == true;);
                    if(done.load() == true) continue;

                    wp = std::move(work_queue.back());
                    work_queue.pop_back();
                

                if(wp) wp->execute();
            
        
    public:

        ActiveObject(): done(false) 
            worker = std::thread &ActiveObject::worker_thread, this;
        

        ~ActiveObject() 
            
                std::unique_lock<std::mutex> lckqueue_mutex;
                done.store(true);
            
            cv.notify_one();
            worker.join();
        

        template<typename R, typename ...Args, typename ...Params>
        std::future<R> async(R (Device::*function)(Params...), Args... args) 
            std::unique_ptr<WorkPackage<R>> wp new WorkPackage<R> std::packaged_task<R()>  std::bind(function, &servant, args...) ;
            std::future<R> fut = wp->get_future();
            
                std::unique_lock<std::mutex> lckqueue_mutex;
                work_queue.push_back(std::move(wp));
            
            cv.notify_one();

            return fut;
        

        // In case you want to call some functions directly on the device
        Device* operator->() 
            return &servant;
        

;

你可以按如下方式使用它:

ActiveObject<QSerialPort> ao_serial_port;
// direct call:
ao_serial_port->setReadBufferSize(size);
//async call:
std::future<void> buf_future = ao_serial_port.async(&QSerialPort::setReadBufferSize, size);

std::future<Parity> parity_future = ao_serial_port.async(&QSerialPort::parity);

// Maybe do some other work here

buf_future.get(); // wait until calculations are ready
Parity p = parity_future.get(); // blocks if result not ready yet, i.e. if method has not finished execution yet

编辑回答 cmets 中的问题:AO 主要是多个读取器/写入器的并发模式。与往常一样,它的使用取决于具体情况。因此这种模式通常用于分布式系统/网络应用程序中,例如当多个客户端从服务器请求服务时。客户端从 AO 模式中受益,因为它们在等待服务器应答时不会被阻塞。 这种模式在网络应用程序以外的领域不经常使用的一个原因可能是线程开销。为每个活动对象创建线程时,如果 CPU 数量较少且同时使用许多活动对象,则会导致大量线程,从而导致线程争用。 我只能猜测为什么人们认为这是一个奇怪的问题:正如您已经发现的那样,它确实需要一些额外的编程。也许这就是原因,但我不确定。 但我认为该模式对于其他原因和用途也非常有用。至于您的示例,主线程(以及其他后台线程)需要来自单例的服务,例如某些设备或硬件接口,它们的可用数量很少,计算速度慢并且需要并发访问,而不是阻塞等待结果。

【讨论】:

您能否补充一点说明为什么这种模式和解决方案没有被广泛使用,很多人认为这是一个奇怪的问题? 您的工作线程在循环中做着毫无意义的工作。它正在检查队列状态,更糟糕的是,它毫无意义地锁定和释放互斥锁。在worker_thread 中主动等待的更好替代方法是使用std::condition_variable 通知工作线程有关队列中的新项目,而不是无休止地检查其状态。 @Rames 你说得对,我加了条件变量 这太好了,非常感谢!您能否添加一个示例,在异步调用中使用具有非 void 返回类型的函数时如何获取结果? @FourtyTwo 我编辑了我的答案并添加了QSerialPort::parity-函数的示例,该函数返回Parity 检查模式。通过async-函数返回的future获取结果(也适用于void类型,如果要检查方法请求是否已经执行完毕)【参考方案3】:

这是 Qt。它的信号槽机制是线程感知的。在您的辅助(非 GUI)线程上,创建一个带有 execute 插槽的 QObject 派生类。连接到此插槽的信号会将事件编组到该线程。

请注意,此QObject 不能是 GUI 对象的子对象,因为子对象需要存在于其父线程中,而此对象明确不存在于 GUI 线程中。

您可以使用现有的std::promise 逻辑来处理结果,就像std::future 一样。

【讨论】:

这是我目前所做的,但是方法很不舒服,需要大量的样板代码

以上是关于指定线程的 std::async 模拟的主要内容,如果未能解决你的问题,请参考以下文章

C++11 多线程std:: async与std::thread的区别

为啥 std::async 使用同一个线程运行函数

使用多线程加速(std::async、std::thread 还是?)

使用 std::async 创建的线程进行 MPI 发送的线程安全

C++ std::async 不会产生新线程

当未存储返回值时,std::async 不会产生新线程