同一气流 DAG 中的任务在前一个任务提交之前启动
Posted
技术标签:
【中文标题】同一气流 DAG 中的任务在前一个任务提交之前启动【英文标题】:Task in same airflow DAG starts before previous task is committed 【发布时间】:2020-02-04 11:28:04 【问题描述】:我们有一个 DAG,它作为第一个任务将表 (A) 聚合到临时表 (B) 中。 之后有一个任务从 staging 表 (B) 读取,并写入另一个表 (C)。
但是,第二个任务在完全更新之前从聚合表 (B) 中读取,这会导致表 C 有旧数据,或者有时它是空的。 Airflow 仍将所有内容记录为成功。
Updating table B is done as (pseudo):
delete all rows;
insert into table b
select xxxx from table A;
Task Concurrency is set as 10
pool size: 5
max_overflow: 10
Using local executor
Redshift 似乎有一个提交队列。当提交实际上仍在队列中时,redshift 是否会告诉气流它已提交,因此在真正的提交发生之前读取下一个任务?
我们尝试将表 B 的更新包装为(伪)事务:
begin
delete all rows;
insert into table b
select xxxx from table A;
commit;
但即使这样也行不通。由于某种原因,气流管理在第一个任务未完全提交之前启动第二个任务。
更新
原来是依赖项有错误。下游任务正在等待不正确的任务完成。
为了将来的参考,永远不要 100% 确定您已经检查了所有内容。检查并重新检查整个流程。
【问题讨论】:
【参考方案1】:您可以通过将wait_for_downstream
设置为True
来实现此目标。
来自https://airflow.apache.org/docs/stable/_api/airflow/operators/index.html:
当设置为 true 时,任务 X 的实例将等待任务 任务 X 的前一个实例的下游立即完成 在它运行之前成功。
您可以在 default_dag_args 级别或任务(操作员)级别设置此参数。
default_dag_args =
'wait_for_downstream': True,
【讨论】:
谢谢!我认为这并不能解决这个问题,因为只有一个“任务 X”实例。相反,下游任务不应该在任务 X 完全提交之前启动。 (任务 X 正在更新表 B)。 最后发现下游的任务被设置为等待错误的任务。我会将您的答案标记为解决方案,因为它是唯一的建议,它也使我们再次重新审视依赖关系。 谢谢你,很高兴你发现了问题!以上是关于同一气流 DAG 中的任务在前一个任务提交之前启动的主要内容,如果未能解决你的问题,请参考以下文章