如何在 TPL 中实现连续运行的数据流块?

Posted

技术标签:

【中文标题】如何在 TPL 中实现连续运行的数据流块?【英文标题】:How to implement continuously running dataflow blocks in TPL? 【发布时间】:2013-12-18 14:30:24 【问题描述】:

我使用 BufferBlock 和 ActionBlock 设置了生产者/消费者数据流块,它在控制台应用程序中运行良好;

将所有项目添加到 BurfferBlock 并将 BufferBlock 与其他 Action Items 链接后;它运行良好。

现在我想使用该内部服务,该数据流块管道将始终处于启动状态,并且当消息将通过外部事件可用时,它将进入缓冲区块并开始处理。我怎样才能做到这一点?

到目前为止,我已经完成了以下工作:

public void SetupPipeline()

    FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
    new ExecutionDataflowBlockOptions
    
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    );

    BufferBlock = new BufferBlock<WorkItem>();

    GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
    GroupingDataflowBlockOptions.Greedy = true;
    GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
    CancellationTokenSource = new CancellationTokenSource();
    CancellationToken = CancellationTokenSource.Token;
    GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
    BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

    ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
        ProcessWorkItems(WorkItems.ToList<WorkItem>()),
        new ExecutionDataflowBlockOptions
      
          CancellationToken = CancellationToken
      );

    Timer = new Timer(_ =>
            BatchBlock.TriggerBatch()
        );

    TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
    
        Timer.Change(TimerInterval, Timeout.Infinite);
        logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
        return WorkItem;
    , new ExecutionDataflowBlockOptions
    
        CancellationToken = CancellationToken
    );

    BatchBlock.LinkTo(ProcessItems);
    TimingBlock.LinkTo(BatchBlock);
    BufferBlock.LinkTo(TimingBlock);

【问题讨论】:

你为什么不这样做呢?您尝试了哪些方法,又是如何失败的? @svick 我已经添加了到目前为止我已经实现的内容 那么,有什么问题呢?该代码是否按您的预期工作?是什么阻止您将任何事件发布到该管道中? 我可以发帖了;如果没有 Timer,我怎么能做到这一点。我不想调用 Pipeline.Complete() 否则我将不得不再次重新初始化 Pipeline;我不想要的(因为我试图保持这个管道始终打开) 相关:How to call TriggerBatch automagically after a timeout if the number of queued items is less than the BatchSize? 【参考方案1】:

批处理大小由批处理块构造函数中的变量“BoundingCapacity”定义。批次将在以下时间发布:

已收到与批量大小相等的帖子数(在构造函数中指定) 批处理块被标记为完成 调用triggerbatch方法

您似乎希望在满足浴池大小或发生超时时发布一批。如果是这种情况,并且批量大小并不重要,我真的会为您拥有的计时器添加一个循环间隔,并使批处理块下游的对象忽略空帖子。

您可能真正想要的,也是最符合数据流编程理念的,是在您开始发布一系列项目时创建一个新的批处理块,然后在完成或发生超时时完成它。如果新帖子尚不存在,则新帖子将创建一个新的批处理块。

尝试在仅基于第一个触发器触发的批处理块周围实现超时计时器的问题是,您要么需要计数并验证发送到缓冲区块的帖子,要么需要查看来自缓冲区块的帖子。这两种情况都会造成很多丑陋和/或违反块封装。

【讨论】:

感谢您的建议;我想避免每次都创建批处理块。如您所见,我的程序基本上使用缓冲区将闲聊消息转换为块。 Buffer 与 BoundingCapacity 配合得很好;就我而言,我已将其设置为 100。但我不想等到所有 100 条消息都到达。我想要对 BufferBlock 进行双重控制;例如如果有 100 条消息或 5 秒(两者都可以配置)。解决方案可以根据我的需要工作,但我想看看其他人是否有更好的解决方案。他们的关键是我希望 BufferBlock 以双模式运行;关于命中 BoundinCapacity 和超时。 一旦你将一个项目传递给一个数据流块,你需要对那个项目一无所知。数据流块应该是数据驱动的。外部控制是不受欢迎的。老实说,我会修改您所拥有的内容,以便将“超时触发器”和批处理块包含在单个 IPropagatingBlock 中。 @VeteCoffee - 这是有道理的;这将分隔块,我将来可以用其他东西代替......谢谢 投反对票很好,但请留下反馈以帮助我修改答案。【参考方案2】:

作为一个严重的过度简化,DataFlow 是一种使用一组方法处理一堆对象的方法。它不提供或期望创建这些对象的任何特定方式。

如果您希望管道保持活动状态,请不要终止应用程序。如果您不想使用控制台应用程序,请创建一个服务来构建管道并将对象发送给它,直到它关闭。

消息只是您将通过读取数据、响应事件(无论这意味着什么)或任何其他方式创建的对象。

至于外部事件,你是什么意思?有人会将数据发送到您的应用程序吗?发生这种情况的方式有很多种:

如果数据来自另一个控制台应用程序,您可以将一个应用程序的结果通过管道传输到另一个应用程序,解析来自命令行应用程序输入流的数据,创建消息并将它们传递到管道 如果您想要一个服务监听请求,您可以托管一个 .NET Pipe、WCF 或 Web API 服务来监听调用并将发布的数据传递给管道。 如果数据来自数据库,您可以轮询更改并将任何更改的数据发送到管道。

重点是,Dataflow 是关于处理数据的,而不是关于监听事件的。它不是一个成熟的分布式代理系统,如果这正是您所寻找的。​​p>

【讨论】:

外部事件,即我有来自不同渠道的通知更新;我在 BufferBlock 上发布的。我有 24/7 服务启动并运行,它监听这些传入的位置更新,并在此基础上进行一些下游处理......

以上是关于如何在 TPL 中实现连续运行的数据流块?的主要内容,如果未能解决你的问题,请参考以下文章

TPL 数据流块消耗所有可用内存

具有永久任务/线程的 TPL 数据流块

如何对连接的 TPL 数据流块进行错误处理?

使用 pyspark 在数据块中实现 FileNotFound 异常

TPL Dataflow,数据块收到第一项时的通知

TPL 数据流在运行时中断 LinkTo()