如何将 EMR 流作业的输出写入 HDFS?
Posted
技术标签:
【中文标题】如何将 EMR 流作业的输出写入 HDFS?【英文标题】:How do I write the output of an EMR streaming job to HDFS? 【发布时间】:2013-05-08 04:27:37 【问题描述】:我看到examples 有人将 EMR 输出写入 HDFS,但我无法找到如何完成的示例。最重要的是,this documentation 似乎说 EMR 流式作业的 --output 参数必须是 S3 存储桶。
当我实际尝试运行脚本(在本例中,使用 python 流和 mrJob)时,它会引发“Invalid S3 URI”错误。
命令如下:
python my_script.py -r emr \
--emr-job-flow-id=j-JOBID --conf-path=./mrjob.conf --no-output \
--output hdfs:///my-output \
hdfs:///my-input-directory/my-files*.gz
还有回溯...
Traceback (most recent call last):
File "pipes/sampler.py", line 28, in <module>
SamplerJob.run()
File "/Library/Python/2.7/site-packages/mrjob/job.py", line 483, in run
mr_job.execute()
File "/Library/Python/2.7/site-packages/mrjob/job.py", line 501, in execute
super(MRJob, self).execute()
File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 146, in execute
self.run_job()
File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 206, in run_job
with self.make_runner() as runner:
File "/Library/Python/2.7/site-packages/mrjob/job.py", line 524, in make_runner
return super(MRJob, self).make_runner()
File "/Library/Python/2.7/site-packages/mrjob/launch.py", line 161, in make_runner
return EMRJobRunner(**self.emr_job_runner_kwargs())
File "/Library/Python/2.7/site-packages/mrjob/emr.py", line 585, in __init__
self._output_dir = self._check_and_fix_s3_dir(self._output_dir)
File "/Library/Python/2.7/site-packages/mrjob/emr.py", line 776, in _check_and_fix_s3_dir
raise ValueError('Invalid S3 URI: %r' % s3_uri)
ValueError: Invalid S3 URI: 'hdfs:///input/sample'
甚至可能吗?
【问题讨论】:
这是一个老问题,但可能仍然存在。通过查看 MrJob 源,EMRJobRunner 仅在输出目的地接受 S3 存储桶。当您使用“长寿”集群时,也许有一个解决方案,而是使用 HadoopJobRunner (-r hadoop
)。虽然我无法找到可行的解决方案......
【参考方案1】:
我不确定如何使用 mrJob 来完成,但是使用 hadoop 和 streaming jobs written in java,我们可以这样做:
-
启动集群
使用s3distcp从s3获取数据到集群的HDFS
以 HDFS 为输入执行我们作业的第 1 步
使用与上述相同的输入执行步骤 2 或我们的作业
...
使用EMR CLI,我们这样做:
> export jobflow=$(elastic-mapreduce --create --alive --plain-output
> --master-instance-type m1.small --slave-instance-type m1.xlarge --num-instances 21 --name "Custer Name" --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop --args
> "--mapred-config-file,s3://myBucket/conf/custom-mapred-config-file.xml")
>
>
> elastic-mapreduce -j $jobflow --jar
> s3://us-east-1.elasticmapreduce/libs/s3distcp/1.latest/s3distcp.jar
> --arg --src --arg 's3://myBucket/input/' --arg --dest --arg 'hdfs:///input'
>
> elastic-mapreduce --jobflow $jobflow --jar s3://myBucket/bin/step1.jar
> --arg hdfs:///input --arg hdfs:///output-step1 --step-name "Step 1"
>
> elastic-mapreduce --jobflow $jobflow --jar s3://myBucket/bin/step2.jar
> --arg hdfs:///input,hdfs:///output-step1 --arg s3://myBucket/output/ --step-name "Step 2"
【讨论】:
【参考方案2】:它必须是 S3 存储桶,因为 EMR 集群在作业完成后不会正常持久化。所以,持久化输出的唯一方法是在集群之外,而下一个最近的地方是 S3。
【讨论】:
我在“keep-alive”模式下运行作业流程,因此结果可以在作业流程步骤之间保持在 HDFS 中。我的工作结构需要使用相同的(大型)数据集作为流程中许多步骤的输入。如果将数据存储在 HDFS 中,而不是每一步都从 S3 重新下载,将节省大量时间。 我明白了。我不是 python 专家,但 MRJobRunner(EMRJobRunner 的超级)代码似乎表明您不需要指定 'hdfs://' 作为输出参数的一部分,只需指定位置 - github.com/Yelp/mrjob/blob/master/mrjob/emr.py【参考方案3】:目前无法保存 MRJob EMR 作业的输出。目前在https://github.com/Yelp/mrjob/issues/887 有一个开放的功能请求。
【讨论】:
以上是关于如何将 EMR 流作业的输出写入 HDFS?的主要内容,如果未能解决你的问题,请参考以下文章
AWS EMR 文件已存在:Hadoop 作业读取和写入 S3
使用 403 写入 S3 时,在 EMR 上运行的 Spark 偶尔会失败