我应该如何在 io_service 中解决这个问题?

Posted

技术标签:

【中文标题】我应该如何在 io_service 中解决这个问题?【英文标题】:How should I solve this problem in io_service? 【发布时间】:2021-05-25 03:31:15 【问题描述】:
class Session 
public:
    Session()
    
        service_ = boost::make_shared<boost::asio::io_service>();
        ep_ = boost::make_shared<boost::asio::ip::tcp::endpoint>(ip::address::from_string("127.0.0.1"), 8001);
        acc_ = boost::make_shared<boost::asio::ip::tcp::acceptor>(*service_, *ep_);
    
    ~Session() 
    void handle_accept(socket_ptr sock_ptr, const boost::system::error_code& err)
    
        if (err)
            return;
        async_read(*sock_ptr, buffer(read_buffer_, 512),
            boost::bind(&Session::handle_msg, this, read_buffer_, sock_ptr, _1));
        memset(read_buffer_, 0, 512);
    

    void start_accept()
    
        boost::system::error_code ec;
        acc_->open(ep_->protocol(), ec);
        acc_->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true), ec);
        acc_->bind(*ep_, ec);
        acc_->listen(boost::asio::socket_base::max_connections, ec);

        sock_ptr_ = boost::make_shared<ip::tcp::socket>(boost::ref(*service_));
        acc_->async_accept(*sock_ptr_, boost::bind(&Session::handle_accept, this, sock_ptr_, _1));
        boost::thread IOwork(boost::bind(&Session::io_work, this));
    

    void handle_msg(char* msg, socket_ptr sock_ptr, const boost::system::error_code& err)
    
        if (err) 
            std::cout << msg << " -----\n\n";
            std::cout << err.message() << "\n";
            return;
        
        else 
            std::cout << msg << " ++++++++++++++\n\n";
        
        boost::system::error_code ec;
        handle_accept(sock_ptr, ec);
    

    void io_work()
    
        boost::system::error_code err;
        service_->run(err);
    

private:
    char read_buffer_[512];
    boost::shared_ptr<boost::asio::io_service> service_ =  nullptr ;
    boost::shared_ptr<boost::asio::ip::tcp::endpoint> ep_ =  nullptr ;
    boost::shared_ptr<boost::asio::ip::tcp::acceptor> acc_ =  nullptr ;
    socket_ptr sock_ptr_ =  nullptr ;
;

Session session_;

int main()

    boost::thread tcp_ser1(boost::bind(&Session::start_accept, &session_));
    while (1);

我用io_service写了一个简单的服务器,但是收到数据后,报End of file的错误。这个问题应该怎么解决呢?采用异步接收和异步读取的方式。每次接收到客户端的数据都会调用函数async_read,handle_msg负责处理客户端接收到的消息。

【问题讨论】:

【参考方案1】:

async_read 具有无限缓冲区将总是到达 EOF。

在您的情况下,缓冲区是有界的,但是任何短于 512 字节的流也会从另一端关闭连接,但仍会导致 EOF。

只要处理这个条件。例如

    if (err && !(bytes_transferred > 0 && err != boost::asio::error::eof)) 
        std::cout << msg << " -----\n\n";
        std::cout << err.message() << "\n";
        return;
    
    else 
        std::cout << msg << " ++++++++++++++\n\n";
    

旁注

一般来说,如果您希望能够忽略error_codes,只需不要传递参数。因为这样你就会得到异常处理。否则,如果以违反先决条件的方式使用 API,您会得到未定义的行为或只是未指定的行为。

不要用共享指针乱扔代码,而是让 Session 本身可共享

