关于 boost::asio::io_context::run 的困惑

Posted

技术标签:

【中文标题】关于 boost::asio::io_context::run 的困惑【英文标题】:Confusion about boost::asio::io_context::run 【发布时间】:2020-07-06 06:20:48 【问题描述】:

我目前正在做一个使用 MQTT 协议进行通信的项目。

在专用文件中有一个 Session 类,它基本上只是设置发布处理程序,即调用的回调,当此客户端接收到消息时(处理程序检查主题是否匹配“ZEUXX/var”,然后反序列化框架的二进制内容并随后取消订阅主题):

session.hpp:

class Session

public:
  Session()
  
    comobj = MQTT_NS::make_sync_client(ioc, "localhost", "1883", MQTT_NS::protocol_version::v5);
    using packet_id_t = typename std::remove_reference_t<decltype(*comobj)>::packet_id_t;

    // Setup client
    comobj->set_client_id(clientId);
    comobj->set_clean_session(true);

    /* If someone sends commands to this client */
    comobj->set_v5_publish_handler( // use v5 handler
        [&](MQTT_NS::optional<packet_id_t> /*packet_id*/,
            MQTT_NS::publish_options pubopts,
            MQTT_NS::buffer topic_name,
            MQTT_NS::buffer contents,
            MQTT_NS::v5::properties /*props*/) 
          std::cout << "[client] publish received. "
                    << " dup: " << pubopts.get_dup()
                    << " qos: " << pubopts.get_qos()
                    << " retain: " << pubopts.get_retain() << std::endl;
          std::string_view topic = std::string_view(topic_name.data(), topic_name.size());
          std::cout << "         -> topic: " << topic << std::endl;
          
          else if (topic.substr(0, 9) == "ZEUXX/var")
          
            std::cout << "[client] reading variable name: " << topic.substr(10, topic.size() - 9) << std::endl;
            auto result = 99; // dummy variable, normally an std::variant of float, int32_t uint8_t 
                              // obtained by deserialzing the binary content of the frame                             
            std::cout << comobj->unsubscribe(std::stringtopic);
          
          return true;
        );
  

  void readvar(const std::string &varname)
  
    comobj->publish(serialnumber + "/read", varname, MQTT_NS::qos::at_most_once);
    comobj->subscribe(serialnumber + "/var/" + varname, MQTT_NS::qos::at_most_once);
  

  void couple()
  
    comobj->connect();
    ioc.run();
  

  void decouple()
  
    comobj->disconnect();
    std::cout << "[client] disconnected..." << std::endl;
  

private:
  std::shared_ptr<
      MQTT_NS::callable_overlay<
          MQTT_NS::sync_client<MQTT_NS::tcp_endpoint<as::ip::tcp::socket, as::io_context::strand>>>>
      comobj;
  boost::asio::io_context ioc;
;

客户端基于boost::asio::io_context 对象,这恰好是我困惑的根源。在我的主文件中,我有以下代码。

main.cpp:

#include "session.hpp"

int main() 

    Session session; 
    session.couple();
    session.readvar("speedcpu");

本质上,这会创建 Session 类的一个实例,并且夫妻成员会调用 boost::asio::io_context::run 成员。这会运行 io_context 对象的事件处理循环并阻塞主线程,即永远不会到达主函数中的第三行。

我想发起一个连接 (session.couple),然后执行我的发布和订阅命令 (session.readvar)。我的问题是:我该如何正确地做到这一点?

从概念上讲,我的目标最好用以下 python 代码表达:

    client.connect("localhost", 1883)

    # client.loop_forever() that's what happens at the moment, the program 
    # doesn't continue from here

    # The process loop get's started, however it does not block the program and 
    # one can send publish command subsequently.
    client.loop_start()

    while True:
         client.publish("ZEUXX/read", "testread")
         time.sleep(20)

在单独的线程中运行 io_context 对象似乎不像我尝试的那样工作,关于如何解决这个问题的任何建议?我尝试的是以下内容:

session.hpp中的适配

// Adapt the couple function to run io_context in a separate thread
void couple()

   comobj->connect();
   std::thread t(boost::bind(&boost::asio::io_context::run, &ioc));  
   t.detach();

