单线程Boost死锁

Posted

技术标签:

【中文标题】单线程Boost死锁【英文标题】:Deadlock with one thread Boost 【发布时间】:2018-04-12 09:27:11 【问题描述】:

我目前正在努力解决死锁问题,但无法确定这是对 Boost:Asio 的错误使用,还是其他任何东西......

最大的问题是我无法重现它,因为它不会每次都发生,它更像是“一生一次”,所以它必须是某种引发条件。整个 gdb 回溯是这样的:

(gdb) thread apply all bt

Thread 1 (process 23619):
#0  0x00007f66a24ea42d in __lll_lock_wait () from /lib64/libpthread.so.0
#1  0x00007f66a24e5dcb in _L_lock_812 () from /lib64/libpthread.so.0
#2  0x00007f66a24e5c98 in pthread_mutex_lock () from /lib64/libpthread.so.0
#3  0x0000000000426968 in pthread_mutex_lock (m=0xb77288) at /usr/include/boost/thread/pthread/mutex.hpp:62
#4  lock (this=0xb77288) at /usr/include/boost/thread/pthread/mutex.hpp:116
#5  lock (this=0xb77288) at /usr/include/boost/thread/lockable_adapter.hpp:42
#6  lock_guard (m_=..., this=<synthetic pointer>) at /usr/include/boost/thread/lock_guard.hpp:38
#7  EndpointGroup::addEndpoint (this=0xb77288, endpoint=std::shared_ptr (count 1, weak 1) 0xbe4528) at /tmp/mediacontrol/src/EndpointGroup.cpp:80
#8  0x0000000000429b29 in EndpointManager::getQueuedOrNewEndpoint (this=this@entry=0x6abd60 <EndpointManager::getInstance()::instance>)
    at /tmp/mediacontrol/src/EndpointManager.cpp:54
#9  0x000000000042a620 in EndpointManager::fetchEndpoint (this=0x6abd60 <EndpointManager::getInstance()::instance>, endpointAddress="185.150.4.67")
    at /tmp/mediacontrol/src/EndpointManager.cpp:67
#10 0x000000000041404b in Client::processAlloc (this=this@entry=0xb76bb8, message=message@entry=0xbba290, response=response@entry=0xb8af60)
    at /tmp/mediacontrol/src/Client.cpp:279
#11 0x000000000041546e in Client::receiveMessage (this=0xb76bb8, message=0xbba290, response=response@entry=0xb8af60) at /tmp/mediacontrol/src/Client.cpp:46
#12 0x00000000004178e6 in operator() (__closure=<optimized out>, receivedBytes=<optimized out>, ec=...) at /tmp/mediacontrol/src/ClientConnection.cpp:119
#13 operator() (this=0x7ffc352d6da0) at /usr/include/boost/asio/detail/bind_handler.hpp:127
#14 asio_handler_invoke<boost::asio::detail::binder2<ClientConnection::doRead()::__lambda0, boost::system::error_code, long unsigned int> > (function=...)
    at /usr/include/boost/asio/handler_invoke_hook.hpp:69
#15 invoke<boost::asio::detail::binder2<ClientConnection::doRead()::__lambda0, boost::system::error_code, long unsigned int>, ClientConnection::doRead()::__lambda0> (
    context=..., function=...) at /usr/include/boost/asio/detail/handler_invoke_helpers.hpp:37
#16 boost::asio::detail::reactive_socket_recv_op<boost::asio::mutable_buffers_1, ClientConnection::doRead()::__lambda0>::do_complete(boost::asio::detail::io_service_impl *, boost::asio::detail::operation *, const boost::system::error_code &, std::size_t) (owner=<optimized out>, base=<optimized out>)
    at /usr/include/boost/asio/detail/reactive_socket_recv_op.hpp:110
#17 0x000000000042f7d0 in complete (bytes_transferred=<optimized out>, ec=..., owner=..., this=<optimized out>)
    at /usr/include/boost/asio/detail/task_io_service_operation.hpp:38
#18 do_run_one (ec=..., this_thread=..., lock=..., this=0xb43b50) at /usr/include/boost/asio/detail/impl/task_io_service.ipp:372
#19 boost::asio::detail::task_io_service::run (this=0xb43b50, ec=...) at /usr/include/boost/asio/detail/impl/task_io_service.ipp:149
#20 0x000000000042cc85 in run (this=0xb43ad0) at /usr/include/boost/asio/impl/io_service.ipp:59
#21 MediaControl::run (this=this@entry=0xb43ad0) at /tmp/mediacontrol/src/MediaControl.cpp:82
#22 0x0000000000410f6f in main (argc=<optimized out>, argv=<optimized out>) at /tmp/mediacontrol/src/main.cpp:106

