boost asio 异步等待条件变量

Posted

技术标签:

【中文标题】boost asio 异步等待条件变量【英文标题】:boost asio asynchronously waiting on a condition variable 【发布时间】:2011-10-10 04:36:00 【问题描述】:

是否可以对 boost::asio 中的条件变量执行异步等待(读取:非阻塞)?如果不直接支持任何有关实施的提示,将不胜感激。

我可以实现一个计时器并每隔几毫秒触发一次唤醒,但这种方法非常低劣,我很难相信条件变量同步没有实现/记录。

【问题讨论】:

你想做什么??? - 也许是您正在寻找的async_read_until 的最后一个版本?非阻塞等待通常是boost thread 的任务...... boost 线程与 boost asio 结合应该可以工作...... 我想到了一个替代实现,我已经在另一个问题中概述了它。 ***.com/questions/6776779/…这可能会让您更深入地了解我想要实现的目标。 【参考方案1】:

如果我正确理解了意图,你想在 asio 线程池的上下文中启动一个事件处理程序,当某个条件变量发出信号时?我认为在处理程序的开头等待条件变量就足够了,而io_service::post() 本身最终又回到了池中,类似这样的东西:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
boost::asio::io_service io;
boost::mutex mx;
boost::condition_variable cv;
void handler()

    boost::unique_lock<boost::mutex> lk(mx);
         cv.wait(lk);
    std::cout << "handler awakened\n";
    io.post(handler);

void buzzer()

    for(;;)
    
        boost::this_thread::sleep(boost::posix_time::seconds(1));
        boost::lock_guard<boost::mutex> lk(mx);
            cv.notify_all();
    

int main()

    io.post(handler);
    boost::thread bt(buzzer);
    io.run();

【讨论】:

但是等待的线程会被阻塞,难道没有办法不阻塞线程,而是注册一个完成处理程序吗?我目前正在这里考虑一种替代机制***.com/questions/6776779/… @Hassan Syed :条件变量是一个涉及阻塞线程的概念。也许您正在寻找异步信号? boost.asio 刚刚在 1.47.0 中添加了对信号处理程序的支持:boost.org/doc/libs/1_47_0/doc/html/boost_asio/history.html 据我所知,这些是操作系统发出的信号。显示您可以注册这些信号,但发出它们的是操作系统。 你的答案是正确的,我是在假设io_service::run() 是被调用者的阻塞调用下进行的,并且 asio 以某种方式处理同步。我很高兴这个假设不是真的。 答案中的io_service::post 链接已损坏。 io_service::post 被删除了吗?它没有出现在当前 asio 文档的参考部分中。【参考方案2】:

我可以建议基于 boost::asio::deadline_timer 的解决方案,这对我来说很好用。这是 boost::asio 环境中的一种异步事件。 一件非常重要的事情是,'handler' 必须通过与 'cancel' 相同的 'strand_' 序列化,因为在多个线程中使用 'boost::asio::deadline_timer' 不是线程安全的。

class async_event

public:
    async_event(
        boost::asio::io_service& io_service,
        boost::asio::strand<boost::asio::io_context::executor_type>& strand)
            : strand_(strand)
            , deadline_timer_(io_service, boost::posix_time::ptime(boost::posix_time::pos_infin))
    

    // 'handler' must be serialised through the same 'strand_' as 'cancel' or 'cancel_one'
    //  because using 'boost::asio::deadline_timer' from multiple threads is not thread safe
    template<class WaitHandler>
    void async_wait(WaitHandler&& handler) 
        deadline_timer_.async_wait(handler);
    
    void async_notify_one() 
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_one_serialized, this));
    
    void async_notify_all() 
        boost::asio::post(strand_, boost::bind(&async_event::async_notify_all_serialized, this));
    
private:
    void async_notify_one_serialized() 
        deadline_timer_.cancel_one();
    
    void async_notify_all_serialized() 
        deadline_timer_.cancel();
    
    boost::asio::strand<boost::asio::io_context::executor_type>& strand_;
    boost::asio::deadline_timer deadline_timer_;
