自定义Writable
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义Writable相关的知识,希望对你有一定的参考价值。
自定义Writable
Hadoop虽然 已经实现了一些非常有用的Writable,而且你可以使用他们的组合做很多事情,但是如果你想构造一些更加复杂的结果,你可以自定义Writable来达到你的目的,我们以注释的方式对自定义Writable进行讲解
- /**
- * 自定义Writable通常都要实现Writable接口
- * 如果有比较大小的业务,最好是实现WritableComparable接口
- * @author 廖*民
- * time : 2015年1月13日下午1:39:12
- * @version
- */
- public class EmployeeWritable implements WritableComparable<EmployeeWritable>{
- //姓名
- private Text name;
- //角色
- private Text role;
- //必须提供无参构造方法(一定要创建name和role对象否则会报空指针异常)
- public EmployeeWritable() {
- name = new Text();
- role = new Text();
- }
- //构造函数
- public EmployeeWritable(Text name, Text role) {
- this.name = name;
- this.role = role;
- }
- public Text getName() {
- return name;
- }
- public void setName(Text name) {
- this.name = name;
- }
- public Text getRole() {
- return role;
- }
- public void setRole(Text role) {
- this.role = role;
- }
- /**
- * 调用成员对象本身的readFields()方法,从输入流中反序列化每一个成员对象
- */
- public void readFields(DataInput dataInput) throws IOException {
- name.readFields(dataInput);
- role.readFields(dataInput);
- }
- /**
- * 通过成员对象本身的write方法,序列化每一个成员对象到输出流中
- */
- public void write(DataOutput dataOutput) throws IOException {
- name.write(dataOutput);
- role.write(dataOutput);
- }
- /**
- * 如果实现了WritableComparable接口必须实现compareTo方法,用于比较
- */
- public int compareTo(EmployeeWritable employeeWritable) {
- int cmp = name.compareTo(employeeWritable.name);
- //如果不相等
- if (cmp != 0){
- return cmp;
- }
- //如果名字相等就比较角色
- return role.compareTo(employeeWritable.role);
- }
- /**
- * MapReduce需要一个分割者(Partitioner)把Map的输出作为输入分成一块块的喂给多个reduce
- * 默认的是HashPatitioner,它是通过对象的hashCode函数进行分割。
- * 所以hashCode的好坏决定了分割是否均匀,它是一个很关键的方法
- */
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((name == null) ? 0 : name.hashCode());
- result = prime * result + ((role == null) ? 0 : role.hashCode());
- return result;
- }
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- EmployeeWritable other = (EmployeeWritable) obj;
- if (name == null) {
- if (other.name != null)
- return false;
- } else if (!name.equals(other.name))
- return false;
- if (role == null) {
- if (other.role != null)
- return false;
- } else if (!role.equals(other.role))
- return false;
- return true;
- }
- /**
- * 自定义自己的输出类型
- */
- @Override
- public String toString() {
- return "EmployeeWritable [姓名=" + name + ", 角色=" + role + "]";
- }
- }
自定义RawComparatorWritable
上面的EmployeeWritable已经可以跑的很溜了,但是还是有优化的空间,当作为MapReduce里的key,需要进行比较时,因为他已经被序列化,想要比较他们,那么首先要反序列化成一个对象,然后再调用compareTo对象进行比较,但是这样效率太低了,有没有可能可以直接比较序列化后的结构呢,答案是肯定的。
我们只需要把EmployeeWritable序列化后的结果拆分成成员对象,然后比较成员对象即可,看代码:
- public static class Comparator extends WritableComparator{
- private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
- protected Comparator() {
- super(EmployeeWritable.class);
- }
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- try {
- int nameL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
- int nameL2 = WritableUtils.decodeVIntSize(b2[2]) + readVInt(b2, s2);
- //和compareTo方法一样,先比较name
- int cmp = TEXT_COMPARATOR.compare(b1, s1, nameL1, b2, s2, nameL2);
- if (cmp != 0){
- return cmp;
- }
- //再比较role
- return TEXT_COMPARATOR.compare(b1, s1+nameL1, l1-nameL1, b2, s2+nameL2, l2-nameL2);
- } catch (Exception e) {
- throw new IllegalArgumentException();
- }
- }
- static {
- //注册raw comparator,更像是绑定,这样MapReduce使用EmployeeWritable时就会直接调用Comparator
- WritableComparator.define(EmployeeWritable.class, new Comparator());
- }
- }
我们没有直接去实现RawComparator而是继承于WritableComparator,因为WritableComparator提供了很多便捷的方法,并且对compare有个默认的实现。写compare方法时一定要小心谨慎,因为都是在字节上操作,可以好好参考下源代码里的一些Writable中的Comparator的写法,另外多看下WritableUtils也是有必要的,它里面有很多简便的方法可以使用。
自定义Comparators
有时候,除了默认的Comparator,你可能还需要一些自定义的Comparator来生成不同的排序队列,看一下下面这个示例,只比较name,两个compare是同一个意思,都是比较name的大小:
- public static class NameComparator extends WritableComparator{
- private static final Text.Comparator TEXT_COMPARATOR= new Text.Comparator();
- protected NameComparator() {
- super(EmploeeWritable.class);
- }
- @Override
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- try {
- int nameL1= WritableUtils.decodeVIntSize(b1[s1])+readVInt(b1,s1);
- int nameL2=WritableUtils.decodeVIntSize(b2[s2])+readVInt(b2,s2);
- return TEXT_COMPARATOR.compare(b1,s1,nameL1,b2,s2,nameL2);
- } catch (IOException e) {
- throw new IllegalArgumentException();
- }
- }
- @Override
- public int compare(WritableComparable a, WritableComparable b) {
- if(a instanceof EmploeeWritable && b instanceof EmploeeWritable){
- return ((EmploeeWritable)a).name.compareTo(((EmploeeWritable)b).name);
- }
- return super.compare(a,b);
- }
- }
以上是关于自定义Writable的主要内容,如果未能解决你的问题,请参考以下文章
我们如何在 Hive 中为自定义 Writable 类型编写自定义 ObjectInspector?
大数据之Hadoop(MapReduce):自定义bean对象实现序列化接口(Writable)