在异步 TCP 服务器的上下文中从 N 头访问数据时的线程安全

Posted

技术标签:

【中文标题】在异步 TCP 服务器的上下文中从 N 头访问数据时的线程安全【英文标题】:Thread-safety when accessing data from N-theads in context of an async TCP-server 【发布时间】:2018-05-12 08:45:29 【问题描述】:

正如标题所说,我有一个关于以下场景的问题(简化示例):

假设我下面有一个 Generator-Class 的对象,它不断更新其 dataChunk 成员(在主线程中运行)。

class Generator

  void generateData();
  uint8_t dataChunk[999];

此外,我有一个异步。 1-N 个客户端可以连接到的 TCP 连接的接受器(在第二个线程中运行)。 接受器为每个新的客户端连接启动一个新线程,其中以下 Connection 类的对象接收来自客户端的请求消息并提供一部分 dataChunk(属于生成器)作为答案。然后等待一个新的请求等等......

class Connection


  void setDataChunk(uint8_t* dataChunk);
  void handleRequest();
  uint8_t* dataChunk;

最后是实际问题:期望的行为是 Generator 对象生成一个新的 dataChunk 并等待,直到所有 1-N Connection 对象都处理完它们的客户端请求,直到它生成一个新的 dataChunk。

当 Connection 对象处理它们的请求时,我如何锁定 dataChunk 以便对 Generator 对象进行写访问,但是它们各自线程中的所有 Connection 对象都应该在其请求处理阶段同时具有读取访问权限.

另一方面,Connection 对象应该在处理完各自的请求后等待新的 dataChunk,而不会丢弃新的客户端请求。

--> 我认为单个互斥体无法解决问题。

我的第一个想法是在对象之间共享一个结构,其中一个信号量用于生成器,一个信号量向量用于连接。有了这些,每个对象都可以“理解”整个系统的状态并相应地工作。

你们怎么看,像这种情况下的最佳实践是什么?

提前致谢!

【问题讨论】:

“我认为单个互斥体不会在这里解决问题。” - 通常认为互斥是异步代码中的反模式 据我了解,您可以使用一个简单的原子计数器(保存可用数据量),当数据发送到客户端时通过连接减少它,如果它变为零则等待它.顺便说一句,在尝试之前放弃互斥锁不是一个好主意。很多程序都成功地使用了锁并且可以很好地扩展。 @MichaelNastenko 在异步操作中使用互斥锁是个坏主意,因为它不起作用。跨操作锁定它意味着您可能会尝试在另一个线程上解锁而不锁定它会使其无用 - 一场竞赛。 @sehe 不,不是。仅当您有很多争用时,这将是一个坏主意,但您可以在测试之前说。在这种情况下,这应该不是问题,因为锁定时间应该太长了。 @SterndesSuedens 你可以尝试使用读/写锁或无锁队列 【参考方案1】:

有几种方法可以解决。

您可以使用std::shared_mutex

void Connection::handleRequest()

    while(true)
    
        std::shared_lock<std::shared_mutex> lock(GeneratorObj.shared_mutex);
        if(GeneratorObj.DataIsAvailable()) // we need to know that data is available
        
            // Send to client
            break;
        
    


void Generator::generateData()

    std::unique_lock<std::shared_mutex> lock(GeneratorObj.shared_mutex);

    // Generate data

或者您可以使用boost::lockfree::queue,但数据结构会有所不同。

【讨论】:

在向 n 个客户端发送数据时如何持有锁?发送是异步发生的。您的示例代码暗示了客户端的拉模型。这个问题暗示了一个推送模型(n路广播) 函数名Connection::handleRequest 暗示拉模型 - 客户提出的请求。为什么你认为它是推模型?即便如此——无需持有锁,数据可以通过Connection缓存进行传输。 我知道它可以复制到任务中。但是,该问题询问有关锁定生成器的问题,这表明他们不希望这样做。此外,该问题明确描述了推送模型,因为所有 N 个客户端在生成下一个之前接收相同的块很重要(“所需的行为是生成器对象生成一个新的 dataChunk 并等到所有 1-N连接对象在生成新的数据块之前已经处理了它们的客户端请求”。唯一的余地是“它们的客户端请求”的描述非常松散) 公平地说,我开始想知道 OP 到底是什么意思。所以,是的,这可能是一个更公平的建议(尽管我建议复制而不是锁定。没有共享总是比共享更好,即使不是为了性能,而是为了简单) @sehe 没有分享总是比分享更好,即使不是为了性能,而是为了简单)完全同意这一点。【参考方案2】:

