文件异步到达的相关数据处理管道

Posted

技术标签:

【中文标题】文件异步到达的相关数据处理管道【英文标题】:Dependent data processing pipelines where files arrive asynchronously 【发布时间】:2020-08-12 14:51:15 【问题描述】:

我有几个依赖于数据的任务/管道,其中一些依赖于另一个的完成。更难的是数据可以异步到达,这意味着某些任务需要等待直到上一步中的所有文件或任务都处理完。 p>

这是一个例子:

假设我们有一个带有索引的原始文件x[i,j],其中i 代表主类别j 中的一个特定子类别。

我需要运行以下管道:

    管道1:清理原始文件x[i,j]并将其存储为x_clean[i,j] 管道2:一旦管道1对j内的所有i完成,聚合来自x_clean[i,j]的结果并将其存储为y_clean[j] 管道3:清理原始文件z[j]并将其存储为z_clean[j] 管道4:一旦管道2和管道3完成,将z_clean[j]y_clean[j]合并存储为w_clean[j]

我可以应用什么样的模型来处理这种数据流方法?这种数据处理任务背后有什么方法吗? GCP 是否针对此类问题构建了一些东西?

【问题讨论】:

【参考方案1】:

在生产过程中...

步骤取决于其他步骤的完成情况。

材料可以异步到达,这意味着后续步骤将等待产品到达进行处理。但是,请注意,这并不意味着无限制的材料会失控,只有特定制造订单要消耗的材料。如果您的场景允许无限数据流流入,那么您必须对其进行预处理,以避免混合不同的产品组件。不要破坏流程的结构来尝试处理某个缓冲区或其他什么中的异步到达数据,因为制造数据产品涉及关系数据而不是原材料。

子组件可能在加入分支中完成,这意味着组装步骤在开始组装之前等待相关组件的协调集到达。

我是 POWER 的创建者,这是迄今为止唯一的协作(制造)架构。关于这个主题有很多要学习的,但是你可以在网上找到我的文章和代码:http://www.powersemantics.com/

这是您的流程在制造业工作模型中的样子:

    class MyProduct
    
        public object[i,j] x_clean  get; set; 
        public object[j] y_clean  get; set; 
        public object[j] z_clean  get; set; 
        // final product
        public object[j] w_clean  get; set; 
    
    class MyProcess : Producer<MyProduct>, IProcess, IMachine, IOrganize
    
        // process inputs
        public object[i,j] x  get; set;   // raw file
        public object[j] z  get; set;  // raw file

        // machines
        public CleanerA Cleaner1  get; set; 
        public Aggregator Aggregator1  get; set 
        public CleanerB Cleaner2  get; set; 
        public Assembler Assembler1  get; set; 

        public void D()  // instantiates properties and machines 
        public void O()
        
            // bind machines to work on the same data points
            // allows maintenance to later remove cleaners if it becomes possible
            // for the process to receive data in the correct form
            Cleaner1.x = x;
            Cleaner1.Product.x_clean = Product.x_clean;

            Aggregator1.x_clean = Product.x_clean;
            Aggregator1.Product.y_clean = Product.y_clean;

            Cleaner2.z = z;
            Cleaner2.Product.z_clean = Product.z_clean;

            Assembler1.z_clean = Product.z_clean;
            Assembler1.y_clean = Product.y_clean;
            Assembler1.Product.w_clean = Product.w_clean;
        

        // hardcoded synchronous controller
        public void M()
        
            Cleaner1.M();
            Aggregator1.M();
            Cleaner2.M();
            Assembler1.M();
        
    

    // these class pairs are Custom Machines, very specific work organized
    // by user requirements rather than in terms of domain-specific operations
    class CleanerAProduct
    
        public object[i,j] x_clean  get; set; 
    
    class CleanerA: Producer<CleanerAProduct>, IMachine
    
        public object[i,j] x  get; set;   // raw file
        public void M()
        
            // clean the raw file x[i,j] and store it as x_clean[i,j]
        
    


    class AggregatorProduct
    
        public object[j] y_clean  get; set; 
    
    class Aggregator: Producer<AggregatorProduct>, IMachine
    
        public object[i,j] x_clean  get; set; 
        public void M()
        
            // aggregate the results from x_clean[i,j] and store it as y_clean[j]
        
    


    class CleanerBProduct
    
        public object[j] z_clean  get; set; 
    
    class CleanerB : Producer<CleanerBProduct>, IMachine
    
        public object[j] z  get; set; 
        public void M()
        
            // clean a raw file z[j] and store it as z_clean[j]
        
    


    class AssemblerProduct
    
        public object[j] w_clean  get; set; 
    
    class Assembler : Producer<AssemblerProduct>, IMachine
    
        public object[j] y_clean  get; set; 
        public object[j] z_clean  get; set; 
        public void M()
        
            // combine z_clean[j] and y_clean[j] and store it as w_clean[j]
        
    

生产过程类的正常用法:

    实例化。调用 D() 来实例化机器和产品。 为流程分配任何输入。 调用 O() 让进程将这些输入分配给机器,并绑定机器以在最终产品上运行。这是您在生产前覆盖这些分配的最后机会。 调用 M() 来执行进程。

大多数源代码在同一个函数体中将生产者和消费者焊接在一起,因此以后维护变得很麻烦,然后函数将数据通过电子邮件发送给彼此,就像没有保留电子邮件跟踪的无用办公室工作人员一样.当您稍后想要做出垂直集成决策(例如更换机器或扩展流程)时,这会导致问题,所有这些我都用源记录。 POWER 是唯一避免集中化等复杂性的架构。我在二月份发布了它。

有 ETL 工具和其他解决方案,例如 TPL Dataflow,但生产流程不会为程序员自行组织或管理。所有程序员都需要学习 POWER 以正确处理浪费、集成、控制和仪表的职责。当我们编写自动化代码然后无法立即停止实时执行时,雇主会觉得我们很有趣,但我们的教育只是让我们准备创建流程,而不是像制造那样构建流程。

【讨论】:

如果您的实际需要是自己协调交付以便可以通过每个相关输入集完成处理,这是生产的规范,那么单独设计和构建该系统。

以上是关于文件异步到达的相关数据处理管道的主要内容,如果未能解决你的问题,请参考以下文章

同步,异步,阻塞,非阻塞

如何检查数据是不是已完成加载以使用异步管道 Angular 8

我怎么知道最后一个 OutputDataReceived 何时到达?

Angular 5:使用异步管道搜索 - 显示加载指示器

如何使用异步管道值处理`ngIf`?

同步异步阻塞非阻塞与服务器