运营商之间的气流和数据传输

Posted

技术标签:

【中文标题】运营商之间的气流和数据传输【英文标题】:Airflow and data transfer between operators 【发布时间】:2017-03-13 11:06:26 【问题描述】:

我是气流新手,对 Airflow 及其处理器有疑问。 当一个处理器产生一个输出时,这个输出如何在输入中移动到下一个处理器? 有一个名为 nifi 的软件将中间输出存储到流文件中,afaik 在气流中没有这样的东西。 那么这是怎么发生的呢?

提前致谢。

【问题讨论】:

【参考方案1】:

Airflow 使用Xcoms 在算子之间传递数据。

如果流程是操作员 A -> 操作员 B,那么操作员 A 必须将一个值“推”到 xcom,如果操作员 B 想要读取该值,则必须从 A 中“拉取”这个值。

A 下游的任何操作员都可以通过以下方式访问 A 推送到 Xcom 的任何值:

value = context['task_instance'].xcom_pull(task_ids='operator_a', key='key_name') 

操作员 A 会像这样推送这个值:

context['task_instance'].xcom_push(key_name,value,context['execution_date'])

【讨论】:

另一个(简单的?)问题:xcom 在哪里存储数据? 在气流 DB xcom 表中 是否可以将查询结果共享给其他任务,如果可以,我该怎么做?在哪里可以找到更多这样的例子,我觉得框架有点乱 @AlbertoBonsanto 考虑将此问题的更详细版本作为新问题发布。我认为答案是“是”,但需要更多信息才能提供一个好的答案。 有没有办法知道当前任务所依赖的task_ids。例如 [a,b] >> c。如果我在做 data1 = context['task_instance'].xcom_pull(task_ids='a', key='key_name') 和 data2 = context['task_instance'].xcom_pull(task_ids='b', key='key_name ') 在c中,那么我需要了解依赖关系。有什么方法可以了解 c 中的 a,b 吗?【参考方案2】:

也许您指的是 GenericTransfer 运算符,它有助于在数据源之间移动数据?

https://github.com/apache/incubator-airflow/blob/master/airflow/operators/generic_transfer.py

【讨论】:

我认为他们在问如何将数据从一个操作员传递到另一个操作员,而不是如何通过操作员从一个位置获取数据到另一个位置。

以上是关于运营商之间的气流和数据传输的主要内容,如果未能解决你的问题,请参考以下文章

如何与客户运营商验证气流 DAG?

气流.providers 和气流.contrib 之间的差异

气流:每日刷新后如何在 s3 存储桶中公开对象

从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业

春云数据流和气流

是否可以同时进行气流回填和调度?