TPL Dataflow 如何与“全局”数据同步
Posted
技术标签:
【中文标题】TPL Dataflow 如何与“全局”数据同步【英文标题】:TPL Dataflow how to synchronize with "global" data 【发布时间】:2017-11-06 23:31:59 【问题描述】:我实际上是在学习 TPL 数据流。每当我读到一些关于它的东西时,我觉得这听起来不错,但我经常问自己:“好吧,如果我有一个经理,处理不同的会话怎么办。这些会话可以通过某些消息更新。如果复杂的 TPL 数据流网格我必须内置同步机制来访问管理器,这会减慢或阻塞网格。”
对 TPL 数据流使用管理对象感觉有点不对劲。
谁能给我一些“正确”方向的提示(链接、书籍、示例...),如何解决上述示例。
【问题讨论】:
【参考方案1】:没有一些代码的非常广泛的问题。但通常你要么使用Tuple
s 或一些DTO 传递一些“状态”对象,我个人认为这是泄漏设计,或者你可以将一些块注入到你的管道中。
例如,您可以创建一个WriteOnceBlock
,并为其提供一个会话值,每个管道都是唯一的,并且只需在执行期间发生的新事件通知它的值。如果您为不同的会话创建管道,这可能是您的一个选择,但如果您有一个大管道,则需要另一种方式。
例如,您有一个BufferBlock
、一个执行会话更新的ActionBlock
和一个简单地继续正常管道执行的TransformBlock
。在这种情况下,您可以做的是引入BroadcastBlock
,将其与动作块和变换块链接。现在您将缓冲区链接到使用谓词的广播块,然后将缓冲区直接链接到转换块。
这里的想法是,如果您的消息被过滤(因此需要会话更新),它将进入广播块,然后进入会话更新操作和您的正常执行管道。如果它与谓词不匹配,它会简单地检查您的正常工作流程。
PS:如果说起来太复杂,我可以提供一些示例代码。
var linkOptions = new DataflowLinkOptions PropagateCompletion = true ;
var buffer = new BufferBlock<int>();
// broadcast do copy according lambda from constructor
// if you provide same reference for message, make sure that your access is thread-safe
var broadcast = new BroadcastBlock<int>(i => i);
buffer.LinkTo(broadcast, linkOptions);
// session update block
var sessionUpdate = new ActionBlock<int>(i =>
Thread.Sleep(new Random().Next(1000));
Console.WriteLine($"Session update:i");
, new ExecutionDataflowBlockOptions MaxDegreeOfParallelism = 4 );
// normal execution
var transform = new TransformBlock<int, int>(i =>
Thread.Sleep(new Random().Next(1000));
Console.WriteLine($"Normal execution:i");
return i;
, new ExecutionDataflowBlockOptions MaxDegreeOfParallelism = 4 );
// do not complete the standalone session block
// message will be accepted only for multipliers of 3
broadcast.LinkTo(sessionUpdate, i => i % 3 == 0);
// normal pipeline with completion propagation
broadcast.LinkTo(transform, linkOptions);
for (var i = 0; i < 10; ++i)
// async message
await buffer.SendAsync(i);
buffer.Complete();
await transform.Completion;
【讨论】:
感谢您的详细回答。如果不是太多工作,一些示例代码会很棒。以上是关于TPL Dataflow 如何与“全局”数据同步的主要内容,如果未能解决你的问题,请参考以下文章
使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?
TPL Dataflow LinkTo TransformBlock 非常慢