如何防止气流回填dag运行?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何防止气流回填dag运行?相关的知识,希望对你有一定的参考价值。

假设你有一个对回填没有意义的气流DAG,这意味着,在它运行一次后,快速运行它将是完全没有意义的。

例如,如果您从一些仅每小时更新一次的数据库加载数据到数据库中,那么快速连续发生的回填只会一次又一次地导入相同的数据。

当您实例化一个新的每小时任务时,这尤其令人讨厌,并且它在它错过的每个小时运行N次数,执行冗余工作,然后在您指定的时间间隔内开始运行。

我能想到的唯一解决方案是他们在FAQ of the docs特别建议的

我们建议不要使用动态值作为start_date,尤其是datetime.now(),因为它可能非常混乱。

有没有办法禁用DAG的回填,或者我应该怎么做?

答案

升级到airflow版本1.8并在airflow.cfg中使用catchup_by_default = False或对每个dag应用catchup = False。

https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default

另一答案

这似乎是一个未解决的Airflow问题。我知道我真的希望拥有完全相同的功能。就我而言,这就是我的意思;它可能对其他人有用。

UI功能(至少在1.7.1.3中)可以帮助解决这个问题。如果您转到树视图并单击特定任务(方框),则会出现一个对话框按钮,其中包含“标记成功”按钮。单击“过去”,然后单击“标记成功”将在DAG中将该任务的所有实例标记为成功,并且不会运行它们。顶级DAG(顶部的圆圈)也可以以类似的方式标记为成功,但似乎没有标记多个DAG实例的方法。

我还没有深入研究它,但可以使用'trigger_dag'子命令来标记DAG的状态。看到这里:https://github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

用于标记DAG的CLI功能正在开发中:http://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%3CJIRA.12973462.1464369259000.37918.1465189859133@Atlassian.JIRA%3E https://github.com/apache/incubator-airflow/pull/1590

更新(2016年9月28日):添加了一个新的运营商'LatestOnlyOperator'(https://github.com/apache/incubator-airflow/pull/1752),它只运行最新版本的下游任务。听起来非常有用,希望它能很快进入发布

更新2:从气流1.8开始,LatestOnlyOperator已被释放。

另一答案

在dag声明中设置catchup = False将提供这个确切的功能。

我没有评论的“声望”,但我想说,为了这个目的,我设计了catchup = False(由我设计)。另外,我可以在1.10.1中验证它在实例化中明确设置时是否正常工作。但是当我置于默认args中时,我看不到它的工作原理。我已经离开Airflow 18个月了,所以在我看看为什么默认的args不适用于追赶之前会有一点点。

dag = DAG('example_dag',
        max_active_runs=3,
        catchup=False,
        schedule_interval=timedelta(minutes=5),
        default_args=default_args)

以上是关于如何防止气流回填dag运行?的主要内容,如果未能解决你的问题,请参考以下文章

气流回填澄清

气流回填澄清

气流回填不起作用

是否可以同时进行气流回填和调度?

如果任何任务失败,气流回填将停止

让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?