TPL 数据流 - 与事件流一起使用

Posted

技术标签:

【中文标题】TPL 数据流 - 与事件流一起使用【英文标题】:TPL Data flow - usage with events stream 【发布时间】:2019-10-30 22:36:56 【问题描述】:

在我的应用程序中,几乎每次都有大量来自第三方的事件,我必须处理它们并通过网络发送(发布)到组织的 RabbitMQ。我的疑问是关于 TPL Dataflow 中的用法。想象一下代码如下:

 private TransformBlock<QuoteEvent, Quote> _quotesProcessingBlock;

    private ActionBlock<Quote> _deliveryBlock;
    public TplDataFlow()
    
        _quotesProcessingBlock = new TransformBlock<QuoteEvent, Quote>(
            x => ProcessQuoteEvent(x));
        _deliveryBlock = new ActionBlock<Quote>(quote => Publish(quote));

        _quotesProcessingBlock.LinkTo(
            _deliveryBlock,
             new DataflowLinkOptions  PropagateCompletion = true 
            ); 
    

    //This callback method registered at the 3rd party events producer.
//It runs single threaded, so I need to process it quickly  
    private void ProcessEvent(QuoteEvent quoteEvent)
    
        _quotesProcessingBlock.Post(quoteEvent);

        //What will be the trigger for those lines?? 
        _quotesProcessingBlock.Complete();
        _deliveryBlock.Completion.ConfigureAwait(false).GetAwaiter().GetResult();
    

我不确定何时使用_quotesProcessingBlock.Complete();_deliveryBlock.Completion。在我看来,为每个引用事件(每秒数百次)都这样做是不合理的,

如果是,我应该删除它还是应该将它移到另一个级别?

或者,

数据流在这里不是正确的解决方案?

如果有,有其他解决方案吗?

【问题讨论】:

【参考方案1】:

当您完成该数据流块后,您将调用Complete。在这种情况下,当没有更多事件要处理时。当您的应用程序关闭时,您可能只执行一次此操作,或者根本不执行此操作。

你应该在某个时候awaitCompletion 属性,即使你从不调用Complete;如果数据流网格失败,它将通知您的代码。

【讨论】:

据我了解您的回答,我必须删除 CompleteCompletion 亚麻布,所以它看起来像 public void ProcessEvent(QuoteEvent quoteEvent) _quotesProcessingBlock.Post(quoteEvent); 在我的情况下,当代码是同步的时,正确的位置在哪里到await 或通过GetResult() 阻止Completion

以上是关于TPL 数据流 - 与事件流一起使用的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法将任务并行库(TPL)与 SQLDataReader 一起使用?

FLink的窗口机制与流处理Join的方案

TPL 数据流(TDF)和响应式扩展有啥区别?

在 TPL 数据流中动态订阅/取消订阅

是否可以将 azure 流分析与 mod 总线一起使用?

用 TPL 数据流中的最新值替换缓冲值