boost::async 读写在一次读取和两次写入后卡住

Posted

技术标签:

【中文标题】boost::async 读写在一次读取和两次写入后卡住【英文标题】:boost::async read write stuck after one read and two writes 【发布时间】:2021-02-18 18:26:10 【问题描述】:

我试图构建一个服务器-客户端应用程序,服务器端是 c++,带有 boost::asio 异步,但是当我运行应用程序时,它在 1 次读取和两次写入后卡住了。 这是我删除 do_write 函数调用时的相关代码,我收到了我正在发送的所有 5 条消息

编辑 当我使用调试器运行时,我能够接收所有五条消息并发送所有响应

void start()
  
    std::thread t1([this]()do_write(););
    t1.detach();
    std::thread t2([this]()do_read(););
    t2.detach();
  

void do_write()
  
    auto self(shared_from_this());
    Message msg;
    while(order_response_queue_.empty())
    
    auto order_response = order_response_queue_.front();
    order_response_queue_.pop();
    try 
      auto m = order_response.SerializeToString();
      msg.body_length(std::strlen(m.c_str()));
      std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
      msg.encode_header();
      std::memcpy(data_,msg.data(), msg.length());
      //std::memcpy(static_cast<void *>(msg.data()), data_, msg.length());
     catch (std::exception& e )
    
      std::cout << e.what();
    
    std::cout <<"write: " << msg.body() << "\n";
    boost::asio::async_write(socket_, boost::asio::buffer(data_,msg.length()),
                   [this, self](boost::system::error_code ec, std::size_t /*length*/)
                   
                      if (ec)
                       std::cerr << "write error:" << ec.value() << " message: " << ec.message() << "\n";
                      
                      do_write();
                    );
 


void do_read()
  
    auto self(shared_from_this());
    Message msg;
    socket_.async_read_some(boost::asio::buffer(res.data(), res.header_length),
                            [this, self](boost::system::error_code ec, std::size_t length) 
                            if (!ec && res.decode_header()) 

                            std::string st(res.body());
                            boost::asio::async_read(socket_, boost::asio::buffer(res.body(),                                                                                      res.body_length()), [this](boost::system::error_code ec, std::size_t length) 
                       if (!ec) 
                           std::cout << "read " << res.body() << "\n";
                           req_.DeserializeFromChar(res.body());
                           order_request_queue_.push(req_);
                         else 
                           if (ec) 
                               std::cerr << "read error:" << ec.value() << " message: "
                                                                        << ec.message() << "\n";
                           
                           socket_.close();
                         
                  );
        
        do_read();
    );
 

这里是io_service

class Server

 public:
 
  Server(boost::asio::io_service& io_service, short port,std::queue<OrderRequest> &order_request_queue,
         std::queue<OrderResponse> &order_response_queue)
      : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
        socket_(io_service) , order_response_queue_(order_response_queue),  order_request_queue_(order_request_queue)
  
    do_accept();
  

 private:
  void do_accept()
    acceptor_.async_accept(socket_,
                           [this](boost::system::error_code ec) 
                             if (!ec) 
                               std::cout << "accept connection\n";
                               std::make_shared<Session>(std::move(socket_),order_request_queue_, order_response_queue_)->start();
                             

                             do_accept();
                           );
  


  tcp::acceptor acceptor_;
  tcp::socket socket_;
  std::queue<OrderResponse> &order_response_queue_;
  std::queue<OrderRequest> &order_request_queue_;
;

【问题讨论】:

我们能看到更多代码吗?处理由async_write 调用的对do_write() 的后续调用的线程的代码在哪里? @DavidSchwartz 如果我理解正确的话,start() 函数 start 函数创建一个调用do_write 然后终止的线程和一个调用do_read 然后终止的线程。处理异步操作完成的线程在哪里?!你没有? @DavidSchwartz async_readasync_write 在 lemda 函数中调用自己作为处理程序 对。您将处理完成运行的线程的处理程序传递给他们。但是该线程的代码在哪里?它是在哪里创建的?当没有要处理的完成时它会做什么?您的代码创建了两个线程,这两个线程都在调用完成处理程序之前终止(并且无论如何都不调用任何完成处理程序)。 executor/io_service 代码在哪里? 【参考方案1】:

您应该使示例独立。

Message 是什么, data_ 来自哪里, res 来自哪里 为什么data_msg.length() 一起使用, 是否包括标头长度, req_res之间有什么关系 它们的寿命是多少。 为什么要使用线程异步 正在运行的执行上下文在哪里等等。

这是您给我们的评论:

您在线程中使用order_response_queue_,没有任何锁定。这真的不行,因为 do_write 线程上没有任何东西会推送到该队列上,所以其他线程必须这样做。

说实话,我认为最好的起点是删除线程。您似乎使用它们的主要原因是:

“保持循环运行”,这已经是异步执行上下文的一个特性。但是,您已经链接到完成处理程序(例如再次调用 do_write),所以没关系

能够“等待”队列中的消息:

 while (order_response_queue_.empty()) 
 

通常的方法是在推送第一条消息时有条件地启动循环,并在队列为空时让它“停止”。无论如何,这避免了忙碌的旋转。 只需将start 替换为类似

 void start() 
     post(strand_, [this, self = shared_from_this()] 
         if (!order_response_queue_.empty()) 
             do_write();
         

         do_read();
     );
 

还有类似的东西

 void enqueue_response(Message stuff) 
     post(strand_, [this, stuff, self = shared_from_this()] 
         order_response_queue_.push(std::move(stuff));
         if (1 == order_response_queue_.size())  // not idle/already writing
             do_write();
         
     );
 

请注意,使用共享队列的每个操作都在逻辑链上同步,以防您使用多线程执行上下文。

