带有循环链接的 TPL 块?

Posted

技术标签:

【中文标题】带有循环链接的 TPL 块?【英文标题】:TPL Block with round robin link? 【发布时间】:2018-03-23 11:53:06 【问题描述】:

我经常使用 TPL,并且拥有大型数据流管道结构。 作为管道网络的一部分,我想将一些数据写入 azure blob 存储。我们有很多数据,因此我们有 4 存储帐户,我们希望在它们之间平均分配数据。

想继续使用数据流管道模式,因此我想实现一个SourceBlock,如果我将它链接到几个目标块,它将通过循环向它们发送消息。 BufferBlock 不够好,因为他正在将消息发送到接受它的第一个块,并假设所有目标块都有很大的有界容量 - 所有消息都将发送到第一个目标块。 BroadcastBlock 也不好,因为我不想重复。

有什么建议吗?使用轮询行为实现ISourceBlock 接口似乎并不那么简单,我想知道是否有更简单的解决方案?还是我不熟悉的 TPL 扩展?

【问题讨论】:

【参考方案1】:

您知道link the blocks with a predicate 的可能性吗?这是一个非常简单且未经过良好测试的示例解决方案:

var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => Console.WriteLine($"First: i"));
var consumer2 = new ActionBlock<int>(i => Console.WriteLine($"Second: i"));
var consumer3 = new ActionBlock<int>(i => Console.WriteLine($"Third: i"));
var consumer4 = new ActionBlock<int>(i => Console.WriteLine($"Forth: i"));

buffer.LinkTo(consumer1, i => Predicate(0));
buffer.LinkTo(consumer2, i => Predicate(1));
buffer.LinkTo(consumer3, i => Predicate(2));
buffer.LinkTo(consumer4, i => Predicate(3));
buffer.LinkTo(DataflowBlock.NullTarget<int>());

for (var i = 0; i < 10; ++i)

    buffer.Post(i);

buffer.Completion.Wait();

输出之一:

Third: 2
First: 0
Forth: 3
Second: 1
Second: 5
Second: 9
Third: 6
Forth: 7
First: 4
First: 8

这里发生的情况是您要保持操作次数,如果电流适合消费者,我们只是增加它。请注意,您仍然应该至少链接一次没有任何谓词的块,以避免内存问题(此外,最好用块来测试循环,它会监控丢失的消息)。

【讨论】:

感谢 VMAtm。我以前从未尝试过通过谓词,我从您的示例中学到了很多东西。仍然不确定它是否适用于我的情况。但也许我错过了一些东西,因为没有完全理解它。所以一个问题 - 你写了“请注意,你仍然应该至少一次链接没有任何谓词的块,以避免内存问题”。你能解释一下你的意思是什么内存问题吗? 正如我所说,这段代码没有经过适当的测试,而且,也许有一种情况,所有谓词都返回 false。之后,消息将保留在BufferBlock 中,直到程序结束。为“坏”消息创建一个块以离开分隔符是一种常见的做法。 NullTarget 只是忽略该消息,但您可以使用您喜欢的任何块并记录所有谓词未命中。

以上是关于带有循环链接的 TPL 块?的主要内容,如果未能解决你的问题,请参考以下文章

带有块和保留循环的 ARC

iOS:在while循环中带有块回调的异步方法

接收集合并为每个元素调用其链接块的 TPL 数据流

带有已定义寻呼机链接的 jQuery 循环

带有 for 循环的 MATLAB add_line 给出了无效的 Simulink 对象名称错误

C#:带有循环链接的节点的 xml 序列化