同一气流 DAG 中的任务在前一个任务提交之前启动

Posted

技术标签:

【中文标题】同一气流 DAG 中的任务在前一个任务提交之前启动【英文标题】:Task in same airflow DAG starts before previous task is committed 【发布时间】:2020-02-04 11:28:04 【问题描述】:

我们有一个 DAG,它作为第一个任务将表 (A) 聚合到临时表 (B) 中。 之后有一个任务从 staging 表 (B) 读取,并写入另一个表 (C)。

但是,第二个任务在完全更新之前从聚合表 (B) 中读取,这会导致表 C 有旧数据,或者有时它是空的。 Airflow 仍将所有内容记录为成功。

Updating table B is done as (pseudo):
delete all rows;
insert into table b
select xxxx from table A;
Task Concurrency is set as 10 
pool size: 5
max_overflow: 10
Using local executor 

Redshift 似乎有一个提交队列。当提交实际上仍在队列中时,redshift 是否会告诉气流它已提交,因此在真正的提交发生之前读取下一个任务?

我们尝试将表 B 的更新包装为(伪)事务:

begin
delete all rows;
insert into table b
select xxxx from table A;
commit;

但即使这样也行不通。由于某种原因,气流管理在第一个任务未完全提交之前启动第二个任务。

更新

原来是依赖项有错误。下游任务正在等待不正确的任务完成。

为了将来的参考,永远不要 100% 确定您已经检查了所有内容。检查并重新检查整个流程。

【问题讨论】:

【参考方案1】:

您可以通过将wait_for_downstream 设置为True 来实现此目标。

来自https://airflow.apache.org/docs/stable/_api/airflow/operators/index.html

当设置为 true 时,任务 X 的实例将等待任务 任务 X 的前一个实例的下游立即完成 在它运行之前成功。

您可以在 default_dag_args 级别或任务(操作员)级别设置此参数。

default_dag_args = 
   'wait_for_downstream': True,

【讨论】:

谢谢!我认为这并不能解决这个问题,因为只有一个“任务 X”实例。相反,下游任务不应该在任务 X 完全提交之前启动。 (任务 X 正在更新表 B)。 最后发现下游的任务被设置为等待错误的任务。我会将您的答案标记为解决方案,因为它是唯一的建议,它也使我们再次重新审视依赖关系。 谢谢你,很高兴你发现了问题!

以上是关于同一气流 DAG 中的任务在前一个任务提交之前启动的主要内容,如果未能解决你的问题,请参考以下文章

气流回填澄清

气流 - Python 文件不在同一个 DAG 文件夹中

测试气流:具有 DAG 的任务和任务上下文未在 pytest 中运行

气流将长时间运行的任务标记为失败

气流:任务调度间隔数秒,两次

如何在另一个任务气流中使用查询结果(bigquery 运算符)