原始草图

我不太明白 Serialize/Deserialize 背后与 Message 相关的想法,所以这可能行不通 - 尽管我觉得它会给你很多关于如何简化事情的线索:

Live On Compiler Explorer*

#include <thread>
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <queue>
using boost::asio::ip::tcp;

struct Message 
    struct header_t 
        // TODO add magic bytes
        std::uint32_t body_len = 0; // network byte order
    ;
    enum  header_length = sizeof(header_t) ;
    std::string _contentsheader_length;

    size_t body_length() const 
        return length() - header_length;
    

    void body_length(size_t n) 
        _contents.resize(n+header_length);
        encode_header();
    

    bool decode_header() 
        assert(_contents.length() >= header_length);

        auto &header = *reinterpret_cast<header_t *>(_contents.data());
        body_length(ntohl(header.body_len));

        // TODO verify magic bytes
        return true;
    

    void encode_header() 
        assert(_contents.length() >= header_length);

        auto &header = *reinterpret_cast<header_t *>(_contents.data());
        header.body_len = htonl(_contents.length() - header_length);
    

    std::string SerializeToString() const  return _contents; 
    void DeserializeFromChar(char const*)   // TODO IMPLEMENTATION

    char *data()              return _contents.data();        
    char *body()              return data() + header_length;  
    char const *data() const  return _contents.data();        
    char const *body() const  return data() + header_length;  
    size_t length() const     return _contents.size();        

    bool operator<(Message const &rhs) const  return _contents < rhs._contents; 
;

struct X : std::enable_shared_from_this<X> 
    std::queue<Message> order_request_queue_, order_response_queue_;

    void start() 
        post(strand_, [this, self = shared_from_this()] 
            if (!order_response_queue_.empty()) 
                do_write();
            

            do_read();
        );
    

    void enqueue_response(Message stuff) 
        post(strand_, [this, stuff, self = shared_from_this()] 
            order_response_queue_.push(std::move(stuff));
            if (1 == order_response_queue_.size())  // not idle/already writing
                do_write();
            
        );
    

    void do_write() 
        auto self(shared_from_this());
        auto order_response = std::move(order_response_queue_.front());
        order_response_queue_.pop();

        Message msg;
        try 
            auto m = order_response.SerializeToString();
            msg.body_length(std::strlen(m.c_str()));
            std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
            msg.encode_header();

            assert(msg.length() <= data_.size());
            std::memcpy(data_.data(), msg.data(), msg.length());
            // std::memcpy(static_cast<void *>(msg.data()), data_,
            // msg.length());
         catch (std::exception &e) 
            std::cout << e.what();
        

        std::cout << "write: " << msg.body() << "\n";

        boost::asio::async_write(
            socket_, boost::asio::buffer(data_, msg.length()),
            [this, self](boost::system::error_code ec, std::size_t /*length*/) 
                if (ec) 
                    std::cerr << "write error:" << ec.value()
                              << " message: " << ec.message() << "\n";
                 else if (!order_response_queue_.empty()) 
                    do_write();
                
            );
    

    Message res, req_;
    void do_read() 
        auto self(shared_from_this());
        Message msg;
        socket_.async_read_some(
            boost::asio::buffer(res.data(), res.header_length),
            [this, self](boost::system::error_code ec, std::size_t /*length*/) 
                if (!ec && res.decode_header()) 

                    std::string st(res.body());
                    boost::asio::async_read(
                        socket_,
                        boost::asio::buffer(res.body(), res.body_length()),
                        [this](boost::system::error_code ec,
                               std::size_t /*length*/) 
                            if (!ec) 
                                std::cout << "read " << res.body() << "\n";
                                req_.DeserializeFromChar(res.body());
                                order_request_queue_.push(req_);
                             else 
                                if (ec) 
                                    std::cerr << "read error:" << ec.value()
                                              << " message: " << ec.message()
                                              << "\n";
                                
                                socket_.close();
                            
                        );
                
                do_read();
            );
    

    using Executor = boost::asio::system_executor;
    boost::asio::strand<Executor> strand_  Executor ;
    tcp::socket socket_  strand_ ;
    std::array<char, 1024> data_;

    // quick & dirty connection
    X()  socket_.connect(, 8989); 
;

int main() 
    auto x = std::make_shared<X>();
    x->start();

    
        Message demo;
        std::string_view greeting"Hello world!";
        demo.body_length(greeting.length());
        std::copy(greeting.begin(), greeting.end(), demo.body());

        x->enqueue_response();
    

    boost::asio::query(boost::asio::system_executor(),
                       boost::asio::execution::context)
        .join();

【讨论】:

感谢您的详细回答,启动函数上的线程的原因是同时在写入和读取上工作,但根据我从您的回答中了解到的情况,这是不合格的。并且我的 IDE 无法识别 system_execute (boost 1.58) 是的,只需使用 io_service。不过那已经很旧了。 (如果你展示你的代码会有所帮助,所以我不必编造东西)。此外,您不需要 any 线程来同时读取和写入。这就是异步 IO 的本质。我有很多答案,我展示了使用 Asio 的单线程多连接全双工 TCP 服务器。具体来说,我建议您查看一些典型的全双工示例(在图书馆聊天示例中或on this site) 非常感谢,我想我解决了这个问题。

以上是关于boost::async 读写在一次读取和两次写入后卡住的主要内容,如果未能解决你的问题,请参考以下文章

有关文件读取写入 和两种文件打开模式的理解

java中怎么对同一个文件读取两次或以上,用io

boost::async_write 大文件和内存消耗

boost::async_write 导致数据损坏

linuxfile两次读

boost async_read_some 用法