Hadoop序列化与Writable源码分析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop序列化与Writable源码分析相关的知识,希望对你有一定的参考价值。

序列化的概念
     1.序列化(Serialization)是指把结构化对象转化为字节流。
     2.反序列化(Deserialization)是序列化的逆过程,即把字节流转回结构化对象


Hadoop序列化的特点
     1.序列化格式特点
         ——紧凑:高效使用 存储空间
         ——快速:读写数据的额外开销小
         ——可扩展:可透明地读取老格式的数据
         ——互操作:支持多语言的交互
注:hadoop1.x的序列化仅满足了紧凑和快速的特点。


     2.
序列化在分布式环境的两大作用:进程间通信,永久存储。
     3.Hadoop节点间通信
技术分享

这里之所以要提到Writable,是因为Writable接口是hadoop中序列化对象的一个通用接口,在package org.apache.hadoop.io中定义了大量的可序列化对象。

 

[java] view plain copy
 
  1. public interface Writable {  
  2.     
  3.   void write(DataOutput out) throws IOException;  
  4.   
  5.    
  6.   void readFields(DataInput in) throws IOException;  
  7. }  


一个类要支持可序列化只需要实现这个接口即可。下面是Writable类的层次结构,如下图:

 

技术分享

下面我们一点一点来看,先是IntWritable和LongWritable

技术分享

从图中我们可以看到WritableComparable接口继承了Writable和Comparable接口,以支持比较功能。正如层次图中看到,IntWritable、LongWritable、ByteWritable等 基本类型都实现了这个接口,IntWritable和LongWritable的readFields()都直接从实现了DataInput接口的输入流中读取二进制数据并重构成int型和long型,而write()方法则直接将int类型数据和long类型数据直接转换成二进制流,IntWritable和LongWritable都含有相应的Comparator内部类(如上图中所示IntWritable聚合了Comparator;LongWritable聚合了Comparator和DecresingComparator),这是用来支持在不反序列化的情况下直接比较数据流中的数据的功能,这是一个优化,无需反序列化创建对象后再比较。

我们再通过一张草图来理解一下Comparator在这里面的位置:

技术分享

如上图所示,Comparator只是一个比较器,就类似于java集合中Collection和Collections(只是提供了一些处理集合数据的方法的工具类)的关系。WritableComparator并不是继承于WritableComparable,只是在WritableComparator中聚合了WritableComparable的对象而已。

 

下面我们来看看IntWritable的代码:

 

[java] view plain copy
 
  1. public class IntWritable implements WritableComparable {  
  2.   private int value;  
  3.   
  4.   public IntWritable() {}  
  5.   
  6.   public IntWritable(int value) { set(value); }  
  7.   
  8.   /** Set the value of this IntWritable. */  
  9.   public void set(int value) { this.value = value; }  
  10.   
  11.   /** Return the value of this IntWritable. */  
  12.   public int get() { return value; }  
  13.   
  14.   public void readFields(DataInput in) throws IOException {  
  15.     value = in.readInt();  
  16.   }  
  17.   
  18.   public void write(DataOutput out) throws IOException {  
  19.     out.writeInt(value);  
  20.   }  
  21.   
  22.   /** Returns true iff <code>o</code> is a IntWritable with the same value. */  
  23.   public boolean equals(Object o) {  
  24.     if (!(o instanceof IntWritable))  
  25.       return false;  
  26.     IntWritable other = (IntWritable)o;  
  27.     return this.value == other.value;  
  28.   }  
  29.   
  30.   public int hashCode() {  
  31.     return value;  
  32.   }  
  33.   
  34.   /** Compares two IntWritables. */  
  35.   public int compareTo(Object o) {  
  36.     int thisValue = this.value;  
  37.     int thatValue = ((IntWritable)o).value;  
  38.     return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));  
  39.   }  
  40.   
  41.   public String toString() {  
  42.     return Integer.toString(value);  
  43.   }  
  44.   
  45.   /** A Comparator optimized for IntWritable. */   
  46.   public static class Comparator extends WritableComparator {  
  47.     public Comparator() {  
  48.       super(IntWritable.class);  
  49.     }  
  50.   
  51.     public int compare(byte[] b1, int s1, int l1,  
  52.                        byte[] b2, int s2, int l2) {  
  53.       int thisValue = readInt(b1, s1);  
  54.       int thatValue = readInt(b2, s2);  
  55.       return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));  
  56.     }  
  57.   }  
  58.   
  59.   static {                                        // register this comparator  
  60.     WritableComparator.define(IntWritable.class, new Comparator());  
  61.   }  
  62. }  


