用于从 BlockingCollection 消费的计时器 vs 线程 vs RegisteredWaitHandle

Posted

技术标签:

【中文标题】用于从 BlockingCollection 消费的计时器 vs 线程 vs RegisteredWaitHandle【英文标题】:Timer vs Thread vs RegisteredWaitHandle for consuming from BlockingCollection 【发布时间】:2020-11-18 19:32:24 【问题描述】:

我有一项服务,它使用 TCP 通过以太网与大约 50 台设备通信。这些设备以不同的速度(从 50 毫秒到 1500 毫秒)推送数据。因此,我使用 BlockingCollection(每个设备)对接收到的数据进行排队,然后在不同的线程中处理数据。

为了使用和处理 BlockingCollection 中的数据,我为每个设备使用了一个显式线程,如下所示

private void ThreadProc(object o)

    while (true)
    
        DeviceData data = m_blockingCollection.Take();
        ProcessData(data);
    

恕我直言,这是一个简单而优雅的解决方案,因为如果没有可用数据,这些线程将被阻塞并且不会消耗 CPU。

另一种方法(同事推荐)是间隔为 250 毫秒的计时器。但我有一种直觉,每 250 毫秒安排大约 50 次操作可能会更昂贵。对于较快的设备,这会减慢处理速度,而对于较慢的设备,这将导致不必要的定时器逻辑执行。

private void OnTimeOut(object o)

    if (m_blockingCollection.TryTake(out DeviceData data))
    
        ProcessData(data);
    

在对此进行研究时,我还发现了 ThreadPool.RegisterWaitForSingleObject 看起来很适合这个问题。因此我将代码修改如下

AutoResetEvent m_dataEvent = new AutoResetEvent(false);
RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject
                         (m_dataEvent, ConsumeAndProcess, null, -1, false);

    private void OnDeviceDataRecevied(DeviceData data)
    
        sm_blockingCollection.Add(data);
        m_dataEvent.Set();
    

    private static void ConsumeAndProcess(object state, bool timedOut)
    
        if (sm_blockingCollection.TryTake(out int data))
        
            ProcessData(data);
        
    

我想对了吗?第三种方法比第一种和第二种方法更好吗?哪个在效率和资源使用方面会更好?

【问题讨论】:

个人 id 为此使用 TPL Dataflow 或 RX,这使您能够以串行或异步方式对数据进行管道传输、是否维护顺序、缓冲和以任何奇怪而美妙的线程/并行方式处理它你喜欢(或不喜欢) 您是否考虑过使用docs.microsoft.com/en-us/dotnet/api/…,而不是每个设备单独的线程? @TheGeneral 谢谢。如果您在相同的场景中使用 Rx,您会在 NewThreadScheduler 或 ThreadPoolScheduler 上观察到吗? 这不是更好,该代码有一个非常讨厌的线程竞争错误。当 OnDeviceDataRecevied() 运行两次时发生,在 ConsumeAndProcess() 可以响应第一个 Set() 之前。这么多设备很可能发生事故,很难调试。根本不要使用线程,BeginRead()(旧式)或 ReadAsync()。 @HansPassant 是的,你说得对,谢谢。我最初是在 EndRead 方法块中处理(处理大约需要 500 毫秒)数据。但是其中一些设备崩溃了,因此我决定将数据移动到阻塞集合并从不同的线程处理。 【参考方案1】:

BlockingCollection 变为Timer 对我来说似乎是一种技术倒退。在 AutoResetEvent 之上实施解决方案看起来像是重新实施 BlockingCollection 的尝试,构建更好解决方案的可能性很小,而构建错误解决方案的可能性很大。

大部分时间阻塞 50 个线程并没有那么糟糕(每个线程“只消耗”1MB 内存),但这是一个低可伸缩性设置。幸运的是,现在有可用的内置工具允许从阻塞集合升级到异步集合,并且具有BlockingCollection 的优点(响应能力,低 CPU 利用率)而没有缺点(阻塞线程)。例如,使用Channels,您可以从 50 台设备增加到 5,000 台设备,使用的线程数量可能比您当前使用的更少(共享 ThreadPool 线程而不是专用线程)。

private Channel<DeviceData> m_channel = Channel.CreateUnbounded<DeviceData>();

private async Task DeviceProcessorAsync()

    while (await m_channel.Reader.WaitToReadAsync())
    
        while (m_channel.Reader.TryRead(out DeviceData data))
        
            ProcessData(data);
        
    

通道是内置在 .NET Core 中的,可作为 .NET Framework 的 package 使用。

【讨论】:

谢谢。我可能会暂时采用第一种方法(基于线程),并且肯定会计划升级并尝试您的解决方案。

以上是关于用于从 BlockingCollection 消费的计时器 vs 线程 vs RegisteredWaitHandle的主要内容,如果未能解决你的问题,请参考以下文章

异步简析之BlockingCollection实现生产消费模式

csharp .NET 4 BlockingCollection的生产者/消费者实现

使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式

利用BlockingCollection实现生产者和消费者队列,实现写文本

如何加快大块 BlockingCollection 的实现

C#阻塞队列BlockingCollection