我应该如何在 boost::asio 的客户端应用程序中同时使用 async_read_until 和 async_write?

Posted

技术标签:

【中文标题】我应该如何在 boost::asio 的客户端应用程序中同时使用 async_read_until 和 async_write?【英文标题】:How should I use async_read_until and async_write simultaneously in the client app in boost::asio? 【发布时间】:2021-01-30 15:11:24 【问题描述】:

我的动机。

我正在尝试构建一个简单的信使。目前我已经编写了支持“类似邮件功能”的客户端和服务器应用程序,即它们缺少您在每个即时消息中都有的聊天交互。

这是我使用的模型。

服务器:每个连接的客户端的服务器都有一个专用的Service 类来提供实际服务。 Service 类的实例有一个 id。

客户端:在特定时刻同时开始从关联的Service 实例读取消息和向其写入消息。

Tracker:通过将用户的登录信息和Service ids 保存在地图中来记录用户的当前会话。还通过保存键值对(聊天参与者 id 1,聊天参与者 id 2)记录打开的聊天。因为我有一个数据库,所以我可以互换使用用户的登录名和 ID。

这是一个典型的使用场景。

    用户正在尝试登录。服务器将 ID 为 1 的 Service 实例专用于该用户。然后将用户标识为 Bob。 Bob 开始与 Ann 聊天。 Tracker 记录 Bob 使用的 Service 1 以及 Bob 打开了与 Ann 的聊天。 用户正在尝试登录。服务器将 ID 为 2 的 Service 实例专用于该用户。然后将用户标识为 Ann。 Ann 开始与 Bob 聊天。 Tracker 记录了 Ann 使用的 Service 2 以及 Ann 打开了与 Bob 的聊天。 Ann 给 Bob 写了一条消息。 Service2 收到消息并要求Service1 如果 Bob 已打开与 Ann 的聊天,则将消息发送到 Bob 的聊天。为此,我使用Tracker。在我们的例子中,Bob 正在聊天,所以 Bob 的客户端应用程序应该从 Service 1 读取消息。否则 Service 2 只会将新消息存储在数据库中。

当用户打开与某人的聊天时,客户端应用程序同时开始向关联 Service 实例读取和写入消息。

问题

    Bob 开始与 Ann 聊天。 Ann 开始与 Bob 聊天。 Ann 发送消息。它们显示在 Bobs 聊天中。 Bob 发送消息。它不会显示在 Ann 的聊天中。此外,Ann 的其他消息不再显示在 Bob 的聊天中。

这是我的服务器代码的一部分。我添加了一些上下文,但您可能想查看Service::onMessageReceivedService::receive_messageService::send_to_chat

/// Struct to track active sessions of clients
struct Tracker 
  static std::mutex current_sessions_guard; ///< mutex to lock the map of current sessions between threads
  static std::map<long, long> current_sessions; 
  static std::map<long, int> client_to_service_id;
;

在客户服务模型中提供实际服务的类

class Service 
public:
  void send_to_chat(const std::string& new_message) 
    asio::async_write(*m_sock.get(), asio::buffer(new_message),
      [this]() 
        onAnotherPartyMessageSent(); 
      );
   

private:
  void onReceivedReady();
  void receive_message() 
    /// Server loop for reading messages from the client
    spdlog::info("[] in receive_message", service_id);

    asio::async_read_until(*m_sock.get(), *(m_message.get()), '\n',
      [this]() 
        onMessageReceived();
      );
   
  void onMessageReceived();

private:
  std::shared_ptr<asio::ip::tcp::socket> m_sock; ///< Pointer to an active socket that is used to communicate
                                                 ///< with the client
  int service_id;  
  long dialog_id = -1, client_id = -1, another_party_id = -1;
  std::shared_ptr<asio::streambuf> m_message;
;

方法的定义


void Service::onMessageReceived() 
  /// Updates the database with the new message and asks Service instance of another participant
  /// to send the message if they opened this chat.

  std::istream istrm(m_message.get());
  std::string new_message;
  std::getline(istrm, new_message);
  m_message.reset(new asio::streambuf);

  std::unique_lock<std::mutex> tracker_lock(Tracker::current_sessions_guard);

  if (Tracker::current_sessions.find(another_party_id) != Tracker::current_sessions.end()) 
    if (Tracker::current_sessions[another_party_id] == client_id) 
      int another_party_service_id = Tracker::client_to_service_id[another_party_id];
      std::string formatted_msg = _form_message_str(login, new_message);
      
      spdlog::info("[] sends to chat ''", another_party_service_id, new_message);

      Server::launched_services[another_party_service_id]->send_to_chat(formatted_msg);
    
  
  tracker_lock.unlock();
  receive_message();
 

这是我的客户端代码的一部分。我添加了一些上下文,但您可能想查看AsyncTCPClient::onSentReadyAsyncTCPClient::message_send_loopAsyncTCPClient::message_wait_loop

/// Struct that stores a session with the given server
struct Session 
  asio::ip::tcp::socket m_sock; //!< The socket for the client application to connect to the server
  asio::ip::tcp::endpoint m_ep; //!< The server's endpoint
  std::string current_chat;

  std::shared_ptr<asio::streambuf> m_chat_buf;
  std::shared_ptr<asio::streambuf> m_received_message;
;

