气流回填澄清

Posted

技术标签:

【中文标题】气流回填澄清【英文标题】:Airflow backfill clarification 【发布时间】:2017-02-14 09:14:08 【问题描述】:

我刚刚开始使用 Airbnb 的 airflow,我仍然不清楚如何/何时完成回填。

具体来说,有两个用例让我感到困惑:

    如果我运行 airflow scheduler 几分钟,停止它一分钟,然后再次重新启动它,我的 DAG 似乎在前 30 秒左右运行了额外的任务,然后它继续正常运行(运行每 10 秒)。这些额外的任务是否“回填”了在早期运行中无法完成的任务?如果是这样,我将如何告诉气流不要回填这些任务?

    如果我运行airflow scheduler 几分钟,然后运行airflow clear MY_tutorial,然后重新启动airflow scheduler,它似乎运行了大量的额外任务。这些任务是否也以某种方式“回填”任务?还是我错过了什么。

目前,我有一个非常简单的 dag:

default_args = 
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2016, 10, 4),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),


dag = DAG(
    'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    % for i in range(5) %
        echo " ds "
        echo " macros.ds_add(ds, 8)"
        echo " params.my_param "
    % endfor %
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params='my_param': 'Parameter I passed in',
    dag=dag)

second_template = """
    touch ~/airflow/logs/test
    echo $(date) >> ~/airflow/logs/test
"""

t4 = BashOperator(
    task_id='write_test',
    bash_command=second_template,
    dag=dag)

t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)

我在气流配置中更改的唯一两件事是

    我从使用 sqlite db 更改为使用 postgres db 我使用的是CeleryExecutor 而不是SequentialExecutor

非常感谢您的帮助!

【问题讨论】:

我认为您遇到了 Airflow 回填旧 DAG 的倾向。它将尝试填写自 start_date 以来的所有 DAG。签出:***.com/questions/38751872/… 您应该使用参数“catchup=False”,例如 dag = DAG('MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10), catchup=False) 【参考方案1】:

当您将 DAG 的调度程序切换设置为“开启”时,调度程序将触发所有未记录状态的 dag 运行实例的回填,从您在“default_args”中指定的 start_date 开始。

例如:如果开始日期是“2017-01-21”,并且您在“2017-01-22T00:00:00”打开了调度开关,并且您的 dag 配置为每小时运行一次,那么调度程序将回填 24 天运行,然后按计划的时间间隔开始运行。

这本质上是您的两个问题中正在发生的事情。在 #1 中,它正在填充您关闭调度程序的 30 秒内缺少的 3 次运行。在 #2 中,它正在填充从 start_date 到“现在”的所有 DAG 运行。

有两种解决方法:

    将 start_date 设置为将来的某个日期,以便它仅在到达该日期后才开始调度 dag 运行。请注意,如果您更改 DAG 的 start_date,则由于开始日期存储在气流数据库中的方式,您还必须更改 DAG 的名称。 使用“-m”(--mark-success)标志手动运行回填from the command line,该标志告诉气流不要实际运行 DAG,而只是在 DB 中将其标记为成功。

例如

airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30

【讨论】:

不。当我们设置 'dag.catchup'=True 时,调度程序将触发从开始日期到当前日期的回填,以进行 DAG 中不存在或未执行的运行。开启和关闭只是暂停作业,让调度程序根据调度间隔执行。【参考方案2】:

请注意,从 1.8 版开始,Airflow 允许您使用追赶来控制此行为。在airflow.cfg 中设置catchup_by_default=Falsecatchup=False 在你的 DAG 定义中。

见https://airflow.apache.org/scheduler.html#backfill-and-catchup

【讨论】:

【参考方案3】:

Airflow 的 UI 上的 On/Off 仅显示“暂停”,这意味着,如果它打开,它只会在触发时暂停,如果关闭,它将在该日期再次继续。

【讨论】:

以上是关于气流回填澄清的主要内容,如果未能解决你的问题,请参考以下文章

如何防止气流回填 dag 运行?

如果任何任务失败,气流回填将停止

气流回填不起作用

是否可以同时进行气流回填和调度?

如何防止气流回填dag运行?

气流添加谷歌云连接