提升 asio async_resolve 对象生命周期

Posted

技术标签:

【中文标题】提升 asio async_resolve 对象生命周期【英文标题】:Boost asio async_resolve object lifetime 【发布时间】:2017-11-05 13:55:45 【问题描述】:

下面的代码旨在执行以下操作:我有一个解析器对象,它包装了 boost asio。解析器对象包含 io 服务和一个 worker,因此 io 服务运行函数永远不会返回。只要解析器对象处于活动状态,就可以发出异步请求。当解析器对象超出范围并且队列中仍有请求时,我想完成所有操作并且解析器对象被销毁。

在这种情况下,根本没有调用任何处理程序,我不知道为什么。我认为共享指针和一些依赖循环可能存在问题。使用valgrind 运行报告“可能丢失内存”。

有什么想法可以使它工作,以便解析器对象在所有工作完成之前保持活动状态?

#include <boost/asio.hpp>
#include <memory>
#include <thread>
#include <functional>
#include <string>
#include <iostream>

struct Resolver : public std::enable_shared_from_this<Resolver> 
    boost::asio::io_service                        io_service;
    std::unique_ptr<boost::asio::io_service::work> work;
    std::unique_ptr<std::thread>                   iothread;

    struct Query : public std::enable_shared_from_this<Query>
        std::shared_ptr<Resolver>                                       service;
        boost::asio::ip::tcp::resolver                                  resolver;
        boost::asio::ip::tcp::resolver::query                           query;
        std::function<void(boost::asio::ip::tcp::resolver::iterator &)> handler;

        Query(std::shared_ptr<Resolver> res, std::function<void(boost::asio::ip::tcp::resolver::iterator &)> handler, const std::string &name) : resolver(res->io_service), query(name, ""), handler(handler) 
            service = res;

        

        void start() 
                auto self = shared_from_this();
                resolver.async_resolve(query, [self](const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator iterator)
                    self->handler(iterator);
                );     
        
    ;

    Resolver() 
        work.reset(new boost::asio::io_service::work(io_service));
        iothread.reset(new std::thread(std::bind(&Resolver::io, this)));
    

    ~Resolver() 
        std::cout << "Resolver destroyed" << std::endl;
        work.reset();
        iothread->join();
    

    void io() 
        io_service.run();
    

    void asyncResolve(const std::string &name, std::function<void(boost::asio::ip::tcp::resolver::iterator &)> fn) 
        auto query = std::make_shared<Query>(shared_from_this(), fn, name);
        query->start();
    
;

void test(boost::asio::ip::tcp::resolver::iterator it) 
    std::cout << "Test" << std::endl;
    std::cout << it->endpoint().address().to_string() << std::endl;


int main(int argc, const char **argv) 
    auto res = std::make_shared<Resolver>();
    res->asyncResolve("***.com", &test);
    res->asyncResolve("***.com", &test);
    res->asyncResolve("***.com", &test);
    res->asyncResolve("***.com", &test);
    res->asyncResolve("***.com", &test);

【问题讨论】:

哇。这是一个棘手的小家伙。 shared_ptr 总是复杂的秘诀。 【参考方案1】:

只需运行服务 (io_service::run()) 即可确保所有异步操作都已完成(请参阅 the documentation)。

您已经在工作线程上执行此操作,并且您加入了该线程,所以您应该没问题!

唯一的例外是如果处理程序抛出,所以为了更加精确,您应该处理来自 run():Should the exception thrown by boost::asio::io_service::run() be caught? 的异常

void io()  
    // http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
    for (;;) 
        try 
            io_service.run();
            break; // exited normally
         catch (std::exception const &e) 
            std::cerr << "[Resolver] An unexpected error occurred: " << e.what();
         catch (...) 
            std::cerr << "[Resolver] An unexpected error occurred";
        
    

那么...问题出在哪里?

这个问题很挑剔,隐藏在线程和 shared_ptr 之间。

共享指针导致~Resolver 在工作线程上运行。这意味着您不能 join() 工作线程(因为线程永远不能加入自己)。好的实现会抛出异常,导致进程终止。