代码最后的static块调用WritableComparator的静态方法define()来注册上面这个Comparator,就是将其加入WritableComparator的comparators成员中,comparators是HashMap类型且是static的,这样,就告诉WritableComparator,当我们使用WritableComparator.get(IntWritable.class);方法的时候,返回我注册的这个Comparator给我(对于IntWritable来说就是IntWritable.Comparator),然后我就可以使用comparator.compare(byte[] b1,int s1,int l1,byte[] b2,int s2,int l2)来比较b1和b2,而不需要将它反序列化成对象。Comparaotr.compare(byte[] b1,int s1,int l1,byte[] b2,int s2,int l2);中有个readInt()方法,它是从WritableComparator继承而来的,它将IntWritable的value从byte数组中通过移位转换出来。

 

注:当comparators中没有注册要比较的类的Comparator,则会返回一个默认的Comparator,然后使用这个默认的Comparator的compare(byte[] b1,int s1,int l1,byte[] b2, int s2,int l2)方法比较b1、b2的时候还要序列化成对象的,详情见后面细讲WritableComparator。

 

LongWritable的方法基本和IntWritable一样,区别就是LongWritable的值是long型,且多了一个额外的LongWritable.DecresingComparator,它继承于LongWritable.Comparator,只是它的的比较方法返回值与使用LongWritable.Comparator比较相反(取负),这个应该是为降序排序做准备的。

 

[java] view plain copy
 
  1. public class LongWritable implements WritableComparable {  
  2.   private long value;  
  3.   
  4.   public LongWritable() {}  
  5.   
  6.   public LongWritable(long value) { set(value); }  
  7.   
  8.   /** Set the value of this LongWritable. */  
  9.   public void set(long value) { this.value = value; }  
  10.   
  11.   /** Return the value of this LongWritable. */  
  12.   public long get() { return value; }  
  13.   
  14.   public void readFields(DataInput in) throws IOException {  
  15.     value = in.readLong();  
  16.   }  
  17.   
  18.   public void write(DataOutput out) throws IOException {  
  19.     out.writeLong(value);  
  20.   }  
  21.   
  22.   /** Returns true iff <code>o</code> is a LongWritable with the same value. */  
  23.   public boolean equals(Object o) {  
  24.     if (!(o instanceof LongWritable))  
  25.       return false;  
  26.     LongWritable other = (LongWritable)o;  
  27.     return this.value == other.value;  
  28.   }  
  29.   
  30.   public int hashCode() {  
  31.     return (int)value;  
  32.   }  
  33.   
  34.   /** Compares two LongWritables. */  
  35.   public int compareTo(Object o) {  
  36.     long thisValue = this.value;  
  37.     long thatValue = ((LongWritable)o).value;  
  38.     return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));  
  39.   }  
  40.   
  41.   public String toString() {  
  42.     return Long.toString(value);  
  43.   }  
  44.   
  45.   /** A Comparator optimized for LongWritable. */   
  46.   public static class Comparator extends WritableComparator {  
  47.     public Comparator() {  
  48.       super(LongWritable.class);  
  49.     }  
  50.   
  51.     public int compare(byte[] b1, int s1, int l1,  
  52.                        byte[] b2, int s2, int l2) {  
  53.       long thisValue = readLong(b1, s1);  
  54.       long thatValue = readLong(b2, s2);  
  55.       return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));  
  56.     }  
  57.   }  
  58.   
  59.   /** A decreasing Comparator optimized for LongWritable. */   
  60.   public static class DecreasingComparator extends Comparator {  
  61.     public int compare(WritableComparable a, WritableComparable b) {  
  62.       return -super.compare(a, b);  
  63.     }  
  64.     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
  65.       return -super.compare(b1, s1, l1, b2, s2, l2);  
  66.     }  
  67.   }  
  68.   
  69.   static {                                       // register default comparator  
  70.     WritableComparator.define(LongWritable.class, new Comparator());  
  71.   }  
  72.   
  73. }  

