TPL-Dataflow 是不是适用于高并发应用程序?

Posted

技术标签:

【中文标题】TPL-Dataflow 是不是适用于高并发应用程序?【英文标题】:Is TPL-Dataflow applicable for highly-concurrent applications?TPL-Dataflow 是否适用于高并发应用程序? 【发布时间】:2021-07-10 21:17:26 【问题描述】:

我正在研究 TPL-Dataflow 是否能够让我们不必为我们的高并发应用程序编写带有锁和监视器的样板代码。

所以我正在模拟一个简单的场景,其中包含一个生产者和多个消费者,每个消费者都应该获得所有生产的消息。并且如果某些消费者比其他消费者慢,它不应该导致系统停滞。

代码如下:

using NLog;
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp10

    internal sealed class Program
    
        private static readonly Logger m_logger = LogManager.GetCurrentClassLogger();

        static void Main(string[] args)
        
            BroadcastBlock<int> root = new BroadcastBlock<int>(d => d);

            ExecutionDataflowBlockOptions consumerOptions = new ExecutionDataflowBlockOptions() 
                BoundedCapacity = 3
            ;

            for (int consumerIndex = 0; consumerIndex < 5; ++consumerIndex)
            
                int c = consumerIndex;
                ActionBlock<int> consumer = new ActionBlock<int>(
                    (int d) => 
                        m_logger.Trace($"[#c] Starting consuming d");
                        Thread.Sleep(c * 100);
                        m_logger.Trace($"[#c] Ended consuming d");
                    ,
                    consumerOptions
                );
                root.LinkTo(consumer);
            

            Producer(10, root);

            Console.ReadLine();
        


        private static void Producer(int n, ITargetBlock<int> target)
        
            for (int i = 0; i < n; ++i)
            
                m_logger.Trace($"Starting producing i");
                if (!target.Post(i))
                
                    throw new Exception($"Failed to post message #i");
                
                m_logger.Trace($"Ending producing i");
                Thread.Sleep(50);
            
        
    

如您所见,我将消费者缓冲区的大小限制为 3(以防止缓慢的消费者无限增长缓冲区)。

下一个消费者比上一个消费者慢。消费者#0 是最快的,没有延迟。并且制片人在制作方面有一些小的延迟。

我预计至少 #0 号消费者会消费所有消息,而 #4 号消费者不会收到一些消息,因为它的缓冲区会溢出。

结果如下:

