TPL Dataflow 模块可从单个输入生成多个输出

Posted

技术标签:

【中文标题】TPL Dataflow 模块可从单个输入生成多个输出【英文标题】:TPL Dataflow block to produce multiple outputs from a single input 【发布时间】:2021-05-20 06:12:47 【问题描述】:

我已经开始研究 TPL 数据流作为处理工作流的解决方案。

处理工作流的要点是从多个表中读取输入消息并从中创建四个反射对象并将它们持久化到其他四个表中,因此每个输入消息应导致创建四个新消息。

我无法识别可以帮助创建四个对象的预定义块之一,起初 TransformManyBlock 似乎是我正在寻找的,但它返回多个相同类型的对象,其中我将有四种类型。

问题示例

我们有两个表,其中包含来自两个旧系统的员工详细信息,它们的实体如下所示

public partial class EmployeeTblA

    public int Id  get; set; 
    public int System  get; set; 
    public string Forename  get; set; 
    public string Surname  get; set; 
    public int Age  get; set; 
    public int Number  get; set; 
    public string Street  get; set; 
    public string PostCode  get; set; 

    public virtual EmployeeSystem SystemNavigation  get; set; 


public partial class EmployeeTblB

    public int Id  get; set; 
    public int System  get; set; 
    public string Name  get; set; 
    public int Age  get; set; 
    public string Address  get; set; 
    public string Postcode  get; set; 

    public virtual EmployeeSystem SystemNavigation  get; set; 

我们希望从两个系统中获取数据并将数据放入我们闪亮的新系统中,为此我们需要将旧系统中的实体转换为新系统中使用的实体。首先,我们将旧系统中的实体转换为如下所示的基类

public class BaseEmployee

    public int Id  get; set; 
    public int System  get; set; 
    public string Name  get; set; 
    public int Age  get; set; 
    public string Address  get; set; 
    public string Postcode  get; set; 

然后我们想从基类创建三个新对象,它们代表新系统的实体,如下所示

public partial class EmployeeName

    public int Id  get; set; 
    public int System  get; set; 
    public int LegacyId  get; set; 
    public string Name  get; set; 

    public virtual EmployeeSystem SystemNavigation  get; set; 


public partial class EmployeeAge

    public int Id  get; set; 
    public int System  get; set; 
    public int LegacyId  get; set; 
    public int Age  get; set; 

    public virtual EmployeeSystem SystemNavigation  get; set; 


public partial class EmployeeAddress

    public int Id  get; set; 
    public int System  get; set; 
    public int LegacyId  get; set; 
    public string Address  get; set; 
    public string Postcode  get; set; 

    public virtual EmployeeSystem SystemNavigation  get; set; 

上面例子中我的 TPL 的粗略流程

    从 DB 中的表中读取数据到 TranformBlock 转换为一个通用对象,这对于每个遗留系统来说都下降了两次

    每个 TranformBlock 都链接到一个 BatchBlock 以对所有输入流进行分组。

    BatchBlock 链接到一个 Block,它将接受输入并根据输入数据创建两个新对象,EmployeeName 和 EmployeeAge。

    Block 然后将链接到 Action 块和 Action 将它们保存到数据库中各自的表中

我知道我可以创建一个自定义块,但我不知道如何使用它来使用数据流将输出提供给四个单独链接的 ActionBlock,有人可以指出我正确的方向吗?

【问题讨论】:

TBH 我对这个问题的表述有点困惑。如果您可以不根据数据流块,而是根据实体来描述整个过程,这可能会有所帮助。您能否构建一个从数据库中取出、被转换并最终保存回数据库的特定实体的最小示例? @TheodorZoulias 我已经添加了一个示例,请让我知道这是否足够详细地描述了问题 两个遗留系统中的每一个都有一组不同的Employees,或者每个Employee都存在于两个系统上,您必须将两个系统的数据连接起来才能获得统一的信息每个Employee? @TheodorZoulias 不同的员工组 @TheodorZoulias 您对 BatchBlock 增加额外复杂性的假设是正确的。我认为需要使用它来聚合不同的数据流,而实际上 Dataflow 允许我将多个源链接到单个输入目标,从而实现了我正在寻找的额外功能。 【参考方案1】:

Broadcast 块是我最终使用的组件,我用它来将 BaseEmployee 对象广播到其他输出流,从而分离出我需要创建的反射对象。

