Hadoop中的KeyValueInputFormat
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop中的KeyValueInputFormat相关的知识,希望对你有一定的参考价值。
一:背景
有时候,我们可以不以偏移量和行文本内容来作为数据源到MapTask的输入格式,而使用键值对的形式,使用KeyValueInputFormat就可以完成这种需求。
二:技术实现
数据源如下
操作代码如下:
- public class MyKeyValueTextInputFormat {
- // 定义输入路径
- private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/hello";
- // 定义输出路径
- private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
- public static void main(String[] args) {
- try {
- // 创建配置信息
- Configuration conf = new Configuration();
- //设置行的分隔符,这里是制表符,第一个制表符前面的是Key,第一个制表符后面的内容都是value
- conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");
- /**********************************************/
- //对Map端输出进行压缩
- /*conf.setBoolean("mapred.compress.map.output", true);
- //设置map端输出使用的压缩类
- conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
- //对reduce端输出进行压缩
- conf.setBoolean("mapred.output.compress", true);
- //设置reduce端输出使用的压缩类
- conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);*/
- // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)
- /*
- * conf.addResource("classpath://hadoop/core-site.xml");
- * conf.addResource("classpath://hadoop/hdfs-site.xml");
- * conf.addResource("classpath://hadoop/hdfs-site.xml");
- */
- // 创建文件系统
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
- // 如果输出目录存在,我们就删除
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }
- // 创建任务
- Job job = new Job(conf, MyKeyValueTextInputFormat.class.getName());
- //1.1 设置输入目录和设置输入数据格式化的类
- FileInputFormat.setInputPaths(job, INPUT_PATH);
- job.setInputFormatClass(KeyValueTextInputFormat.class);
- //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
- job.setMapperClass(MyKeyValueInputFormatMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(1);
- //1.4 排序、分组
- //1.5 归约
- //2.1 Shuffle把数据从Map端拷贝到Reduce端。
- //2.2 指定Reducer类和输出key和value的类型
- job.setReducerClass(MyKeyValueInputFormatReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- //2.3 指定输出的路径和设置输出的格式化类
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(TextOutputFormat.class);
- // 提交作业 退出
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 自定义Mapper类
- * @author 廖钟民
- * time : 2015年1月15日下午8:00:01
- * @version
- */
- public static class MyKeyValueInputFormatMapper extends Mapper<Text, Text, Text, LongWritable>{
- /**
- * 输入数据是
- * hello you
- * hello me
- * you me love
- *
- * 进入map的键值对应该是<hello,you> <hello,me> <you,me love>每个键值对分别调用map()函数
- */
- protected void map(Text key, Text value, Mapper<Text, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
- //把key和value都当做key写出去
- context.write(key, new LongWritable(1));
- context.write(value, new LongWritable(1));
- }
- }
- /**
- * map()函数的输出结果为:
- *<hello,1> <you,1> <hello,1> <me,1> <you,1> <me love,1>
- *排序分组后的结果为:
- *<hello,{1,1}> <me,{1}> <me love,{1}> <you,{1,1}>
- */
- /**
- * 自定义Reducer类
- * @author 廖钟民
- * time : 2015年1月15日下午8:00:12
- * @version
- */
- public static class MyKeyValueInputFormatReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
- @Override
- protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
- InterruptedException {
- int sum = 0;
- //遍历统计
- for (LongWritable s : values){
- sum += s.get();
- }
- context.write(key, new LongWritable(sum));
- }
- }
- }
程序运行结果:
以上是关于Hadoop中的KeyValueInputFormat的主要内容,如果未能解决你的问题,请参考以下文章
hadoop 作业中的 org.apache.hadoop.fs.BlockMissingException