2021-04-15 22:44:15.4905 [T1] Starting producing 0 
2021-04-15 22:44:15.5049 [T1] Ending producing 0 
2021-04-15 22:44:15.5166 [T4] [#4] Starting consuming 0 
2021-04-15 22:44:15.5285 [T7] [#0] Starting consuming 0 
2021-04-15 22:44:15.5285 [T7] [#0] Ended consuming 0 
2021-04-15 22:44:15.5285 [T7] [#1] Starting consuming 0 
2021-04-15 22:44:15.5573 [T1] Starting producing 1 
2021-04-15 22:44:15.5573 [T1] Ending producing 1 
2021-04-15 22:44:15.5573 [T5] [#0] Starting consuming 1 
2021-04-15 22:44:15.5573 [T5] [#0] Ended consuming 1 
2021-04-15 22:44:15.5573 [T5] [#2] Starting consuming 0 
2021-04-15 22:44:15.5573 [T6] [#3] Starting consuming 0 
2021-04-15 22:44:15.6081 [T1] Starting producing 2 
2021-04-15 22:44:15.6081 [T1] Ending producing 2 
2021-04-15 22:44:15.6352 [T7] [#1] Ended consuming 0 
2021-04-15 22:44:15.6352 [T7] [#1] Starting consuming 1 
2021-04-15 22:44:15.6592 [T1] Starting producing 3 
2021-04-15 22:44:15.6592 [T1] Ending producing 3 
2021-04-15 22:44:15.7102 [T1] Starting producing 4 
2021-04-15 22:44:15.7102 [T1] Ending producing 4 
2021-04-15 22:44:15.7353 [T7] [#1] Ended consuming 1 
2021-04-15 22:44:15.7353 [T7] [#1] Starting consuming 2 
2021-04-15 22:44:15.7612 [T5] [#2] Ended consuming 0 
2021-04-15 22:44:15.7612 [T5] [#2] Starting consuming 1 
2021-04-15 22:44:15.7612 [T1] Starting producing 5 
2021-04-15 22:44:15.7612 [T1] Ending producing 5 
2021-04-15 22:44:15.8132 [T1] Starting producing 6 
2021-04-15 22:44:15.8132 [T1] Ending producing 6 
2021-04-15 22:44:15.8420 [T7] [#1] Ended consuming 2 
2021-04-15 22:44:15.8420 [T7] [#1] Starting consuming 3 
2021-04-15 22:44:15.8603 [T6] [#3] Ended consuming 0 
2021-04-15 22:44:15.8603 [T6] [#3] Starting consuming 1 
2021-04-15 22:44:15.8764 [T1] Starting producing 7 
2021-04-15 22:44:15.8764 [T1] Ending producing 7 
2021-04-15 22:44:15.9174 [T4] [#4] Ended consuming 0 
2021-04-15 22:44:15.9174 [T4] [#4] Starting consuming 1 
2021-04-15 22:44:15.9369 [T1] Starting producing 8 
2021-04-15 22:44:15.9369 [T1] Ending producing 8 
2021-04-15 22:44:15.9509 [T7] [#1] Ended consuming 3 
2021-04-15 22:44:15.9509 [T7] [#1] Starting consuming 4 
2021-04-15 22:44:15.9639 [T5] [#2] Ended consuming 1 
2021-04-15 22:44:15.9639 [T5] [#2] Starting consuming 2 
2021-04-15 22:44:15.9874 [T1] Starting producing 9 
2021-04-15 22:44:15.9874 [T1] Ending producing 9 
2021-04-15 22:44:16.0515 [T7] [#1] Ended consuming 4 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 2 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 2 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 3 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 3 
2021-04-15 22:44:16.0515 [T7] [#0] Starting consuming 4 
2021-04-15 22:44:16.0515 [T7] [#0] Ended consuming 4 
2021-04-15 22:44:16.0515 [T7] [#1] Starting consuming 5 
2021-04-15 22:44:16.1525 [T7] [#1] Ended consuming 5 
2021-04-15 22:44:16.1525 [T7] [#1] Starting consuming 6 
2021-04-15 22:44:16.1525 [T6] [#3] Ended consuming 1 
2021-04-15 22:44:16.1525 [T6] [#3] Starting consuming 2 
2021-04-15 22:44:16.1645 [T5] [#2] Ended consuming 2 
2021-04-15 22:44:16.1645 [T5] [#2] Starting consuming 4 
2021-04-15 22:44:16.2526 [T7] [#1] Ended consuming 6 
2021-04-15 22:44:16.2526 [T7] [#1] Starting consuming 7 
2021-04-15 22:44:16.3177 [T4] [#4] Ended consuming 1 
2021-04-15 22:44:16.3177 [T4] [#4] Starting consuming 2 
2021-04-15 22:44:16.3537 [T7] [#1] Ended consuming 7 
2021-04-15 22:44:16.3537 [T7] [#1] Starting consuming 9 
2021-04-15 22:44:16.3537 [T5] [#2] Ended consuming 4 
2021-04-15 22:44:16.3537 [T5] [#2] Starting consuming 5 
2021-04-15 22:44:16.4547 [T7] [#1] Ended consuming 9 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 5 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 5 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 6 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 6 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 7 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 7 
2021-04-15 22:44:16.4547 [T9] [#0] Starting consuming 9 
2021-04-15 22:44:16.4547 [T9] [#0] Ended consuming 9 
2021-04-15 22:44:16.4607 [T6] [#3] Ended consuming 2 
2021-04-15 22:44:16.4607 [T6] [#3] Starting consuming 4 
2021-04-15 22:44:16.5648 [T5] [#2] Ended consuming 5 
2021-04-15 22:44:16.5648 [T5] [#2] Starting consuming 9 
2021-04-15 22:44:16.7179 [T4] [#4] Ended consuming 2 
2021-04-15 22:44:16.7179 [T4] [#4] Starting consuming 4 
2021-04-15 22:44:16.7610 [T6] [#3] Ended consuming 4 
2021-04-15 22:44:16.7610 [T6] [#3] Starting consuming 9 
2021-04-15 22:44:16.7610 [T5] [#2] Ended consuming 9 
2021-04-15 22:44:17.0611 [T6] [#3] Ended consuming 9 
2021-04-15 22:44:17.1182 [T4] [#4] Ended consuming 4 
2021-04-15 22:44:17.1182 [T4] [#4] Starting consuming 9 
2021-04-15 22:44:17.5185 [T4] [#4] Ended consuming 9 

令我困惑的是消费者#0 从未收到消息8。实际上没有其他消费者得到这个消息。为什么会这样?这是 Dataflow 的预期行为吗?

如果你想检查我的 NLog.config 如下所示(我使用AsyncWrapper 目标来防止文件访问影响我的实验结果):

<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.nlog-project.org/schemas/NLog.xsd NLog.xsd"
        internalLogFile="nlog.log"
        internalLogLevel="Warn"
        throwExceptions="false" 
        parseMessageTemplates="false"
    >

  <variable name="varExceptionMsg" value="$exception:format=Message"/>
  <variable name="varMessageWithException" value="$message$onexception:inner= $varExceptionMsg"/>
  <variable name="msg4File" value="$longdate [T$threadid$threadname] $varMessageWithException $onexception:inner=$newline$exception:format=tostring:maxInnerExceptionLevel=2:innerFormat=tostring" />

  <targets>

    <target name="file" xsi:type="AsyncWrapper" queueLimit="5000" overflowAction="Discard">
      <target xsi:type="File"
              layout="$msg4File"
              fileName="$basedir/logs/$processname.$shortdate.log"
              keepFileOpen="true"
              encoding="utf-8"
            />
    </target>
  </targets>

  <rules>
    <logger name="*" minlevel="Trace" writeTo="file" />
  </rules>
</nlog>

【问题讨论】:

您可以尝试在程序开头添加这一行:ThreadPool.SetMinThreads(100, 100);,看看是否有什么不同?不建议将此作为修复方法,而是作为解决您观察到的问题的一种方法。 按回车退出程序,然后在记录器文件中登录。 @TheodorZoulias 似乎确实有帮助,是的。所以你认为问题的根源是我面临工作线程池饥饿? @jdweng,不知道你的意思是什么,但是是的,我确实通过 hittnig return 退出,并且文件确实被刷新了。如您所见,日志已满——至少有证据表明第 0 个工人确实消费了第 8 条消息,该消息是在第 8 条之后产生的,并且不会发生重新排序。 那么这个问题不是冲洗问题吗?用 using 块包装代码将自动刷新。 【参考方案1】:

您需要在 root 上执行 .Complete(),然后 Main() 需要在 Main 结束之前等待所有消费者完成咀嚼食物

在 Main 的顶部:

var linkOptions = new DataflowLinkOptions  PropagateCompletion = true ;

在你的 for 循环中:

将所有消费者添加到列表(我称之为“comps”,Completions 的缩写)。 在您的 LinkTo 调用中包含 linkOptions。

在你的 ReadLine() 之前添加这个:

root.Complete();
Task.WaitAll(comps.ToArray());

【讨论】:

尽管有了这些更新,我还是得到了一些奇怪的效果,即消费者 #4 总体上消耗的物品更少,而值 6 从未被任何消费者拾取,因为 BroadcastBlock(违反直觉)继续快乐的方式,即使消费者都是慢吞吞的。我不知道有很多好的用例。这个答案可能有一个解决方法:***.com/questions/22127660/… 你的真实消费群体的成员会不会有不同的行为?也许一个消费者大队会比试图强迫 BroadcastBlock 做一些不合适的事情更好。 在内存允许的情况下,另一种选择是将广播块链接到五个并排的缓冲区块,并让消费者从缓冲区中读取。 感谢您的回答。这是等待所有作业完成的正确方法,而不是 Console.ReadLine(),是的。它仍然没有解决最初的问题,这就是为什么即使在最快的消费者中也有一些物品丢失的原因,以及如何以可靠的方式防止这种情况发生。 我试过了。每个消费者一个缓冲区有效。所有 5 个消费者都获得了所有 10 个输入。

以上是关于TPL-Dataflow 是不是适用于高并发应用程序?的主要内容,如果未能解决你的问题,请参考以下文章

适用于高密度环境的LC双芯光纤连接器

并发队列

并发编程------并发quene

并发队列

适用于高分辨率和中等分辨率的 android 视频大小?

HTTP协议