;

【讨论】:

【参考方案3】:

很遗憾,Boost ASIO 没有async_wait_for_condvar() 方法。

在大多数情况下,您也不需要它。以 ASIO 方式编程通常意味着您使用链而不是互斥锁或条件变量来保护共享资源。除了在启动和退出时通常关注正确的构造或销毁顺序的极少数情况外,您根本不需要互斥锁或条件变量。

修改共享资源时,经典的部分同步线程方式如下:

锁定保护资源的互斥锁 更新需要更新的内容 如果需要等待线程进一步处理,则向条件变量发出信号 解锁互斥锁

完全异步的 ASIO 方式是:

生成一条消息,其中包含更新资源所需的所有内容 将带有该消息的更新处理程序调用发布到资源链 如果需要进一步处理,让更新处理程序创建更多消息并将它们发布到适当的资源链。 如果作业可以在完全私有的数据上执行,则将它们直接发布到 io-context。

这是一个类some_shared_resource 的示例,它接收一个字符串state 并根据接收到的状态触发一些进一步的处理。请注意,私有方法 some_shared_resource::receive_state() 中的所有处理都是完全线程安全的,因为 strand 会序列化所有调用。

当然,这个例子是不完整的; some_other_resource 需要与some_shared_ressource::send_state() 类似的send_code_red() 方法。

#include <boost/asio>
#include <memory>

using asio_context = boost::asio::io_context;
using asio_executor_type = asio_context::executor_type;
using asio_strand = boost::asio::strand<asio_executor_type>;

class some_other_resource;
class some_shared_resource : public std::enable_shared_from_this<some_shared_resource> 
    asio_strand strand;
    std::shared_ptr<some_other_resource> other;
    std::string state;

    void receive_state(std::string&& new_state) 
        std::string oldstate = std::exchange(state, new_state);
        if(state == "red" && oldstate != "red") 
            // state transition to "red":
            other.send_code_red(true);
         else if(state != "red" && oldstate == "red") 
            // state transition from "red":
            other.send_code_red(false);
        
    

public:
    some_shared_resource(asio_context& ctx, const std::shared_ptr<some_other_resource>& other)
      : strand(ctx.get_executor()), other(other) 

    void send_state(std::string&& new_state) 
        boost::asio::post(strand, [me = weak_from_this(), new_state = std::move(new_state)]() mutable 
            if(auto self = me.lock(); self) 
                self->receive_state(std::move(new_state));
            
        );
    
;

如您所见,一开始总是向 ASIO 发布帖子可能有点乏味。但是您可以将大部分“为类配备链”代码移动到模板中。

消息传递的好处:由于您不使用互斥锁,因此即使在极端情况下,您也不会再陷入僵局。此外,使用消息传递,创建高级并行性通常比使用经典多线程更容易。不利的一面是,围绕所有这些消息对象移动和复制非常耗时,这会降低您的应用程序的速度。

最后一点:在send_state()形成的消息中使用弱指针有助于可靠地销毁some_shared_resource对象:否则,如果A调用B,B调用C,C调用A(可能仅在超时或类似),在消息中使用共享指针而不是弱指针会创建循环引用,从而防止对象破坏。如果您确定永远不会有循环,并且处理来自待删除对象的消息不会造成问题,那么您当然可以使用shared_from_this() 而不是weak_from_this()。如果您确定在 ASIO 停止之前不会删除对象(并且所有工作线程都已加入主线程),那么您也可以直接捕获 this 指针。

【讨论】:

以上是关于boost asio 异步等待条件变量的主要内容,如果未能解决你的问题,请参考以下文章

Boost条件变量condition_variable_any

boost::asio::async_read_until 与自定义匹配条件运算符重载混淆

等待条件的线程的有序通知(C++,boost)

在已锁定的互斥体上使用 boost::interprocess 条件变量

Boost::asio::async_read 不会在条件下停止

boost--asio