气流回填澄清

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了气流回填澄清相关的知识,希望对你有一定的参考价值。

我刚刚开始使用Airbnb的airflow,而且我还不清楚如何/何时回填。

具体来说,有2个用例让我困惑:

  1. 如果我运行airflow scheduler几分钟,停止它一分钟,然后重新启动它,我的DAG似乎在前30秒左右运行额外的任务,然后它继续正常(每10秒运行一次)。这些额外的任务是“回填”的任务,在早期的运行中无法完成吗?如果是这样,我怎么告诉气流不回填这些任务?
  2. 如果我运行airflow scheduler几分钟,然后运行airflow clear MY_tutorial,然后重新启动airflow scheduler,它似乎运行TON的额外任务。这些任务是否也以某种方式“回填”任务?或者我错过了什么。

目前,我有一个非常简单的dag:

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2016, 10, 4),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 8)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

second_template = """
    touch ~/airflow/logs/test
    echo $(date) >> ~/airflow/logs/test
"""

t4 = BashOperator(
    task_id='write_test',
    bash_command=second_template,
    dag=dag)

t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)

我在airflow配置中改变的唯一两件事是

  1. 我从使用sqlite db更改为使用postgres db
  2. 我正在使用CeleryExecutor而不是SequentialExecutor

非常感谢你的帮助!

答案

当您将DAG的调度程序切换为“on”时,调度程序将触发所有未记录状态的dag运行实例的回填,从您在“default_args”中指定的start_date开始。

例如:如果开始日期为“2017-01-21”并且您打开了“2017-01-22T00:00:00”的计划切换并且您的dag配置为每小时运行一次,那么调度程序将回填24 dag运行然后按计划的间隔开始运行。

这基本上就是你们两个问题中发生的事情。在#1中,它填写了从关闭调度程序的30秒开始的3次缺失运行。在#2中,它填充了从start_date到“now”的所有DAG运行。

有两种方法:

  1. 将start_date设置为将来的日期,以便在达到该日期后才开始计划dag运行。请注意,如果更改DAG的start_date,则必须更改DAG的名称,因为开始日期存储在气流数据库中。
  2. 使用“-m”标志从命令行手动运行回填,该标志告诉气流不要实际运行DAG,而只是在DB(https://airflow.incubator.apache.org/cli.html)中将其标记为成功。 例如airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30
另一答案

请注意,从版本1.8开始,Airflow允许您使用catchup控制此行为。在DAG定义中的airflow.cfg或catchup_by_default=False中设置catchup=False

https://airflow.apache.org/scheduler.html#backfill-and-catchup

另一答案

Airflow上的On / Off仅显示“PAUSE”,这意味着,如果它处于ON状态,它只会在触发时暂停,如果关闭则再次在该日期继续。

以上是关于气流回填澄清的主要内容,如果未能解决你的问题,请参考以下文章

如何防止气流回填 dag 运行?

如果任何任务失败,气流回填将停止

气流回填不起作用

是否可以同时进行气流回填和调度?

如何防止气流回填dag运行?

气流添加谷歌云连接