boost::asio::io_service 运行方法阻塞/解除阻塞时感到困惑
Posted
技术标签:
【中文标题】boost::asio::io_service 运行方法阻塞/解除阻塞时感到困惑【英文标题】:Confused when boost::asio::io_service run method blocks/unblocks 【发布时间】:2013-03-12 04:15:18 【问题描述】:作为 Boost.Asio 的初学者,我对 io_service::run()
感到困惑。如果有人可以在此方法阻止/解除阻止时向我解释,我将不胜感激。文件指出:
run()
函数一直阻塞,直到所有工作完成并且没有更多的处理程序要分派,或者直到io_service
停止。多个线程可以调用
run()
函数来建立一个线程池,io_service
可以从中执行处理程序。在池中等待的所有线程都是等效的,io_service
可以选择其中任何一个来调用处理程序。
run()
函数的正常退出意味着io_service
对象已停止(stopped()
函数返回 true)。除非之前调用过reset()
,否则对run()
、run_one()
、poll()
或poll_one()
的后续调用将立即返回。
以下语句是什么意思?
[...] 没有更多的处理程序要分派 [...]
在尝试了解io_service::run()
的行为时,我遇到了这个example(示例3a)。在其中,我观察到io_service->run()
阻塞并等待工作指令。
// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);
boost::shared_ptr<boost::asio::io_service> io_service(
new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
new boost::asio::io_service::work(*io_service));
// ...
boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));
work.reset();
worker_threads.join_all();
但是,在我正在处理的以下代码中,客户端使用 TCP/IP 进行连接,并且 run 方法会阻塞,直到异步接收到数据。
typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));
// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1",
boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());
// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
ClientReceiveEvent);
io_service->run();
// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);
如果对run()
的任何解释在以下两个示例中描述其行为,我们将不胜感激。
【问题讨论】:
【参考方案1】:基础
让我们从一个简化的示例开始,并检查相关的 Boost.Asio 部分:
void handle_async_receive(...) ...
void print() ...
...
boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);
...
io_service.post(&print); // 1
socket.connect(endpoint); // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print); // 4
io_service.run(); // 5
什么是处理程序?
handler 只不过是一个回调。在示例代码中,有 3 个处理程序:
print
处理程序 (1)。
handle_async_receive
处理程序 (3)。
print
处理程序 (4)。
即使相同的print()
函数被使用了两次,每次使用都被认为是创建自己的唯一可识别处理程序。处理程序可以有多种形状和大小,从上述基本函数到更复杂的构造,例如从boost::bind()
和 lambdas 生成的函子。不管复杂程度如何,处理程序仍然只是一个回调。
什么是工作?
工作是 Boost.Asio 被要求代表应用程序代码执行的一些处理。有时 Boost.Asio 可能会在被告知后立即开始一些工作,而有时它可能会等待稍后的时间点完成工作。完成工作后,Boost.Asio 将通过调用提供的 handler 来通知应用程序。
Boost.Asio 保证 handlers 只会在当前调用 run()
、run_one()
、poll()
或 poll_one()
的线程中运行。这些线程将工作并调用handlers。因此,在上面的示例中,print()
在发布到io_service
(1) 时不会被调用。相反,它被添加到io_service
中,并将在稍后的时间点调用。在这种情况下,它在io_service.run()
(5) 内。
什么是异步操作?
asynchronous operation 创建工作,Boost.Asio 将调用 处理程序 以在工作完成时通知应用程序。异步操作是通过调用一个名称带有前缀async_
的函数来创建的。这些函数也称为启动函数。
异步操作可以分解为三个独特的步骤:
需要启动或通知相关联的io_service
。 async_receive
操作 (3) 通知 io_service
它需要从套接字异步读取数据,然后 async_receive
立即返回。
做实际工作。在这种情况下,当socket
接收到数据时,会读取字节并将其复制到buffer
。实际工作将在以下任一方面完成:
启动函数 (3),如果 Boost.Asio 可以确定它不会阻塞。
当应用程序显式运行io_service
(5)。
调用handle_async_receive
ReadHandler。再一次,处理程序 仅在运行io_service
的线程中调用。因此,无论何时完成工作(3 或 5),都可以保证 handle_async_receive()
只会在 io_service.run()
(5) 内被调用。
这三个步骤在时间和空间上的分离称为控制流倒置。这是使异步编程变得困难的复杂性之一。但是,有一些技术可以帮助缓解这种情况,例如使用 coroutines。
io_service.run()
做什么?
当一个线程调用io_service.run()
,work 和handlers 将从这个线程中被调用。在上面的例子中,io_service.run()
(5) 将阻塞直到:
print
处理程序调用并返回,接收操作以成功或失败完成,其handle_async_receive
处理程序已被调用并返回。
io_service
已通过 io_service::stop()
显式停止。
从处理程序中引发异常。
一种潜在的伪流可以描述如下:
创建 io_service 创建套接字 将打印处理程序添加到 io_service (1) 等待套接字连接 (2) 向io_service添加异步读工作请求(3) 将打印处理程序添加到 io_service (4) 运行 io_service (5) 有工作或处理程序吗? 是的,有 1 个工作和 2 个处理程序 socket有数据吗?不,什么都不做 运行打印处理程序 (1) 有工作或处理程序吗? 是的,有 1 个工作和 1 个处理程序 socket有数据吗?不,什么都不做 运行打印处理程序 (4) 有工作或处理程序吗? 是的,有 1 件作品 socket有数据吗?不,继续等待 -- 套接字接收数据 -- 套接字有数据,将其读入缓冲区 将 handle_async_receive 处理程序添加到 io_service 有工作或处理程序吗? 是的,有 1 个处理程序 运行 handle_async_receive 处理程序 (3) 有工作或处理程序吗? 不,将 io_service 设置为已停止并返回注意当读取完成时,它向io_service
添加了另一个处理程序。这个微妙的细节是异步编程的一个重要特征。它允许将处理程序链接在一起。例如,如果handle_async_receive
没有得到它预期的所有数据,那么它的实现可能会发布另一个异步读取操作,导致io_service
有更多的工作,因此不会从io_service.run()
返回。
请注意,当io_service
已用完时,应用程序必须在再次运行之前reset()
io_service
。
示例问题和示例 3a 代码
现在,让我们检查问题中引用的两段代码。
问题代码
socket->async_receive
将工作添加到io_service
。因此,io_service->run()
将阻塞,直到读取操作以成功或错误完成,并且ClientReceiveEvent
已完成运行或引发异常。
Example 3a代码
为了更容易理解,这里有一个较小的注释示例 3a:
void CalculateFib(std::size_t n);
int main()
boost::asio::io_service io_service;
boost::optional<boost::asio::io_service::work> work = // '. 1
boost::in_place(boost::ref(io_service)); // .'
boost::thread_group worker_threads; // -.
for(int x = 0; x < 2; ++x) // :
// '.
worker_threads.create_thread( // :- 2
boost::bind(&boost::asio::io_service::run, &io_service) // .'
); // :
// -'
io_service.post(boost::bind(CalculateFib, 3)); // '.
io_service.post(boost::bind(CalculateFib, 4)); // :- 3
io_service.post(boost::bind(CalculateFib, 5)); // .'
work = boost::none; // 4
worker_threads.join_all(); // 5
在高层次上,程序将创建 2 个线程来处理 io_service
的事件循环 (2)。这会产生一个简单的线程池来计算斐波那契数 (3)。
问题代码与此代码之间的一个主要区别是此代码调用io_service::run()
(2) 在 实际工作和处理程序添加到io_service
(3)。为了防止 io_service::run()
立即返回,创建了一个 io_service::work
对象 (1)。这个对象可以防止io_service
用完工作;因此,io_service::run()
不会因为没有工作而返回。
整体流程如下:
-
创建
io_service::work
对象并将其添加到io_service
。
创建的线程池调用io_service::run()
。由于io_service::work
对象,这些工作线程不会从io_service
返回。
将 3 个计算斐波那契数的处理程序添加到 io_service
,并立即返回。工作线程,而不是主线程,可能会立即开始运行这些处理程序。
删除io_service::work
对象。
等待工作线程完成运行。这只会在所有 3 个处理程序都完成执行后才会发生,因为 io_service
既没有处理程序也没有工作。
代码可以以与原始代码相同的方式编写不同的代码,其中将处理程序添加到io_service
,然后处理io_service
事件循环。这消除了使用io_service::work
的需要,并产生以下代码:
int main()
boost::asio::io_service io_service;
io_service.post(boost::bind(CalculateFib, 3)); // '.
io_service.post(boost::bind(CalculateFib, 4)); // :- 3
io_service.post(boost::bind(CalculateFib, 5)); // .'
boost::thread_group worker_threads; // -.
for(int x = 0; x < 2; ++x) // :
// '.
worker_threads.create_thread( // :- 2
boost::bind(&boost::asio::io_service::run, &io_service) // .'
); // :
// -'
worker_threads.join_all(); // 5
同步与异步
虽然问题中的代码使用的是异步操作,但它实际上是同步运行的,因为它正在等待异步操作完成:
socket.async_receive(buffer, handler)
io_service.run();
相当于:
boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);
作为一般经验法则,请尽量避免混合同步和异步操作。很多时候,它可以把一个复杂的系统变成一个复杂的系统。这个answer 突出了异步编程的优点,其中一些也包含在Boost.Asio documentation 中。
【讨论】:
很棒的帖子。我只想添加一件事,因为我觉得它没有得到足够的关注:在 run() 返回后,您需要在 io_service 上调用 reset() ,然后才能再次 run() 它。否则无论是否有 async_ 操作等待,它都可能立即返回。 缓冲区从何而来?这是什么? 我还是一头雾水。如果混音是同步的,不推荐异步的,那么纯异步模式是什么?你能举个例子显示没有 io_service.run(); 的代码吗? @Splash One 可以使用io_service.poll()
来处理事件循环,而不会阻塞未完成的操作。避免混合同步和异步操作的主要建议是避免增加不必要的复杂性,并防止在处理程序需要很长时间才能完成时响应不佳。在某些情况下它是安全的,例如当您知道同步操作不会阻塞时。
中的“当前”是什么意思?Boost.Asio 保证处理程序只会在当前调用run()
的线程中运行... ."?如果有 N 个线程(调用了run()
),那么哪一个是“当前”线程?可以有很多吗?或者你的意思是已经完成执行async_*()
(比如async_read
)的线程,是否也保证调用它的处理程序?【参考方案2】:
为了简化run
的工作方式,将其视为必须处理一堆文件的员工;它需要一张纸,按照纸上说的做,把纸扔掉,然后拿下一张;当他的床单用完时,它就离开了办公室。每张纸上都可以有任何类型的说明,甚至可以在纸堆中添加一张新纸。
回到 asio:您可以通过两种方式为 io_service
工作,本质上是:通过在您链接的示例中使用 post
,或者通过在 io_service
上使用内部调用 post
的其他对象,比如socket
和它的async_*
方法。
【讨论】:
以上是关于boost::asio::io_service 运行方法阻塞/解除阻塞时感到困惑的主要内容,如果未能解决你的问题,请参考以下文章
boost::asio::io_service 运行方法阻塞/解除阻塞时感到困惑
C++ boost::asio::io_service创建线程池thread_group简单实例
C++ boost::asio::io_service创建线程池thread_group简单实例
从作为分离线程运行的 boost::asio::io_service::work 捕获异常