我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群
Posted
技术标签:
【中文标题】我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群【英文标题】:I have an existing EMR cluster in AWS. I want to run a dag from airflow to an aws existing cluster 【发布时间】:2019-10-14 15:15:16 【问题描述】:我有一台气流机,它有 apache-airflow==1.10.5 版本。我知道如何运行一个 dag,它会自动创建一个集群并运行该步骤并终止集群。在气流 UI 中使用连接我能够实现这一点。但是要在现有的 aws emr 集群上运行 dag,我无法知道我需要在连接中传递哪些参数。
AIRFLOW UI --> Admin --> Connections --> Created Conn ID (EMR Default1), conn type Elastic Map reduce。
[2019-10-14 12:12:40,919] taskinstance.py:1051 ERROR - Parameter validation failed:
Missing required parameter in input: "Instances"
Traceback (most recent call last):
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
result = task_copy.execute(context=context)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/operators/emr_create_job_flow_operator.py", line 68, in execute
response = emr.create_job_flow(self.job_flow_overrides)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/hooks/emr_hook.py", line 55, in create_job_flow
response = self.get_conn().run_job_flow(**config)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 586, in _make_api_call
api_params, operation_model, context=request_context)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 621, in _convert_to_request_dict
api_params, operation_model)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/validate.py", line 291, in serialize_to_request
raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in input: "Instances"
[2019-10-14 12:12:40,920] taskinstance.py:1082 INFO - Marking task as FAILED.
【问题讨论】:
【参考方案1】:在第一种情况下,您也可以通过扩展SparkSubmitOperator
运算符来实现,而不是使用 UI 动态创建/终止集群。启动 EMR 集群后,您可以将 EMR master 中的 *.xml(例如 core-site.xml)文件复制到气流节点上的某个位置,然后在 spark-submit 任务中指向这些文件在气流中。至少我们那天在我们的产品中这样做了。为了在逻辑上扩展它,如果您计划重用现有集群,您只需要知道这些 *.xml 文件已经存储在哪里。然后,其余的将是相同的。触发任务时只需要引用这些文件即可。
更多详情
我不知道任何这样的文档,所以我只能建议您探索以下内容,这是基于我收集到的知识:
我们需要为 spark-submit 编写一个自定义插件。作为这个自定义插件模块的一部分,让我们定义一个CustomSparkSubmitOperator
类。它需要扩展BaseOperator
。你可以找到很多关于在气流中编写自定义插件的文章。 This 可能是一个不错的起点。 Here,您可以查看BaseOperator
的更多详情。
在BaseOperator
中,您会找到一个名为pre_execute
的方法。在此方法中执行以下操作是一个可行的选择:
一个。等到您的集群启动。如果您传递 cluster-id,您可以使用 boto3 轻松完成此操作。
b.集群启动后,获取 EMR 主节点的 ip,并将匹配 /etc/hadoop/conf/*-site.xml
的内容复制到您的气流节点。这可以通过python中的子进程调用来实现。
获得 xml 文件后,在 execute
方法中,只需使用 SparkSubmitHook
提交您的 spark-job。您需要确保气流节点上的 spark 二进制文件使用此路径进行 spark-submit。
如果需要,您可以在 post_execute
方法中清理集群。
【讨论】:
感谢您的回复。您有关于在现有集群上触发任务时如何引用 *.xml 的文档吗?以上是关于我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群的主要内容,如果未能解决你的问题,请参考以下文章