Apache Manged Airflow EMR 操作员 DAG 失败

Posted

技术标签:

【中文标题】Apache Manged Airflow EMR 操作员 DAG 失败【英文标题】:Apache Manged Airflow EMR operator DAG is failing 【发布时间】:2021-08-25 07:14:55 【问题描述】:

我正在 apache 管理的气流工作流环境中创建 DAG。 DAG 基本上使用 EMR 算子,它创建 EMR 集群,运行 DAG 中定义的 spark 作业并删除 EMR 集群。我已从气流网站获取代码。这是代码的链接-https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.html

错误信息:[正在创建 EMR 集群,但显示如下,错误是否与气流有关]

2021-08-25 05:00:04,520] logging_mixin.py:104 INFO - [2021-08-25 05:00:04,520] local_task_job.py:188 WARNING - State of this instance has been externally set to skipped. Terminating instance.
[2021-08-25 05:00:04,600] process_utils.py:100 INFO - Sending Signals.SIGTERM to GPID 1897
[2021-08-25 05:00:04,621] taskinstance.py:1265 ERROR - Received SIGTERM. Terminating subprocesses.
[2021-08-25 05:00:04,702] taskinstance.py:1482 ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1138, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/sensors/base.py", line 243, in execute
    time.sleep(self._get_next_poke_interval(started_at, run_duration, try_number))
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1267, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-08-25 05:00:04,725] taskinstance.py:1532 INFO - Marking task as FAILED. dag_id=emr_job_flow_manual_steps_dag, task_id=watch_step, execution_date=20210824T030000, start_date=20210825T030008, end_date=20210825T050004
[2021-08-25 05:00:04,793] process_utils.py:66 INFO - Process psutil.Process(pid=1897, status='terminated', exitcode=1, started='03:00:07') (1897) terminated with exit code 1

我的气流环境类:mw1.small

【问题讨论】:

【参考方案1】:

看起来您的 DAG 只是在 2 小时后超时:

start_date=20210825T030008, end_date=20210825T050004

不幸的是,MWAA 与其他 AWS 服务的集成没有很好的文档记录,但我的猜测是 MWAA 环境执行角色没有操作 EMR 集群的权限。

检查 MWAA 运行的角色并分配所需的权限,如下所述:

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr.html

为了成功运行这 2 个示例,您需要为 Amazon EMR 创建 IAM 服务角色(EMR_EC2_DefaultRole 和 EMR_DefaultRole)。

您可以使用 AWS CLI 创建这些角色:aws emr create-default-roles

【讨论】:

嗨 Jacek 非常感谢您的回复。我已经拥有这两个角色了。

以上是关于Apache Manged Airflow EMR 操作员 DAG 失败的主要内容,如果未能解决你的问题,请参考以下文章

在 Airflow EMR 操作员步骤中使用 Json 输入变量

AWS EMR Airflow:Postgresql 连接器

从 Airflow(使用气流 Livy 运算符)向 Livy(在 EMR 中)提交 Spark 作业

我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群

任务调度工具 Apache Airflow 初识

Airflow dag 中的 postgres_operator 问题