气流 DAG 步骤依赖项
Posted
技术标签:
【中文标题】气流 DAG 步骤依赖项【英文标题】:Airflow DAG Steps Dependencies 【发布时间】:2021-09-28 11:46:11 【问题描述】:我的 Airflow DAG 如下所示:
with DAG(
dag_id='dag',
default_args=
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
,
dagrun_timeout=timedelta(hours=2),
start_date=datetime(2021, 9, 28, 11),
schedule_interval='10 * * * *'
) as dag:
create_job_flow = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
job_step = EmrAddStepsOperator(
task_id='job_step',
job_flow_id=create_job_flow.output,
aws_conn_id='aws_default',
steps=JOB_SETP,
)
job_step_sensor = EmrStepSensor(
task_id='job_step_sensor',
job_flow_id=create_job_flow.output,
step_id=" task_instance.xcom_pull(task_ids='job_step', key='return_value')[0] ",
aws_conn_id='aws_default',
)
read_file = PythonOperator(
task_id="read_file",
python_callable=get_date_information
)
alter_partitions = PythonOperator(
task_id="alter_partitions",
python_callable=update_partitions
)
remove_cluster = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=create_job_flow.output,
aws_conn_id='aws_default',
)
create_job_flow.set_downstream(job_step)
job_step.set_downstream(job_step_sensor)
job_step_sensor.set_downstream(read_file)
read_file.set_downstream(alter_partitions)
alter_partitions.set_downstream(remove_cluster)
所以这基本上是创建一个 EMR 集群,在其中启动一个步骤并感知该步骤。然后执行一些 Python 函数,最后终止集群。 Airflow UI 中的 DAG 视图如下:
这里,create_job_flow 也指向 remove_cluster(可能是因为 job_flow_id 引用了 create_job_flow),而我只将 alter_partitions 的下游设置为 remove_cluster。是否会发生这种情况,在到达 job_step 之前,它会删除集群,因为在这种情况下,集群在执行 split_job_step 之前已经被删除,因此这就是问题所在。有什么方法可以删除 create_job_flow 和 remove_cluster 之间的链接?还是会等待完成 alter_partitions 然后执行 remove_cluster?
【问题讨论】:
【参考方案1】:“remove_cluster”任务将等到“alter_partitions”任务完成。 “create_job_flow”和“remove_cluster”之间(以及“create_job_flow”和“job_step_sensor”之间)的额外边缘是TaskFlow API和XComArg
概念的一个特性,即使用运算符的.output
属性。 (查看this documentation 以获取另一个示例。)
在“remove_cluster”和“job_step_sensor”任务中,job_flow_id=create_job_flow.output
是一个输入参数。在幕后,当操作员的.output
在模板化字段中用作另一个任务的输入时,会自动创建依赖关系。此功能可确保使用其他任务的 XComs
的任务之间以前隐含的任务依赖关系现在是显式的。
此管道将按写入和所需顺序执行(假设trigger_rule
是“all_success”,这是默认值)。在“create_job_flow”和“alter_partitions”任务都完成(实际上是串行执行)之前,“remove_cluster”任务不会执行。
【讨论】:
以上是关于气流 DAG 步骤依赖项的主要内容,如果未能解决你的问题,请参考以下文章
安装气流时出错:默认情况下,Airflow 的依赖项之一安装 GPL
气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators
气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作