boost::asio::io_service 运行方法阻塞/解除阻塞时感到困惑

Posted

技术标签:

【中文标题】boost::asio::io_service 运行方法阻塞/解除阻塞时感到困惑【英文标题】:Confused when boost::asio::io_service run method blocks/unblocks 【发布时间】:2013-03-12 04:15:18 【问题描述】:

作为 Boost.Asio 的初学者,我对 io_service::run() 感到困惑。如果有人可以在此方法阻止/解除阻止时向我解释,我将不胜感激。文件指出:

run() 函数一直阻塞,直到所有工作完成并且没有更多的处理程序要分派,或者直到 io_service 停止。

多个线程可以调用run() 函数来建立一个线程池,io_service 可以从中执行处理程序。在池中等待的所有线程都是等效的,io_service 可以选择其中任何一个来调用处理程序。

run() 函数的正常退出意味着 io_service 对象已停止(stopped() 函数返回 true)。除非之前调用过reset(),否则对run()run_one()poll()poll_one() 的后续调用将立即返回。

以下语句是什么意思?

[...] 没有更多的处理程序要分派 [...]


在尝试了解io_service::run() 的行为时,我遇到了这个example(示例3a)。在其中,我观察到io_service->run() 阻塞并等待工作指令。

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)

  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));


io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();

但是,在我正在处理的以下代码中,客户端使用 TCP/IP 进行连接,并且 run 方法会阻塞,直到异步接收到数据。

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);

如果对run() 的任何解释在以下两个示例中描述其行为,我们将不胜感激。

【问题讨论】:

【参考方案1】:

基础

让我们从一个简化的示例开始,并检查相关的 Boost.Asio 部分:

void handle_async_receive(...)  ... 
void print()  ... 

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5

什么是处理程序

handler 只不过是一个回调。在示例代码中,有 3 个处理程序:

print 处理程序 (1)。 handle_async_receive 处理程序 (3)。 print 处理程序 (4)。

即使相同的print() 函数被使用了两次,每次使用都被认为是创建自己的唯一可识别处理程序。处理程序可以有多种形状和大小,从上述基本函数到更复杂的构造,例如从boost::bind() 和 lambdas 生成的函子。不管复杂程度如何,处理程序仍然只是一个回调。

什么是工作

工作是 Boost.Asio 被要求代表应用程序代码执行的一些处理。有时 Boost.Asio 可能会在被告知后立即开始一些工作,而有时它可能会等待稍后的时间点完成工作。完成工作后,Boost.Asio 将通过调用提供的 handler 来通知应用程序。

Boost.Asio 保证 handlers 只会在当前调用 run()run_one()poll()poll_one() 的线程中运行。这些线程将工作并调用handlers。因此,在上面的示例中,print() 在发布到io_service (1) 时不会被调用。相反,它被添加到io_service 中,并将在稍后的时间点调用。在这种情况下,它在io_service.run() (5) 内。

什么是异步操作?

asynchronous operation 创建工作,Boost.Asio 将调用 处理程序 以在工作完成时通知应用程序。异步操作是通过调用一个名称带有前缀async_ 的函数来创建的。这些函数也称为启动函数

异步操作可以分解为三个独特的步骤:

需要启动或通知相关联的io_serviceasync_receive 操作 (3) 通知 io_service 它需要从套接字异步读取数据,然后 async_receive 立即返回。 做实际工作。在这种情况下,当socket 接收到数据时,会读取字节并将其复制到buffer。实际工作将在以下任一方面完成: 启动函数 (3),如果 Boost.Asio 可以确定它不会阻塞。 当应用程序显式运行io_service (5)。 调用handle_async_receiveReadHandler。再一次,处理程序 仅在运行io_service 的线程中调用。因此,无论何时完成工作(3 或 5),都可以保证 handle_async_receive() 只会在 io_service.run() (5) 内被调用。

这三个步骤在时间和空间上的分离称为控制流倒置。这是使异步编程变得困难的复杂性之一。但是,有一些技术可以帮助缓解这种情况,例如使用 coroutines。

io_service.run() 做什么?

当一个线程调用io_service.run(),work 和handlers 将从这个线程中被调用。在上面的例子中,io_service.run() (5) 将阻塞直到:

它已从两个print 处理程序调用并返回,接收操作以成功或失败完成,其handle_async_receive 处理程序已被调用并返回。 io_service 已通过 io_service::stop() 显式停止。 从处理程序中引发异常。

一种潜在的伪流可以描述如下:

创建 io_service 创建套接字 将打印处理程序添加到 io_service (1) 等待套接字连接 (2) 向io_service添加异步读工作请求(3) 将打印处理程序添加到 io_service (4) 运行 io_service (5) 有工作或处理程序吗? 是的,有 1 个工作和 2 个处理程序 socket有数据吗?不,什么都不做 运行打印处理程序 (1) 有工作或处理程序吗? 是的,有 1 个工作和 1 个处理程序 socket有数据吗?不,什么都不做 运行打印处理程序 (4) 有工作或处理程序吗? 是的,有 1 件作品 socket有数据吗?不,继续等待 -- 套接字接收数据 -- 套接字有数据,将其读入缓冲区 将 handle_async_receive 处理程序添加到 io_service 有工作或处理程序吗? 是的,有 1 个处理程序 运行 handle_async_receive 处理程序 (3) 有工作或处理程序吗? 不,将 io_service 设置为已停止并返回

