如何限制 Airflow 一次只运行一个 DAG 实例?
Posted
技术标签:
【中文标题】如何限制 Airflow 一次只运行一个 DAG 实例?【英文标题】:How to limit Airflow to run only one instance of a DAG run at a time? 【发布时间】:2018-08-20 05:50:42 【问题描述】:我希望 DAG 中的任务在下一次运行的第一个任务执行之前全部完成。
我的 max_active_runs = 1,但this 仍然会发生。
default_args =
'depends_on_past': True,
'wait_for_downstream': True,
'max_active_runs': 1,
'start_date': datetime(2018, 03, 04),
'owner': 't.n',
'email': ['t.n@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=4)
dag = DAG('example', default_args=default_args, schedule_interval = schedule_interval)
(我所有的任务都依赖于之前的任务。Airflow版本是1.8.0)
谢谢
【问题讨论】:
【参考方案1】:我更改为将max_active_runs
作为DAG()
的参数而不是default_arguments,它起作用了。
感谢 SimonD 给了我这个想法,虽然在你的回答中没有直接指出它。
【讨论】:
这工作在哪个版本?我正在使用 1.10.14。它仍在同时运行所有 dag。 这是 1.8.0 的答案【参考方案2】:您已将'max_active_runs': 1
放入default_args
参数中,但未放入正确的位置。
max_active_runs
是 DAG 的构造函数参数,不应放入 default_args
字典中。
这是一个示例 DAG,它显示了您需要将其移动到的位置:
dag_args =
'owner': 'Owner',
# 'max_active_runs': 1, # <--- Here is where you had it.
'depends_on_past': False,
'start_date': datetime(2018, 01, 1, 12, 00),
'email_on_failure': False
sched = timedelta(hours=1)
dag = DAG(
job_id,
default_args=dag_args,
schedule_interval=sched,
max_active_runs=1 # <---- Here is where it is supposed to be
)
如果您的 dag 正在运行的任务实际上是子 dag,那么您可能也需要将 max_active_runs
传递给子 dag,但不能 100% 确定这一点。
【讨论】:
我将 max_active_runs 作为默认参数传递。想知道是不是问题。将尝试在 DAG 函数中传递它,看看它是否有效。 如果你想为你所有的 DAG 设置这个,你可以使用 set max_active_runs_per_dag【参考方案3】:您可以使用 xcoms 来执行此操作。首先将 2 个 python 运算符作为 DAG 的“开始”和“结束”。设置流为:
开始 ---> 所有任务 ----> 结束
'end' 总是压入一个变量
last_success = context['execution_date'] 到 xcom (xcom_push)。 (要求 PythonOperators 中的 provide_context = True)。
'start' 将始终检查 xcom (xcom_pull) 以查看是否存在 last_success 变量,其值等于前一个 DagRun 的 execution_date 或 DAG 的 start_date(让进程启动)。
已关注this answer
【讨论】:
似乎是一个很有前途的解决方法。会试试这个。【参考方案4】:实际上你应该使用 DAG_CONCURRENCY=1 作为环境变量。为我工作。
【讨论】:
DAG_CONCURRENCY 指定在一个 DAG 中同时运行多少个任务实例,而不是根据我的理解运行多少个 DAG 实例。以上是关于如何限制 Airflow 一次只运行一个 DAG 实例?的主要内容,如果未能解决你的问题,请参考以下文章