如何从 Lambda 函数在亚马逊 EMR 上执行 spark 提交?

Posted

技术标签:

【中文标题】如何从 Lambda 函数在亚马逊 EMR 上执行 spark 提交?【英文标题】:How to execute spark submit on amazon EMR from Lambda function? 【发布时间】:2018-01-29 10:47:45 【问题描述】:

我想根据 S3 上的文件上传事件在 AWS EMR 集群上执行 spark 提交作业。我正在使用 AWS Lambda 函数来捕获事件,但我不知道如何通过 Lambda 函数在 EMR 集群上提交 spark 提交作业。

我搜索的大多数答案都谈到在 EMR 集群中添加一个步骤。但我不知道我是否可以在添加的步骤中添加任何步骤来触发“spark submit --with args”。

【问题讨论】:

【参考方案1】:

你可以,我上周不得不做同样的事情!

将 boto3 用于 Python(其他语言肯定会有类似的解决方案),您可以使用定义的步骤启动集群,或者将步骤附加到已经启动的集群。

使用步骤定义集群

def lambda_handler(event, context):
    conn = boto3.client("emr")        
    cluster_id = conn.run_job_flow(
        Name='ClusterName',
        ServiceRole='EMR_DefaultRole',
        JobFlowRole='EMR_EC2_DefaultRole',
        VisibleToAllUsers=True,
        LogUri='s3n://some-log-uri/elasticmapreduce/',
        ReleaseLabel='emr-5.8.0',
        Instances=
            'InstanceGroups': [
                
                    'Name': 'Master nodes',
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'MASTER',
                    'InstanceType': 'm3.xlarge',
                    'InstanceCount': 1,
                ,
                
                    'Name': 'Slave nodes',
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',
                    'InstanceType': 'm3.xlarge',
                    'InstanceCount': 2,
                
            ],
            'Ec2KeyName': 'key-name',
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False
        ,
        Applications=[
            'Name': 'Spark'
        ],
        Configurations=[
            "Classification":"spark-env",
            "Properties":,
            "Configurations":[
                "Classification":"export",
                "Properties":
                    "PYSPARK_PYTHON":"python35",
                    "PYSPARK_DRIVER_PYTHON":"python35"
                
            ]
        ],
        BootstrapActions=[
            'Name': 'Install',
            'ScriptBootstrapAction': 
                'Path': 's3://path/to/bootstrap.script'
            
        ],
        Steps=[
            'Name': 'StepName',
            'ActionOnFailure': 'TERMINATE_CLUSTER',
            'HadoopJarStep': 
                'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
                'Args': [
                    "/usr/bin/spark-submit", "--deploy-mode", "cluster",
                    's3://path/to/code.file', '-i', 'input_arg', 
                    '-o', 'output_arg'
                ]
            
        ],
    )
    return "Started cluster ".format(cluster_id)

将步骤附加到已运行的集群

根据here

def lambda_handler(event, context):
    conn = boto3.client("emr")
    # chooses the first cluster which is Running or Waiting
    # possibly can also choose by name or already have the cluster id
    clusters = conn.list_clusters()
    # choose the correct cluster
    clusters = [c["Id"] for c in clusters["Clusters"] 
                if c["Status"]["State"] in ["RUNNING", "WAITING"]]
    if not clusters:
        sys.stderr.write("No valid clusters\n")
        sys.stderr.exit()
    # take the first relevant cluster
    cluster_id = clusters[0]
    # code location on your emr master node
    CODE_DIR = "/home/hadoop/code/"

    # spark configuration example
    step_args = ["/usr/bin/spark-submit", "--spark-conf", "your-configuration",
                 CODE_DIR + "your_file.py", '--your-parameters', 'parameters']

    step = "Name": "what_you_do-" + time.strftime("%Y%m%d-%H:%M"),
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': 
                'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
                'Args': step_args
            
        
    action = conn.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step])
    return "Added step: %s"%(action)

【讨论】:

如果我需要为 mey 代码文件添加 s3 路径,这个 script-runner.jar 是什么? s3n://elasticmapreduce 存储桶由 Amazon 提供。除了引用它,你不需要做任何事情。 这个 spark-submit 动作是从 lambda 函数同步调用还是只是添加工作流而不实际调用它??【参考方案2】:

如果您想使用 spark submit 命令执行 Spark jar,则使用 AWS Lambda 函数 python 代码:

from botocore.vendored import requests

import json

def lambda_handler(event, context):

headers =  "content-type": "application/json" 

  url = 'http://ip-address.ec2.internal:8998/batches'

  payload = 

    'file' : 's3://Bucket/Orchestration/RedshiftJDBC41.jar 
s3://Bucket/Orchestration/mysql-connector-java-8.0.12.jar 

s3://Bucket/Orchestration/SparkCode.jar',

    'className' : 'Main Class Name',

    'args' : [event.get('rootPath')]

  

  res = requests.post(url, data = json.dumps(payload), headers = headers, verify = False)

  json_data = json.loads(res.text)

  return json_data.get('id')

【讨论】:

能不能把第一句的英文改一下,格式化一下代码? 这使用 livy 提交 Spark 作业。虽然作业可以这样运行,但是很多集群没有配置 Livy,因此这种方法有它的局限性

以上是关于如何从 Lambda 函数在亚马逊 EMR 上执行 spark 提交?的主要内容,如果未能解决你的问题,请参考以下文章

亚马逊EMR火花上的蜂巢

如何使用 AWS Lambda 在 AWS EMR 上运行 PySpark

创建 EMR 集群时出错,EMR 服务角色无效

AWS Lambda 函数写入 S3

在 Amazon EMR-4 上的 Tez 上运行 Pig

使用 lambda 函数删除关联的 cloudformation 堆栈时 EMR 集群未终止