完整流水线下同

         _transEmployeeA = new TransformBlock<EmployeeTblA, BaseMsg>((input) =>
         
            return new BaseMsg()
            
                Id = input.Id,
                System = input.System,
                Name = string.Concat(input.Forename, " ", input.Surname),
                Age = input.Age,
                Address = string.Concat(input.Number, " ", input.Street),
                Postcode = input.PostCode
            ;
        );

        _transEmployeeB = new TransformBlock<EmployeeTblB, BaseMsg>((input) =>
        
            return new BaseMsg()
            
                Id = input.Id,
                System = input.System,
                Name = input.Name,
                Age = input.Age,
                Address = input.Address,
                Postcode = input.Postcode
            ;
        );

        _broadcastBaseMsg = new BroadcastBlock<BaseMsg>(null);

        _transEmployeeName = new TransformBlock<BaseMsg, EmployeeName>((baseMsg) =>
        
            return new EmployeeName()
            
                System = baseMsg.System,
                LegacyId = baseMsg.Id,
                Name = baseMsg.Name
            ;
        );

        _transEmployeeAge = new TransformBlock<BaseMsg, EmployeeAge>((baseMsg) =>
        
            return new EmployeeAge()
            
                System = baseMsg.System,
                LegacyId = baseMsg.Id,
                Age = baseMsg.Age
            ;
        );

        _transEmployeeAddress = new TransformBlock<BaseMsg, EmployeeAddress>((baseMsg) =>
        
            return new EmployeeAddress()
            
                System = baseMsg.System,
                LegacyId = baseMsg.Id,
                Address = baseMsg.Address,
                Postcode = baseMsg.Postcode
            ;
        );


        _bufferEmployeeName = new BufferBlock<EmployeeName>();
        _bufferEmployeeAge = new BufferBlock<EmployeeAge>();
        _bufferEmployeeAddress = new BufferBlock<EmployeeAddress>();

        _actionEmployeeName = new ActionBlock<EmployeeName>((output) =>
        
            using (var cxt = new SandboxContext())
            
                cxt.EmployeeNames.Add(output);
                cxt.SaveChanges();
            
        );

        _actionEmployeeAge = new ActionBlock<EmployeeAge>((output) =>
                        
            using (var cxt = new SandboxContext())
            
                cxt.EmployeeAges.Add(output);
                cxt.SaveChanges();
                            
        );

        _actionEmployeeAddress = new ActionBlock<EmployeeAddress>((output) =>
                        
            using (var cxt = new SandboxContext())
            
                cxt.EmployeeAddresses.Add(output);
                cxt.SaveChanges();
                           
        );

        var linkOpts = new DataflowLinkOptions()
        
            PropagateCompletion = true
        ;

        // Transform Employees and pass to Batch
        _transEmployeeA.LinkTo(_broadcastBaseMsg, linkOpts);
        _transEmployeeB.LinkTo(_broadcastBaseMsg, linkOpts);

        // Transform Broadcast to respective outputs
        _broadcastBaseMsg.LinkTo(_transEmployeeName, linkOpts);
        _broadcastBaseMsg.LinkTo(_transEmployeeAge, linkOpts);
        _broadcastBaseMsg.LinkTo(_transEmployeeAddress, linkOpts);

        // Add outputs to Buffer
        _transEmployeeName.LinkTo(_bufferEmployeeName, linkOpts);
        _transEmployeeAge.LinkTo(_bufferEmployeeAge, linkOpts);
        _transEmployeeAddress.LinkTo(_bufferEmployeeAddress, linkOpts);

        // Persist outputs to DB
        _bufferEmployeeName.LinkTo(_actionEmployeeName, linkOpts);
        _bufferEmployeeAge.LinkTo(_actionEmployeeAge, linkOpts);
        _bufferEmployeeAddress.LinkTo(_actionEmployeeAddress, linkOpts);

另外来自 @TheodorZoulias 的 cmets 帮助我简单地使用 TPL 数据流来处理这个特定的数据流。

【讨论】:

以上是关于TPL Dataflow 模块可从单个输入生成多个输出的主要内容,如果未能解决你的问题,请参考以下文章

多个短期 TPL 数据流与单个长期运行流

TPL Dataflow,数据块收到第一项时的通知

如何在 TPL/Dataflow 中发出笛卡尔积?

使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?

TPL Dataflow BufferBlock 线程安全吗?

TPL Dataflow 如何与“全局”数据同步