/// Class that implements an asynchronous TCP client to interact with Service class
class AsyncTCPClient: public asio::noncopyable 

  void onSentReady(std::shared_ptr<Session> session) 
  
    msg_wait_thread.reset(new std::thread([this, session] 
      asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n", 
        [this, session] () 
          message_wait_loop(session);
        );
      ));
    msg_wait_thread->detach();

    msg_thread.reset(new std::thread([this, session] 
      message_send_loop(session);
      ));

    msg_thread->detach();
   


  void message_send_loop(std::shared_ptr<Session> session) 
    /// Starts loop in the current chat enabling the client to keep sending messages to another party
    logger->info("'' in message_send_loop", session->login);

    clear_console();
    m_console.write(session->current_chat);
    m_console.write("Write your message: ");

    std::string new_message;

    // We use a do/while loop to prevent empty messages either because of the client input
    // or \n's that were not read before

    do 
      new_message = m_console.read();
     while (new_message.empty());
    

    std::unique_lock<std::mutex> lock_std_out(std_out_guard);
    session->current_chat.append(_form_message_str(session->login, new_message));
    lock_std_out.unlock();

    asio::async_write(session->m_sock, asio::buffer(new_message + "\n"), 
      [this, session] () 
        message_send_loop(session);
      ); 
   

  void message_wait_loop(std::shared_ptr<Session> session) 
    /// Starts loop in the current chat enabling the client to keep reading messages from another party

    logger->info("'' in message_wait_loop", session->login);

    std::istream istrm(session->m_received_message.get());
    std::string received_message;
    std::getline(istrm, received_message);

    session->m_received_message.reset(new asio::streambuf);

    std::unique_lock<std::mutex> lock_std_out(std_out_wait_guard);
    session->current_chat.append(received_message + "\n");
    lock_std_out.unlock();

    clear_console();
    m_console.write(session->current_chat);
    m_console.write("Write your message: ");
    
    asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n", 
      [this, session] (std::size_t) 
        message_wait_loop(session);
      );
  

private:
  asio::io_context m_ios;
;

所以,当我描述问题时,我在第 3 点上没有两个客户端的 "'' in message_wait_loop" 日志)。但我在第 2 点为 Bob 的客户提供了这些日志。

我也使用来自answer here 的控制台。它通过互斥体去除回声并控制标准输入/输出资源。但是它并没有解决我的问题。

任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

代码太多而太少。这个问题太多了,而实际上建议改进的太少了。我看到过度使用 shared_ptr,线程,特别是在他们自己的线程上运行异步操作非常奇怪。更别说分离了:

msg_wait_thread.reset(new std::thread([this, session] 
      asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n", 
        [this, session] () 
          message_wait_loop(session);
        );
      ));
    msg_wait_thread->detach();

最好用完全等效的(但更安全)替换整个东西

  asio::async_read_until(session->m_sock, *(session->m_received_message.get()), "\n", 
    [this, session] () 
      message_wait_loop(session);
    );

我想读取循环在一个线程上,这样输入就不会阻塞。但是,如果您将主线程视为“UI 线程”(确实如此),并接受控制台 IO 在那里阻塞,而不是将结果请求发布到单个 IO 线程以进行所有非阻塞操作,则会变得容易得多。

如果您分享一个回购链接或其他内容,我很乐意查看更多内容。

更新

在 cmets 中,我查看了来自 github repo 的代码并发布了 PR:https://github.com/cepessh/mymsg/pull/1

这是一个非常原始的概念验证。我已经包含了许多更改 实际上与建议的并发修复无关,但它们 发生了:

让我跑步 在审核期间(您可能希望查看其中的一些更改并保留它们) main 分支中明显缺失的修复(例如,Message.read_by_recipient 数据库列的默认值)

您应该能够弄清楚所做的更改以及更改原因 commit messages.

只有最后两个提交真正关注in chat 讨论的想法。

【讨论】:

我已经尝试了几个小时来削减代码,以免它被视为垃圾邮件。这是我的回购github.com/cepessh/mymsg。如果你想构建它,那么你需要在 CMakeLists.txt 中调整 boost 路径(如果你没有它,boost::system 也需要单独构建,其他 boost 文件在我的项目中用作仅标题库) .如果要完全复制这种情况,则需要在 msgDB 中运行 sql 脚本来创建数据库并添加用户。然后调整 cfg_server.json 中的凭据 好的,我的预感完全正确:i.imgur.com/KmrqZNt.png 输入操作阻塞了 IO 服务。 我仍然不明白如何解决这个问题。如果输入操作阻塞了 IO 服务,为什么它只在问题的 3) 点阻塞?因此,据我了解,在我的情况下,IO 阻塞仅在对方实际键入内容时才会发生。此外,IO 的阻塞如何也干扰 message_wait_loop 中的 async_read_until?我的意思是 async_read_until 的处理程序在第 3 点没有被调用,尽管服务发送了消息。请问,你能详细说明我应该如何管理线程吗? 它总是阻塞,但有时时机可能是幸运的。例如。当双向发送另一条消息时,它总是对我不起作用(所以这只是第一次有时“起作用”) “它如何干扰”?真的,你应该问:怎么不。您在线程上运行 io_service。这意味着线程一次可以执行一个处理程序。您的一些处理程序会阻塞 std::cin IO,这意味着它们会阻止任何其他完成处理程序(例如来自 async_read_until)执行直到完成。

以上是关于我应该如何在 boost::asio 的客户端应用程序中同时使用 async_read_until 和 async_write?的主要内容,如果未能解决你的问题,请参考以下文章

Boost::asio - 如何中断阻塞的 tcp 服务器线程?

如何将此 Boost ASIO 示例应用于我的应用程序

如何通过 boost asio 制作真正的异步客户端

如何获取 boost::asio::ip::tcp::socket 的 IP 地址?

使用 Boost Asio 在 TCP 套接字上执行异步写入操作

boost::asio::async_read 不回调我的处理函数