TPL DataFlow 一对一处理

Posted

技术标签:

【中文标题】TPL DataFlow 一对一处理【英文标题】:TPL DataFlow One by one processing 【发布时间】:2017-12-22 00:23:55 【问题描述】:

我有一个持续处理消息的系统。我想确保仅在处理先前的消息时才从外部队列请求消息。 让我们假设 GetMessages 方法从外部队列请求消息。

有事件1.会推送它 推1 有活动 2。将推动它 - 我的音乐会在这里。因为我们在处理之前的项目之前得到了项目 处理 1 已处理 1 已删除 1

代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp1

    class Program
    
        static void Main(string[] args)
        
            EventProcessor a = new EventProcessor();
            Task task = Task.Run(async ()=> await a.Process());

            task.Wait();
        
    

    public class EventProcessor
    
        private readonly TransformBlock<int, string> _startBlock;
        private readonly ActionBlock<string> _deleteBlock;
        private readonly ActionBlock<int> _recieveBlock;

        public EventProcessor()
        
            var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions 
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1,
            ;

            this._startBlock = new TransformBlock<int, string>(
                async @event => await this.ProcessNotificationEvent(@event),
                executionDataflowBlockOptions
            );

            this._deleteBlock = new ActionBlock<string>(async @event => 
                await this.DeleteMessage(@event);
            , executionDataflowBlockOptions);
            var trashBin = DataflowBlock.NullTarget<string>();


            var dataflowLinkOptions = new DataflowLinkOptions 
                PropagateCompletion = true,
            ;

            this._startBlock.LinkTo(
                this._deleteBlock,
                dataflowLinkOptions,
                (result => result != "o")
            );

            this._startBlock.LinkTo(
                trashBin,
                dataflowLinkOptions,
                (result => result == "o")
            );
        

        private async Task<string> ProcessNotificationEvent(int @event)
        
            Console.WriteLine($"Processing @event");
            await Task.Delay(5000);
            Console.WriteLine($"Processed @event");
            return @event.ToString();
        

        public async Task Process()
        

            //while (this._cancellationTokenSource.IsCancellationRequested == false) 
            foreach (var notificationEvent in GetMessages()) 
                Console.WriteLine($"Got event notificationEvent. Will push it");
                if (await this._startBlock.SendAsync(notificationEvent) == false) 
                    Console.WriteLine($"Failed to push notificationEvent");
                    return;
                
                Console.WriteLine($"Pushed notificationEvent");
            
            //
            this._startBlock.Complete();
            this._deleteBlock.Completion.Wait();
        

        private static IEnumerable<int> GetMessages() 
            return Enumerable.Range(1, 5);
        

        private async Task DeleteMessage(string @event)
        
            Console.WriteLine($"Deleted @event");
        
    

输出将是

Got event 1. Will push it
Pushed 1
Got event 2. Will push it
Processing 1
Processed 1
Deleted 1
Processing 2
Pushed 2
Got event 3. Will push it
Processed 2
Processing 3
Deleted 2
Pushed 3
Got event 4. Will push it
Processed 3
Deleted 3
Processing 4
Pushed 4
Processed 4
Deleted 4
Press any key to continue . . .

我认为我可以为每条消息创建 TDL 数据流,但我认为这有点过头了。

【问题讨论】:

如果你让ProcessNotificationEvent 同步而不是异步,它会起作用吗? @mjwills 不。相同的行为 旁注:如果您的trashBinNullTarget,则不应为其过滤消息。并且不要做 `== false` 比较,使用! 表示布尔值。 您想施加超出单个块边界的并发限制。查看this 答案以获取可用选项。在您的情况下,只有 SemaphoreSlim 解决方案适用,因为您需要限制的操作是异步的。 【参考方案1】:

问题是您有一个缓冲区,因此您的生产者循环将始终在处理第一个项目时处理下一个项目。这是使用 TPL 数据流的自然结果。

如果您想一次处理一个,最简单的方法可能是删除 TPL 数据流:

public class EventProcessor

  private async Task<string> ProcessNotificationEvent(int @event)
  
    Console.WriteLine($"Processing @event");
    await Task.Delay(5000);
    Console.WriteLine($"Processed @event");
    return @event.ToString();
  

  public async Task Process()
  
    foreach (var notificationEvent in GetMessages()) 
      Console.WriteLine($"Got event notificationEvent. Will push it");
      var result = await this.ProcessNotificationEvent(notificationEvent);
      if (result != "o")
        await DeleteMessage(result);
    
  

  private static IEnumerable<int> GetMessages() => Enumerable.Range(1, 5);

  private async Task DeleteMessage(string @event) => Console.WriteLine($"Deleted @event");

【讨论】:

感谢您的回复。是的,我正在考虑它。这只是一个例子,当然我在那里有更多的数据流块。而且每一步实际上都会向缓冲区添加一个项目。据我所知,我只有两个选择:1)删除数据流 2)为每条消息创建流并等待最后一个块的完成。对吗? 如果这是在 Dataflow 网格的中间,那么只需将“下载”步骤和“处理”步骤组合成一个 TransformBlock。

以上是关于TPL DataFlow 一对一处理的主要内容,如果未能解决你的问题,请参考以下文章

TPL Dataflow 模块可从单个输入生成多个输出

TPL Dataflow 如何与“全局”数据同步

TPL Dataflow,数据块收到第一项时的通知

如何在 TPL/Dataflow 中发出笛卡尔积?

使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?

TPL-Dataflow 是不是适用于高并发应用程序?