正在寻找将 aws pig 步骤注入已经运行的 emr 的 boto3 python 示例?

Posted

技术标签:

【中文标题】正在寻找将 aws pig 步骤注入已经运行的 emr 的 boto3 python 示例?【英文标题】:Looking for a boto3 python example of injecting a aws pig step into an already running emr? 【发布时间】:2016-08-15 14:43:32 【问题描述】:

我正在寻找已经运行的 AWS EMR 的良好 BOTO3 示例,我希望将 Pig Step 注入该 EMR。之前我用的是boto2.42版本的:

from boto.emr.connection import EmrConnection
from boto.emr.step import InstallPigStep, PigStep

#  AWS_ACCESS_KEY = '' # REQUIRED
#  AWS_SECRET_KEY = '' # REQUIRED
#  conn = EmrConnection(AWS_ACCESS_KEY, AWS_SECRET_KEY)

# loop next element on bucket_compare list 

pig_file = 's3://elasticmapreduce/samples/pig-apache/do-reports2.pig'
INPUT = 's3://elasticmapreduce/samples/pig-apache/input/access_log_1'
OUTPUT = '' # REQUIRED, S3 bucket for job output

pig_args = ['-p', 'INPUT=%s' % INPUT,
             '-p', 'OUTPUT=%s' % OUTPUT]
pig_step = PigStep('Process Reports', pig_file, pig_args=pig_args)
steps = [InstallPigStep(), pig_step]

conn.run_jobflow(name='prs-dev-test', steps=steps,
             hadoop_version='2.7.2-amzn-2', ami_version='latest',
             num_instances=2, keep_alive=False)

现在的主要问题是,BOTO3 不使用:from boto.emr.connection import EmrConnection,也不是 from boto.emr.step import InstallPigStep,PigStep 和我找不到等效的一组模块?

【问题讨论】:

【参考方案1】:

经过一番检查,我发现了一种非常简单的方法,可以使用 awscli 和 subprocess 模块从 Python 中注入 Pig Script 命令。可以导入 awscli 和子进程,然后将所需的 PIG 步骤封装并注入到已经运行的 EMR 中:

import awscli
import subprocess


cmd='aws emr add-steps --cluster-id j-GU07FE0VTHNG --steps Type=PIG,Name="AggPigProgram",ActionOnFailure=CONTINUE,Args=[-f,s3://dev-end2end-test/pig_scripts/AggRuleBag.pig,-p,INPUT=s3://dev-end2end-test/input_location,-p,OUTPUT=s3://end2end-test/output_location]'

push=subprocess.Popen(cmd, shell=True, stdout = subprocess.PIPE)
print(push.returncode)

当然,您必须使用以下方式找到您的 JobFlowID:

aws emr list-clusters --active

使用与上述相同的子进程和推送命令。当然,您可以将监控添加到您心中的喜悦中,而不仅仅是打印声明。

【讨论】:

我也在想办法做到这一点。【参考方案2】:

这里是如何为现有的 emr 集群作业流程添加一个新步骤,用于猪作业 sing boto3

注意:你的脚本日志文件,输入输出目录应该有 格式中的完整路径 's3://<bucket>/<directory>/<file_or_key>'

emrcon = boto3.client("emr")    
cluster_id1 = cluster_status_file_content #Retrieved from S3, where it was recorded on creation
    
                step_id = emrcon.add_job_flow_steps(JobFlowId=str(cluster_id1),
                                                    Steps=[
                                                            'Name': str(pig_job_name),
                                                            'ActionOnFailure': 'CONTINUE',
                                                            'HadoopJarStep': 
                                                                'Jar': 'command-runner.jar',
                                                                'Args': ['pig', "-l", str(pig_log_file_full_path), "-f", str(pig_job_run_script_full_path), "-p", "INPUT=" + str(pig_input_dir_full_path), 
                                                                                        "-p", "OUTPUT=" + str(pig_output_dir_full_path) ]
                                                            
                                                        ]
                                                    )

请查看屏幕截图以进行监控-

【讨论】:

以上是关于正在寻找将 aws pig 步骤注入已经运行的 emr 的 boto3 python 示例?的主要内容,如果未能解决你的问题,请参考以下文章

寻找有关如何使用 python 启动 AWS EMR 集群以运行 pyspark 步骤的示例

使用 java.lang.NoClassDefFoundError 在 AWS EMR 上运行 Pig UDF:org/apache/pig/LoadFunc

在 AWS EMR 上使用 pig 的 Java 堆空间

Pig Script 上的多个 AWS 账户

Pig 的“转储”在 AWS 上不起作用

如何在运行时通过 AWS 注入密钥