气流 DAG 步骤依赖项

Posted

技术标签:

【中文标题】气流 DAG 步骤依赖项【英文标题】:Airflow DAG Steps Dependencies 【发布时间】:2021-09-28 11:46:11 【问题描述】:

我的 Airflow DAG 如下所示:

with DAG(
    dag_id='dag',
    default_args=
        'owner': 'airflow',
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
    ,
    dagrun_timeout=timedelta(hours=2),
    start_date=datetime(2021, 9, 28, 11),
    schedule_interval='10 * * * *'
) as dag:

    create_job_flow = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

    job_step = EmrAddStepsOperator(
        task_id='job_step',
        job_flow_id=create_job_flow.output,
        aws_conn_id='aws_default',
        steps=JOB_SETP,
    )

    job_step_sensor = EmrStepSensor(
        task_id='job_step_sensor',
        job_flow_id=create_job_flow.output,
        step_id=" task_instance.xcom_pull(task_ids='job_step', key='return_value')[0] ",
        aws_conn_id='aws_default',
    )

    read_file = PythonOperator(
        task_id="read_file",
        python_callable=get_date_information
    )

    alter_partitions = PythonOperator(
        task_id="alter_partitions",
        python_callable=update_partitions
    )

    remove_cluster = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id=create_job_flow.output,
        aws_conn_id='aws_default',
    )


    create_job_flow.set_downstream(job_step)
    job_step.set_downstream(job_step_sensor)
    job_step_sensor.set_downstream(read_file)
    read_file.set_downstream(alter_partitions)
    alter_partitions.set_downstream(remove_cluster)

所以这基本上是创建一个 EMR 集群,在其中启动一个步骤并感知该步骤。然后执行一些 Python 函数,最后终止集群。 Airflow UI 中的 DAG 视图如下:

这里,create_job_flow 也指向 remove_cluster(可能是因为 job_flow_id 引用了 create_job_flow),而我只将 alter_partitions 的下游设置为 remove_cluster。是否会发生这种情况,在到达 job_step 之前,它会删除集群,因为在这种情况下,集群在执行 split_job_step 之前已经被删除,因此这就是问题所在。有什么方法可以删除 create_job_flow 和 remove_cluster 之间的链接?还是会等待完成 alter_partitions 然后执行 remove_cluster?

【问题讨论】:

【参考方案1】:

“remove_cluster”任务将等到“alter_partitions”任务完成。 “create_job_flow”和“remove_cluster”之间(以及“create_job_flow”和“job_step_sensor”之间)的额外边缘是TaskFlow API和XComArg概念的一个特性,即使用运算符的.output属性。 (查看this documentation 以获取另一个示例。)

在“remove_cluster”和“job_step_sensor”任务中,job_flow_id=create_job_flow.output 是一个输入参数。在幕后,当操作员的.output 在模板化字段中用作另一个任务的输入时,会自动创建依赖关系。此功能可确保使用其他任务的 XComs 的任务之间以前隐含的任务依赖关系现在是显式的。

此管道将按写入和所需顺序执行(假设trigger_rule 是“all_success”,这是默认值)。在“create_job_flow”和“alter_partitions”任务都完成(实际上是串行执行)之前,“remove_cluster”任务不会执行。

【讨论】:

以上是关于气流 DAG 步骤依赖项的主要内容,如果未能解决你的问题,请参考以下文章

安装气流时出错:默认情况下,Airflow 的依赖项之一安装 GPL

气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators

气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作

我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群

气流 - Pytest - 未找到夹具“dag”

在气流上部署 dag 文件的有效方法