TPL Dataflow 如何与“全局”数据同步

Posted

技术标签:

【中文标题】TPL Dataflow 如何与“全局”数据同步【英文标题】:TPL Dataflow how to synchronize with "global" data 【发布时间】:2017-11-06 23:31:59 【问题描述】:

我实际上是在学习 TPL 数据流。每当我读到一些关于它的东西时,我觉得这听起来不错,但我经常问自己:“好吧,如果我有一个经理,处理不同的会话怎么办。这些会话可以通过某些消息更新。如果复杂的 TPL 数据流网格我必须内置同步机制来访问管理器,这会减慢或阻塞网格。”

对 TPL 数据流使用管理对象感觉有点不对劲。

谁能给我一些“正确”方向的提示(链接、书籍、示例...),如何解决上述示例。

【问题讨论】:

【参考方案1】:

没有一些代码的非常广泛的问题。但通常你要么使用Tuples 或一些DTO 传递一些“状态”对象,我个人认为这是泄漏设计,或者你可以将一些块注入到你的管道中。

例如,您可以创建一个WriteOnceBlock,并为其提供一个会话值,每个管道都是唯一的,并且只需在执行期间发生的新事件通知它的值。如果您为不同的会话创建管道,这可能是您的一个选择,但如果您有一个大管道,则需要另一种方式。

例如,您有一个BufferBlock、一个执行会话更新的ActionBlock 和一个简单地继续正常管道执行的TransformBlock。在这种情况下,您可以做的是引入BroadcastBlock,将其与动作块和变换块链接。现在您将缓冲区链接到使用谓词的广播块,然后将缓冲区直接链接到转换块。

这里的想法是,如果您的消息被过滤(因此需要会话更新),它将进入广播块,然后进入会话更新操作和您的正常执行管道。如果它与谓词不匹配,它会简单地检查您的正常工作流程。

PS:如果说起来太复杂,我可以提供一些示例代码。

var linkOptions = new DataflowLinkOptions  PropagateCompletion = true ;

var buffer = new BufferBlock<int>();
// broadcast do copy according lambda from constructor
// if you provide same reference for message, make sure that your access is thread-safe
var broadcast = new BroadcastBlock<int>(i => i);
buffer.LinkTo(broadcast, linkOptions);

// session update block
var sessionUpdate = new ActionBlock<int>(i =>
    
        Thread.Sleep(new Random().Next(1000));
        Console.WriteLine($"Session update:i");
    , new ExecutionDataflowBlockOptions  MaxDegreeOfParallelism = 4 );
// normal execution
var transform = new TransformBlock<int, int>(i =>
    
        Thread.Sleep(new Random().Next(1000));
        Console.WriteLine($"Normal execution:i");
        return i;
    , new ExecutionDataflowBlockOptions  MaxDegreeOfParallelism = 4 );
// do not complete the standalone session block
// message will be accepted only for multipliers of 3
broadcast.LinkTo(sessionUpdate, i => i % 3 == 0);
// normal pipeline with completion propagation
broadcast.LinkTo(transform, linkOptions);

for (var i = 0; i < 10; ++i)

    // async message
    await buffer.SendAsync(i);

buffer.Complete();
await transform.Completion;

【讨论】:

感谢您的详细回答。如果不是太多工作,一些示例代码会很棒。

以上是关于TPL Dataflow 如何与“全局”数据同步的主要内容,如果未能解决你的问题,请参考以下文章

使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?

如何在 TPL/Dataflow 中发出笛卡尔积?

如何在简单的 TPL DataFlow 管道中优化性能?

TPL Dataflow LinkTo TransformBlock 非常慢

TPL Dataflow ,完成一个 Block ,重新创建一个 BLock

TPL Dataflow 管道中的图像刷新问题