TPL Dataflow 模块可从单个输入生成多个输出
Posted
技术标签:
【中文标题】TPL Dataflow 模块可从单个输入生成多个输出【英文标题】:TPL Dataflow block to produce multiple outputs from a single input 【发布时间】:2021-05-20 06:12:47 【问题描述】:我已经开始研究 TPL 数据流作为处理工作流的解决方案。
处理工作流的要点是从多个表中读取输入消息并从中创建四个反射对象并将它们持久化到其他四个表中,因此每个输入消息应导致创建四个新消息。
我无法识别可以帮助创建四个对象的预定义块之一,起初 TransformManyBlock 似乎是我正在寻找的,但它返回多个相同类型的对象,其中我将有四种类型。
问题示例
我们有两个表,其中包含来自两个旧系统的员工详细信息,它们的实体如下所示
public partial class EmployeeTblA
public int Id get; set;
public int System get; set;
public string Forename get; set;
public string Surname get; set;
public int Age get; set;
public int Number get; set;
public string Street get; set;
public string PostCode get; set;
public virtual EmployeeSystem SystemNavigation get; set;
public partial class EmployeeTblB
public int Id get; set;
public int System get; set;
public string Name get; set;
public int Age get; set;
public string Address get; set;
public string Postcode get; set;
public virtual EmployeeSystem SystemNavigation get; set;
我们希望从两个系统中获取数据并将数据放入我们闪亮的新系统中,为此我们需要将旧系统中的实体转换为新系统中使用的实体。首先,我们将旧系统中的实体转换为如下所示的基类
public class BaseEmployee
public int Id get; set;
public int System get; set;
public string Name get; set;
public int Age get; set;
public string Address get; set;
public string Postcode get; set;
然后我们想从基类创建三个新对象,它们代表新系统的实体,如下所示
public partial class EmployeeName
public int Id get; set;
public int System get; set;
public int LegacyId get; set;
public string Name get; set;
public virtual EmployeeSystem SystemNavigation get; set;
public partial class EmployeeAge
public int Id get; set;
public int System get; set;
public int LegacyId get; set;
public int Age get; set;
public virtual EmployeeSystem SystemNavigation get; set;
public partial class EmployeeAddress
public int Id get; set;
public int System get; set;
public int LegacyId get; set;
public string Address get; set;
public string Postcode get; set;
public virtual EmployeeSystem SystemNavigation get; set;
上面例子中我的 TPL 的粗略流程
从 DB 中的表中读取数据到 TranformBlock
每个 TranformBlock
BatchBlock 链接到一个 Block
Block
我知道我可以创建一个自定义块,但我不知道如何使用它来使用数据流将输出提供给四个单独链接的 ActionBlock,有人可以指出我正确的方向吗?
【问题讨论】:
TBH 我对这个问题的表述有点困惑。如果您可以不根据数据流块,而是根据实体来描述整个过程,这可能会有所帮助。您能否构建一个从数据库中取出、被转换并最终保存回数据库的特定实体的最小示例? @TheodorZoulias 我已经添加了一个示例,请让我知道这是否足够详细地描述了问题 两个遗留系统中的每一个都有一组不同的Employee
s,或者每个Employee
都存在于两个系统上,您必须将两个系统的数据连接起来才能获得统一的信息每个Employee
?
@TheodorZoulias 不同的员工组
@TheodorZoulias 您对 BatchBlock 增加额外复杂性的假设是正确的。我认为需要使用它来聚合不同的数据流,而实际上 Dataflow 允许我将多个源链接到单个输入目标,从而实现了我正在寻找的额外功能。
【参考方案1】:
Broadcast 块是我最终使用的组件,我用它来将 BaseEmployee 对象广播到其他输出流,从而分离出我需要创建的反射对象。
完整流水线下同
_transEmployeeA = new TransformBlock<EmployeeTblA, BaseMsg>((input) =>
return new BaseMsg()
Id = input.Id,
System = input.System,
Name = string.Concat(input.Forename, " ", input.Surname),
Age = input.Age,
Address = string.Concat(input.Number, " ", input.Street),
Postcode = input.PostCode
;
);
_transEmployeeB = new TransformBlock<EmployeeTblB, BaseMsg>((input) =>
return new BaseMsg()
Id = input.Id,
System = input.System,
Name = input.Name,
Age = input.Age,
Address = input.Address,
Postcode = input.Postcode
;
);
_broadcastBaseMsg = new BroadcastBlock<BaseMsg>(null);
_transEmployeeName = new TransformBlock<BaseMsg, EmployeeName>((baseMsg) =>
return new EmployeeName()
System = baseMsg.System,
LegacyId = baseMsg.Id,
Name = baseMsg.Name
;
);
_transEmployeeAge = new TransformBlock<BaseMsg, EmployeeAge>((baseMsg) =>
return new EmployeeAge()
System = baseMsg.System,
LegacyId = baseMsg.Id,
Age = baseMsg.Age
;
);
_transEmployeeAddress = new TransformBlock<BaseMsg, EmployeeAddress>((baseMsg) =>
return new EmployeeAddress()
System = baseMsg.System,
LegacyId = baseMsg.Id,
Address = baseMsg.Address,
Postcode = baseMsg.Postcode
;
);
_bufferEmployeeName = new BufferBlock<EmployeeName>();
_bufferEmployeeAge = new BufferBlock<EmployeeAge>();
_bufferEmployeeAddress = new BufferBlock<EmployeeAddress>();
_actionEmployeeName = new ActionBlock<EmployeeName>((output) =>
using (var cxt = new SandboxContext())
cxt.EmployeeNames.Add(output);
cxt.SaveChanges();
);
_actionEmployeeAge = new ActionBlock<EmployeeAge>((output) =>
using (var cxt = new SandboxContext())
cxt.EmployeeAges.Add(output);
cxt.SaveChanges();
);
_actionEmployeeAddress = new ActionBlock<EmployeeAddress>((output) =>
using (var cxt = new SandboxContext())
cxt.EmployeeAddresses.Add(output);
cxt.SaveChanges();
);
var linkOpts = new DataflowLinkOptions()
PropagateCompletion = true
;
// Transform Employees and pass to Batch
_transEmployeeA.LinkTo(_broadcastBaseMsg, linkOpts);
_transEmployeeB.LinkTo(_broadcastBaseMsg, linkOpts);
// Transform Broadcast to respective outputs
_broadcastBaseMsg.LinkTo(_transEmployeeName, linkOpts);
_broadcastBaseMsg.LinkTo(_transEmployeeAge, linkOpts);
_broadcastBaseMsg.LinkTo(_transEmployeeAddress, linkOpts);
// Add outputs to Buffer
_transEmployeeName.LinkTo(_bufferEmployeeName, linkOpts);
_transEmployeeAge.LinkTo(_bufferEmployeeAge, linkOpts);
_transEmployeeAddress.LinkTo(_bufferEmployeeAddress, linkOpts);
// Persist outputs to DB
_bufferEmployeeName.LinkTo(_actionEmployeeName, linkOpts);
_bufferEmployeeAge.LinkTo(_actionEmployeeAge, linkOpts);
_bufferEmployeeAddress.LinkTo(_actionEmployeeAddress, linkOpts);
另外来自 @TheodorZoulias 的 cmets 帮助我简单地使用 TPL 数据流来处理这个特定的数据流。
【讨论】:
以上是关于TPL Dataflow 模块可从单个输入生成多个输出的主要内容,如果未能解决你的问题,请参考以下文章
使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?