Hadoop自定义分区Partitioner

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop自定义分区Partitioner相关的知识,希望对你有一定的参考价值。

一:背景

为了使得MapReduce计算后的结果显示更加人性化,Hadoop提供了分区的功能,可以使得MapReduce计算结果输出到不同的分区中,方便查看。Hadoop提供的Partitioner组件可以让Map对Key进行分区,从而可以根据不同key来分发到不同的reduce中去处理,我们可以自定义key的分发规则,如数据文件包含不同的省份,而输出的要求是每个省份对应一个文件。

 

二:技术实现

自定义分区很简单,我们只需要继承抽象类Partitioner,实现自定义的getPartitioner()方法即可,另外还要给任务设置分区:job.setPartitionerClass(),就可以了。

 

案例

阿里巴巴旗下三个子网站site1、site2、site3,每个网站对商品销售进行了统计,现在要汇总这三个网站的销售量,数据如下:

技术分享

从上图可以看到有4种商品,很显然我们应该设置4个分区,代码如下:

 

[java] view plain copy
 
  1. public class MyPartitionerTest extends Configured implements Tool{  
  2.         // 定义输入路径  
  3.         private static String INPUT_PATH = "";  
  4.         // 定义输出路径  
  5.         private static  String OUT_PATH = "";  
  6.   
  7.         public static void main(String[] args) {  
  8.             try {  
  9.                 //运行  
  10.                 ToolRunner.run(new MyPartitionerTest(), args);  
  11.             } catch (Exception e) {  
  12.                 e.printStackTrace();  
  13.             }  
  14.               
  15.         }  
  16.     public static class MyPartitionerMapper extends Mapper<LongWritable, Text, Text, LongWritable> {  
  17.         // 创建map输出的key  
  18.         private Text product = new Text();  
  19.         // 创建map输出的value  
  20.         private LongWritable saleNum = new LongWritable();  
  21.   
  22.         @Override  
  23.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,  
  24.                 InterruptedException {  
  25.   
  26.             // 对行文本内容进行切分  
  27.             String[] splits = value.toString().split("\t");  
  28.             System.out.println(splits[0] +":"+ splits[1]);  
  29.             // 获取商品和销售量写出去  
  30.             product.set(splits[0]);  
  31.             saleNum.set(Long.parseLong(splits[1]));  
  32.               
  33.             // 写出去  
  34.             context.write(product, saleNum);  
  35.         }  
  36.     }  
  37.   
  38.     public static class MyPartitionerReducer extends Reducer<Text, LongWritable, Text, LongWritable> {  
  39.   
  40.         // 定义商品的销售量  
  41.         private LongWritable saleSum = new LongWritable();  
  42.   
  43.         @Override  
  44.         protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,  
  45.                 InterruptedException {  
  46.             // 定义商品的总数  
  47.             Long sum = 0L;  
  48.             // 遍历集合对商品销售进行汇总  
  49.             for (LongWritable saleNum : values) {  
  50.                 sum += saleNum.get();  
  51.             }  
  52.             // 设置商品的总销售量  
  53.             saleSum.set(sum);  
  54.             // 写出去  
  55.             context.write(key, saleSum);  
  56.         }  
  57.     }  
  58.   
  59.     public static class MyPartitioner extends Partitioner<Text, LongWritable> {  
  60.   
  61.         @Override  
  62.         public int getPartition(Text key, LongWritable value, int numPartitions) {  
  63.   
  64.             if (key.toString().equals("shoes")) // 当key为"shoes"时,分一个区  
  65.                 return 0;  
  66.             if (key.toString().equals("hat"))// 当key为"hat"时分一个区  
  67.                 return 1;  
  68.             if (key.toString().equals("stockings"))// 当key为stockings"时分一个区  
  69.                 return 2;  
  70.             // 其他的记录都分到一个区中  
  71.             return 3;  
  72.         }  
  73.   
  74.     }  
  75.   
  76.     public int run(String[] args) throws Exception {  
  77.         try {  
  78.               
  79.             //为路径设置参数  
  80.             INPUT_PATH = args[0];  
  81.             OUT_PATH = args[1];  
  82.             // 创建配置信息  
  83.             Configuration conf = new Configuration();  
  84.               
  85.   
  86.             // 创建文件系统  
  87.             FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  88.             // 如果输出目录存在,我们就删除  
  89.             if (fileSystem.exists(new Path(OUT_PATH))) {  
  90.                 fileSystem.delete(new Path(OUT_PATH), true);  
  91.             }  
  92.   
  93.             // 创建任务  
  94.             Job job = new Job(conf, MyPartitionerTest.class.getName());  
  95.   
  96.             //打成jar包  
  97.             job.setJarByClass(MyPartitionerTest.class);  
  98.             //1.1   设置输入目录和设置输入数据格式化的类  
  99.             FileInputFormat.setInputPaths(job, INPUT_PATH);  
  100.             job.setInputFormatClass(TextInputFormat.class);  
  101.   
  102.             //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  103.             job.setMapperClass(MyPartitionerMapper.class);  
  104.             job.setMapOutputKeyClass(Text.class);  
  105.             job.setMapOutputValueClass(LongWritable.class);  
  106.   
  107.             // 1.3  设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
  108.             job.setPartitionerClass(MyPartitioner.class);  
  109.             job.setNumReduceTasks(4);//注:这个分区的数量是我们实现要规定好的,因为我们有四种商品,所以我们分了四个区  
  110.   
  111.             //1.4   排序、分组  
  112.             //1.5   归约  
  113.             job.setCombinerClass(MyPartitionerReducer.class);  
  114.             // 2.1  Shuffle把数据从Map端拷贝到Reduce端。  
  115.             //2.2   指定Reducer类和输出key和value的类型  
  116.             job.setReducerClass(MyPartitionerReducer.class);  
  117.             job.setOutputKeyClass(Text.class);  
  118.             job.setOutputValueClass(LongWritable.class);  
  119.   
  120.             //2.3   指定输出的路径和设置输出的格式化类  
  121.             FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  122.             job.setOutputFormatClass(TextOutputFormat.class);  
  123.   
  124.   
  125.             // 提交作业 退出  
  126.             System.exit(job.waitForCompletion(true) ? 0 : 1);  
  127.           
  128.         } catch (Exception e) {  
  129.             e.printStackTrace();  
  130.         }  
  131.           
  132.         return 0;  
  133.     }  
  134.   
  135. }  

打包运行程序:

 

 

[java] view plain copy
 
  1. hadoop jar MyPartitioner.jar hdfs://liaozhongmin5:9000/files/* hdfs://liaozhongmin5:9000/out  



程序运行结果:

 

技术分享

如我们所愿,程序分了四个分区!

程序运行的日志:

技术分享

有兴趣的可以去分析一下程序运行所消耗的资源:基于计算机资源分析MapReduce运行

 

注:一个有意思的问题是,我直接使用Eclipse插件连接远程的Hadoop集群去跑这个程序就会报错,提示分区非法,在网上看到有人说,如果分区数大于1的时候,就必须打成jar包才能正常运行,天知道呢,就这样吧,反正也没有更好的解释了!

以上是关于Hadoop自定义分区Partitioner的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop Partitioner 中的自定义计数器

hadoop 学习自定义分区

Hadoop学习之路MapReduce自定义分区实现

Hadoop Oozie MapReduce 操作自定义分区器

MapReduce之自定义分区器Partitioner

Spark自定义分区(Partitioner)