TPL Dataflow LinkTo TransformBlock 非常慢

Posted

技术标签:

【中文标题】TPL Dataflow LinkTo TransformBlock 非常慢【英文标题】:TPL Dataflow LinkTo TransformBlock is very slow 【发布时间】:2018-08-29 22:41:54 【问题描述】:

我有两个 TransformBlocks,它们排列成一个循环。他们将数据相互链接。 TransformBlock 1 是一个读取数据的 I/O 块,最多只能有 50 个任务。它读取数据和一些元数据。然后它们被传递到第二个块。如果消息再次进入第一个块,则第二个块决定元数据。因此,在元数据匹配标准并稍等片刻之后,数据应该再次返回到 I/O 块。第二个块 MaxDegreeOfParallelism 可以是无限的。

现在我注意到,当我向 I/O 块发送大量数据时,需要很长时间才能将消息链接到第二个块。链接数据大约需要 10 分钟,而且它们都是成批发送的。就像几秒钟内的 1000 个条目一样。 通常我会这样实现它:

public void Start()

    _ioBlock = new TransformBlock<Data,Tuple<Data, MetaData>>(async data =>
    
        var metaData = await ReadAsync(data).ConfigureAwait(false);

        return new Tuple<Data, MetaData>(data, metaData);

    , new ExecutionDataflowBlockOptions  MaxDegreeOfParallelism = 50 );

    _waitBlock = new TransformBlock<Tuple<Data, MetaData>,Data>(async dataMetaData =>
    
        var data = dataMetaData.Item1;
        var metaData = dataMetaData.Item2;

        if (!metaData.Repost)
        
            return null;
        

        await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);

        return data;

    , new ExecutionDataflowBlockOptions  MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded );

    _ioBlock.LinkTo(_waitBlock);
    _waitBlock.LinkTo(_ioBlock, data => data != null);
    _waitBlock.LinkTo(DataflowBlock.NullTarget<Data>());

    foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
    
        _ioBlock.Post(data);
    

但由于所描述的问题,我必须像这样实现它:

public void Start()

    _ioBlock = new ActionBlock<Data>(async data =>
    
        var metaData = await ReadAsync(data).ConfigureAwait(false);

        var dataMetaData= new Tuple<Data, MetaData>(data, metaData);

        _waitBlock.Post(dataMetaData);

    , new ExecutionDataflowBlockOptions  MaxDegreeOfParallelism = 50 );

    _waitBlock = new ActionBlock<Tuple<Data, MetaData>>(async dataMetaData =>
    
        var data = dataMetaData.Item1;
        var metaData = dataMetaData.Item2;

        if (metaData.Repost)
        
            await Task.Delay(TimeSpan.FromMinutes(1)).ConfigureAwait(false);

            _ioBlock.Post(data);
        
    , new ExecutionDataflowBlockOptions  MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded );

    foreach (var data in Enumerable.Range(0, 2000).Select(i => new Data(i)))
    
        _ioBlock.Post(data);
    

当我使用第二种方法时,数据会更快地链接/发布(一个接一个)。但这对我来说更像是一种黑客行为。有人知道如何解决这个问题吗?一些朋友推荐我使用 TPL Pipeline,但对我来说似乎要复杂得多。

【问题讨论】:

代码中的 一分钟 等待是怎么回事?! 试试看。我怀疑它是批处理,你已经引入了每批可能 1 分钟的延迟 "..如果消息再次进入第一个块,则第二个块决定元数据.." - 你的第一个例子是这样做的吗?还是不明白你为什么是sleeping。它违背了线程/并发/TPL 数据流的全部目的,特别是当您的问题是 “TPL 数据流 LinkTo TransformBlock 是 very slow 时。 LinkTo 在运行时不做任何事情。它不会向下一个块传递任何东西,它会建立一个连接,以便一个块将结果发布到下一个块。如果有些东西很慢,那就是块中的代码 @BlackMatrix Task.Delay(TimeSpan.FromMinutes(1)) 是性能缓慢的一个明显原因,并且有足够的理由将这个问题关闭为不可重现。两个 sn-ps 完全不同——第二个没有延迟 【参考方案1】:

问题解决了。你需要设置

ExecutionDataflowBlockOptions.EnsureOrdered

将数据立即转发到下一个/等待块。

更多信息:

Why do blocks run in this order?

【讨论】:

以上是关于TPL Dataflow LinkTo TransformBlock 非常慢的主要内容,如果未能解决你的问题,请参考以下文章

TPL 数据流在运行时中断 LinkTo()

如何在 TPL/Dataflow 中发出笛卡尔积?

使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?

TPL-Dataflow 是不是适用于高并发应用程序?

TPL Dataflow BufferBlock 线程安全吗?

TPL Dataflow 如何与“全局”数据同步