boost::asio io_service 和 std::containers 的线程安全

Posted

技术标签:

【中文标题】boost::asio io_service 和 std::containers 的线程安全【英文标题】:Thread safety of boost::asio io_service and std::containers 【发布时间】:2016-04-20 07:41:57 【问题描述】:

我正在使用boost::asio 构建网络服务,但我不确定线程​​安全性。

io_service.run() 只在 io_service 工作专用线程中调用一次

另一方面,send_message() 可以由后面提到的第二个 io_service 处理程序中的代码调用,也可以在用户交互时由 mainThread 调用。这就是我变得紧张的原因。

std::deque<message> out_queue;

// send_message will be called by two different threads
void send_message(MsgPtr msg)
    while (out_queue->size() >= 20)    
        Sleep(50);
    
    io_service_.post([this, msg]()  deliver(msg);  );


// from my understanding, deliver will only be called by the thread which called io_service.run()
void deliver(const MsgPtr)
    bool write_in_progress = !out_queue.empty();
    out_queue.push_back(msg);
    if (!write_in_progress)
    
        write();
    


void write()

    auto self(shared_from_this());

    asio::async_write(socket_,
        asio::buffer(out_queue.front().header(),
        message::header_length),    [this, self](asio::error_code ec, std::size_t/)
    

        if (!ec)
        
            asio::async_write(socket_,
                asio::buffer(out_queue.front().data(),
                out_queue.front().paddedPayload_size()),
                [this, self](asio::error_code ec, std::size_t /*length*/)
            
                if (!ec)
                
                    out_queue.pop_front();
                    if (!out_queue.empty())
                    
                        write();
                    
                

            );
        

    );


这种情况安全吗?

类似的第二种情况:当网络线程收到一条消息时,它会将它们发布到另一个asio::io_service,该asio::io_service 也由它自己的专用线程运行。此 io_service 使用 std::unordered_map 来存储回调函数等。

std::unordered_map<int, eventSink> eventSinkMap_;

//...

// called by the main thread (GUI), writes a callback function object to the map
int IOReactor::registerEventSink(std::function<void(int, std::shared_ptr<message>)> fn, QObject* window, std::string endpointId)
    util::ScopedLock lock(&sync_);

    eventSink es;
    es.id = generateRandomId();
    // ....
    std::pair<int, eventSink> eventSinkPair(es.id, es);

    eventSinkMap_.insert(eventSinkPair);

    return es.id;


// called by the second thread, the network service thread when a message was received
void IOReactor::onMessageReceived(std::shared_ptr<message> msg, ConPtr con)
    
    reactor_io_service_.post([=]() handleReceive(msg, con); );


// should be called only by the one thread running the reactor_io_service.run()
// read and write access to the map
void IOReactor::handleReceive(std::shared_ptr<message> msg, ConPtr con)
   util::ScopedLock lock(&sync_);
   auto es = eventSinkMap_.find(msg.requestId);
    if (es != eventSinkMap_.end())
    
    auto fn = es->second.handler;
    auto ctx = es->second.context;
    QMetaObject::invokeMethod(ctx, "runInMainThread", Qt::QueuedConnection, Q_ARG(std::function<void(int, std::shared_ptr<msg::IMessage>)>, fn), Q_ARG(int, CallBackResult::SUCCESS), Q_ARG(std::shared_ptr<msg::IMessage>, msg));

    eventSinkMap_.erase(es);       

首先:我什至需要在这里使用锁吗?

Ofc 两种方法都访问地图,但它们访问的不是相同的元素(receiveHandler 无法尝试访问或读取尚未注册/插入到地图中的元素)。那是线程安全的吗?

【问题讨论】:

【参考方案1】:

首先,缺少很多上下文(onMessageReceived 调用在哪里,ConPtr 是什么?你有太多问题。我会给你一些具体的指针,但会帮助你。

    你应该在这里紧张:

    void send_message(MsgPtr msg)
        while (out_queue->size() >= 20)    
            Sleep(50);
        
        io_service_.post([this, msg]()  deliver(msg);  );
    
    

    检查out_queue-&gt;size() &gt;= 20 需要同步,除非 out_queue 是线程安全的。

    io_service_.post 的调用是安全的,因为io_service 是线程安全的。由于您有一个专用 IO 线程,这意味着 deliver() 将在该线程上运行。现在,你也需要同步。

    我强烈建议在那里使用适当的线程安全队列。

    问。首先:我还需要在这里使用锁吗?

    是的,您需要锁定才能进行地图查找(否则您会遇到与主线程插入接收器的数据竞争)。

    您确实不需要在调用期间需要锁定(事实上,这似乎是一个非常不明智的想法,可能会导致性能问题或锁定)。由于Iterator invalidation rules,引用仍然有效。

    删除当然需要再次加锁。我将修改代码以立即进行删除和删除,并且仅在释放锁后才调用接收器。 注意您必须在这里考虑异常(在您的代码中,当调用期间出现异常时,接收器不会被删除(永远?)。这对您来说可能很重要。

    Live Demo

    void handleReceive(std::shared_ptr<message> msg, ConPtr con)
        util::ScopedLock lock(&sync_);
        auto es = eventSinkMap_.find(msg->requestId);
        if (es != eventSinkMap_.end())
        
            auto fn  = es->second.handler;
            auto ctx = es->second.context;
            eventSinkMap_.erase(es); // invalidates es
    
            lock.unlock();
            // invoke in whatever way you require
            fn(static_cast<int>(CallBackResult::SUCCESS), std::static_pointer_cast<msg::IMessage>(msg));
        
    
    

【讨论】:

谢谢,这很有帮助!顺便说一句:抱歉,在读取和解析消息后,在连接 asio::async_read() 处理程序中调用了“onMessageReceived”。

以上是关于boost::asio io_service 和 std::containers 的线程安全的主要内容,如果未能解决你的问题,请参考以下文章

boost::asio io_service 和 std::containers 的线程安全

多线程和Boost::Asio

boost asio中io_service类的几种使用

在 boost::asio::io_service 上调用 run 时崩溃

asio::io_service 和 thread_group 生命周期问题

boost::asio io_service 停止特定线程