main.cpp 中的修改

int main(int argc, char** argv) 

    Session session; 
    session.couple();
    std::cout << "successfully started io context in separate thread" << std::endl;
    session.readvar("speedcpu");

现在到达 std::cout 行,即程序不会被 io_context.run() 卡在类的情侣成员中。但是,在此行之后,我立即收到错误消息:“网络连接已被本地系统中止”。

有趣的是,当我使用t.join() 而不是t.detach() 时没有错误,但是我使用t.join() 的行为与我直接调用io_context.run() 时的行为相同,即阻塞程序.

【问题讨论】:

你说它不起作用是什么意思?如果您在运行 io_context::run 的线程时遇到错误,这通常可以通过在 asio 包含之前定义 BOOST_ASIO_NO_DEPRECATED 来解决。 澄清一下,boost::asio::io_context 没有涉及错误,但是一旦调用 run 方法,程序就会进入一个永远循环,这意味着我不能对 readvar 进行任何调用在进程循环开始之后。 在单独的线程中运行它有什么问题? 【参考方案1】:

鉴于您对现有答案的评论:

io_context.run() 永远不会返回,因为它永远不会耗尽工作(它从 MQTT 服务器保持活动状态)。结果,一旦我进入run() 方法,线程就会被阻塞,并且我无法再发送任何发布和订阅帧。那时我认为在单独的线程中运行 io_context 而不阻塞主线程会很聪明。但是,当我detach这个单独的线程时,连接会出错,如果我使用join,它工作正常但主线程再次被阻塞。

我假设您知道如何在单独的线程中成功运行它。您面临的“问题”是,由于io_context 不会耗尽工作,调用thread::join 也会阻塞,因为它会等待线程停止执行。最简单的解决方案是在thread::join 之前调用io_context::stop。来自the official docs:

这个函数不会阻塞,而是简单地通知io_context 停止。对其run()run_one() 成员函数的所有调用都应尽快返回。对 run()run_one()poll()poll_one() 的后续调用将立即返回,直到调用 restart()

也就是说,调用io_context::stop 将导致io_context::run 调用返回(“尽快”),从而使相关线程可加入。

您还需要在某处保存对thread 的引用(可能作为Session 类的一个属性),并且仅在您完成其余部分之后调用thread::join 作品(例如称为Session::readvar)而不是来自Session::couple

【讨论】:

【参考方案2】:

io_context 工作结束时,它会从run() 返回。

如果您不发布任何作品,run() 将始终立即返回。任何后续的run() 也会立即返回,即使发布了新作品。

要在完成后重新使用io_context,请使用io_context.reset()。在你的情况下,最好

    使用工作守卫 (https://www.boost.org/doc/libs/1_73_0/doc/html/boost_asio/reference/executor_work_guard.html),查看许多库示例 如果您已经在后台线程上运行了 ioc,甚至不要在 couple() 中“运行”它

如果您需要同步行为,请不要在后台线程上运行它。

另外请记住,您需要提供优雅的关闭,这对于分离线程来说更加困难 - 毕竟,现在您无法join() 它知道它何时退出。

【讨论】:

似乎恰恰相反:io_context.run() 永远不会返回,因为它永远不会耗尽工作(它从 MQTT 服务器保持活动状态)。结果,一旦我进入run() 方法,线程就会被阻塞,并且我无法再发送任何发布和订阅帧。那时我认为在单独的线程中运行io_context 而不阻塞主线程会很聪明。但是,当我detach这个单独的线程时,连接会出错,如果我使用join,它工作正常但主线程再次被阻塞。 只在程序退出时加入? (并从couple() 中删除运行,但根据您的评论猜测,这可能是疏忽)

以上是关于关于 boost::asio::io_context::run 的困惑的主要内容,如果未能解决你的问题,请参考以下文章

scrapy 关于 rule, 关于多页

JS学习笔记关于选项卡,关于this,关于innerHTML

关于唯典冰淇淋新闻页的布局

关于prototype.js的一些技术说明

主页链接发送到关于/关于页面无法回家

关于写作