AWS EMR 文件已存在:Hadoop 作业读取和写入 S3

Posted

技术标签:

【中文标题】AWS EMR 文件已存在:Hadoop 作业读取和写入 S3【英文标题】:AWS EMR File Already exists: Hadoop Job reading and writing to S3 【发布时间】:2018-04-13 02:30:12 【问题描述】:

我有一个在 EMR 中运行的 Hadoop 作业,我将 S3 路径作为输入和输出传递给该作业。

当我在本地运行时,一切正常。(因为只有一个节点)

当我在 5 节点集群的 EMR 中运行时,我遇到了文件已经存在 IO 异常。

输出路径中包含时间戳,因此输出路径在 S3 中不存在。

Error: java.io.IOException: File already exists:s3://<mybucket_name>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED/part-m-00000

我有一个非常简单的 hadoop 应用程序(主要是我的映射器),它从文件中读取每一行并转换它(使用现有库)

不确定为什么每个节点都尝试使用相同的文件名写入。

这里是映射器

public static class TokenizeMapper extends Mapper<Object,Text,Text,Text>
        public void map(Object key, Text value,Mapper.Context context) throws IOException,InterruptedException
            //TODO: Invoke Core Engine to transform the Data
            Encryption encryption = new Encryption();
            String tokenizedVal = encryption.apply(value.toString());
            context.write(tokenizedVal,1);
        
    

任何我的减速器

public static class TokenizeReducer extends Reducer<Text,Text,Text,Text> 
        public void reduce(Text text,Iterable<Text> lines,Context context) throws IOException,InterruptedException
            Iterator<Text> iterator = lines.iterator();
            int counter =0;
            while(iterator.hasNext())
                counter++;
            

            Text output = new Text(""+counter);
            context.write(text,output);
        
    

还有我的主课

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
        long startTime = System.currentTimeMillis();
        try 
            Configuration config = new Configuration();
            String[] additionalArgs = new GenericOptionsParser(config, args).getRemainingArgs();

            if (additionalArgs.length != 2) 
                System.err.println("Usage: Tokenizer Input_File and Output_File ");
                System.exit(2);
            


            Job job = Job.getInstance(config, "Raw File Tokenizer");
            job.setJarByClass(Tokenizer.class);
            job.setMapperClass(TokenizeMapper.class);
            job.setReducerClass(TokenizeReducer.class);

            job.setNumReduceTasks(0);
            job.setOutputKeyClass(Text.class);
            job.setOutputKeyClass(Text.class);

            FileInputFormat.addInputPath(job, new Path(additionalArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(additionalArgs[1]));

            boolean status = job.waitForCompletion(true);
            if (status) 
                //System.exit(0);
                System.out.println("Completed Job Successfully");
             else 
                System.out.println("Job did not Succeed");
            
        
        catch(Exception e)
            e.printStackTrace();
        
        finally
            System.out.println("Total Time for processing =["+(System.currentTimeMillis()-startTime)+"]");
        
    

我在启动集群时传递参数

s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt

s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED

感谢任何输入。

谢谢

【问题讨论】:

只是一个查询,为什么要在输出文件路径中保留一个txt文件位置(RawFile12.txt)?我们可以不删除那部分吗? 我可以删除它。不要认为这是问题,因为即使具有文件名,密钥也不同。更多的文件夹正在被创建。通过我从路径中删除了文件名并再次遇到相同的问题。 【参考方案1】:

在驱动代码中,你已经将Reducer设置为0,那么我们就不需要reducer代码了。

如果您需要在作业启动之前清除输出目录,您可以使用此 sn-p 清除该目录(如果存在):-

    FileSystem fileSystem = FileSystem.get(<hadoop config object>);

    if(fileSystem.exists(new Path(<pathTocheck>)))
    
        fileSystem.delete(new Path(<pathTocheck>), true);
    

【讨论】:

以上是关于AWS EMR 文件已存在:Hadoop 作业读取和写入 S3的主要内容,如果未能解决你的问题,请参考以下文章

在 AWS EMR 上使用 pig 的 Java 堆空间

AWS EMR 性能 HDFS 与 S3

从 S3 加载 AWS EMR

在 emr 中使用 spark 从 S3 读取 avro 失败

Spark流式传输作业不会删除随机播放文件

EMR Hadoop 长时间运行的作业被杀死