您如何使用 boto3(或其他方式)在 emr 上自动化 pyspark 作业?

Posted

技术标签:

【中文标题】您如何使用 boto3(或其他方式)在 emr 上自动化 pyspark 作业?【英文标题】:How do you automate pyspark jobs on emr using boto3 (or otherwise)? 【发布时间】:2016-08-10 22:20:19 【问题描述】:

我正在创建一个作业来解析大量服务器数据,然后将其上传到Redshift 数据库。

我的工作流程如下:

从 S3 获取日志数据 使用 sparkdataframes 或 spark sql 解析数据并写回 S3 将数据从 S3 上传到 Redshift。

不过,我对如何自动执行此操作感到困惑,以便我的进程启动 EMR 集群,引导正确的安装程序,并运行包含解析和编写代码的 python 脚本。

有没有人可以与我分享任何示例、教程或经验来帮助我学习如何做到这一点?

【问题讨论】:

现在有一个来自 AWS 自己的教程 aws.amazon.com/blogs/big-data/… 。我们最终抛弃了 Cloudformation,重用了相当一部分 Python/Spark/Livy 的东西。 你好,我有类似的要求。你是如何解决或解决你的问题的 【参考方案1】:

查看 boto3 EMR 文档以创建集群。您基本上必须调用 run_job_flow 并创建运行所需程序的步骤。

import boto3    

client = boto3.client('emr', region_name='us-east-1')

S3_BUCKET = 'MyS3Bucket'
S3_KEY = 'spark/main.py'
S3_URI = 's3://bucket/key'.format(bucket=S3_BUCKET, key=S3_KEY)

# upload file to an S3 bucket
s3 = boto3.resource('s3')
s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY)

response = client.run_job_flow(
    Name="My Spark Cluster",
    ReleaseLabel='emr-4.6.0',
    Instances=
        'MasterInstanceType': 'm4.xlarge',
        'SlaveInstanceType': 'm4.xlarge',
        'InstanceCount': 4,
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    ,
    Applications=[
        
            'Name': 'Spark'
        
    ],
    BootstrapActions=[
        
            'Name': 'Maximize Spark Default Config',
            'ScriptBootstrapAction': 
                'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
            
        ,
    ],
    Steps=[
    
        'Name': 'Setup Debugging',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': 
            'Jar': 'command-runner.jar',
            'Args': ['state-pusher-script']
        
    ,
    
        'Name': 'setup - copy files',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': 
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/']
        
    ,
    
        'Name': 'Run Spark',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': 
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit', '/home/hadoop/main.py']
        
    
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole'
)

如果您知道作业流程 ID,您还可以向正在运行的集群添加步骤:

job_flow_id = response['JobFlowId']
print("Job flow ID:", job_flow_id)

step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps)

step_ids = step_response['StepIds']

print("Step IDs:", step_ids)

更多配置请查看sparksteps。

【讨论】:

我无法理解 S3_KEY 值作为 python 文件的价值和意义。它有什么作用/? S3 密钥是您要运行的 PySpark 文件/作业。其中一个步骤将其从 S3 复制到您的集群。它不必是 python 文件。如果您正在执行 Scala 作业,则可能是 Scala。 这会创建一个作业流 ID,但它不会显示在 EMR 控制台中:-? 上面脚本中提到的“ScriptBootstrapAction”现在/应该不再需要:参见github.com/aws-samples/emr-bootstrap-actions/blob/master/spark/…【参考方案2】:

只需使用AWS Data Pipeline 即可。您可以设置 S3 存储桶以在每次将新文件放入存储桶 https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html 时触发 lambda 函数。然后您的 Lambda 函数将激活您的 Data Pipeline https://aws.amazon.com/blogs/big-data/using-aws-lambda-for-event-driven-data-processing-pipelines/ 然后您的 Data Pipeline 使用 EmrCluster 启动一个新的 EMR 集群,然后您可以指定引导选项,然后您可以使用 EmrActivity 运行您的 EMR 命令,当一切都完成后'将终止您的 EMR 集群并停用数据管道。

