异步线程在 TPL 等待时死亡

Posted

技术标签:

【中文标题】异步线程在 TPL 等待时死亡【英文标题】:Async thread dies on TPL await 【发布时间】:2021-12-26 01:46:09 【问题描述】:

我正在编写一个简单的生产者/消费者应用程序,但我注意到一个非常奇怪的行为..这是代码:

private Thread _timelineThread = null;
private BufferBlock<RtpPacket> _queue = null;
private AutoResetEvent _stopping = new AutoResetEvent(false);

static void Main(string[] args)
  
  // Start consumer thread
  Consume();
  
  // Produce
  var t = new Thread(() =>
  
    while (true)
    
      var packet = RtpPacket.GetNext();
      _queue.Post(packet);
      Thread.Sleep(70);
    
  
  t.Join();


static void Consume()

  _timelineThread = new Thread(async () =>
  
    while (_stopping.WaitOne(0) == false)
    
      // Start consuming...
      while (await _queue.OutputAvailableAsync())
      
        var packet = await _queue.ReceiveAsync();
        // Some processing...
      
    
  );
  _timelineThread.Start();   

这是一个无限循环(直到我路由 _stopping 信号)。但是,当 _timelineThread 遇到第一个 await _queue.OutputAvailableAsync() 时,线程将状态更改为“已停止”。有什么我没有考虑的问题吗?

如果我将 Consume() 函数更改为:

static void Consume()

  _timelineThread = new Thread(() =>
  
    while (_stopping.WaitOne(0) == false)
    
      // Start consuming...
      while (_queue.OutputAvailableAsync().GetAwaiter().GetResult())
      
        var packet = _queue.ReceiveAsync().GetAwaiter().GetResult();
        // Some processing...
      
    
  );
  _timelineThread.Start();   

线程运行没有任何问题..但代码几乎与前一个相同..

编辑:一小时后,这个“黑客”似乎也不起作用..线程正在“运行”,但我没有从队列中收到任何数据..

【问题讨论】:

直接使用Threads 有什么特别的原因吗? 你能解释一下你的意思吗?谢谢 我的意思是目前推荐的 C# 多线程方法是使用来自TPL 的Task 【参考方案1】:

Thread 构造函数不理解 async 委托。你可以在这里阅读:

Is it OK to use "async" with a ThreadStart method? Async thread body loop, It just works, but how?

我的建议是使用同步的BlockingCollection&lt;RtpPacket&gt; 而不是BufferBlock&lt;RtpPacket&gt;,并通过枚举GetConsumingEnumerable 方法来使用它:

var _queue = new BlockingCollection<RtpPacket>();

var producer = new Thread(() =>

    while (true)
    
        var packet = RtpPacket.GetNext();
        if (packet == null)  _queue.CompleteAdding(); break; 
        _queue.Add(packet);
        Thread.Sleep(70);
    
);

var consumer = new Thread(() =>

    foreach (var packet in _queue.GetConsumingEnumerable())
    
        // Some processing...
    
);

producer.Start();
consumer.Start();

producer.Join();
consumer.Join();

【讨论】:

谢谢,现在我的线程没有死,但一小时后它挂了..所以问题仍然存在。实际上生产者线程有效,我看到我的_queue 加班加点。但是消费者线程不处理我的数据包。在debug mode 中,我可以看到线程仍然存在,但_queue.GetConsumingEnumerable() 永远不会返回。 @MarioVaccaro 好吧,这取决于您对数据包进行的处理,如果您正在清理与处理每个数据包相关的所有资源等。仅供参考,BlockingCollection&lt;T&gt; 也支持有界模式,其中存储在集合中的项目数量是有限的。当达到限制时,生产者将在调用Add 时被阻塞,直到再次有可用空间。 看来你是对的。注释掉处理方法,3小时后消费者线程仍在运行。处理方法只考虑 RTP 标头剥离,MP3 有效负载转换为 PCM,最后写入文件。处理在 try..catch 块内,所以异常处理得很好。如果我能找到解决方案,我会更新主线程。 感谢您的回复。可能使用多个消费者将是处理的最佳方法,但这些数据包应该会聚(排序)到一个文件中,所以我需要至少另一个队列供作者使用。同时我也发现了问题。似乎SharpMediaFoundationReader(MP3 到 PCM)某处有内存泄漏... 哇,这看起来很有趣,我会深入研究这个问题。非常感谢!!

以上是关于异步线程在 TPL 等待时死亡的主要内容,如果未能解决你的问题,请参考以下文章

TPL异步并行编程之任务超时

TPL异步并行编程之回调

使用 TPL 在多个不同线程上创建单例对象

如何在JAVA中让一个线程死亡或结束

多线程和异步编程示例和实践-踩过的坑

Java ExecutorService - 处于等待状态的线程