AWS EMR 文件已存在:Hadoop 作业读取和写入 S3
Posted
技术标签:
【中文标题】AWS EMR 文件已存在:Hadoop 作业读取和写入 S3【英文标题】:AWS EMR File Already exists: Hadoop Job reading and writing to S3 【发布时间】:2018-04-13 02:30:12 【问题描述】:我有一个在 EMR 中运行的 Hadoop 作业,我将 S3 路径作为输入和输出传递给该作业。
当我在本地运行时,一切正常。(因为只有一个节点)
当我在 5 节点集群的 EMR 中运行时,我遇到了文件已经存在 IO 异常。
输出路径中包含时间戳,因此输出路径在 S3 中不存在。
Error: java.io.IOException: File already exists:s3://<mybucket_name>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED/part-m-00000
我有一个非常简单的 hadoop 应用程序(主要是我的映射器),它从文件中读取每一行并转换它(使用现有库)
不确定为什么每个节点都尝试使用相同的文件名写入。
这里是映射器
public static class TokenizeMapper extends Mapper<Object,Text,Text,Text>
public void map(Object key, Text value,Mapper.Context context) throws IOException,InterruptedException
//TODO: Invoke Core Engine to transform the Data
Encryption encryption = new Encryption();
String tokenizedVal = encryption.apply(value.toString());
context.write(tokenizedVal,1);
任何我的减速器
public static class TokenizeReducer extends Reducer<Text,Text,Text,Text>
public void reduce(Text text,Iterable<Text> lines,Context context) throws IOException,InterruptedException
Iterator<Text> iterator = lines.iterator();
int counter =0;
while(iterator.hasNext())
counter++;
Text output = new Text(""+counter);
context.write(text,output);
还有我的主课
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
long startTime = System.currentTimeMillis();
try
Configuration config = new Configuration();
String[] additionalArgs = new GenericOptionsParser(config, args).getRemainingArgs();
if (additionalArgs.length != 2)
System.err.println("Usage: Tokenizer Input_File and Output_File ");
System.exit(2);
Job job = Job.getInstance(config, "Raw File Tokenizer");
job.setJarByClass(Tokenizer.class);
job.setMapperClass(TokenizeMapper.class);
job.setReducerClass(TokenizeReducer.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
FileInputFormat.addInputPath(job, new Path(additionalArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(additionalArgs[1]));
boolean status = job.waitForCompletion(true);
if (status)
//System.exit(0);
System.out.println("Completed Job Successfully");
else
System.out.println("Job did not Succeed");
catch(Exception e)
e.printStackTrace();
finally
System.out.println("Total Time for processing =["+(System.currentTimeMillis()-startTime)+"]");
我在启动集群时传递参数
s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt
s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED
感谢任何输入。
谢谢
【问题讨论】:
只是一个查询,为什么要在输出文件路径中保留一个txt文件位置(RawFile12.txt)?我们可以不删除那部分吗? 我可以删除它。不要认为这是问题,因为即使具有文件名,密钥也不同。更多的文件夹正在被创建。通过我从路径中删除了文件名并再次遇到相同的问题。 【参考方案1】:在驱动代码中,你已经将Reducer设置为0,那么我们就不需要reducer代码了。
如果您需要在作业启动之前清除输出目录,您可以使用此 sn-p 清除该目录(如果存在):-
FileSystem fileSystem = FileSystem.get(<hadoop config object>);
if(fileSystem.exists(new Path(<pathTocheck>)))
fileSystem.delete(new Path(<pathTocheck>), true);
【讨论】:
以上是关于AWS EMR 文件已存在:Hadoop 作业读取和写入 S3的主要内容,如果未能解决你的问题,请参考以下文章