如何防止气流回填 dag 运行?

Posted

技术标签:

【中文标题】如何防止气流回填 dag 运行?【英文标题】:How to prevent airflow from backfilling dag runs? 【发布时间】:2016-12-09 15:55:55 【问题描述】:

假设您有一个气流 DAG,回填没有意义,这意味着,在它运行一次之后,再快速运行它之后的时间将完全没有意义。

例如,如果您从某个仅每小时更新一次的来源将数据加载到数据库中,那么快速连续发生的回填只会一次又一次地导入相同的数据。

当您实例化一个新的每小时任务时,这尤其令人讨厌,并且在它开始按您指定的时间间隔运行之前,它错过了每小时运行 N 次,做多余的工作。

我能想到的唯一解决方案是他们在FAQ of the docs 中特别建议反对的东西

我们建议不要将动态值用作 start_date,尤其是 datetime.now(),因为它可能会造成混淆。

有什么方法可以禁用 DAG 的回填,或者我应该执行上述操作吗?

【问题讨论】:

【参考方案1】:

升级到气流版本 1.8 并在气流.cfg 中使用 catchup_by_default=False 或将 catchup=False 应用于每个 dag。

https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default

【讨论】:

我已设置 catchup_by_default=False,但 Airflow 仍会回填作业。知道为什么吗?我正在运行 1.8 版 @Nick 我实际上也无法让默认设置正常工作,所以我最终只是将catchup=False 放在我所有的 DAG 上,例如 DAG('example', default_args=default_args, schedule_interval='0 5 * * *', catchup=False) @Nick 默认 args 对象由应用于在 DAG 下运行的 任务 而不是 DAG 本身的参数组成。我最初也对此感到困惑。 我正在使用 Airflow v1.10.0 并且仍然看到这个问题 此处相同,在 Airflow 1.10.1 上。我在所有 dag 上设置 catchup=False,但我仍然得到回填。【参考方案2】:

在你的 dag 声明中设置 catchup=False 将提供这个确切的功能。

我没有“声誉”可评论,但我想说 catchup=False 是(由我)为此目的而设计的。此外,我可以验证在 1.10.1 中它在实例化中明确设置时是否正常工作。但是,当放置在默认参数中时,我看不到它工作。不过,我已经离开 Airflow 18 个月了,所以我还需要一段时间才能了解为什么默认 args 不能用于追赶。

dag = DAG('example_dag',
        max_active_runs=3,
        catchup=False,
        schedule_interval=timedelta(minutes=5),
        default_args=default_args)

【讨论】:

我正在运行airflow 1.10.14,但这不起作用,至少在使用 DebugExecutor 时不起作用 正在运行气流 1.10.12,但仍然无法正常工作。 刚刚看到默认情况下catchup_by_defaultString 设置为True 而不是Boolean。不知道这是否是一个问题! airflow.apache.org/docs/apache-airflow/1.10.12/… 我们可以将此默认设置为 False,因为很多人不需要/有问题将其关闭。【参考方案3】:

这似乎是一个未解决的气流问题。我知道我真的很想拥有完全相同的功能。这是我所得到的;它可能对其他人有用。

有一些 UI 功能(至少在 1.7.1.3 中)可以帮助解决这个问题。如果您转到树视图并单击特定任务(方框),则会出现一个对话框按钮,其中包含一个“标记成功”按钮。单击“过去”,然后单击“标记成功”会将 DAG 中该任务的所有实例标记为成功,它们将不会运行。*** DAG(顶部的圆圈)也可以以类似的方式标记为成功,但似乎没有办法标记多个 DAG 实例。

我还没有深入研究它,但是可以使用“trigger_dag”子命令来标记 DAG 的状态。看这里:https://github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

用于标记 DAG 的 CLI 功能正在开发中: http://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%3CJIRA.12973462.1464369259000.37918.1465189859133@Atlassian.JIRA%3E https://github.com/apache/incubator-airflow/pull/1590

更新(2016 年 9 月 28 日): 添加了一个新的运算符“LatestOnlyOperator”(https://github.com/apache/incubator-airflow/pull/1752),它将只运行最新版本的下游任务。听起来很有用,希望它会很快发布到版本中

更新 2:从气流 1.8 开始,LatestOnlyOperator 已发布。

【讨论】:

更新看起来很有希望!感谢您关注这个问题。 请注意,LatestOnlyOperator 将下游任务设置为“已跳过”状态。根据文档,跳过的状态会传播,因此所有直接上游的任务也会被跳过。当您希望(ew)希望上游作业使用过时的数据成功运行时,这使得该方法不适合。在这种情况下,最好的解决方案是在您的代码中添加一个 early 运算符,如果任务运行得特别晚,则该运算符会成功。 cli 的回填命令看起来现在可用,并且可能是目前执行此操作的最佳方法。 airflow.incubator.apache.org/cli.html 气流回填 -h [此处的主机名] -m=True -s [开始日期] -e $(date +"%Y-%m-%dT:%H:%M:%S") 我已经尝试过回填标记成功脚本技巧,它实际上并不能停止所有正在运行的任务/防止回填(至少在 1.8 中)。希望它将在未来的版本中工作。通过 UI 手动完成是可行的,但只有在处理少量回填任务时才真正可行。

以上是关于如何防止气流回填 dag 运行?的主要内容,如果未能解决你的问题,请参考以下文章

气流回填澄清

气流回填澄清

气流回填不起作用

是否可以同时进行气流回填和调度?

每月日期和时间的气流 DAG 调度

我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群