TPL 数据流是并行的或有序的,但不是两者

Posted

技术标签:

【中文标题】TPL 数据流是并行的或有序的,但不是两者【英文标题】:TPL Dataflow is either Parallel or Ordered but not both 【发布时间】:2015-11-13 18:41:56 【问题描述】:

我最近已经发布了关于 Async、Await、TPL 和 TPL 数据流的各种问题。所有这些问题都得到了解答,让我明白了很多。

我开始研究异步编程,因为我想运行我的异步方法,但有一个问题。我想在任务异步并行运行时保持顺序。记录插入数据库时​​以及使用TextBox 控件在表单上打印时需要维护的顺序。 (这里我必须使用FromCurrentSynchronizationContext,因为我是从UI线程访问控件)

有人建议我使用 TPL 数据流,因为有人告诉我它提供了我需要的功能。经过一番努力并了解了 TPL 数据流的工作原理后,我设法创建了一个简单的应用程序,用于比较同步调用和 TPL 数据流代码之间的性能。

  private void btnStartSync_Click(object sender, EventArgs e)
    
        Stopwatch watch = new Stopwatch();
        watch.Start();

        try
        
            txtOutput.Clear();

            for (int i = 1; i <= 200; i++)
            
                bool x = InsertIntoDatabaseSync(i);

                if (x)
                    txtOutput.Text += "Value Inserted for Id: " + i + Environment.NewLine;
                else
                    txtOutput.Text += "Value Failed for Id: " + i + Environment.NewLine;

                txtOutput.Refresh();
            

            watch.Stop();
            lblSyncTime.Text = Convert.ToString(watch.ElapsedMilliseconds / 1000);
        
        catch
        

        
    

上述代码是对同步方法的同步调用,该方法将记录插入数据库。

 public async void btnTPLDataFlow_Click(object sender, EventArgs e)
    
        Stopwatch watch = new Stopwatch();
        watch.Start();

        txtOutput.Clear();

        ExecutionDataflowBlockOptions execOptions = new ExecutionDataflowBlockOptions();
        execOptions.MaxDegreeOfParallelism = 1;

        var transformBlock = new TransformBlock<string, OutputPropertyClass>(async v =>
        
            try
            
                bool x = await InsertIntoDatabaseAsync(Convert.ToInt32(v));

                OutputPropertyClass objResult = new OutputPropertyClass();
                objResult.Id = Convert.ToInt32(v);
                objResult.result = x;

                return objResult;
            
            catch
            
                throw new Exception("Exception occurred for: " + v);
            

        , execOptions);


        ActionBlock<OutputPropertyClass> actionBlock = new ActionBlock<OutputPropertyClass>(v =>
        
            try
            
                if (v.result)
                    txtOutput.Text += "Value Inserted for Id: " + v.Id  + Environment.NewLine;
                else
                    txtOutput.Text += "Value Failed for Id: " + v.Id + Environment.NewLine;
            
            catch
            
                throw new Exception("Exception occurred");
                            

        , new ExecutionDataflowBlockOptions  TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() );


        for (int i = 1; i <= 200; i++)
        
            transformBlock.Post(i.ToString());
        

        transformBlock.LinkTo(actionBlock, new DataflowLinkOptions  PropagateCompletion = true );
        transformBlock.Complete();

        try
        
            await transformBlock.Completion;
        
        catch (AggregateException ex)
        
            MessageBox.Show(ex.InnerException.Message);
        
        catch (Exception ex)
        
            MessageBox.Show(ex.Message);
        


        watch.Stop();
        lblTPLDataFlow.Text = Convert.ToString(watch.ElapsedMilliseconds / 1000);
    

上面提到的代码是一个Async调用DataFlow块。 TransformBlock 正在调用 InsertIntoDatabaseAsync 方法,这是一个异步任务。除此之外InsertIntoDatabaseAsyncInsertIntoDatabaseSync 是相同的。

现在两个调用的完成时间完全相同。在我的系统上,它们都需要 9 秒才能完成。

为了保持TransformBlock 中数据的顺序,我必须将MaxDegreeOfParallelism 限制为1。这样可以保持顺序,但效率没有提高。

如果我尝试增加 MaxDegreeOfParallelism 让我们说 5,那么完成该过程所需的时间会增加,并且超过 25 秒。秩序也受到干扰。

我正在寻找一种同时具有顺序和并行性的方法。如果 TPL Dataflow 无法实现这一点,那么必须有其他方法。请帮助找到正确的方向。

【问题讨论】:

您可能应该await actionBlock.Completion 而不是await transformBlock.Completion,因为管道中的最后一个块是actionBlock 【参考方案1】:

你想要的没有意义。对于流程的每个部分,如果需要订购单个部分,则永远不能并行运行。当您限制所有内容按顺序运行时,它必须一次运行一个。这不是 TPL 数据流的问题,而是需求的问题。

TPL Dataflow 可以将流分成多个部分,每个部分并行运行,并允许每个部分与自身并行运行(使用MaxDegreeOfParallelism &gt; 1),同时保持流本身的顺序。

TPL 数据流保持块的输入和输出顺序,它不保持块内的顺序(将它们插入数据库的位置)。

因此,如果您希望您的所有流程既是并行的又是有序的,TPL 数据流无法帮助您,但没有其他方法可以帮助您。

【讨论】:

这只是我第一次为 TPL DataFlow 编写的演示代码。我知道这种情况不符合标准,但我正在根据我的理解测试性能。好吧,我放弃了在 Block 内保持秩序的要求,我不需要按顺序记录。话虽如此,我可以保持 TextBox 上打印的值的顺序吗? @AdnanYaseen 是的,您的代码已经这样做了,因为 TransfromBlock 以正确的顺序输出项目,而您的 ActionBlock 是单线程的(这就是我首先建议这样做的原因)。 谢谢。只是一件快事。 TPL 数据流是否存在资源锁定或竞争问题?我需要处理这个还是不是问题。如果不是相关问题,我很抱歉。 @AdnanYaseen 如果您的代码块内的代码并行运行,那么它需要像任何其他代码一样的保护和同步。除此之外,你不需要任何东西。您可以使用多个线程从块中写入和读取。【参考方案2】:

同时(并行)做很多事情的诀窍是它们不会以相同的顺序完成。要订购一套东西,这套东西需要是完整的,而不是分解成单独的任务。您可能需要在并行任务全部完成后订购该套装。

【讨论】:

感谢您的快速回复。因此,例如,我有单独的方法来执行单个任务。在这种情况下,我应该将它们作为一种方法吗?以及如何将 UI 文本更新与我的插入方法结合起来。 UI 控件位于单独的线程上。

以上是关于TPL 数据流是并行的或有序的,但不是两者的主要内容,如果未能解决你的问题,请参考以下文章

TPL 数据流:在保持秩序的同时进行并行设计

为网站抓取工具实施的 TPL 数据流

理解并行编程

TPL 数据流使用旧数据而不是最新数据

TPL 数据流(TDF)和响应式扩展有啥区别?

浅谈并发与并行