Boost::asio - 如何中断阻塞的 tcp 服务器线程?

Posted

技术标签:

【中文标题】Boost::asio - 如何中断阻塞的 tcp 服务器线程?【英文标题】:Boost::asio - how to interrupt a blocked tcp server thread? 【发布时间】:2012-06-26 19:24:11 【问题描述】:

我正在开发一个多线程应用程序,其中一个线程充当从客户端接收命令的 tcp 服务器。线程使用 Boost 套接字和接受器等待客户端连接,从客户端接收命令,将命令传递给应用程序的其余部分,然后再次等待。代码如下:

void ServerThreadFunc()

    using boost::asio::ip::tcp;
    boost::asio::io_service io_service;
    tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), port_no));

    for (;;)
    
        //  listen for command connection
        tcp::socket socket(io_service);
        acceptor.accept(socket);

        //  connected; receive command
        boost::array<char,256> msg_buf;
        socket.receive(boost::asio::buffer(msg_buf));

        //  do something with received bytes here
    

该线程大部分时间都被阻塞在对acceptor.accept() 的调用上。目前,线程仅在应用程序退出时终止。不幸的是,这会在 main() 返回后导致崩溃——我相信是因为线程在单例被销毁后尝试访问应用程序的日志记录单例。 (老实说,我来的时候就是这样。)

当应用程序退出时,我怎样才能干净地关闭这个线程?我读过原始套接字上的阻塞 accept() 调用可以通过从另一个线程关闭套接字来中断,但这似乎不适用于 Boost 套接字。我尝试使用 Boost asynchronous tcp echo server example 将服务器逻辑转换为异步 i/o,但这似乎只是将阻塞调用 acceptor::accept() 换成阻塞调用 io_service::run(),所以我遇到了同样的问题: 一个我无法打断的阻塞呼叫。有什么想法吗?

【问题讨论】:

总是有io_service::stop,可以从另一个线程调用。 拥有事件循环的整个想法是,一切都发生在事件循环中,即 run()。这意味着您首先无需担心第二个线程。 (尽管有时在事件循环之外做一些事情是有意义的,比如要求很高的计算) @AmbrozBizjak,我同意。使用带有 select() 的老式套接字处理,您可以在一个循环中完成所有必要的事情。当已完成的连接数非零时,select() 会将侦听的 TCP 套接字设置为准备好读取。因此,在这个主选择循环中,您可以在一个地方处理所有接受、读取和写入。 Richard Stevens 的书:“Unix NW 编程,第 1 卷”谈到了这一点。 【参考方案1】:

简而言之,有两种选择:

将代码更改为异步代码(acceptor::async_accept()async_read),通过io_service::run() 在事件循环中运行,并通过io_service::stop() 取消。 使用信号等较低级别的机制强制阻塞调用中断。

我会推荐第一个选项,因为它更便于携带且更易于维护。要理解的重要概念是,只要有待处理的工作,io_service::run() 就会阻塞。当io_service::stop()被调用时,它会尝试让所有阻塞在io_service::run()上的线程尽快返回;它不会中断同步操作,例如acceptor::accept()socket::receive(),即使在事件循环中调用了同步操作。需要注意的是,io_service::stop() 是一个非阻塞调用,因此与在io_service::run() 上阻塞的线程的同步必须使用另一种机制,例如thread::join()

这是一个运行 10 秒并监听 8080 端口的示例:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <iostream>

void StartAccept( boost::asio::ip::tcp::acceptor& );

void ServerThreadFunc( boost::asio::io_service& io_service )

  using boost::asio::ip::tcp;
  tcp::acceptor acceptor( io_service, tcp::endpoint( tcp::v4(), 8080 ) );

  // Add a job to start accepting connections.
  StartAccept( acceptor );

  // Process event loop.
  io_service.run();

  std::cout << "Server thread exiting." << std::endl;