另外,ByteWritable、BooleanWritable、FloatWritable、DoubleWritable基本都一样。

 

然后我们看VIntWritable和VLongWritable,这两个基本类基本一样而且VIntWritable的value编码的时候也是使用VLongWritable的value编解码时的方法,主要区别是VIntWritable对象使用int型的value成员,而VLongWritable使用long型的value成员,这是由它们的取值范围决定的。它们都没有ComParator,不像上面的类。

我们只看VLongWritable即可,源码如下:

 

[java] view plain copy
 
  1. public class VLongWritable implements WritableComparable {  
  2.   private long value;  
  3.   
  4.   public VLongWritable() {}  
  5.   
  6.   public VLongWritable(long value) { set(value); }  
  7.   
  8.   /** Set the value of this LongWritable. */  
  9.   public void set(long value) { this.value = value; }  
  10.   
  11.   /** Return the value of this LongWritable. */  
  12.   public long get() { return value; }  
  13.   
  14.   public void readFields(DataInput in) throws IOException {  
  15.     value = WritableUtils.readVLong(in);  
  16.   }  
  17.   
  18.   public void write(DataOutput out) throws IOException {  
  19.     WritableUtils.writeVLong(out, value);  
  20.   }  
  21.   
  22.   /** Returns true iff <code>o</code> is a VLongWritable with the same value. */  
  23.   public boolean equals(Object o) {  
  24.     if (!(o instanceof VLongWritable))  
  25.       return false;  
  26.     VLongWritable other = (VLongWritable)o;  
  27.     return this.value == other.value;  
  28.   }  
  29.   
  30.   public int hashCode() {  
  31.     return (int)value;  
  32.   }  
  33.   
  34.   /** Compares two VLongWritables. */  
  35.   public int compareTo(Object o) {  
  36.     long thisValue = this.value;  
  37.     long thatValue = ((VLongWritable)o).value;  
  38.     return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));  
  39.   }  
  40.   
  41.   public String toString() {  
  42.     return Long.toString(value);  
  43.   }  
  44.   
  45. }  


从源码中我们可以发现,它编码的时候使用的是WritableUtils.writeVLong()方法,WritableUtils是关于编解码用的,暂时只看关于VIntWritable和VLongWritable的。

 

VIntWritable中value的编码实际也是使用writeVLong(),代码如下:

 

[java] view plain copy
 
  1. public static void writeVInt(DataOutput stream, int i) throws IOException {  
  2.    writeVLong(stream, i);  
  3.  }  
  4.    


首先,序列化大小对比如下图:

技术分享

 


VIntWritable的长度是1-5,VLongWritable的长度是1-9,如果数值在【-112,127】时,使用1byte表示,即编码后的1byte存储的就是这个数值。如果不在这个范围内,则需要更多的byte,而第一个byte将被用作存储长度,其他byte存储数值。

writeVlong()的操作过程如下图:

技术分享

WritableUtils.writeVLong()的源码如下:

 

