在 io_service.stop() 之后等待 boost asio 的未来将永远持续下去

Posted

技术标签:

【中文标题】在 io_service.stop() 之后等待 boost asio 的未来将永远持续下去【英文标题】:Waiting boost asio's future lasts forever after io_service.stop() 【发布时间】:2016-02-26 12:35:06 【问题描述】:

我尝试等待从任何boost::asio::async_ 函数返回的std::future 对象(使用use_future)。例如:

auto endpoint_future = resolver.async_resolve(query, boost::asio::use_future);
endpoint_future.wait(); // lasts forever after io_service.stop();

这是在一个线程中完成的。 我还有另一个线程在此async_resolve 调用之前启动:

runner_ = std::thread([this]() 
    boost::asio::io_service::work work(io_service_);
    io_service_.run();
);

一切正常,但后来我还添加了一个boost::asio::deadline_timer 以停止与io_service 的任何工作:

void deadlineTimeout() 
    deadline_.cancel();
    io_service_.stop();
    // closing socket here does not work too

但是,在deadlineTimeout() 中,当截止日期到达它的超时并且它执行io_service_.stop() 未来没有被释放所以endpointer_future.wait() 仍然阻塞。在这种情况下,我该如何停止等待未来?

【问题讨论】:

【参考方案1】:

我自己找到了一个解决方案:我们不需要stop()io_service而是reset()它,在此之前我们需要关闭套接字,因此正确的超时回调将是:

void deadlineTimeout() 
    deadline_.cancel();
    socket_.close(); // socket_ is a socket on io_service_
    io_service_.reset();

在此更改之后,所有期货都将被释放。

【讨论】:

io_service::stop() 只是设置了一个标志,要求服务“尽快”停止。只要您在套接字上有未完成的读取,服务就无法停止,因此socket_.close() 是您答案的重要部分。 你错了。关闭套接字并调用 io_service.stop() 后不起作用。这里重要的是调用 reset() 而不是 stop()。 在仍在运行的 io_service 上调用 reset() 是错误的。你应该彻底关闭它。如果您计划在服务停止后重用该服务,则应使用 reset 。 boost.org/doc/libs/1_60_0/doc/html/boost_asio/reference/… 您还需要在服务停止之前删除work。 (重置通过蛮力做到这一点(讨厌)) reset() 文档严格禁止在io_service 上调用reset(),而对poll()poll_one()run()run_one() 的调用未完成。【参考方案2】:

io_sevice.stop() 的调用将导致所有run()run_one() 的调用尽快返回。从处理程序中调用时,调用者将从run() 返回,而不调用任何其他处理程序。在您的情况下,async_resolve 的完成处理程序将设置与endpoint_future 关联的promise;但是,通过停止io_service,将不会调用完成处理程序。考虑:

cancel()future 关联的 I/O 对象,然后继续运行 io_service 直到完成,以便设置 promise 的值 销毁所有 I/O 对象,然后销毁 io_service,以便删除处理程序并通过 future 检测到损坏的承诺

循环对future 执行定时等待,如果future 准备好或io_service 已停止则退出循环。例如,以下函数返回一个带有未来值的boost::optional,如果不设置未来,则返回boost::none

template <typename T>
boost::optional<T> get(
  boost::asio::io_service& io_service,
  std::future<T>& future)

  for (;;)
  
    // If the future is ready, get the value.
    if (future.wait_for(std::chrono::seconds(1)) == std::future_status::ready)
    
      return future.get();
    
    // Otherwise, if the future is never going to be set, return none.
    if (io_service.stopped())
    
      return boost::none;
    
  


...
if (auto endpoint_iterator = get(io_service, endpoint_future))

  // use *endpoint_iterator...


下面是一个示例 demonstrating 如何在停止 io_service 的同时安全地等待未来:

#include <chrono>
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/optional.hpp>

template <typename T>
boost::optional<T> get(
  boost::asio::io_service& io_service,
  std::future<T>& future)

  for (;;)
  
    // If the future is ready, get the value.
    if (future.wait_for(std::chrono::seconds(1)) == std::future_status::ready)
    
      return future.get();
    
    // Otherwise, if the future is never going to be set, return none.
    if (io_service.stopped())
    
      std::cout << "io_service stopped, future will not be set" << std::endl;
      return boost::none;
    
    std::cout << "timeout waiting for future" << std::endl;
  


int main()

  boost::asio::io_service io_service;

  // Create I/O objects.
  boost::asio::ip::udp::socket socket(io_service,
    boost::asio::ip::udp::v4());
  boost::asio::deadline_timer timer(io_service);

  // Process the io_service in the runner thread.
  auto runner = std::thread([&]() 
    boost::asio::io_service::work work(io_service);
    io_service.run();
  );

  // Start an operation that will not complete.
  auto bytes_transferred_future = socket.async_receive(
    boost::asio::null_buffers(), boost::asio::use_future);

  // Arm the timer.
  timer.expires_from_now(boost::posix_time::seconds(2));
  timer.async_wait([&](const boost::system::error_code&) 
    timer.cancel();
    socket.close();
    io_service.stop();
  );

  // bytes_transferred's promise will never be set as the io_service
  // is not running.
  auto bytes_transferred = get(io_service, bytes_transferred_future);
  assert(!bytes_transferred);

  runner.join();

【讨论】:

等待wait_for 并检查某些东西是我想避免的无用开销。我曾考虑过这个解决方案,但我试图找到更有效的方法,它可以释放我等待的所有未来,而无需手动工作。这是否可以正确执行(如果错误则不使用reset)? @VictorPolevoy 据我所知,只有 3 个选项具有明确的行为:运行处理程序;销毁处理程序;或定期等待处理程序。在可能永远不会完成的异步操作上无限期地同步阻塞似乎很尴尬。如果使用wait_for()wait_until(),并且提供的持续时间或时间略大于deadline_timer的过期时间,则大大减少了未来多次等待的机会。【参考方案3】:

使工作对截止时间超时可见,并允许截止时间超时将其删除。

boost::scoped_ptr<boost::asio::io_service::work work;

你仍然可以在这里创建工作,但堆分配它。

runner_ = std::thread([this]() 
    work = new boost::asio::io_service::work(io_service_);
    io_service_.run();
);

然后像这样关闭:

void deadlineTimeout() 
    deadline_.cancel();
    socket_.close(); // socket_ is a socket on io_service_
    work.reset();

    // The io_service::run() will exit when there are no active requests 
    // (i.e. no sockeet, no deadline, and no work)
    // so you do not need to call: io_service_.stop();

【讨论】:

stop() 的调用可能会产生不良影响。已取消的async_resolve 操作的内部完成处理程序可能永远不会被调用,从而导致std::promise 永远不会设置其值。

以上是关于在 io_service.stop() 之后等待 boost asio 的未来将永远持续下去的主要内容,如果未能解决你的问题,请参考以下文章

iOS 在调用完成块之前会在 viewDidDisappear 之后等待 CATransaction

如果在“等待”之后抛出,则从任务中抛出的异常被吞下

在 forEach 之后使用 Promise.all() 渲染 NodeJS 页面之前等待 Firebase 数据加载

JS中实现等待一段时间之后开始定时循环执行某个方法?

如何实现java主线程等待子线程执行完毕之后再执行

即使在等待之后,Python 中的 Selenium 也无法识别 DOM 中的变化