BufferBlock<T> -> ActionBlock<T> 未触发

Posted

技术标签:

【中文标题】BufferBlock<T> -> ActionBlock<T> 未触发【英文标题】:BufferBlock<T> -> ActionBlock<T> Not Firing 【发布时间】:2016-05-25 15:08:34 【问题描述】:

简而言之

我创建了一个 BatchBlock,它链接到一个 ActionBlock,它调用一个异步方法,但这个方法永远不会触发。

详情

这是我创建块的方式

        var instance = new ConnectionSaveStep(repo, progress, total);
        var batch = new BatchBlock<LandingPageConnection>(10000);
        saveAction = new ActionBlock<IEnumerable<LandingPageConnection>>(i => instance.Save(i));
        batch.LinkTo(saveAction);

保存方法如下:

    internal async Task Save(IEnumerable<LandingPageConnection> pages)
    
        Trace.WriteLine("Inserting " + pages.Count() + " items ...");
        await repo.InsertBatchAsync(pages);
    

元素(大约 1 mio)在 Parallel.ForEach 中排队。

 Parallel.ForEach(cities, city=>
           
             var pages= BuildLandingPage(city);
             batch.Post(pages);

但是保存方法永远不会被提升

当我查看批处理和 saveAction 的属性时,我可以看到首先批处理块的未输入队列中的项目数量正在增加,直到达到批处理块大小。之后,具有 10.000 个工作项的一项在 saveACtion 输入队列中排队。

但从不调用保存。

我做错了什么?

【问题讨论】:

不使用Parallel.ForEach是否有效? 好主意,我会试一试。但根据我的预期,这也应该适用于每个 您的代码中似乎没有任何问题。但也许Parallel.ForEach 完全占据了ThreadPool,并没有为ActionBlock 留出空间。 嗯是的,也许我也会尝试限制线程数 你是对的,但主要我想了解它为什么不起作用。原因是我发布的代码下面有一个小代码 sn-p,它引发了 nullrefence 异常 【参考方案1】:

解决方案在 cmets 中。原因是生产者创建了许多线程,这就是消费者永远没有机会触发的原因。

【讨论】:

以上是关于BufferBlock<T> -> ActionBlock<T> 未触发的主要内容,如果未能解决你的问题,请参考以下文章

通过 BufferBlock 的背压不起作用。 (C# TPL 数据流)

使用 TPL 数据流的请求/响应模式

如何在 TPL 中实现连续运行的数据流块?

我应该如何使用 DataflowBlockOptions.CancellationToken?

带有循环链接的 TPL 块?

具有有限容量的转换块中的 TPL 数据流异常