[java] view plain copy
 
  1. public static void writeVLong(DataOutput stream, long i) throws IOException {  
  2.     if (i >= -112 && i <= 127) {  
  3.       stream.writeByte((byte)i);  
  4.       return;//-112到127的数值只用一个byte  
  5.     }  
  6.         
  7.     int len = -112;  
  8.     if (i < 0) {  
  9.       i ^= -1L; // take one‘s complement‘~1=(11111111)2得到这个i_2,i_2+1=[i],可想一下负数的反码如何能得到正数(连符号一起取反+1)  
  10.       len = -120;  
  11.     }  
  12.         
  13.     long tmp = i;//到这里,i一定是正数,这个数介于【0,2^64-1】  
  14.     while (tmp != 0) {//然后用循环计算一下长度,i越大,实际长度就越大,偏离长度起始值[原来len]越大,len值越小。  
  15.       tmp = tmp >> 8;  
  16.       len--;  
  17.     }  
  18.       //现在,我们显然计算出了一个能表示其长度的值len,只要看其偏离长度起始值多少即可。  
  19.     stream.writeByte((byte)len);  
  20.         
  21.     len = (len < -120) ? -(len + 120) : -(len + 112);//计算出了长度,不包含第一个byte【表示长度的byte】  
  22.         
  23.     for (int idx = len; idx != 0; idx--) {//这里将i的二进制码从左到右8位8位的拿出来,然后写入到流中。  
  24.       int shiftbits = (idx - 1) * 8;  
  25.       long mask = 0xFFL << shiftbits;  
  26.       stream.writeByte((byte)((i & mask) >> shiftbits));  
  27.     }  
  28.   }  

现在知道它是怎么写出去的了,再看看 它是怎么读进来的,这显然是个反过程,WritableUtils.readVLong()源码如下:

 

 

[java] view plain copy
 
  1. public static long readVLong(DataInput stream) throws IOException {  
  2.     byte firstByte = stream.readByte();  
  3.     int len = decodeVIntSize(firstByte);  
  4.     if (len == 1) {  
  5.       return firstByte;  
  6.     }  
  7.     long i = 0;  
  8.     for (int idx = 0; idx < len-1; idx++) {  
  9.       byte b = stream.readByte();  
  10.       i = i << 8;  
  11.       i = i | (b & 0xFF);  
  12.     }  
  13.     return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);  
  14.   }  

这显然就是读出字节表示长度,然后从输入流中一个byte一个byte读出来,&0xFF是为了不让系统自动类型转换,然后在^-1L即连符号一起取反。
WritableUtils.decodeVIntSize()就是获取编码的长度,源码如下:

 

 

[java] view plain copy
 
  1. public static int decodeVIntSize(byte value) {  
  2.     if (value >= -112) {  
  3.       return 1;  
  4.     } else if (value < -120) {  
  5.       return -119 - value;  
  6.     }  
  7.     return -111 - value;  
  8.   }  

显然,就是按照上面图中的反过程,使用了-119和-111只是为了获取编码长度而不是实际数值长度(不包含表示长度的第一个byte)而已。

 

 

下面我们继续前面提到的WritableComparator,它实现了RawComparator接口,RawComparator的源码很简单,如下:

 

[java] view plain copy
 
  1. public interface RawComparator<T> extends Comparator<T> {  
  2.   
  3.   public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);  
  4.   
  5. }  


WritableComparator是RawComparator实例的工厂(注册了Writable的实现类),它为这些Writable实现类提供了反序列化用的方法,这些方法都比较简单,比较难的是readVInt()和readVLong(),也就是上面说到的两个内容,WritableComparator还提供了compare()的默认实现,它会反序列化后才进行比较。如果WritableComparator.get()没有得到注册的Comparator,则会创建一个新的Comparator(其实就是一个WritableComparator的实例),然后当你使用public int compare(byte[] b1,int s1,int l1,byte[] b2,ints2,int l2)进行比较的时候,它会去使用你要比较Writable的实现的readFields()方法读出value来。

 

