文件异步到达的相关数据处理管道
Posted
技术标签:
【中文标题】文件异步到达的相关数据处理管道【英文标题】:Dependent data processing pipelines where files arrive asynchronously 【发布时间】:2020-08-12 14:51:15 【问题描述】:我有几个依赖于数据的任务/管道,其中一些依赖于另一个的完成。更难的是数据可以异步到达,这意味着某些任务需要等待直到上一步中的所有文件或任务都处理完。 p>
这是一个例子:
假设我们有一个带有索引的原始文件x[i,j]
,其中i
代表主类别j
中的一个特定子类别。
我需要运行以下管道:
-
管道1:清理原始文件
x[i,j]
并将其存储为x_clean[i,j]
管道2:一旦管道1对j
内的所有i
完成,聚合来自x_clean[i,j]
的结果并将其存储为y_clean[j]
管道3:清理原始文件z[j]
并将其存储为z_clean[j]
管道4:一旦管道2和管道3完成,将z_clean[j]
和y_clean[j]
合并存储为w_clean[j]
。
我可以应用什么样的模型来处理这种数据流方法?这种数据处理任务背后有什么方法吗? GCP 是否针对此类问题构建了一些东西?
【问题讨论】:
【参考方案1】:在生产过程中...
步骤取决于其他步骤的完成情况。
材料可以异步到达,这意味着后续步骤将等待产品到达进行处理。但是,请注意,这并不意味着无限制的材料会失控,只有特定制造订单要消耗的材料。如果您的场景允许无限数据流流入,那么您必须对其进行预处理,以避免混合不同的产品组件。不要破坏流程的结构来尝试处理某个缓冲区或其他什么中的异步到达数据,因为制造数据产品涉及关系数据而不是原材料。
子组件可能在加入分支中完成,这意味着组装步骤在开始组装之前等待相关组件的协调集到达。
我是 POWER 的创建者,这是迄今为止唯一的协作(制造)架构。关于这个主题有很多要学习的,但是你可以在网上找到我的文章和代码:http://www.powersemantics.com/
这是您的流程在制造业工作模型中的样子:
class MyProduct
public object[i,j] x_clean get; set;
public object[j] y_clean get; set;
public object[j] z_clean get; set;
// final product
public object[j] w_clean get; set;
class MyProcess : Producer<MyProduct>, IProcess, IMachine, IOrganize
// process inputs
public object[i,j] x get; set; // raw file
public object[j] z get; set; // raw file
// machines
public CleanerA Cleaner1 get; set;
public Aggregator Aggregator1 get; set
public CleanerB Cleaner2 get; set;
public Assembler Assembler1 get; set;
public void D() // instantiates properties and machines
public void O()
// bind machines to work on the same data points
// allows maintenance to later remove cleaners if it becomes possible
// for the process to receive data in the correct form
Cleaner1.x = x;
Cleaner1.Product.x_clean = Product.x_clean;
Aggregator1.x_clean = Product.x_clean;
Aggregator1.Product.y_clean = Product.y_clean;
Cleaner2.z = z;
Cleaner2.Product.z_clean = Product.z_clean;
Assembler1.z_clean = Product.z_clean;
Assembler1.y_clean = Product.y_clean;
Assembler1.Product.w_clean = Product.w_clean;
// hardcoded synchronous controller
public void M()
Cleaner1.M();
Aggregator1.M();
Cleaner2.M();
Assembler1.M();
// these class pairs are Custom Machines, very specific work organized
// by user requirements rather than in terms of domain-specific operations
class CleanerAProduct
public object[i,j] x_clean get; set;
class CleanerA: Producer<CleanerAProduct>, IMachine
public object[i,j] x get; set; // raw file
public void M()
// clean the raw file x[i,j] and store it as x_clean[i,j]
class AggregatorProduct
public object[j] y_clean get; set;
class Aggregator: Producer<AggregatorProduct>, IMachine
public object[i,j] x_clean get; set;
public void M()
// aggregate the results from x_clean[i,j] and store it as y_clean[j]
class CleanerBProduct
public object[j] z_clean get; set;
class CleanerB : Producer<CleanerBProduct>, IMachine
public object[j] z get; set;
public void M()
// clean a raw file z[j] and store it as z_clean[j]
class AssemblerProduct
public object[j] w_clean get; set;
class Assembler : Producer<AssemblerProduct>, IMachine
public object[j] y_clean get; set;
public object[j] z_clean get; set;
public void M()
// combine z_clean[j] and y_clean[j] and store it as w_clean[j]
生产过程类的正常用法:
-
实例化。调用 D() 来实例化机器和产品。
为流程分配任何输入。
调用 O() 让进程将这些输入分配给机器,并绑定机器以在最终产品上运行。这是您在生产前覆盖这些分配的最后机会。
调用 M() 来执行进程。
大多数源代码在同一个函数体中将生产者和消费者焊接在一起,因此以后维护变得很麻烦,然后函数将数据通过电子邮件发送给彼此,就像没有保留电子邮件跟踪的无用办公室工作人员一样.当您稍后想要做出垂直集成决策(例如更换机器或扩展流程)时,这会导致问题,所有这些我都用源记录。 POWER 是唯一避免集中化等复杂性的架构。我在二月份发布了它。
有 ETL 工具和其他解决方案,例如 TPL Dataflow,但生产流程不会为程序员自行组织或管理。所有程序员都需要学习 POWER 以正确处理浪费、集成、控制和仪表的职责。当我们编写自动化代码然后无法立即停止实时执行时,雇主会觉得我们很有趣,但我们的教育只是让我们准备创建流程,而不是像制造那样构建流程。
【讨论】:
如果您的实际需要是自己协调交付以便可以通过每个相关输入集完成处理,这是生产的规范,那么单独设计和构建该系统。以上是关于文件异步到达的相关数据处理管道的主要内容,如果未能解决你的问题,请参考以下文章
如何检查数据是不是已完成加载以使用异步管道 Angular 8