运营商之间的气流和数据传输
Posted
技术标签:
【中文标题】运营商之间的气流和数据传输【英文标题】:Airflow and data transfer between operators 【发布时间】:2017-03-13 11:06:26 【问题描述】:我是气流新手,对 Airflow 及其处理器有疑问。 当一个处理器产生一个输出时,这个输出如何在输入中移动到下一个处理器? 有一个名为 nifi 的软件将中间输出存储到流文件中,afaik 在气流中没有这样的东西。 那么这是怎么发生的呢?
提前致谢。
【问题讨论】:
【参考方案1】:Airflow 使用Xcoms 在算子之间传递数据。
如果流程是操作员 A -> 操作员 B,那么操作员 A 必须将一个值“推”到 xcom,如果操作员 B 想要读取该值,则必须从 A 中“拉取”这个值。
A 下游的任何操作员都可以通过以下方式访问 A 推送到 Xcom 的任何值:
value = context['task_instance'].xcom_pull(task_ids='operator_a', key='key_name')
操作员 A 会像这样推送这个值:
context['task_instance'].xcom_push(key_name,value,context['execution_date'])
【讨论】:
另一个(简单的?)问题:xcom 在哪里存储数据? 在气流 DB xcom 表中 是否可以将查询结果共享给其他任务,如果可以,我该怎么做?在哪里可以找到更多这样的例子,我觉得框架有点乱 @AlbertoBonsanto 考虑将此问题的更详细版本作为新问题发布。我认为答案是“是”,但需要更多信息才能提供一个好的答案。 有没有办法知道当前任务所依赖的task_ids
。例如 [a,b] >> c。如果我在做 data1 = context['task_instance'].xcom_pull(task_ids='a', key='key_name') 和 data2 = context['task_instance'].xcom_pull(task_ids='b', key='key_name ') 在c中,那么我需要了解依赖关系。有什么方法可以了解 c 中的 a,b 吗?【参考方案2】:
也许您指的是 GenericTransfer 运算符,它有助于在数据源之间移动数据?
https://github.com/apache/incubator-airflow/blob/master/airflow/operators/generic_transfer.py
【讨论】:
我认为他们在问如何将数据从一个操作员传递到另一个操作员,而不是如何通过操作员从一个位置获取数据到另一个位置。以上是关于运营商之间的气流和数据传输的主要内容,如果未能解决你的问题,请参考以下文章
气流.providers 和气流.contrib 之间的差异