自定义Writable

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义Writable相关的知识,希望对你有一定的参考价值。

自定义Writable

Hadoop虽然 已经实现了一些非常有用的Writable,而且你可以使用他们的组合做很多事情,但是如果你想构造一些更加复杂的结果,你可以自定义Writable来达到你的目的,我们以注释的方式对自定义Writable进行讲解

 

[java] view plain copy
 
  1. /** 
  2.  * 自定义Writable通常都要实现Writable接口 
  3.  * 如果有比较大小的业务,最好是实现WritableComparable接口 
  4.  * @author 廖*民 
  5.  * time : 2015年1月13日下午1:39:12 
  6.  * @version 
  7.  */  
  8. public class EmployeeWritable implements WritableComparable<EmployeeWritable>{  
  9.   
  10.     //姓名  
  11.     private Text name;  
  12.     //角色  
  13.     private Text role;  
  14.       
  15.     //必须提供无参构造方法(一定要创建name和role对象否则会报空指针异常)  
  16.     public EmployeeWritable() {  
  17.           
  18.         name = new Text();  
  19.         role = new Text();  
  20.     }  
  21.   
  22.     //构造函数  
  23.     public EmployeeWritable(Text name, Text role) {  
  24.         this.name = name;  
  25.         this.role = role;  
  26.     }  
  27.   
  28.   
  29.     public Text getName() {  
  30.         return name;  
  31.     }  
  32.   
  33.     public void setName(Text name) {  
  34.         this.name = name;  
  35.     }  
  36.   
  37.     public Text getRole() {  
  38.         return role;  
  39.     }  
  40.   
  41.     public void setRole(Text role) {  
  42.         this.role = role;  
  43.     }  
  44.   
  45.       
  46.     /** 
  47.      * 调用成员对象本身的readFields()方法,从输入流中反序列化每一个成员对象 
  48.      */  
  49.     public void readFields(DataInput dataInput) throws IOException {  
  50.   
  51.         name.readFields(dataInput);  
  52.         role.readFields(dataInput);  
  53.     }  
  54.   
  55.     /** 
  56.      * 通过成员对象本身的write方法,序列化每一个成员对象到输出流中 
  57.      */  
  58.     public void write(DataOutput dataOutput) throws IOException {  
  59.         name.write(dataOutput);  
  60.         role.write(dataOutput);  
  61.     }  
  62.   
  63.     /** 
  64.      * 如果实现了WritableComparable接口必须实现compareTo方法,用于比较 
  65.      */  
  66.     public int compareTo(EmployeeWritable employeeWritable) {  
  67.           
  68.         int cmp = name.compareTo(employeeWritable.name);  
  69.         //如果不相等  
  70.         if (cmp != 0){  
  71.             return cmp;  
  72.         }  
  73.         //如果名字相等就比较角色  
  74.         return role.compareTo(employeeWritable.role);  
  75.     }  
  76.   
  77.     /** 
  78.      * MapReduce需要一个分割者(Partitioner)把Map的输出作为输入分成一块块的喂给多个reduce 
  79.      * 默认的是HashPatitioner,它是通过对象的hashCode函数进行分割。 
  80.      * 所以hashCode的好坏决定了分割是否均匀,它是一个很关键的方法 
  81.      */  
  82.     @Override  
  83.     public int hashCode() {  
  84.         final int prime = 31;  
  85.         int result = 1;  
  86.         result = prime * result + ((name == null) ? 0 : name.hashCode());  
  87.         result = prime * result + ((role == null) ? 0 : role.hashCode());  
  88.         return result;  
  89.     }  
  90.   
  91.     @Override  
  92.     public boolean equals(Object obj) {  
  93.         if (this == obj)  
  94.             return true;  
  95.         if (obj == null)  
  96.             return false;  
  97.         if (getClass() != obj.getClass())  
  98.             return false;  
  99.         EmployeeWritable other = (EmployeeWritable) obj;  
  100.         if (name == null) {  
  101.             if (other.name != null)  
  102.                 return false;  
  103.         } else if (!name.equals(other.name))  
  104.             return false;  
  105.         if (role == null) {  
  106.             if (other.role != null)  
  107.                 return false;  
  108.         } else if (!role.equals(other.role))  
  109.             return false;  
  110.         return true;  
  111.     }  
  112.   
  113.     /** 
  114.      * 自定义自己的输出类型 
  115.      */  
  116.     @Override  
  117.     public String toString() {  
  118.         return "EmployeeWritable [姓名=" + name + ", 角色=" + role + "]";  
  119.     }  
  120.       
  121.       
  122.   
  123. }  



 

 

 