比如:VIntWritable没有注册,我们get()时他就构造一个WritableComparator的实例,然后设置key1,key2,buffer,keyClass,当你使用compare(byte[] b1,int s1,int l1,byte[]2,int s2,int l2)时,则使用VIntWritable.readField()从编码后的byte[]中读取value值再进行比较。

然后是ArrayWritable和TwoDArrayWritable,这两个Writable实现分别是对一维数组和二维数组的封装,不难想象他们都应该提供一个Writable数组和保持关于这个数组的类型,而且序列化和反序列化也将使用封装的Writable实现的readFields()方法和write()方法。

 

[java] view plain copy
 
  1. public class TwoDArrayWritable implements Writable {  
  2.   private Class valueClass;  
  3.   private Writable[][] values;  
  4.   
  5.   public TwoDArrayWritable(Class valueClass) {  
  6.     this.valueClass = valueClass;  
  7.   }  
  8.   
  9.   public TwoDArrayWritable(Class valueClass, Writable[][] values) {  
  10.     this(valueClass);  
  11.     this.values = values;  
  12.   }  
  13.   
  14.   public Object toArray() {  
  15.     int dimensions[] = {values.length, 0};  
  16.     Object result = Array.newInstance(valueClass, dimensions);  
  17.     for (int i = 0; i < values.length; i++) {  
  18.       Object resultRow = Array.newInstance(valueClass, values[i].length);  
  19.       Array.set(result, i, resultRow);  
  20.       for (int j = 0; j < values[i].length; j++) {  
  21.         Array.set(resultRow, j, values[i][j]);  
  22.       }  
  23.     }  
  24.     return result;  
  25.   }  
  26.   
  27.   public void set(Writable[][] values) { this.values = values; }  
  28.   
  29.   public Writable[][] get() { return values; }  
  30.   
  31.   public void readFields(DataInput in) throws IOException {  
  32.     // construct matrix  
  33.     values = new Writable[in.readInt()][];            
  34.     for (int i = 0; i < values.length; i++) {  
  35.       values[i] = new Writable[in.readInt()];  
  36.     }  
  37.   
  38.     // construct values  
  39.     for (int i = 0; i < values.length; i++) {  
  40.       for (int j = 0; j < values[i].length; j++) {  
  41.         Writable value;                             // construct value  
  42.         try {  
  43.           value = (Writable)valueClass.newInstance();  
  44.         } catch (InstantiationException e) {  
  45.           throw new RuntimeException(e.toString());  
  46.         } catch (IllegalAccessException e) {  
  47.           throw new RuntimeException(e.toString());  
  48.         }  
  49.         value.readFields(in);                       // read a value  
  50.         values[i][j] = value;                       // store it in values  
  51.       }  
  52.     }  
  53.   }  
  54.   
  55.   public void write(DataOutput out) throws IOException {  
  56.     out.writeInt(values.length);                 // write values  
  57.     for (int i = 0; i < values.length; i++) {  
  58.       out.writeInt(values[i].length);  
  59.     }  
  60.     for (int i = 0; i < values.length; i++) {  
  61.       for (int j = 0; j < values[i].length; j++) {  
  62.         values[i][j].write(out);  
  63.       }  
  64.     }  
  65.   }  
  66. }  

也就是那样,没什么好讲的了。

 

另外还有一些如TupleWritable,AbstractMapWritable(MapWritable,SortMapWritable),DBWritable,CompressedWritable,VersionedWritable,GenericWritable之类等,有必要时再去谈他们,其实也差不多,功能不一样而已。

以上是关于Hadoop序列化与Writable源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop序列化与Writable接口

深入对比Java与Hadoop大数据序列化机制Avro

大数据-Hadoop生态(12)-Hadoop序列化和源码追踪

Hadoop-序列化接口Writable和SequenceFile

MapReduce程序之序列化原理与Writable案例

序列化与Writable实现