注意当读取完成时,它向io_service 添加了另一个处理程序。这个微妙的细节是异步编程的一个重要特征。它允许将处理程序链接在一起。例如,如果handle_async_receive 没有得到它预期的所有数据,那么它的实现可能会发布另一个异步读取操作,导致io_service 有更多的工作,因此不会从io_service.run() 返回。

请注意,当io_service 已用完时,应用程序必须在再次运行之前reset() io_service


示例问题和示例 3a 代码

现在,让我们检查问题中引用的两段代码。

问题代码

socket-&gt;async_receive 将工作添加到io_service。因此,io_service-&gt;run() 将阻塞,直到读取操作以成功或错误完成,并且ClientReceiveEvent 已完成运行或引发异常。

Example 3a代码

为了更容易理解,这里有一个较小的注释示例 3a:

void CalculateFib(std::size_t n);

int main()

  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
                                                             //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
                                                             // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5

在高层次上,程序将创建 2 个线程来处理 io_service 的事件循环 (2)。这会产生一个简单的线程池来计算斐波那契数 (3)。

问题代码与此代码之间的一个主要区别是此代码调用io_service::run() (2) 实际工作和处理程序添加到io_service (3)。为了防止 io_service::run() 立即返回,创建了一个 io_service::work 对象 (1)。这个对象可以防止io_service 用完工作;因此,io_service::run() 不会因为没有工作而返回。

整体流程如下:

    创建io_service::work 对象并将其添加到io_service。 创建的线程池调用io_service::run()。由于io_service::work 对象,这些工作线程不会从io_service 返回。 将 3 个计算斐波那契数的处理程序添加到 io_service,并立即返回。工作线程,而不是主线程,可能会立即开始运行这些处理程序。 删除io_service::work 对象。 等待工作线程完成运行。这只会在所有 3 个处理程序都完成执行后才会发生,因为 io_service 既没有处理程序也没有工作。

代码可以以与原始代码相同的方式编写不同的代码,其中将处理程序添加到io_service,然后处理io_service 事件循环。这消除了使用io_service::work 的需要,并产生以下代码:

int main()

  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
                                                             //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
                                                             // -'
  worker_threads.join_all();                                  // 5


同步与异步

虽然问题中的代码使用的是异步操作,但它实际上是同步运行的,因为它正在等待异步操作完成:

socket.async_receive(buffer, handler)
io_service.run();

相当于:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);

作为一般经验法则,请尽量避免混合同步和异步操作。很多时候,它可以把一个复杂的系统变成一个复杂的系统。这个answer 突出了异步编程的优点,其中一些也包含在Boost.Asio documentation 中。

【讨论】:

很棒的帖子。我只想添加一件事,因为我觉得它没有得到足够的关注:在 run() 返回后,您需要在 io_service 上调用 reset() ,然后才能再次 run() 它。否则无论是否有 async_ 操作等待,它都可能立即返回。 缓冲区从何而来?这是什么? 我还是一头雾水。如果混音是同步的,不推荐异步的,那么纯异步模式是什么?你能举个例子显示没有 io_service.run(); 的代码吗? @Splash One 可以使用io_service.poll() 来处理事件循环,而不会阻塞未完成的操作。避免混合同步和异步操作的主要建议是避免增加不必要的复杂性,并防止在处理程序需要很长时间才能完成时响应不佳。在某些情况下它是安全的,例如当您知道同步操作不会阻塞时。 中的“当前”是什么意思?Boost.Asio 保证处理程序只会在当前调用run()的线程中运行... ."?如果有 N 个线程(调用了run()),那么哪一个是“当前”线程?可以有很多吗?或者你的意思是已经完成执行async_*()(比如async_read)的线程,是否也保证调用它的处理程序?【参考方案2】:

为了简化run 的工作方式,将其视为必须处理一堆文件的员工;它需要一张纸,按照纸上说的做,把纸扔掉,然后拿下一张;当他的床单用完时,它就离开了办公室。每张纸上都可以有任何类型的说明,甚至可以在纸堆中添加一张新纸。 回到 asio:您可以通过两种方式为 io_service 工作,本质上是:通过在您链接的示例中使用 post,或者通过在 io_service 上使用内部调用 post 的其他对象,比如socket 和它的async_* 方法。

【讨论】:

以上是关于boost::asio::io_service 运行方法阻塞/解除阻塞时感到困惑的主要内容,如果未能解决你的问题,请参考以下文章

boost::asio::io_service 运行方法阻塞/解除阻塞时感到困惑

C++ boost::asio::io_service创建线程池thread_group简单实例

C++ boost::asio::io_service创建线程池thread_group简单实例

从作为分离线程运行的 boost::asio::io_service::work 捕获异常

在 boost::asio::io_service 上调用 run 时崩溃

boost::asio io_service 和 std::containers 的线程安全