自定义RawComparatorWritable

上面的EmployeeWritable已经可以跑的很溜了,但是还是有优化的空间,当作为MapReduce里的key,需要进行比较时,因为他已经被序列化,想要比较他们,那么首先要反序列化成一个对象,然后再调用compareTo对象进行比较,但是这样效率太低了,有没有可能可以直接比较序列化后的结构呢,答案是肯定的。

我们只需要把EmployeeWritable序列化后的结果拆分成成员对象,然后比较成员对象即可,看代码:

 

[java] view plain copy
 
  1. public static class Comparator extends WritableComparator{  
  2.   
  3.         private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();  
  4.           
  5.         protected Comparator() {  
  6.             super(EmployeeWritable.class);  
  7.         }  
  8.           
  9.         @Override  
  10.         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
  11.               
  12.             try {  
  13.                   
  14.                 int nameL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);  
  15.                 int nameL2 = WritableUtils.decodeVIntSize(b2[2]) + readVInt(b2, s2);  
  16.                   
  17.                 //和compareTo方法一样,先比较name  
  18.                 int cmp = TEXT_COMPARATOR.compare(b1, s1, nameL1, b2, s2, nameL2);  
  19.                   
  20.                 if (cmp != 0){  
  21.                     return cmp;  
  22.                 }  
  23.                 //再比较role  
  24.                 return TEXT_COMPARATOR.compare(b1, s1+nameL1, l1-nameL1, b2, s2+nameL2, l2-nameL2);  
  25.             } catch (Exception e) {  
  26.                 throw new IllegalArgumentException();  
  27.             }  
  28.         }  
  29.           
  30.         static {  
  31.             //注册raw comparator,更像是绑定,这样MapReduce使用EmployeeWritable时就会直接调用Comparator  
  32.             WritableComparator.define(EmployeeWritable.class, new Comparator());  
  33.               
  34.         }  
  35.           
  36.     }  

我们没有直接去实现RawComparator而是继承于WritableComparator,因为WritableComparator提供了很多便捷的方法,并且对compare有个默认的实现。写compare方法时一定要小心谨慎,因为都是在字节上操作,可以好好参考下源代码里的一些Writable中的Comparator的写法,另外多看下WritableUtils也是有必要的,它里面有很多简便的方法可以使用。

 

 

自定义Comparators

有时候,除了默认的Comparator,你可能还需要一些自定义的Comparator来生成不同的排序队列,看一下下面这个示例,只比较name,两个compare是同一个意思,都是比较name的大小:

 

[java] view plain copy
 
    1. public static class NameComparator extends WritableComparator{  
    2.         private static final Text.Comparator TEXT_COMPARATOR= new Text.Comparator();  
    3.   
    4.         protected NameComparator() {  
    5.             super(EmploeeWritable.class);  
    6.         }  
    7.   
    8.         @Override  
    9.         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
    10.             try {  
    11.                 int nameL1= WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1,s1);  
    12.                 int nameL2=WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2,s2);  
    13.                 return TEXT_COMPARATOR.compare(b1,s1,nameL1,b2,s2,nameL2);  
    14.             } catch (IOException e) {  
    15.                 throw new IllegalArgumentException();  
    16.             }  
    17.         }  
    18.   
    19.         @Override  
    20.         public int compare(WritableComparable a, WritableComparable b) {  
    21.             if(a instanceof EmploeeWritable && b instanceof  EmploeeWritable){  
    22.                 return ((EmploeeWritable)a).name.compareTo(((EmploeeWritable)b).name);  
    23.             }  
    24.             return super.compare(a,b);  
    25.         }  
    26.     }  

以上是关于自定义Writable的主要内容,如果未能解决你的问题,请参考以下文章

我们如何在 Hive 中为自定义 Writable 类型编写自定义 ObjectInspector?

大数据之Hadoop(MapReduce):自定义bean对象实现序列化接口(Writable)

Hadoop 学习自定义数据类型

在python中为Hadoop Map Reduce创建自定义可写键/值类型?

HDFS与Lucene

Hadoop---mapreduce排序和二次排序以及全排序