如何通过 boost asio 制作真正的异步客户端

Posted

技术标签:

【中文标题】如何通过 boost asio 制作真正的异步客户端【英文标题】:How to make real asynchronous client via boost asio 【发布时间】:2015-11-02 14:41:18 【问题描述】:

我需要写一个动态库来导出三个函数:

bool init_sender(const char* ip_addr, int port);
void cleanup_sender();
void send_command(const char* cmd, int len);

init_sender应该同步连接服务器,并根据连接成功与否返回true/false

cleanup_sender 应该等待所有命令完成然后返回。

send_command 应该将指定的命令异步发送到服务器并尽快返回。

于是我写了如下代码:

boost::asio::io_service                         g_io_service;
std::unique_ptr<boost::asio::io_service::work>  g_work;
boost::asio::ip::tcp::socket                    g_sock(g_io_service);
boost::thread                                   g_io_service_th;

void io_service_processor()

  g_io_service.run();


bool __stdcall init_sender(const char* ip_addr, int port)

  try
  
    g_work = std::make_unique<boost::asio::io_service::work>(g_io_service);
    boost::asio::ip::tcp::resolver resolver(g_io_service);
    boost::asio::connect(g_sock, resolver.resolve( ip_addr, std::to_string(port) ));
    g_io_service_th = boost::thread(io_service_processor);
    return true;
  
  catch (const std::exception& ex)
  
    return false;
  


void __stdcall cleanup_sender()

  g_work.reset();
  if (g_io_service_th.joinable())
  
    g_io_service_th.join();
  


void async_write_cb(
  const boost::system::error_code& error,
  std::size_t bytes_transferred)

  // TODO: implement


void __stdcall send_command(const char* cmd, int len)

  boost::asio::async_write(g_sock, boost::asio::buffer(cmd, len), async_write_cb);

据我从 boost asio 文档中了解到的,我通过 async_write 函数调用发布的所有命令都将从一个线程(包含 run 函数调用的线程——在我的例子中为 g_io_service_th)执行。我对吗?如果是这样,它似乎对我来说并不完全异步。我能做些什么来改变这种行为并从多个线程同时发送多个命令?我应该像这样创建boost::thread_group

for (int i = 0; i < pool_size; ++i)

  _thread_group.create_thread(boost::bind(&boost::asio::io_service::run, &_io_service));                             

或者还有其他方法吗?

【问题讨论】:

【参考方案1】:

你问了一些问题,还有很多东西要学。可能最重要的是要了解如何使用工作对象。

编辑:对 async_write 限制的引用: http://www.boost.org/doc/libs/1_59_0/doc/html/boost_asio/reference/async_write/overload1.html

引用文档:

此操作通过对流的 async_write_some 函数的零次或多次调用来实现,称为组合操作。 程序必须确保流不执行其他写入操作(例如 async_write、流的 async_write_some 函数或执行写入的任何其他组合操作),直到此操作完成。

您的 asio 线程代码应如下所示:

#include <iostream>
#include <vector>
#include <boost/asio.hpp>
#include <thread>


struct service_loop

    using io_service = boost::asio::io_service;

    io_service& get_io_service() 
        return _io_service;
    

    service_loop(size_t threads = 1)
    : _strand(_io_service)
    , _work(_io_service)
    , _socket(_io_service)
    
        for(size_t i = 0 ; i < threads ; ++i)
            add_thread();
    

    ~service_loop() 
        stop();
    

    // adding buffered sequential writes...

    void write(const char* data, size_t length)
    
        _strand.dispatch([this, v = std::vector<char>(data, data + length)] 
            _write_buffer.insert(std::end(_write_buffer), v.begin(), v.end());
            check_write();
        );

    
private:
    std::vector<char> _write_buffer;
    bool _writing;

    void check_write()
    
        if (!_writing and !_write_buffer.empty()) 
            auto pv = std::make_shared<std::vector<char>>(std::move(_write_buffer));
            _writing = true;
            _write_buffer.clear();
            boost::asio::async_write(_socket,
                                     boost::asio::buffer(*pv),
                                     [this, pv] (const boost::system::error_code& ec, size_t written) 
                                         _strand.dispatch(std::bind(&service_loop::handle_write,
                                                                    this,
                                                                    ec,
                                                                    written));
                                     );
        
    

    void handle_write(const boost::system::error_code& ec, size_t written)
    
        _writing = false;
        if (ec) 
            // handle error somehow
        
        else 
            check_write();
        
    

private:
    io_service _io_service;
    io_service::strand _strand;
    io_service::work _work;
    std::vector<std::thread> _threads;
    boost::asio::ip::tcp::socket _socket;

    void add_thread()
    
        _threads.emplace_back(std::bind(&service_loop::run_thread, this));
    

    void stop()
    
        _io_service.stop();
        for(auto& t : _threads) 
            if(t.joinable()) t.join();
        
    

    void run_thread()
    
        while(!_io_service.stopped())
        
            try 
                _io_service.run();
            
            catch(const std::exception& e) 
                // report exceptions here
            
        
    
;


using namespace std;

auto main() -> int

    service_loop sl;
    sl.write("hello", 5);
    sl.write(" world", 6);
    std::this_thread::sleep_for(std::chrono::seconds(10));

    return 0;

【讨论】:

但我说的对吗?我的代码有什么问题,所以您说“可能最重要的要了解的是如何使用工作对象”? 目前,如果任何回调函数抛出异常,您的 io_service 循环将停止。这可能不是你想要的。但是,作为一个简单的实现,您走在正确的轨道上。然而。请记住,每个套接字在任何时候都必须至少有一个异步写入正在进行。您将需要维护一个“写入队列”,您将希望在您的完成处理程序中耗尽它(记住用互斥锁保护它)。 @FrozenHeart 更新了示例以包括线程安全和对异步组合操作的保护。还包括文档的链接和引用。 @FrozenHeart 想一想。在 2 个线程上同时写入一个套接字是否有意义?当然不是。一次只写一个操作(同样适用于读取)是异步执行时对 all I/O 的限制。这就是 I/O 的本质。 ...因为 async_write 允许您一次写入许多个套接字,一次写入每个套接字...并且在您等待发送完成时,您的线程可以继续处理其他事情。

以上是关于如何通过 boost asio 制作真正的异步客户端的主要内容,如果未能解决你的问题,请参考以下文章

Boost.Asio 异步 TCP 客户端和多线程

boost.asio 和文件 i/o 有啥关系?

boost.asio - 如果在不同的异步处理程序之间共享数据库类型对象,我是不是需要使用锁?

使用 Boost Asio 在 TCP 套接字上执行异步写入操作

Boost asio - 从标准输入异步读取已建立的字符数

C++ Boost.Asio - tcp 套接字异步写入