hadoop 自定义OutputFormat

Posted asker009

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop 自定义OutputFormat相关的知识,希望对你有一定的参考价值。

1、继承FileOutputFormat,复写getRecordWriter方法

/**
 * @Description:自定义outputFormat,输出数据到不同的文件
 */
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> 
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
        return new FRecordWriter(job);
    

2、实现RecordWriter

/**
 * @Description: 继承RecordWriter,实现数据输出到不同目录文件
 */
public class FRecordWriter extends RecordWriter<Text, NullWritable> 
    FSDataOutputStream out1 = null;
    FSDataOutputStream out2 = null;

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException 
        // 判断是否包含“baidu”和"alibaba"字符串,输出到不同文件
        if (key.toString().contains("baidu") || key.toString().contains("alibaba")) 
            out1.write(key.toString().getBytes());
         else 
            out2.write(key.toString().getBytes());
        

    

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException 
        IOUtils.closeStream(out1);
        IOUtils.closeStream(out2);
    

    public FRecordWriter(TaskAttemptContext job) 
        FileSystem fs;
        try 
            Path path1 = new Path("output1/a.log");
            Path path2 = new Path("output2/b.log");
            System.out.println(path1.getName());
            System.out.println(path2.getName());
            fs = FileSystem.get(job.getConfiguration());
            out1 = fs.create(path1);
            out2 = fs.create(path2);
        catch (Exception e)
            e.printStackTrace();
        

    

3、map

/**
 * @Description: 按行读取,按行写入
 */
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> 
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        context.write(value,NullWritable.get());
    

4、reducer

public class FilterReducer extends Reducer<Text, NullWritable,Text,NullWritable> 
    private Text newLine = new Text();
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException 

        //循环null值的values是防止key里有重复的数据没有被取出
        //Iterable<NullWritable> values迭代器里存储了key和value(虽然本例中value都是null值)
        //通过循环迭代器,迭代器里的key值也会被不断取出赋值到Text key中(公用内存地址)
        for (NullWritable value : values) 
            newLine.set(key.toString()+"\r\n");
            context.write(newLine,value);
        
    

5、driver

/**
 * @Description: 自定义输出 
 * 实现对样本按行分割,判断是否包含baidu或alibaba字符串,
 * 包含则写入目录1,不包含写入目录2,
 */
public class FilterDriver 

   public static void main(String args[]) throws Exception
       if(args.length!=2)
       
           System.err.println("使用格式:FilterDriver <input path> <output path>");
           System.exit(-1);
       


       Configuration conf = new Configuration();
       Job job = Job.getInstance(conf);

       job.setJarByClass(FilterDriver.class);
       job.setMapperClass(FilterMapper.class);
       job.setReducerClass(FilterReducer.class);

       job.setMapOutputKeyClass(Text .class);
       job.setMapOutputValueClass(NullWritable .class);

       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(NullWritable.class);

       // 要将自定义的输出格式组件设置到job中
       job.setOutputFormatClass(FilterOutputFormat.class);

       FileInputFormat.setInputPaths(job, new Path(args[0]));

       // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
       // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
       FileOutputFormat.setOutputPath(job, new Path(args[1]));

       Path outPath = new Path(args[1]);
       FileSystem fs = FileSystem.get(conf);
       if(fs.exists(outPath))
           fs.delete(outPath,true);
       

       boolean result = job.waitForCompletion(true);
       System.exit(result ? 0 : 1);
   


 

 

以上是关于hadoop 自定义OutputFormat的主要内容,如果未能解决你的问题,请参考以下文章

hadoop 学习自定义分区

Hadoop自定义分区Partitioner

Hadoop3集群搭建之——hive添加自定义函数UDTF (一行输入,多行输出)

Hadoop 学习自定义数据类型

hadoop 自定义OutputFormat

hadoop29---自定义注解