使用 BlockingCollection<T>() 缩放连接

Posted

技术标签:

【中文标题】使用 BlockingCollection<T>() 缩放连接【英文标题】:Scaling Connections with BlockingCollection<T>() 【发布时间】:2019-04-22 16:04:53 【问题描述】:

我有一台通过 TCP LAN 与 50 台或更多设备通信的服务器。每个套接字读取消息循环都有一个 Task.Run。

我将每条消息缓冲到一个阻塞队列中,其中每个阻塞队列都有一个使用 BlockingCollection.Take() 的 Task.Run。

类似(半伪代码):

套接字读取任务

Task.Run(() =>

    while (notCancelled)
    
        element = ReadXml();
        switch (element)
        
            case messageheader:
                MessageBlockingQueue.Add(deserialze<messageType>());
            ...
        
    
);

消息缓冲区任务

Task.Run(() =>

    while (notCancelled)
    
        Process(MessageQueue.Take());
    
);

这样会使 50 多个读取任务和 50 多个任务阻塞在自己的缓冲区中。

我这样做是为了避免阻塞读取循环并允许程序更公平地分配消息的处理时间,或者我相信。

这是一种低效的处理方式吗?有什么更好的方法?

【问题讨论】:

你如何处理这些消息 @TheGeneral 数据库存储、记录并将数据发送给客户端,但有些数据确实需要额外处理。 【参考方案1】:

您可能对“频道”工作感兴趣,尤其是:System.Threading.Channels。这样做的目的是提供 异步 生产者/消费者队列,涵盖单个和多个生产者和消费者场景、上限等。通过使用异步 API,您不会占用大量线程只是等着做点什么。

您的读取循环将变为:

while (notCancelled) 
    var next = await queue.Reader.ReadAsync(optionalCancellationToken);
    Process(next);

和制作人:

switch (element)

    case messageheader:
        queue.Writer.TryWrite(deserialze<messageType>());
        ...

所以:最小的变化


另外 - 或组合 - 您可以查看诸如“管道”(https://www.nuget.org/packages/System.IO.Pipelines/)之类的东西 - 因为您正在处理 TCP 数据,这将是一个理想的选择,并且是我为Stack Overflow 上的自定义 web-socket 服务器(处理大量个连接)。由于 API 始终是异步的,因此它在平衡工作方面做得很好——并且管道 API 的设计考虑了典型的 TCP 场景,例如在检测帧边界时部分消耗传入的数据流。我已经写了很多关于这种用法的文章,代码示例大多是here。请注意,“管道”不包含直接 TCP 层,但“kestrel”服务器包含一个,或者第三方库 https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ 包含(披露:我写的)。

【讨论】:

很有趣,我会看看所有这些。我正在查看 TPL 数据流,但似乎这是更好的选择。【参考方案2】:

我实际上在另一个项目中做了类似的事情。我学到或会做的不同如下:

    首先,最好为读/写循环使用专用线程(使用new Thread(ParameterizedThreadStart)),因为Task.Run 使用池线程,并且当您在(几乎)无限循环中使用它时,线程实际上永远不会返回去游泳池。

    var thread = new Thread(ReaderLoop)  Name = nameof(ReaderLoop) ; // priority, etc if needed
    thread.Start(cancellationToken);
    

    您的Process 可以是一个事件,您可以异步调用它,以便您的阅读器循环可以立即返回以尽快处理新传入的包:

    private void ReaderLoop(object state)
    
        var token = (CancellationToken)state;
        while (!token.IsCancellationRequested)
        
            try
            
                var message = MessageQueue.Take(token);
                OnMessageReceived(new MessageReceivedEventArgs(message));
            
            catch (OperationCanceledException)
            
                if (!disposed && IsRunning)
                    Stop();
                break;
            
        
    
    

请注意,如果一个委托有多个目标,那么它的异步调用并不是微不足道的。我创建了这个扩展方法来调用池线程上的委托:

public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)

    void Callback(IAsyncResult ar)
    
        var method = (EventHandler<TEventArgs>)ar.AsyncState;
        try
        
            method.EndInvoke(ar);
        
        catch (Exception e)
        
            HandleError(e, method);
        
    

    foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
        handler.BeginInvoke(sender, args, Callback, handler);

所以OnMessageReceived 的实现可以是:

protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
    => messageReceivedHandler.InvokeAsync(this, e);

    最后,BlockingCollection&lt;T&gt; 存在一些性能问题是一个重要的教训。它内部使用SpinWait,如果长时间没有传入数据,其SpinOnce方法等待的时间越来越长。这是一个棘手的问题,因为即使您记录了处理的每一步,您也不会注意到一切都被延迟了,除非您也可以模拟服务器端。 Here 你可以找到一个快速的BlockingCollection 实现,使用AutoResetEvent 来触发传入数据。我添加了一个Take(CancellationToken) 重载,如下所示:

    /// <summary>
    /// Takes an item from the <see cref="FastBlockingCollectionT"/>
    /// </summary>
    public T Take(CancellationToken token)
    
        T item;
        while (!queue.TryDequeue(out item))
        
            waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
            token.ThrowIfCancellationRequested();
        
    
        return item;
    
    

基本上就是这样。也许并非所有内容都适用于您的情况,例如。如果几乎立即的响应并不重要,那么常规的BlockingCollection 也会这样做。

【讨论】:

【参考方案3】:

是的,这有点低效,因为你阻塞了 ThreadPool 线程。 我已经讨论过这个问题Using Task.Yield to overcome ThreadPool starvation while implementing producer/consumer pattern

您还可以在此处查看测试生产者-消费者模式的示例: https://github.com/BBGONE/TestThreadAffinity

您可以在循环中使用 await Task.Yield 来让其他任务访问该线程。

您也可以通过使用专用线程或更好的使用自己的线程池的自定义 ThreadScheduler 来解决它。但是创建 50+ 纯线程是无效的。最好调整一下任务,这样会更配合。

如果您使用 BlockingCollection(因为它可以在等待写入(如果有界)或读取或没有要读取的项目时长时间阻塞线程),那么最好使用 System.Threading.Tasks.Channels https://github.com/stephentoub/corefxlab/blob/master/src/System.Threading.Tasks.Channels/README.md

它们不会在等待集合可以写入或读取时阻塞线程。有一个例子如何使用https://github.com/BBGONE/TestThreadAffinity/tree/master/ThreadingChannelsCoreFX/ChannelsTest

【讨论】:

以上是关于使用 BlockingCollection<T>() 缩放连接的主要内容,如果未能解决你的问题,请参考以下文章

在 BlockingCollection 中搜索特定元素

BlockingCollection实现单体程序内队列

C#阻塞队列BlockingCollection

.NET(C#):线程安全集合的阻塞BlockingCollection的使用

用于从 BlockingCollection 消费的计时器 vs 线程 vs RegisteredWaitHandle

带有 BlockingCollection.GetConsumableEnumerable 的 Parallel.ForEach 循环