如何在 Connection 对象处理其请求时锁定 dataChunk 以对 Generator 对象进行写访问,但在其各自线程中的所有 Connection 对象都应该在其请求处理阶段同时具有读取访问权限.

我会创建一个逻辑操作链,包括生成。

这是一个示例:

完全是单线程的 接受无限制的连接并处理掉线的连接 它使用deadline_timer 对象在等待向(许多)连接发送块时发出屏障信号。这样可以方便地将 generateData 调用放入异步调用链中。

Live On Coliru

#include <boost/asio.hpp>
#include <list>
#include <iostream>

namespace ba = boost::asio;
using ba::ip::tcp;
using boost::system::error_code;

using Clock = std::chrono::high_resolution_clock;
using Duration = Clock::duration;
using namespace std::chrono_literals;

struct Generator 
    void generateData();
    uint8_t dataChunk[999];
;

struct Server 
    Server(unsigned short port) : _port(port) 
        _barrier.expires_at(boost::posix_time::neg_infin);

        _acc.set_option(tcp::acceptor::reuse_address());
        accept_loop();
    

    void generate_loop() 
        assert(n_sending == 0);

        garbage_collect(); // remove dead connections, don't interfere with sending

        if (_socks.empty()) 
            std::clog << "No more connections; pausing Generator\n";
         else 
            _gen.generateData();
            _barrier.expires_at(boost::posix_time::pos_infin);

            for (auto& s : _socks) 
                ++n_sending;
                ba::async_write(s, ba::buffer(_gen.dataChunk), [this,&s](error_code ec, size_t written) 
                    assert(n_sending);
                    --n_sending; // even if failed, decreases pending operation
                    if (ec) 
                        std::cerr << "Write: " << ec.message() << "\n";
                        s.close();
                    
                    std::clog << "Written: " << written << ", " << n_sending << " to go\n";

                    if (!n_sending) 
                        // green light to generate next chunk
                        _barrier.expires_at(boost::posix_time::neg_infin);
                    
                );
            

            _barrier.async_wait([this](error_code ec) 
                if (ec && ec != ba::error::operation_aborted)
                    std::cerr << "Client activity: " << ec.message() << "\n";
                else generate_loop();
            );
        
    

    void accept_loop() 
        _acc.async_accept(_accepting, [this](error_code ec) 
                if (ec) 
                    std::cerr << "Accept fail: " << ec.message() << "\n";
                 else 
                    std::clog << "Accepted: " << _accepting.remote_endpoint() << "\n";
                    _socks.push_back(std::move(_accepting));

                    if (_socks.size() == 1) // first connection?
                        generate_loop();    // start generator

                    accept_loop();
                
            );
    

    void run_for(Duration d) 
        _svc.run_for(d);
    

    void garbage_collect() 
        _socks.remove_if([](tcp::socket& s)  return !s.is_open(); );
    
  private:
    ba::io_service _svc;
    unsigned short _port;
    tcp::acceptor _acc  _svc,  , _port  ;
    tcp::socket _accepting _svc;

    std::list<tcp::socket> _socks;

    Generator _gen;
    size_t n_sending = 0;
    ba::deadline_timer _barrier _svc;
;

int main() 
    Server s(6767);
    s.run_for(3s); // COLIRU


#include <fstream>
// synchronously generate random data chunks
void Generator::generateData() 
    std::ifstream ifs("/dev/urandom", std::ios::binary);
    ifs.read(reinterpret_cast<char*>(dataChunk), sizeof(dataChunk));
    std::clog << "Generated chunk: " << ifs.gcount() << "\n";

打印(仅适用于 1 个客户):

Accepted: 127.0.0.1:60870
Generated chunk: 999
Written: 999, 0 to go
Generated chunk: 999
   [... snip ~4000 lines ...]
Written: 999, 0 to go
Generated chunk: 999
Write: Broken pipe
Written: 0, 0 to go
No more connections; pausing Generator

【讨论】:

以上是关于在异步 TCP 服务器的上下文中从 N 头访问数据时的线程安全的主要内容,如果未能解决你的问题,请参考以下文章

在Angular 2中从服务到组件获取异步数据

Feign远程调用丢失请求头问题以及异步丢失上下文问题解决方法

Python模拟浏览器实现网页访问

如何在 NextJS 应用程序中从 Apollo 链接上下文访问 Express HTTP 请求?

8.swoole学习笔记--异步tcp客户端

ESP32/ESP8266TCP异步通讯点灯控制示例程序