hadoop 自定义OutputFormat
Posted asker009
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop 自定义OutputFormat相关的知识,希望对你有一定的参考价值。
1、继承FileOutputFormat,复写getRecordWriter方法
/** * @Description:自定义outputFormat,输出数据到不同的文件 */ public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException return new FRecordWriter(job);
2、实现RecordWriter
/** * @Description: 继承RecordWriter,实现数据输出到不同目录文件 */ public class FRecordWriter extends RecordWriter<Text, NullWritable> FSDataOutputStream out1 = null; FSDataOutputStream out2 = null; @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException // 判断是否包含“baidu”和"alibaba"字符串,输出到不同文件 if (key.toString().contains("baidu") || key.toString().contains("alibaba")) out1.write(key.toString().getBytes()); else out2.write(key.toString().getBytes()); @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException IOUtils.closeStream(out1); IOUtils.closeStream(out2); public FRecordWriter(TaskAttemptContext job) FileSystem fs; try Path path1 = new Path("output1/a.log"); Path path2 = new Path("output2/b.log"); System.out.println(path1.getName()); System.out.println(path2.getName()); fs = FileSystem.get(job.getConfiguration()); out1 = fs.create(path1); out2 = fs.create(path2); catch (Exception e) e.printStackTrace();
3、map
/** * @Description: 按行读取,按行写入 */ public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException context.write(value,NullWritable.get());
4、reducer
public class FilterReducer extends Reducer<Text, NullWritable,Text,NullWritable> private Text newLine = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException //循环null值的values是防止key里有重复的数据没有被取出 //Iterable<NullWritable> values迭代器里存储了key和value(虽然本例中value都是null值) //通过循环迭代器,迭代器里的key值也会被不断取出赋值到Text key中(公用内存地址) for (NullWritable value : values) newLine.set(key.toString()+"\r\n"); context.write(newLine,value);
5、driver
/** * @Description: 自定义输出 * 实现对样本按行分割,判断是否包含baidu或alibaba字符串, * 包含则写入目录1,不包含写入目录2, */ public class FilterDriver public static void main(String args[]) throws Exception if(args.length!=2) System.err.println("使用格式:FilterDriver <input path> <output path>"); System.exit(-1); Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text .class); job.setMapOutputValueClass(NullWritable .class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); Path outPath = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(outPath)) fs.delete(outPath,true); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1);
以上是关于hadoop 自定义OutputFormat的主要内容,如果未能解决你的问题,请参考以下文章