异步线程在 TPL 等待时死亡
Posted
技术标签:
【中文标题】异步线程在 TPL 等待时死亡【英文标题】:Async thread dies on TPL await 【发布时间】:2021-12-26 01:46:09 【问题描述】:我正在编写一个简单的生产者/消费者应用程序,但我注意到一个非常奇怪的行为..这是代码:
private Thread _timelineThread = null;
private BufferBlock<RtpPacket> _queue = null;
private AutoResetEvent _stopping = new AutoResetEvent(false);
static void Main(string[] args)
// Start consumer thread
Consume();
// Produce
var t = new Thread(() =>
while (true)
var packet = RtpPacket.GetNext();
_queue.Post(packet);
Thread.Sleep(70);
t.Join();
static void Consume()
_timelineThread = new Thread(async () =>
while (_stopping.WaitOne(0) == false)
// Start consuming...
while (await _queue.OutputAvailableAsync())
var packet = await _queue.ReceiveAsync();
// Some processing...
);
_timelineThread.Start();
这是一个无限循环(直到我路由 _stopping 信号)。但是,当 _timelineThread 遇到第一个 await _queue.OutputAvailableAsync() 时,线程将状态更改为“已停止”。有什么我没有考虑的问题吗?
如果我将 Consume() 函数更改为:
static void Consume()
_timelineThread = new Thread(() =>
while (_stopping.WaitOne(0) == false)
// Start consuming...
while (_queue.OutputAvailableAsync().GetAwaiter().GetResult())
var packet = _queue.ReceiveAsync().GetAwaiter().GetResult();
// Some processing...
);
_timelineThread.Start();
线程运行没有任何问题..但代码几乎与前一个相同..
编辑:一小时后,这个“黑客”似乎也不起作用..线程正在“运行”,但我没有从队列中收到任何数据..
【问题讨论】:
直接使用Thread
s 有什么特别的原因吗?
你能解释一下你的意思吗?谢谢
我的意思是目前推荐的 C# 多线程方法是使用来自TPL 的Task
【参考方案1】:
Thread
构造函数不理解 async
委托。你可以在这里阅读:
我的建议是使用同步的BlockingCollection<RtpPacket>
而不是BufferBlock<RtpPacket>
,并通过枚举GetConsumingEnumerable
方法来使用它:
var _queue = new BlockingCollection<RtpPacket>();
var producer = new Thread(() =>
while (true)
var packet = RtpPacket.GetNext();
if (packet == null) _queue.CompleteAdding(); break;
_queue.Add(packet);
Thread.Sleep(70);
);
var consumer = new Thread(() =>
foreach (var packet in _queue.GetConsumingEnumerable())
// Some processing...
);
producer.Start();
consumer.Start();
producer.Join();
consumer.Join();
【讨论】:
谢谢,现在我的线程没有死,但一小时后它挂了..所以问题仍然存在。实际上生产者线程有效,我看到我的_queue
加班加点。但是消费者线程不处理我的数据包。在debug mode
中,我可以看到线程仍然存在,但_queue.GetConsumingEnumerable()
永远不会返回。
@MarioVaccaro 好吧,这取决于您对数据包进行的处理,如果您正在清理与处理每个数据包相关的所有资源等。仅供参考,BlockingCollection<T>
也支持有界模式,其中存储在集合中的项目数量是有限的。当达到限制时,生产者将在调用Add
时被阻塞,直到再次有可用空间。
看来你是对的。注释掉处理方法,3小时后消费者线程仍在运行。处理方法只考虑 RTP 标头剥离,MP3 有效负载转换为 PCM,最后写入文件。处理在 try..catch 块内,所以异常处理得很好。如果我能找到解决方案,我会更新主线程。
感谢您的回复。可能使用多个消费者将是处理的最佳方法,但这些数据包应该会聚(排序)到一个文件中,所以我需要至少另一个队列供作者使用。同时我也发现了问题。似乎SharpMediaFoundationReader
(MP3 到 PCM)某处有内存泄漏...
哇,这看起来很有趣,我会深入研究这个问题。非常感谢!!以上是关于异步线程在 TPL 等待时死亡的主要内容,如果未能解决你的问题,请参考以下文章