程序(简化):

    接收分配消息 创建并运行 asio::io_service 将 asio::io_service 传递给“EndpointGroup”以将其与 asio::high_resolution_timer 一起使用 启动计时器 在计时器到期后将项目添加到函数中使用的列表 (MUTEX) 定时器到期并使用列表(MUTEX)

我还必须注意,在“EndpointGroup”中,这两个 lock_guard 是此类中唯一的。功能似乎不同,但“RtpEndpoint”不再启动“EndpointGroup”,它将在创建时启动。 相关功能:

// create a service which is used for async operations
io_service_ptr ThreadPoolManager::createNewService()

    io_service_ptr io_service = std::make_shared<asio::io_service>();
    work_ptr work = std::make_shared<asio::io_service::work>(*io_service);
    io_services_endpoint.push_back(io_service);
    work_endpoint.push_back(work);
    threads_endpoint.create_thread(bind(&asio::io_service::run, io_service));
    return io_service;


// simply start this function over and over again every 10ms
void EndpointGroup::invokeSendingOnEndpoints(size_t offset)

    pTimer.expires_from_now(std::chrono::milliseconds(PTIME_INTERVAL - offset));
    auto self(shared_from_this());
    pTimer.async_wait([this, self](system::error_code ec)
    
        if (!ec)
        
            vector<rtp_endpoint_ptr> iterationEndpoints;
            
                boost::lock_guard<EndpointGroup> guard(*this);
                iterationEndpoints = endpoints;
            
            for (rtp_endpoint_ptr endpoint : iterationEndpoints)
            
                // do fancy stuff
            
            ++pTimerIterations;
            // check how many milliseconds passed since start of the function
            invokeSendingOnEndpoints(std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::high_resolution_clock::now() - start)).count() / (pTimerIterations * PTIME_INTERVAL));
        
        else
        
            // just write error happend....    
        
    );


bool EndpointGroup::addEndpoint(const rtp_endpoint_ptr& endpoint)

    boost::lock_guard<EndpointGroup> guard(*this);
    endpoints.push_back(endpoint);
    return true;

感谢有关如何调试此问题的任何建议,并希望能解决此问题。

更新 1

由于有些人要求提供更多信息等,所以他们来了。 程序的目标,想想FreeSwitch/Asterisk,但更小。这仍然是一种天真的方法。该程序是一个服务器,它接收分配新 RTP 端点的请求,因为可能有数百个,这应该是多线程的(我理解的是多个 io_services)。并且由于每个端点使用一个线程是不好的,它们将被分组到 EndpointGroup 中。所以使用的定时器会每隔 10ms 调用一次 RTP-Endpoint 来开始编码和发送 RTP。

我还重新考虑了您使用互斥锁成员变量而不是从 basic_lockable_adapter 继承的建议。 @sehe 的答案与我使用的几乎相同,除了添加端点的客户端(不同的 io_service/线程)。

【问题讨论】:

如果您有多个互斥锁 - 始终以相同的顺序锁定它们。 @UKMonkey 我没有多个互斥锁,只有一个用于“EndpointGroup”,仅用于这两个功能。 请提供minimal reproducible example 你到底为什么要动态创建 io_services?一个通常足以完成整个程序。 @Hestalon asio 支持一个 io_service 由多个线程提供服务的想法。它还具有对互斥的内置支持(参见 asio::io_service::strand) 【参考方案1】:

我花了大约 20 分钟将您的示例代码变成独立的东西。当然,它只是工作,但那是因为你没有显示锁定的代码,无论如何。

也许我推断和填空的方式会帮助你发现你的不同之处:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

static const auto PTIME_INTERVAL = 10;
static std::atomic_size_t pTimerIterations  0 ;

namespace asio = boost::asio;
using io_service_ptr = std::shared_ptr<asio::io_service>;
using work_ptr = std::shared_ptr<asio::io_service::work>;

