正在寻找将 aws pig 步骤注入已经运行的 emr 的 boto3 python 示例?
Posted
技术标签:
【中文标题】正在寻找将 aws pig 步骤注入已经运行的 emr 的 boto3 python 示例?【英文标题】:Looking for a boto3 python example of injecting a aws pig step into an already running emr? 【发布时间】:2016-08-15 14:43:32 【问题描述】:我正在寻找已经运行的 AWS EMR 的良好 BOTO3 示例,我希望将 Pig Step 注入该 EMR。之前我用的是boto2.42版本的:
from boto.emr.connection import EmrConnection
from boto.emr.step import InstallPigStep, PigStep
# AWS_ACCESS_KEY = '' # REQUIRED
# AWS_SECRET_KEY = '' # REQUIRED
# conn = EmrConnection(AWS_ACCESS_KEY, AWS_SECRET_KEY)
# loop next element on bucket_compare list
pig_file = 's3://elasticmapreduce/samples/pig-apache/do-reports2.pig'
INPUT = 's3://elasticmapreduce/samples/pig-apache/input/access_log_1'
OUTPUT = '' # REQUIRED, S3 bucket for job output
pig_args = ['-p', 'INPUT=%s' % INPUT,
'-p', 'OUTPUT=%s' % OUTPUT]
pig_step = PigStep('Process Reports', pig_file, pig_args=pig_args)
steps = [InstallPigStep(), pig_step]
conn.run_jobflow(name='prs-dev-test', steps=steps,
hadoop_version='2.7.2-amzn-2', ami_version='latest',
num_instances=2, keep_alive=False)
现在的主要问题是,BOTO3 不使用:from boto.emr.connection
import EmrConnection
,也不是 from boto.emr.step
import InstallPigStep
,PigStep 和我找不到等效的一组模块?
【问题讨论】:
【参考方案1】:经过一番检查,我发现了一种非常简单的方法,可以使用 awscli 和 subprocess 模块从 Python 中注入 Pig Script 命令。可以导入 awscli 和子进程,然后将所需的 PIG 步骤封装并注入到已经运行的 EMR 中:
import awscli
import subprocess
cmd='aws emr add-steps --cluster-id j-GU07FE0VTHNG --steps Type=PIG,Name="AggPigProgram",ActionOnFailure=CONTINUE,Args=[-f,s3://dev-end2end-test/pig_scripts/AggRuleBag.pig,-p,INPUT=s3://dev-end2end-test/input_location,-p,OUTPUT=s3://end2end-test/output_location]'
push=subprocess.Popen(cmd, shell=True, stdout = subprocess.PIPE)
print(push.returncode)
当然,您必须使用以下方式找到您的 JobFlowID:
aws emr list-clusters --active
使用与上述相同的子进程和推送命令。当然,您可以将监控添加到您心中的喜悦中,而不仅仅是打印声明。
【讨论】:
我也在想办法做到这一点。【参考方案2】:这里是如何为现有的 emr 集群作业流程添加一个新步骤,用于猪作业 sing boto3
注意:你的脚本日志文件,输入输出目录应该有
格式中的完整路径
's3://<bucket>/<directory>/<file_or_key>'
emrcon = boto3.client("emr")
cluster_id1 = cluster_status_file_content #Retrieved from S3, where it was recorded on creation
step_id = emrcon.add_job_flow_steps(JobFlowId=str(cluster_id1),
Steps=[
'Name': str(pig_job_name),
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep':
'Jar': 'command-runner.jar',
'Args': ['pig', "-l", str(pig_log_file_full_path), "-f", str(pig_job_run_script_full_path), "-p", "INPUT=" + str(pig_input_dir_full_path),
"-p", "OUTPUT=" + str(pig_output_dir_full_path) ]
]
)
请查看屏幕截图以进行监控-
【讨论】:
以上是关于正在寻找将 aws pig 步骤注入已经运行的 emr 的 boto3 python 示例?的主要内容,如果未能解决你的问题,请参考以下文章
寻找有关如何使用 python 启动 AWS EMR 集群以运行 pyspark 步骤的示例
使用 java.lang.NoClassDefFoundError 在 AWS EMR 上运行 Pig UDF:org/apache/pig/LoadFunc