Map-Reduce作业无法提供预期的分区文件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Map-Reduce作业无法提供预期的分区文件相关的知识,希望对你有一定的参考价值。

在Map-Reduce作业中,我使用了五个不同的文件,其中我的数据集中包含两个类别PI下的值。在找到I特定值后,我将它们传递给I-part-r-00000文件,因此,对于P.我在reducer中使用MultipleOutputformat类来实现这一点。

我的Mapper类包含:

public class parserMapper extends Mapper<LongWritable, Text, Text, Text> {
   public void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {

   String IPFLAG = "";
   String[] element_data= value.toString.split(","); 

    if(element_data[0].toString().trim().equalsIgnoreCase("005010X222A1")){
        IPFLAG = "P"; 
     }

    else {
       IPFLAG = "I";
     }

   if (IPFLAG == "P") {
     context.write(new Text(IPFLAG), new Text(theData));
     } 

   else if (IPFLAG == "I") {
   context.write(new Text(IPFLAG), new Text(theData));
     }

   else{
   System.out.println("No category found");
     } 


  }

  public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
        cleanup(context);
    }


}

减速机类包括:

public class parserReducer extends Reducer<Text, Text, Text, Text> {

    private MultipleOutputs multipleOutputs;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        multipleOutputs = new MultipleOutputs(context);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        Object c = null;

        try{
            if (!(key.toString().isEmpty())) {

                for (Text value : values) {

                    multipleOutputs.write(c, value, key.toString());
                }

            }
        }
        catch(Exception e){ System.out.println("Caught Exception: " + e.getMessage());}
    }


}

和驱动程序代码包括=>

public class parserDriver {

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("textinputformat.record.delimiter", "~"+"
"+"ISA*");
    Job job = new Job(conf);
        job.setJobName("PARSER");
        job.setJarByClass(parserDriver.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(parserMapper.class);
        job.setReducerClass(parserReducer.class);
//      job.setOutputFormatClass(TextOutputFormat.class);
        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
    //  job.setOutputFormatClass(LazyOutputFormat.class);

/*       MultipleOutputs.addNamedOutput(job, "P", TextOutputFormat.class, Text.class, Text.class);
         MultipleOutputs.addNamedOutput(job, "I", TextOutputFormat.class, Text.class, Text.class);
*/
        // Pass as option -D mapred.reduce.tasks=<number>
        job.setNumReduceTasks(3);       

        /* This line is to accept the input recursively */
        //FileInputFormat.setInputDirRecursive(job, true);

        FileInputFormat.addInputPath(job, "/Users/Mohit/input");
        FileOutputFormat.setOutputPath(job, "/Users/Mohit/output");

        /*
         * Delete output file path if already exists
         */
        FileSystem fs = FileSystem.get(conf);

        if (fs.exists(outputFilePath)) {
            fs.delete(outputFilePath, true);
        }

        return job.waitForCompletion(true) ? 0: 1;
    }
}

通过这一切,我试图针对单个文件实现两个分区

file1 - > P-part-r00000,I-part-r00001

file2 - > P-part-r00002,I-part-r00003

。但我得到两个分区,反对作为此作业输入的所有文件。

file1,file2,file3,file4,file5 - > P-part-r00000,I-part-r00001

不知道我在这里缺少什么,如果有人可以帮忙吗?

答案

1)在您的驱动程序中将这些行添加到文件命名:

   job.setOutputFormatClass(TextOutputFormat.class);
   MultipleOutputs.addNamedOutput(job, "I", TextOutputFormat.class,
          Text.class, Text.class);
   MultipleOutputs.addNamedOutput(job, "P", TextOutputFormat.class,
          Text.class, Text.class);

2)更改reducer以将每个值发送到具有特定名称的文件:

@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    try{
        if (!(key.toString().isEmpty())) {

            for (Text value : values) {

                multipleOutputs.write(key.toString(), key, value);
            }

        }
    }
    catch(Exception e){ System.out.println("Caught Exception: " + e.getMessage());}
}

3)将reducer的数量更改为2以获得正好2个文件。

以上是关于Map-Reduce作业无法提供预期的分区文件的主要内容,如果未能解决你的问题,请参考以下文章

mgo 和 mongodb 的 i/o 超时

删除与分区相关的hdfs文件后无法联系hive表分区

在hadoop map-reduce中运行jar文件时出错

map-reduce 如何在 HDFS 与 S3 上工作?

节点本地映射减少作业

数据流作业:无法将列分区表复制到列分区元表:不支持