struct ThreadPoolManager 
    std::vector<io_service_ptr> io_services_endpoint;
    std::vector<work_ptr> work_endpoint;
    boost::thread_group threads_endpoint;

    io_service_ptr createNewService() 
        io_service_ptr io_service = std::make_shared<asio::io_service>();
        work_ptr work = std::make_shared<asio::io_service::work>(*io_service);
        io_services_endpoint.push_back(io_service);
        work_endpoint.push_back(work);
        threads_endpoint.create_thread(boost::bind(&asio::io_service::run, io_service));

        return io_service;
    

    ~ThreadPoolManager() 
        for(auto& w : work_endpoint)
            w.reset();

        threads_endpoint.join_all();
    
;

struct RtpEndpoint 
;

using rtp_endpoint_ptr = std::shared_ptr<RtpEndpoint>;

struct EndpointGroup : std::enable_shared_from_this<EndpointGroup>, boost::mutex 
    io_service_ptr _io;
    asio::high_resolution_timer pTimer;
    std::vector<rtp_endpoint_ptr> endpoints;
    std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();

    EndpointGroup(io_service_ptr io) : _io(io), pTimer(*_io) 

    void stop() 
        auto self(shared_from_this());
        _io->post([self,this]  pTimer.cancel(); );
    

    // simply start this function over and over again every 10ms
    void invokeSendingOnEndpoints(size_t offset) 
        pTimer.expires_from_now(std::chrono::milliseconds(PTIME_INTERVAL - offset));

        auto self(shared_from_this());
        pTimer.async_wait([this, self](boost::system::error_code ec) 
            if (!ec) 
                std::vector<rtp_endpoint_ptr> iterationEndpoints;
                
                    boost::lock_guard<EndpointGroup> guard(*this);
                    iterationEndpoints = endpoints;
                
                for (rtp_endpoint_ptr endpoint : iterationEndpoints) 
                    // do fancy stuff
                
                ++pTimerIterations;
                // check how many milliseconds passed since start of the function
                invokeSendingOnEndpoints(std::chrono::duration_cast<std::chrono::milliseconds>(
                                             (std::chrono::high_resolution_clock::now() - start))
                                             .count() /
                                         (pTimerIterations * PTIME_INTERVAL));
             else 
                // just write error happend....
            
        );
    

    bool addEndpoint(const rtp_endpoint_ptr &endpoint) 
        boost::lock_guard<EndpointGroup> guard(*this);
        endpoints.push_back(endpoint);
        return true;
    
;

using group_ptr = std::shared_ptr<EndpointGroup>;

#include <iostream>

int main() 
    std::cout << "starting" << std::endl;
    
        ThreadPoolManager tpm;

        std::vector<group_ptr> groups;

        for (int i = 0; i < 5; ++i) 
            std::cout << "Group " << i << std::endl;
            auto epg = std::make_shared<EndpointGroup>(tpm.createNewService());
            epg->invokeSendingOnEndpoints(i*2);

            for (int j = 0; j < rand()%10; ++j) 
                epg->addEndpoint(std::make_shared<RtpEndpoint>());
                std::cout << " - RtpEndpoint " << i << "." << j << std::endl;
            

            groups.push_back(epg);
        

        std::cout << "waiting..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(4));
        std::cout << "shutting down" << std::endl;

        for(auto& g : groups)
            g->stop();
    
    std::cout << "done, " << pTimerIterations << " iterations" << std::endl;

打印

starting
Group 0
 - RtpEndpoint 0.0
 - RtpEndpoint 0.1
 - RtpEndpoint 0.2
 - RtpEndpoint 0.3
Group 1
 - RtpEndpoint 1.0
 - RtpEndpoint 1.1
Group 2
 - RtpEndpoint 2.0
Group 3
 - RtpEndpoint 3.0
 - RtpEndpoint 3.1
Group 4
 - RtpEndpoint 4.0
 - RtpEndpoint 4.1
 - RtpEndpoint 4.2
waiting...
shutting down
done, 1963 iterations

然而