将你的听众称为会话有点不典型(会话通常被认为是为每个接受的连接开始

在构造函数初始化器列表中优先使用基/成员初始化器(或从 C++11 开始使用 NSDMI,因为您已经冗余地使用 nullptr 进行初始化)

考虑使用 std::bind、std::shared_ptr 等代替 boost 版本

不要传递原始缓冲区指针(丢失大小信息),也不要忽略bytes_transferred 参数。否则,简单地打印 msg 已经是一个安全问题 - 并且会运行到 UB,因为如果您读取 512 个字节,则没有 NUL 终止符

这是一场数据竞赛:

 async_read(*sock_ptr, buffer(read_buffer_, 512),
     std::bind(&Session::handle_msg, this, read_buffer_, sock_ptr, _1));
 memset(read_buffer_, 0, 512);

问题是async_read 立即返回,但您可能同时修改read_bufferasync_read。数据竞赛也是 UB

还有一个问题

  boost::thread IOwork(std::bind(&Session::io_work, this));

这让IOwork 线程对象超出范围,但线程没有分离。我认为这应该导致程序异常终止

不要忘记 IOWork 中的异常处理(请参阅Should the exception thrown by boost::asio::io_service::run() be caught?)

固定代码

修改您的代码:Live On Coliru

#include <boost/asio.hpp>
#include <functional>
#include <iomanip>
#include <iostream>
#include <string_view>
#include <thread>

using boost::asio::ip::tcp;
using boost::system::error_code;
using namespace std::placeholders;
using namespace std::chrono_literals;

class Session : public std::enable_shared_from_this<Session> 
  public:
    Session(tcp::socket&& sock) : socket_(std::move(sock))  

    void start()
    
        do_read();
    

  private:
    void do_read()
    
        read_buffer_ = 0;
        async_read(socket_, boost::asio::buffer(read_buffer_),
                   std::bind(&Session::handle_msg, shared_from_this(), _1, _2));
    

    void handle_msg(error_code err, size_t bytes_transferred)
    
        std::string_view msg(read_buffer_.data(), bytes_transferred);

        std::cout << std::quoted(msg) << " -----\n" << std::endl;

        if (err && !(bytes_transferred > 0 && err != boost::asio::error::eof)) 
            std::cerr << "Error: " << err.message() << std::endl;
            return;
        

        do_read();
    

  private:
    std::array<char, 512> read_buffer_0;
    tcp::socket           socket_;
;

class Server 
  public:
    void start()
    
        acc_.set_option(tcp::acceptor::reuse_address(true));
        acc_.listen();
        do_accept();
    

    void stop() 
        post(service_, [this] 
            acc_.cancel(); /*or acc.close();*/
        );
    

    ~Server() 
        work_.reset();
        //// optionally:
        //stop();

        worker_.join();
    

  private:
    void do_accept()
    
        acc_.async_accept(std::bind(&Server::handle_accept, this, _1, _2));
    

    void handle_accept(error_code err, tcp::socket&& s)
    
        if (err)
            return; // TODO log error?

        std::make_shared<Session>(std::move(s))->start();

        do_accept(); // continue accepting connections
    

    static void io_work(boost::asio::io_service& svc)
    
        // http://
        // www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
        for (;;) 
            try 
                svc.run();
                break; // exited normally
             catch (std::exception const& e) 
                std::cerr << "[io_work] error: " << e.what() << std::endl;
             catch (...) 
                std::cerr << "[io_work] An unexpected error occurred"
                          << std::endl;
            
        
    

  private:
    boost::asio::io_service service_;
    tcp::acceptor           acc_service_, , 8001;

    boost::asio::executor_work_guard<boost::asio::io_service::executor_type>
                work_make_work_guard(service_);

    std::thread worker_std::bind(&Server::io_work, std::ref(service_));
;

int main()

    Server server;
    server.start();

    std::this_thread::sleep_for(5s); // for COLIRU demo

    server.stop();

    // destructor joins when all sessions exited

在线演示使用的缓冲区大小为 8 字节:

"#include" -----

" <boost/" -----

"asio.hpp" -----

">
#inclu" -----

"de <func" -----

"tional>
" -----

"#include" -----

" <iomani" -----

"p>
#incl" -----

"ude <ios" -----

"tream>
#" -----

(……剪掉……)

"p();

  " -----

"  // des" -----

"tructor " -----

"joins wh" -----

"en all s" -----

"essions " -----

"exited
" -----

"
" -----

Error: End of file

文件大小与 8 字节缓冲区大小对齐并不明显,因此最后一次读取实际上是在检测 EOF 时读取 9 字节。您可能想要删除 bytes_transferred&gt;0 条件。

【讨论】:

添加了一个大大简化/改进的代码Live On Coliru

以上是关于我应该如何在 io_service 中解决这个问题?的主要内容,如果未能解决你的问题,请参考以下文章

io_service,为啥以及如何使用它?

Boost::Asio : io_service.run() vs poll() 或者我如何在主循环中集成 boost::asio

多个 ASIO io_services 是好事吗?

如何将 boost::asio::tcp::io_stream 附加到我的 io_service?

非阻塞 io_service::run

函数在需要时触发。我应该如何解决这个问题?