void HandleAccept( const boost::system::error_code& error,
                   boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
                   boost::asio::ip::tcp::acceptor& acceptor )

  // If there was an error, then do not add any more jobs to the service.
  if ( error )
  
    std::cout << "Error accepting connection: " << error.message() 
              << std::endl;
    return;
  

  // Otherwise, the socket is good to use.
  std::cout << "Doing things with socket..." << std::endl;

  // Perform async operations on the socket.

  // Done using the socket, so start accepting another connection.  This
  // will add a job to the service, preventing io_service::run() from
  // returning.
  std::cout << "Done using socket, ready for another connection." 
            << std::endl;
  StartAccept( acceptor );
;

void StartAccept( boost::asio::ip::tcp::acceptor& acceptor )

  using boost::asio::ip::tcp;
  boost::shared_ptr< tcp::socket > socket(
                                new tcp::socket( acceptor.get_io_service() ) );

  // Add an accept call to the service.  This will prevent io_service::run()
  // from returning.
  std::cout << "Waiting on connection" << std::endl;
  acceptor.async_accept( *socket,
    boost::bind( HandleAccept,
      boost::asio::placeholders::error,
      socket,
      boost::ref( acceptor ) ) );


int main()

  using boost::asio::ip::tcp;

  // Create io service.
  boost::asio::io_service io_service;

  // Create server thread that will start accepting connections.
  boost::thread server_thread( ServerThreadFunc, boost::ref( io_service ) );

  // Sleep for 10 seconds, then shutdown the server.
  std::cout << "Stopping service in 10 seconds..." << std::endl;
  boost::this_thread::sleep( boost::posix_time::seconds( 10 ) );
  std::cout << "Stopping service now!" << std::endl;

  // Stopping the io_service is a non-blocking call.  The threads that are
  // blocked on io_service::run() will try to return as soon as possible, but
  // they may still be in the middle of a handler.  Thus, perform a join on 
  // the server thread to guarantee a block occurs.
  io_service.stop();

  std::cout << "Waiting on server thread..." << std::endl;
  server_thread.join();
  std::cout << "Done waiting on server thread." << std::endl;

  return 0;

在运行时,我打开了两个连接。这是输出:

10 秒后停止服务...
等待连接
用socket做事...
使用套接字完成,准备进行另一个连接。
等待连接
用socket做事...
使用套接字完成,准备进行另一个连接。
等待连接
立即停止服务!
正在等待服务器线程...
服务器线程退出。
在服务器线程上等待完成。

【讨论】:

太棒了——正是我需要的代码和解释。谢谢! 你应该得到 +100 代表 :-) 我会添加从服务器内部连接到服务器以完成接受的选项:an example using this approach in a completely synchronous Asio server【参考方案2】:

当您收到需要退出的事件时,您可以调用acceptor.cancel(),这将取消挂起的接受(错误代码为operation_canceled)。在某些系统上,为了安全起见,您可能还必须close() 接受者。

【讨论】:

acceptor::cancel()acceptor::close() 都不会中断 同步 操作。这些调用只会导致未完成的异步操作准备好在服务事件循环中运行,错误为operation_canceled【参考方案3】:

如果涉及到它,您可以在 localhost 上打开一个到它的临时客户端连接 - 这会唤醒它。您甚至可以向它发送一条特殊消息,以便您可以从 pub 关闭您的服务器 - 应该有一个应用程序:)

【讨论】:

这实际上是我可以退出在acceptor.accept()上阻塞的服务器线程的唯一方法;让服务器向自己发送一条消息,这使得 accept() 返回【参考方案4】:

只需使用本机句柄和 SHUT_RD 选项调用 shutdown,即可取消现有的接收(接受)操作。

【讨论】:

【参考方案5】:

接受的答案并不完全正确。 事实上@JohnYu 回答正确

使用 ASIO 的阻塞 API 很像使用 ASIO 库封装在其类中的 BSD 套接字 API。

