我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群

Posted

技术标签:

【中文标题】我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群【英文标题】:I have an existing EMR cluster in AWS. I want to run a dag from airflow to an aws existing cluster 【发布时间】:2019-10-14 15:15:16 【问题描述】:

我有一台气流机,它有 apache-airflow==1.10.5 版本。我知道如何运行一个 dag,它会自动创建一个集群并运行该步骤并终止集群。在气流 UI 中使用连接我能够实现这一点。但是要在现有的 aws emr 集群上运行 dag,我无法知道我需要在连接中传递哪些参数。

AIRFLOW UI --> Admin --> Connections --> Created Conn ID (EMR Default1), conn type Elastic Map reduce。

[2019-10-14 12:12:40,919] taskinstance.py:1051 ERROR - Parameter validation failed:
Missing required parameter in input: "Instances"
Traceback (most recent call last):
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/operators/emr_create_job_flow_operator.py", line 68, in execute
    response = emr.create_job_flow(self.job_flow_overrides)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/hooks/emr_hook.py", line 55, in create_job_flow
    response = self.get_conn().run_job_flow(**config)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 586, in _make_api_call
    api_params, operation_model, context=request_context)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 621, in _convert_to_request_dict
    api_params, operation_model)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/validate.py", line 291, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in input: "Instances"
[2019-10-14 12:12:40,920] taskinstance.py:1082 INFO - Marking task as FAILED.

【问题讨论】:

【参考方案1】:

在第一种情况下,您也可以通过扩展SparkSubmitOperator 运算符来实现,而不是使用 UI 动态创建/终止集群。启动 EMR 集群后,您可以将 EMR master 中的 *.xml(例如 core-site.xml)文件复制到气流节点上的某个位置,然后在 spark-submit 任务中指向这些文件在气流中。至少我们那天在我们的产品中这样做了。为了在逻辑上扩展它,如果您计划重用现有集群,您只需要知道这些 *.xml 文件已经存储在哪里。然后,其余的将是相同的。触发任务时只需要引用这些文件即可。

更多详情

我不知道任何这样的文档,所以我只能建议您探索以下内容,这是基于我收集到的知识:

    我们需要为 spark-submit 编写一个自定义插件。作为这个自定义插件模块的一部分,让我们定义一个CustomSparkSubmitOperator 类。它需要扩展BaseOperator。你可以找到很多关于在气流中编写自定义插件的文章。 This 可能是一个不错的起点。 Here,您可以查看BaseOperator的更多详情。

    BaseOperator 中,您会找到一个名为pre_execute 的方法。在此方法中执行以下操作是一个可行的选择:

    一个。等到您的集群启动。如果您传递 cluster-id,您可以使用 boto3 轻松完成此操作。

    b.集群启动后,获取 EMR 主节点的 ip,并将匹配 /etc/hadoop/conf/*-site.xml 的内容复制到您的气流节点。这可以通过python中的子进程调用来实现。

    获得 xml 文件后,在 execute 方法中,只需使用 SparkSubmitHook 提交您的 spark-job。您需要确保气流节点上的 spark 二进制文件使用此路径进行 spark-submit。

    如果需要,您可以在 post_execute 方法中清理集群。

【讨论】:

感谢您的回复。您有关于在现有集群上触发任务时如何引用 *.xml 的文档吗?

以上是关于我在 AWS 中有一个现有的 EMR 集群。我想从气流运行 dag 到现有的 aws 集群的主要内容,如果未能解决你的问题,请参考以下文章

将文件从 AWS EMR 集群中的映射器上传到 S3

打开/关闭 AWS EMR 集群

结合 AWS EMR 输出

AWS EMR 上的持续集成

aws emr 上的 spark 集群找不到 spark-env.sh

如何在现有的 Apache Spark 独立集群上安装 Apache Zeppelin