用于从 BlockingCollection 消费的计时器 vs 线程 vs RegisteredWaitHandle
Posted
技术标签:
【中文标题】用于从 BlockingCollection 消费的计时器 vs 线程 vs RegisteredWaitHandle【英文标题】:Timer vs Thread vs RegisteredWaitHandle for consuming from BlockingCollection 【发布时间】:2020-11-18 19:32:24 【问题描述】:我有一项服务,它使用 TCP 通过以太网与大约 50 台设备通信。这些设备以不同的速度(从 50 毫秒到 1500 毫秒)推送数据。因此,我使用 BlockingCollection(每个设备)对接收到的数据进行排队,然后在不同的线程中处理数据。
为了使用和处理 BlockingCollection 中的数据,我为每个设备使用了一个显式线程,如下所示
private void ThreadProc(object o)
while (true)
DeviceData data = m_blockingCollection.Take();
ProcessData(data);
恕我直言,这是一个简单而优雅的解决方案,因为如果没有可用数据,这些线程将被阻塞并且不会消耗 CPU。
另一种方法(同事推荐)是间隔为 250 毫秒的计时器。但我有一种直觉,每 250 毫秒安排大约 50 次操作可能会更昂贵。对于较快的设备,这会减慢处理速度,而对于较慢的设备,这将导致不必要的定时器逻辑执行。
private void OnTimeOut(object o)
if (m_blockingCollection.TryTake(out DeviceData data))
ProcessData(data);
在对此进行研究时,我还发现了 ThreadPool.RegisterWaitForSingleObject 看起来很适合这个问题。因此我将代码修改如下
AutoResetEvent m_dataEvent = new AutoResetEvent(false);
RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject
(m_dataEvent, ConsumeAndProcess, null, -1, false);
private void OnDeviceDataRecevied(DeviceData data)
sm_blockingCollection.Add(data);
m_dataEvent.Set();
private static void ConsumeAndProcess(object state, bool timedOut)
if (sm_blockingCollection.TryTake(out int data))
ProcessData(data);
我想对了吗?第三种方法比第一种和第二种方法更好吗?哪个在效率和资源使用方面会更好?
【问题讨论】:
个人 id 为此使用 TPL Dataflow 或 RX,这使您能够以串行或异步方式对数据进行管道传输、是否维护顺序、缓冲和以任何奇怪而美妙的线程/并行方式处理它你喜欢(或不喜欢) 您是否考虑过使用docs.microsoft.com/en-us/dotnet/api/…,而不是每个设备单独的线程? @TheGeneral 谢谢。如果您在相同的场景中使用 Rx,您会在 NewThreadScheduler 或 ThreadPoolScheduler 上观察到吗? 这不是更好,该代码有一个非常讨厌的线程竞争错误。当 OnDeviceDataRecevied() 运行两次时发生,在 ConsumeAndProcess() 可以响应第一个 Set() 之前。这么多设备很可能发生事故,很难调试。根本不要使用线程,BeginRead()(旧式)或 ReadAsync()。 @HansPassant 是的,你说得对,谢谢。我最初是在 EndRead 方法块中处理(处理大约需要 500 毫秒)数据。但是其中一些设备崩溃了,因此我决定将数据移动到阻塞集合并从不同的线程处理。 【参考方案1】:从BlockingCollection
变为Timer
对我来说似乎是一种技术倒退。在 AutoResetEvent
之上实施解决方案看起来像是重新实施 BlockingCollection
的尝试,构建更好解决方案的可能性很小,而构建错误解决方案的可能性很大。
大部分时间阻塞 50 个线程并没有那么糟糕(每个线程“只消耗”1MB 内存),但这是一个低可伸缩性设置。幸运的是,现在有可用的内置工具允许从阻塞集合升级到异步集合,并且具有BlockingCollection
的优点(响应能力,低 CPU 利用率)而没有缺点(阻塞线程)。例如,使用Channels
,您可以从 50 台设备增加到 5,000 台设备,使用的线程数量可能比您当前使用的更少(共享 ThreadPool
线程而不是专用线程)。
private Channel<DeviceData> m_channel = Channel.CreateUnbounded<DeviceData>();
private async Task DeviceProcessorAsync()
while (await m_channel.Reader.WaitToReadAsync())
while (m_channel.Reader.TryRead(out DeviceData data))
ProcessData(data);
通道是内置在 .NET Core 中的,可作为 .NET Framework 的 package 使用。
【讨论】:
谢谢。我可能会暂时采用第一种方法(基于线程),并且肯定会计划升级并尝试您的解决方案。以上是关于用于从 BlockingCollection 消费的计时器 vs 线程 vs RegisteredWaitHandle的主要内容,如果未能解决你的问题,请参考以下文章
异步简析之BlockingCollection实现生产消费模式
csharp .NET 4 BlockingCollection的生产者/消费者实现
使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式