问题是 boost::asio::ip::tcp::acceptor 类不提供 shutdown() 功能,因此您必须使用“旧”套接字 API 来完成。

附加说明:确保在所有使用它的线程退出之前,acceptorsocketio_service 未被删除。在以下代码中,std::shared_ptr 用于保持共享资源处于活动状态,因此ApplicationContext 类的用户可以删除ApplicationContext 对象并避免SEGFAULT 崩溃

补充说明:注意 boost 文档,有引发异常的重载方法和返回错误代码的方法。原始海报的代码使用 acceptor-&gt;accept(socket); 没有 try/catch 会导致程序退出而不是正常的线程例程退出和清理。

解决方案说明如下:

#include <unistd.h> // include ::shutdown() function
// other includes ...

using boost::asio::ip::tcp;
using boost::asio::io_service;

class ApplicationContext 

    // Use shared pointer to extend life of resources afer ApplicationContext is deleted
    // and running threads can still keep using shared resources
    std::shared_ptr<tcp::acceptor> acceptor;
    std::shared_ptr<io_service> ioservice;

    // called `ServerThreadFunc` in question code example
    void AcceptLoopThreadRoutine(int port_no) 
        ioservice = std::make_shared<io_service>();
        acceptor = std::make_shared<tcp::acceptor>(*ioservice, tcp::endpoint(tcp::v4(), port_no));

        try 
            for (;;) 
                // listen for client connection
                tcp::socket socket(*ioservice);
                // Note boost::system::system_error is raised when using this overload
                acceptor->accept(socket);

                // connected receive some data ...
                // // boost::array<char,256> msg_buf;
                // // socket.receive(boost::asio::buffer(msg_buf));
                //  do something with received bytes here
            
         catch(std::exception const & exception) 
            // boost::system::system_error here indicates clean exit ;)
        
    

    void StopAcceptThread() 
        if(acceptor) 
            // boost::asio::ip::tcp::acceptor does not have shutdown() functionality
            // exposed, so we need to do it with this low-level approach
            int shutdown_status = shutdown(acceptor->native_handle(), SHUT_RDWR);
        
    

;

另请注意,使用信号解除阻塞接受线程是非常讨厌的实现,本地主机上的临时客户端连接解除阻塞接受线程非常尴尬。

ASIO 可帮助您通过回调在单线程中完成所有工作。如果您正在混合线程和 ASIO,那么您的设计可能很糟糕。

补充说明:不要混淆shutdown()close()。某些系统可能允许您在接受套接字上使用close() 来解除阻塞接受循环,但这不是可移植的。

【讨论】:

ASIO 中的 AS 用于asynchronous。它不仅限于回调。事实上,回调只是延续的可用方式之一:使用不同的CompletionToken 可以获得std::future 或使用协程进行异步编程。可调用对象本身可以绑定到可能在不同线程中同时调用提供的回调的执行程序。所以,你对 bad 设计的看法是非常错误的。 @SergeyKolesnik 您在谈论与问题无关的抽象。我的回答仅限于问题的范围。该问题具有需要答案中提供的特定解决方案的具体代码。在这种情况下,建议是合理且正确的。 首先,your 是关于使用 asio 进行设计的基于意见的概括确实具有误导性,并且不需要与问题相关。其次,关于您的代码:使用shared_ptr 是完全不合理的,并且您没有说明您将如何以及在何种情况下共享资源的所有权。根据文档,共享 acceptor 不是线程安全的。

以上是关于Boost::asio - 如何中断阻塞的 tcp 服务器线程?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 boost asio 中设置阻塞套接字的超时时间?

如何接受boost :: asio :: ssl :: stream 作为boost :: asio :: ip :: tcp :: socket类型的参数

boost::asio 扩展 TCP 套接字

Boost.Asio、tcp::iostream 和多线程

如何在 Boost.ASIO 中分配已连接的本机套接字类型 (TCP)

boost::asio::read() 永远阻塞