如何限制 Airflow 一次只运行一个 DAG 实例?

Posted

技术标签:

【中文标题】如何限制 Airflow 一次只运行一个 DAG 实例?【英文标题】:How to limit Airflow to run only one instance of a DAG run at a time? 【发布时间】:2018-08-20 05:50:42 【问题描述】:

我希望 DAG 中的任务在下一次运行的第一个任务执行之前全部完成。

我的 max_active_runs = 1,但this 仍然会发生。

default_args = 
    'depends_on_past': True,
    'wait_for_downstream': True,
    'max_active_runs': 1,
    'start_date': datetime(2018, 03, 04),
    'owner': 't.n',
    'email': ['t.n@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=4)


dag = DAG('example', default_args=default_args, schedule_interval = schedule_interval)

(我所有的任务都依赖于之前的任务。Airflow版本是1.8.0)

谢谢

【问题讨论】:

【参考方案1】:

我更改为将max_active_runs 作为DAG() 的参数而不是default_arguments,它起作用了。

感谢 SimonD 给了我这个想法,虽然在你的回答中没有直接指出它。

【讨论】:

这工作在哪个版本?我正在使用 1.10.14。它仍在同时运行所有 dag。 这是 1.8.0 的答案【参考方案2】:

您已将'max_active_runs': 1 放入default_args 参数中,但未放入正确的位置。

max_active_runs 是 DAG 的构造函数参数,不应放入 default_args 字典中。

这是一个示例 DAG,它显示了您需要将其移动到的位置:

dag_args =  
    'owner': 'Owner',
    # 'max_active_runs': 1, # <--- Here is where you had it.
    'depends_on_past': False,
    'start_date': datetime(2018, 01, 1, 12, 00),
    'email_on_failure': False


sched = timedelta(hours=1)
dag = DAG(
          job_id, 
          default_args=dag_args, 
          schedule_interval=sched, 
          max_active_runs=1 # <---- Here is where it is supposed to be
      ) 

如果您的 dag 正在运行的任务实际上是子 dag,那么您可能也需要将 max_active_runs 传递给子 dag,但不能 100% 确定这一点。

【讨论】:

我将 max_active_runs 作为默认参数传递。想知道是不是问题。将尝试在 DAG 函数中传递它,看看它是否有效。 如果你想为你所有的 DAG 设置这个,你可以使用 set max_active_runs_per_dag【参考方案3】:

您可以使用 xcoms 来执行此操作。首先将 2 个 python 运算符作为 DAG 的“开始”和“结束”。设置流为:

开始 ---> 所有任务 ----> 结束

'end' 总是压入一个变量

last_success = context['execution_date'] 到 xcom (xcom_push)。 (要求 PythonOperators 中的 provide_context = True)。

'start' 将始终检查 xcom (xcom_pull) 以查看是否存在 last_success 变量,其值等于前一个 DagRun 的 execution_date 或 DAG 的 start_date(让进程启动)。

已关注this answer

【讨论】:

似乎是一个很有前途的解决方法。会试试这个。【参考方案4】:

实际上你应该使用 DAG_CONCURRENCY=1 作为环境变量。为我工作。

【讨论】:

DAG_CONCURRENCY 指定在一个 DAG 中同时运行多少个任务实例,而不是根据我的理解运行多少个 DAG 实例。

以上是关于如何限制 Airflow 一次只运行一个 DAG 实例?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Airflow 中设置 DAG 之间的依赖关系?

如何控制 Airflow 安装的并行性或并发性?

如何在 mfc 中限制一次只运行一个应用程序实例

如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?

Airflow:如何指定资源池的定量使用?

Airflow DAG - 如何首先检查BQ(必要时删除)然后运行数据流作业?