还有更多:如果您在工作线程处理异步任务时退出main(),完成处理程序可能会在std::cout 等全局变量被拆除后运行。所以要真正**看到* Resolver 完成工作并销毁,您需要确保 main 不会太快退出。

简化:

现在,下面是一个简化的示例,它确实表明异步操作确实完成了:(仍然存在问题):

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <thread>
#include <iostream>

class Resolver : public std::enable_shared_from_this<Resolver> 
    using tcp = boost::asio::ip::tcp;
    using io_service = boost::asio::io_service;

    io_service _svc;
    tcp::resolver resolver  _svc ;

    boost::optional<io_service::work> work  _svc ;
    std::thread _worker  [this]  event_loop();  ;

    void event_loop()  
        // http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
        for (;;) 
            std::cout << __PRETTY_FUNCTION__ << "\n";
            try 
                _svc.run();
                break; // exited normally
             catch (std::exception const &e) 
                std::cerr << "[Resolver] An unexpected error occurred: " << e.what() << "\n";
             catch (...) 
                std::cerr << "[Resolver] An unexpected error occurred\n";
            
        
        std::cout << "EXIT " << __PRETTY_FUNCTION__ << "\n";
    

  public:
    ~Resolver() 
        std::cout << __PRETTY_FUNCTION__ << "\n";
        work.reset();
    

    using Endpoint = tcp::endpoint;
    using Callback = std::function<void(Endpoint)>;

    void asyncResolve(std::string const& name, Callback fn) 
        auto self = shared_from_this();
        resolver.async_resolve(name, "", [self,fn](boost::system::error_code ec, tcp::resolver::iterator it) 
                if (!ec) fn(it->endpoint());
            );
    
;

void test_handler(Resolver::Endpoint ep) 
    std::cout << "Test: " <<  ep << "\n";


int main() 
    
        auto res = std::make_shared<Resolver>();
        for (auto fqdn : "***.com", "google.com", "localhost")
            res->asyncResolve(fqdn, test_handler);
    
    std::cout << "Released shared resolver\n";

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Main exit\n";

打印:

void Resolver::event_loop()
Released shared resolver
Test: 151.101.65.69:0
Test: 172.217.17.46:0
Test: 127.0.0.1:0
Resolver::~Resolver()
terminate called without an active exception

处理程序跟踪:

剩下的问题

最后一个问题是现在我们不加入线程。从 std::thread::~thread 析构函数抛出。这是一个棘手的问题:

我们不能join(),因为我们可能那个工作线程 我们不能detach(),因为这会造成数据竞争,在析构函数完成后工作线程仍在运行。

选项有:

    从析构函数调用_svc::run(),而不是join()-ing 线程。这可行,但如果服务用于更多异步任务,则不合适,因为作为副作用,排队操作可能会在导致析构函数运行的线程上运行。

    如果我们不是工作线程,则调用join(),如果是,则调用run()。这始终是安全的,因为 run() 可以被嵌套调用,并且操作仍按预期从工作线程运行

    只需 join 并使用 error_condition resource_deadlock_would_occur 捕获 system_error 异常

我会说第二个是最干净的。但是在您的简单示例中,第一个选项没有问题,因为(a)如果存在现有的解析操作,析构函数将始终从工作线程运行(b)如果没有,服务队列必须 一直是空的,所以run() 实际上什么都不做。

所以这里有一个修复:

~Resolver() 
    std::cout << __PRETTY_FUNCTION__ << "\n";
    work.reset();

    event_loop();
    if (_worker.joinable()) 
       if (_worker.get_id() == std::this_thread::get_id())
           _worker.detach();
       else
           _worker.join();
    

现在输出是

void Resolver::event_loop()
Released shared resolver
Test: 151.101.193.69:0
Test: 216.58.212.238:0
Test: 127.0.0.1:0
Resolver::~Resolver()
void Resolver::event_loop()
Main exit

【讨论】:

以上是关于提升 asio async_resolve 对象生命周期的主要内容,如果未能解决你的问题,请参考以下文章

提升 Asio 单线程性能

提升asio和shared_ptrs的扩散

C++ 提升 asio 多线程

提升asio超时[重复]

使用 asio 提升线程池:线程随机不执行

提升 asio 原始套接字