就像其他人提到的那样,这是非常非正统的代码。

    你可以只用 1 个 io_service 拥有并行容器是一种代码味道(有一个 struct thread, service, work 向量,而不是三个包含服务、线程和工作对象的向量。 从不继承std::mutex。自己实现BasicLockable 也没有什么可贵的理由。相反,如果必须,请公开unique_lock。它将是异常安全的,并具有明确定义的延迟/采用语义。 计时业务看起来很像您正在尝试实现我在此答案中展示的内容:boost::asio::deadline_timer 1ms lags after some time

在这种情况下,我认为整个池最多需要 1 个 io_service + 1 个工作对象,这是一个简化的例子:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

static const auto PTIME_INTERVAL = 10; // boost::posix_time::milliseconds(10);
static std::atomic_size_t pTimerIterations  0 ;

namespace asio = boost::asio;

struct ThreadPoolManager 
    ~ThreadPoolManager() 
        work.reset();
        threads_endpoint.join_all();
    

    boost::asio::io_service& get_service()  return io; 

    void launch() 
        threads_endpoint.create_thread([this] io.run(); );
    

  private:
    asio::io_service io;
    boost::optional<asio::io_service::work> work io;
    boost::thread_group threads_endpoint;
;

struct RtpEndpoint 
;

using rtp_endpoint_ptr = std::shared_ptr<RtpEndpoint>;

struct EndpointGroup : std::enable_shared_from_this<EndpointGroup> 
    std::mutex _mx;
    asio::io_service& _io;
    asio::high_resolution_timer pTimer;
    std::vector<rtp_endpoint_ptr> endpoints;
    std::chrono::high_resolution_clock::time_point start = std::chrono::high_resolution_clock::now();

    EndpointGroup(asio::io_service& io) : _io(io), pTimer(_io) 

    void stop() 
        auto self(shared_from_this());
        _io.post([self,this]  pTimer.cancel(); );
    

    // simply start this function over and over again every 10ms
    void invokeSendingOnEndpoints(size_t offset) 
        pTimer.expires_from_now(std::chrono::milliseconds(PTIME_INTERVAL - offset));

        auto self(shared_from_this());
        pTimer.async_wait([this, self](boost::system::error_code ec) 
            if (!ec) 
                std::vector<rtp_endpoint_ptr> iterationEndpoints;
                
                    boost::lock_guard<std::mutex> guard(_mx);
                    iterationEndpoints = endpoints;
                
                for (rtp_endpoint_ptr endpoint : iterationEndpoints) 
                    // do fancy stuff
                
                ++pTimerIterations;
                // check how many milliseconds passed since start of the function
                invokeSendingOnEndpoints(std::chrono::duration_cast<std::chrono::milliseconds>(
                                             (std::chrono::high_resolution_clock::now() - start))
                                             .count() /
                                         (pTimerIterations * PTIME_INTERVAL));
             else 
                // just write error happend....
            
        );
    

    bool addEndpoint(const rtp_endpoint_ptr &endpoint) 
        boost::lock_guard<std::mutex> guard(_mx);
        endpoints.push_back(endpoint);
        return true;
    
;

using group_ptr = std::shared_ptr<EndpointGroup>;

#include <iostream>

int main() 
    std::cout << "starting" << std::endl;
    
        ThreadPoolManager tpm;
        for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i)
            tpm.launch();

        std::vector<group_ptr> groups;

        for (int i = 0; i < 5; ++i) 
            std::cout << "Group " << i << std::endl;
            auto epg = std::make_shared<EndpointGroup>(tpm.get_service());
            epg->invokeSendingOnEndpoints(i*2);

            for (int j = 0; j < rand()%10; ++j) 
                epg->addEndpoint(std::make_shared<RtpEndpoint>());
                std::cout << " - RtpEndpoint " << i << "." << j << std::endl;
            

            groups.push_back(epg);
        

        std::cout << "waiting..." << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(4));
        std::cout << "shutting down" << std::endl;

        for(auto& g : groups)
            g->stop();
    
    std::cout << "done, " << pTimerIterations << " iterations" << std::endl;

具有相同的输出。

【讨论】:

我更新了原始问题以获取更多信息。是的,您的考虑也是正确的(另请参见上面的更新)。但是只有一个 io_service 永远无法满足整个程序的要求。我希望可以像你建议的那样将它们结合起来,但我不得不(不是我的决定)。在从 basic_lockable_adapter 继承之前,我像大家建议的那样使用成员变量更改了互斥锁的东西。你的另一个答案和我在这里的不同。不过还是非常感谢。

以上是关于单线程Boost死锁的主要内容,如果未能解决你的问题,请参考以下文章

由于 CPU 类型,C++ Boost 多线程比单线程慢?

死锁Lock锁等待唤醒机制线程组线程池定时器单例设计模式_DAY24

使用 libevent 或 boost::asio 的单线程中的多个 tcp 连接。这是可能的?

JavaScript是单线程还是多线程(转)

单线程与与线程优缺点

单线程与与线程优缺点