Apache Airflow:在单个 DAG 运行中运行所有并行任务

Posted

技术标签:

【中文标题】Apache Airflow:在单个 DAG 运行中运行所有并行任务【英文标题】:Apache Airflow: run all parallel tasks in single DAG run 【发布时间】:2019-08-12 05:17:18 【问题描述】:

我有一个有 30 个(或更多)动态创建的并行任务的 DAG。

我在该 DAG 上设置了 concurrency 选项,因此在追赶历史记录时,我只有一个 DAG Run 运行。 当我在我的服务器上运行它时,实际上只有 16 个任务并行运行,而其余 14 个任务只是等待排队。

我应该更改哪个设置,以便我只运行 1 个 DAG Run,但所有 30 多个任务并行运行?

根据this FAQ,它似乎是dag_concurrencymax_active_runs_per_dag 之一,但前者似乎已经被concurrency 设置过度驱动,而后者似乎没有效果(或者我有效搞砸了我的设置)。 这是示例代码:

import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = 
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,
    'retries': 0,



def print_operators(ds, **kwargs):
    logging.info(f"Task kwargs.get('task_instance_key_str', 'unknown_task_instance')")


dag = DAG(
    dag_id='test_parallelism_dag',
    start_date=dt.datetime(2019, 1, 1),
    default_args=default_args,
    schedule_interval='@daily',
    catchup=True,
    template_searchpath=[config.DAGS_PATH],
    params='schema': config.SCHEMA_DB,
    max_active_runs=1,
)

print_operators = [PythonOperator(
    task_id=f'test_parallelism_dag.print_operator_i',
    python_callable=print_operators,
    provide_context=True,
    dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
    task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
    task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end

编辑 1: 我当前的airflow.cfg 包含:

executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26

我的环境变量如下(将它们全部设置为不同的,以便轻松发现哪个有帮助):

AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22

我有以下甘特图:

哪种方式提示我设置 DAG_CONCURRENCY 环境变量有效。

【问题讨论】:

那些并发任务是 SubDagOperator 类型的吗? @RyanTheCoder 不,它们只是简单的任务,PythonOperator 的。 嗨,是否可以将变量传递给 for 条件?我的意思是可以传递 xcom 或 variable.get 而不是 range(60) 吗?我问这个是因为我希望我的限制来自以前的任务。 【参考方案1】:

要更改的实际参数是airflow.cfg 中的dag_concurrency,或者用AIRFLOW__CORE__DAG_CONCURRENCY 环境变量覆盖它。

作为per docs I referred to in my question:

concurrency:Airflow 调度程序将运行不超过$concurrency 在任何给定时间您的 DAG 的任务实例。并发定义 在您的气流 DAG 中。如果你没有在你的 DAG 上设置并发, 调度程序将使用来自dag_concurrency 的默认值 进入你的airflow.cfg。

这意味着遵循简化的代码:

default_args = 
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,



dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
)

应该改写为:

default_args = 
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,



dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
    concurrency=30
)

我的代码实际上有错误的假设,即 default_args 在某些时候将实际的 kwargs 替换为 DAG 构造函数。我不知道当时是什么导致我得出这个结论,但我想将concurrency 设置为1 有一些草案剩余,它实际上从未影响任何东西,实际的 DAG 并发是从配置默认值设置的,即 16。

【讨论】:

【参考方案2】:

在您的airflow.cfg 文件中更新concurrency 配置。如果是 16,则增加到 32。

如果您使用的是 Celery Executor,请将 worker_concurrency 更改为 32。

【讨论】:

我更新了我的问题。我使用 SequentialExecutor 并且并行度的默认值为 32,因此该参数超出了等式。 奇怪的是,您能够与SequentialExecutor 并行运行任务。我们设计SequentialExecutor 来串行运行任务。如果您想并行运行任务,请使用LocalExecutor 没错。我们有默认配置值,它被我忽略的 env 设置覆盖。感谢您指出这一点。确实是LocalExecutor

以上是关于Apache Airflow:在单个 DAG 运行中运行所有并行任务的主要内容,如果未能解决你的问题,请参考以下文章

对于 Apache Airflow,如何通过 CLI 手动触发 DAG 时传递参数?

Apache Airflow - 在 AWS MWAA 上解析 SQL 查询很慢

Apache Atlas 和 Airflow 集成

在 DAG 运行期间动态生成 DAG - Airflow

Google Cloud Composer (Apache Airflow) 无法访问日志文件

Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date