【讨论】:

试过这个。不写日志就失败:(另一个半熟的 AWS 应用程序,它只是 Lambda 函数的包装器:-/ @CpILL 是的,虽然我在这里推广了 AWS Data Pipeline,但我并没有为自己使用它。在评估它之后,我认为它不够强大,所以我全神贯注于 Apache Airflow。从那以后就再也没有回头 :) 但是 AWS Data Pipeline 可以用于小而简单的事情。我使用我的答案中列出的架构创建了一些概念验证应用程序,并且在我的头撞到墙上几个小时让它工作后它工作正常:) 看起来不错。一旦它离开孵化,就会看看它。另外,我现在需要一个 AWS 解决方案:-/ 我也只是想到了一些事情。有时,AWS 日志只需要一点时间即可写入。例如,当我在 EMR 上运行 Spark 命令时,有时命令会失败。好吧,在写入日志之前还需要三分钟左右,所以我必须等待查看命令失败的原因。因此,如果您在使用 AWS Data Pipeline 时就是这种情况,那么只需刷新日志,直到看到它们进入。在不同的地方也有日志,例如,您将拥有 Data Pipeline 日志,但如果您启动一个通过管道进行 EMR,然后这些日志将在其他地方的 S3 中。 如果您仍然需要解决方案,那么我认为您应该接受我的回答。您需要 Data Pipeline 做的唯一一件事就是启动 EMR 集群并运行您的命令。这一点都不复杂,因此您不应该为 Data Pipeline 缺乏功能和健壮性而感到负担。我几乎想说你可以只使用 S3、Lambda 和 EMR 来做到这一点。 S3 触发器在新文件进入时启动 lambda,lambda 使用 boto3 使用您的 hadoop 步骤创建新的 EMR(EMR 自动终止设置为 true)。唯一的问题是,如果您的 EMR 步骤失败,那么您将不知道,因为 lambda 将被关闭。【参考方案3】:

其实, 我已经使用 AWS 的 Step Functions,它是 Lambda 函数的状态机包装器,因此您可以使用 boto3 使用 run_job_flow 启动 EMR Spark 作业,您可以使用 describe_cluaster 获取集群的状态.最后使用choice。所以你的阶梯函数看起来像这样(括号中的阶梯函数类型:

运行作业(任务)-> 等待 X 分钟(等待)-> 检查状态(任务)-> 分支(选择)[ => 返回等待,或 => 完成]

【讨论】:

所以基本上,你编写了一个 lambda 函数,它使用 boto3 来启动集群并添加一个步骤来运行 python 脚本(进行处理)? 是的,阶梯函数或类似 Airflow 的东西来处理“编排”【参考方案4】:

我在GitHub 上放了一个完整的示例,展示了如何使用 Boto3 完成所有这些操作。

长期存在的集群示例展示了如何在集群上创建和运行作业步骤,该集群从包含历史 Amazon 评论数据的公共 S3 存储桶中获取数据,对其进行一些 PySpark 处理,并将输出写回 S3桶。

创建 Amazon S3 存储桶并上传作业脚本。 创建演示使用的 AWS Identity and Access Management (IAM) 角色。 创建演示使用的 Amazon Elastic Compute Cloud (Amazon EC2) 安全组。 创建短期和长期集群并在它们上运行作业步骤。 终止集群并清理所有资源。

【讨论】:

以上是关于您如何使用 boto3(或其他方式)在 emr 上自动化 pyspark 作业?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 boto3 在 EMR Studio 中创建笔记本?

如何使 Pyspark 脚本在 Amazon EMR 上运行以识别 boto3 模块?它说找不到模块

jar 文件的参数不正确 - 使用 Boto3 启动 EMR 集群

使用 Boto3 基于 AMI 创建 EMR 集群

有没有办法使用 boto3 中的集群名称检查 emr 集群状态?

AWS 使用 boto3 自动缩放创建 EMR 无法正常工作