Hadoop-序列化接口Writable和SequenceFile

Posted 健哥说编程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop-序列化接口Writable和SequenceFile相关的知识,希望对你有一定的参考价值。

序列化Writable接口

Writable接口的两个主要方法:

 

一个是将状态写入DataOutput。一个是将状态从DataInput中读取出来。

 

以下是Writable类的继承关系:

 



int类型

Java基本类型与hadoopwritable子类的对应关系:

1char类型封装到IntWritable类型中。

2int基本类型对应两种:

IntWritable 4个字节。

VIntWritable 1-5个字节。

3long类型可以封装到两个类型中:LongWritable8个字节或是VLongWritable1-9个字节。

   变长字节VIntWritableVLongWritable比定长字节IntwritableLongWritable更能节省空间。

 

Text类型

1Text最多可以保存2G的数据,因为它是使用整数来保存字节的。

2:与String不同,Text是可变的,可以通过调用set重用。如:Text txt = new Text();txt.set(Jack);

 

NullWritable

NullWritable是一个单例的对象。通过调用NullWritable.get()来获取它的实例。

WritableUtils

在某些操作中,可以使用WritableUtils类进行某些操作。此类是一个工具类。

示例:

//通过WritableUtils写数据

ByteArrayOutputStream array = new ByteArrayOutputStream();

DataOutputStream out = new DataOutputStream(array);

WritableUtils.writeVInt(out, 34);

//通过writableUtils读取数据

DataInputStream in =

 new DataInputStream(new ByteArrayInputStream(array.toByteArray()));

int x =  WritableUtils.readVInt(in);

System.err.println(x);//34

 

Hadoop中有很多的工具类,如:ReflectionUtils类。

上面这两个工具类,都在hadoop-common.jar中的org.apache.hadoop.io包下。

 

8SequenceFile的读写操作

首先必须要清楚一点:

Hadoop中的SequenceFileJava中的SequenceInputStream不是同一个概念。Java中的SequenceInputStream是指多个InputStream的序列。而hadoop中的SequenceFile是以Key-value对保存形式数据的文件对象。

以下是SequenceFileJavaDoc

SequenceFiles are flat files consisting of binary key/value pairs.

SequenceFile provides SequenceFile.Writer, SequenceFile.Reader and Sorter classes for writing, reading and sorting respectively.

There are three SequenceFile Writers based on the CompressionType used to compress key/value pairs:

1) Writer : Uncompressed records.

2) RecordCompressWriter : Record-compressed files, only compress values.

3) BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.

通过上面的JavaDoc可知:

1SequenceFile以扁平的形式保存二进制的key/value键值对。

2SequenceFile提供了Writer\Reader\Sorter类,以便于读写排序。

 

SequenceFile.Writer类的方法:

append(Writable key,writable value) ; 向序列文件中追加数据

sync() 向序列文件中添加一个同步点

hflush()client的数据写出的文件中

SequenceFile.Reader类的方法:

next(Writable key,Writable value) : boolean 用于读取序列文件中的key/value值,如果已经读取到文件最后,则返回false

seek(int) 设置读取行首的边界。

syncSeen():bololean判断是否是同步点。

 

开发SequenceFile的救命,只需要添加一个依赖:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>2.7.6</version>

<scope>provided</scope>

</dependency>

 

1、SequenceFile的写操作示例

使用SequenceFile.Writer w = SequenceFile.create创建Writer对象。

 

package cn.wangjian;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Text;

/**

 * 写SequenceFile的示例

 * @author wangjian

 */

public class Demo01_SequenceFileWriter {

public static void main(String[] args) throws Exception {

Configuration config = new Configuration();

FileSystem fs = FileSystem.get(config);

//创建SequenceFile的key和Value

IntWritable key = new IntWritable();

Text text = new Text();

//声明写入数据的对象

SequenceFile.Writer w = null;

try {

//创建写数据的对象

w = SequenceFile.createWriter(fs, config, new Path("/test/s1.txt"), key.getClass(), text.getClass());

for (int i = 0; i < 10; i++) {

key.set(i * 2);

text.set("this is line " + i);

//向文件中追加数据保存key/value对

w.append(key, text);

}

} finally {

if (w != null) {

IOUtils.closeStream(w);//关闭输入流

}

}

}

}

 

将上面的程序打包,然后发布到linux平台,执行:

$ hadoop jar seq.jar cn.wangjian.Demo01_SequenceFileWriter

查看hdfs上的文件:

[wangjian@hadoop41 test]$ hdfs dfs -ls /test

Found 2 items

-rw-r--r--   1 wangjian supergroup        458 2018-05-04 11:14 /test/s1.txt

对于上面的s1.txt文件,由于这是一个序列文件,所以,通过-cat查看此文件时,显示乱码。Hadoop-text命令,用于显示SequenceFile中的数据,如下:

[wangjian@hadoop41 test]$ hdfs dfs -text /test/s1.txt

18/05/04 11:37:32 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library

18/05/04 11:37:32 INFO compress.CodecPool: Got brand-new decompressor [.deflate]

0this is line 0

2this is line 1

4this is line 2

6this is line 3

8this is line 4

10this is line 5

12this is line 6

14this is line 7

16this is line 8

18this is line 9

 

Configuration config = new Configuration();

config.set("fs.defaultFS", "hdfs://hadoop41:8020");

其他的保持不变。

运行完成以后,就可以通过-text显示文件中的内容。

【注意】:

由于使用了hdfs的协议 ,所以,在本地,必须要有hdfs的依赖,才可以在本地运行,即:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-hdfs</artifactId>

<version>2.7.6</version>

<scope>provided</scope>

</dependency>

 

2、SequenceFile的读操作

使用SequenceFile.Reader用于读取序列文件中的数据。它的next(key,value)一次读取一个key和一个value,如果读取到了文件的最后,则返回false,以下是读取数据的完整代码:

package cn.wangjian;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.Text;

/**

 * @author wangjian

 */

public class Demo02_SequenceFileReader {

public static void main(String[] args) throws Exception {

Configuration config = new Configuration();

FileSystem fs =FileSystem.get(config);

//声明Reader

SequenceFile.Reader reader  =null;

try {

//声明Reader对象,传递需要读取的文件

reader = new SequenceFile.Reader(fs, new Path(args[0]),config);

IntWritable key = new IntWritable();

Text value = new Text();

//获取偏移量

long position = reader.getPosition();

//读取数据,如果读取到文件最后,则返回false

while(reader.next(key, value)) {

System.out.println(position+" "+key+" "+value);

position = reader.getPosition();//再获取下一次的偏移量

}

}finally {

IOUtils.closeStream(reader);

}

}

}

执行以下命令,获得以下结果:

[wangjian@hadoop41 test]$ hadoop jar seq.jar cn.wangjian.Demo02_SequenceFileReader /test/s1.txt

128 0 this is line 0

161 2 this is line 1

194 4 this is line 2

227 6 this is line 3

260 8 this is line 4

293 10 this is line 5

 

Configuration config = new Configuration();

config.set("fs.defaultFS", "hdfs://hadoop41:8020");

然后,添加运行时参数:

 

运行的结果如下:

128 0 写入数据行号:0

170 2 写入数据行号:1

212 4 写入数据行号:2

254 6 写入数据行号:3

 

 

3、通过Reader判断keyvalue的类型

由于在某些情况下,我们可能不知道具体SequenceFilekey/value类型,所以,可以通过SequenceFile.Reader.getkeyClass()来获取key/value的具体类型。注意,在根据获取到的字节码以后,通过ReflectionUtils反射实例化,并强制类型转换成Writable类型:

//获取key和value类型,注意强制类型转换

Writable key =

(Writable) ReflectionUtils.newInstance(reader.getKeyClass(), config);

Writable value =

 (Writable) ReflectionUtils.newInstance(reader.getValueClass(), config);

 

4、新的APIWriter.Option

<权威指南>中,关于序列文件的写,有以下代码:

w = SequenceFile.createWriter(fs, config, new Path("/test/s1.txt"), key.getClass(), text.getClass());

通过API可知,createWriter方法,已经被废弃。建议使用:

 public static Writer createWriter(Configuration conf, Writer.Option... opts

                                    ) throws IOException {}

注意第二个参数为:Writer.Option的可变长参数:

现在我们修改<权威指南>上的代码,即使用Writer.Option,请注意红色字体部分:

package cn.wangjian;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.SequenceFile;

import org.apache.hadoop.io.SequenceFile.Writer;

import org.apache.hadoop.io.SequenceFile.Writer.Option;

import org.apache.hadoop.io.Text;

/**

 * 写SequenceFile的示例

 * @author wangjian

 */

public class Demo01_SequenceFileWriter {

// 声明一些字符串数组,用于保存到SequenceFile中

private static String[] strs = { "Jack and Mary is friends", "Mary And Rose is Friends",

"So Jack and Rose is Friends", "What? Are your sure?", "Is that sure you are" };

public static void main(String[] args) throws Exception {

Configuration config = new Configuration();

config.set("fs.defaultFS", "hdfs://hadoop41:8020");

//如果使用Option,则fs对象,就可以不用了

//FileSystem fs = FileSystem.get(config);

// 创建SequenceFile的key和Value

IntWritable key = new IntWritable();

Text text = new Text();

// 声明写入数据的对象

SequenceFile.Writer w = null;

try {

// 创建写数据的对象

Option file = Writer.file(new Path(args[0]));

Option keyOption = Writer.keyClass(key.getClass());

Option valueOption = Writer.valueClass(text.getClass());

w = SequenceFile.createWriter(config, file,keyOption,valueOption);

for (int i = 0; i < 10; i++) {

key.set(strs[i].length());// 以字符串的长度为key

text.set(strs[i]);// 以数据为value

w.append(key, text);

}

} finally {

if (w != null) {

IOUtils.closeStream(w);// 关闭输入流

}

}

}

}

测试:

生成SequenceFile文件:

[wangjian@hadoop41 test]$ hadoop jar seq.jar cn.wangjian.Demo01_SequenceFileWriter /test/s3.seq

查看生成的SequenceFile文件:

[wangjian@hadoop41 test]$ hdfs dfs -ls /test

-rw-r--r--   1 wangjian supergroup        348 2018-05-04 13:38 /test/s3.seq

使用-text命令,查看SequenceFile里面的内容:

[wangjian@hadoop41 test]$ hdfs dfs -text /test/s3.seq

24Jack and Mary is friends

24Mary And Rose is Friends

27So Jack and Rose is Friends

20What? Are your sure?

20Is that sure you are

我们可以使用系统自带的hadoop-mapreduce-example-*.jar,中的sortSequenceFile进行排序。排序的命令如下:

hadoop jar \

/app/hadoop-2.7.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.5.jar \

sort -r 1 \

-inFormat org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat \

-outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \

-outKey org.apache.hadoop.io.IntWritable \

-outValue org.apache.hadoop.io.Text \

/test/s3.seq /out004

请注意,sort只能对SequenceFile进行排序。关于sort功能,参数的具体含义为:

 -rreduce的个数。

 -inFormat为输入的文件类型,默认就是SequenceFileInputFormat

 -outFormat为输出的文件类型,依然是SequenceFileOutputFormat即为序列化文件

 -outKey,-outValue为输出的key/value的具体类型。

  最后的两个参数为输入文件和输出目录。

在上面的文件,执行完成以后,使用-text查看输出的数据,可见,已经根据key值进行了排序:

[wangjian@hadoop41 test]$ hdfs dfs -text /out004/*

20Is that sure you are

20What? Are your sure?

24Mary And Rose is Friends

24Jack and Mary is friends

27So Jack and Rose is Friends

 

 

 

 

9、使用-text查看序列文件的内容

序列文件,即SequenceFile是以key/value形式保存数据的文件:

[wangjian@hadoop41 test]$ hdfs dfs -text /test/s2.txt

0写入数据行号:0

2写入数据行号:1

4写入数据行号:2

6写入数据行号:3

8写入数据行号:4

 


以上是关于Hadoop-序列化接口Writable和SequenceFile的主要内容,如果未能解决你的问题,请参考以下文章

大数据之Hadoop(MapReduce):自定义bean对象实现序列化接口(Writable)

Hadoop-2.4.1学习之Writable及其实现

Hadoop详解——MapReduce原理和执行过程,远程Debug,Writable序列化接口,MapReduce程序编写

Hadoop 学习自定义数据类型

Hadoop---mapreduce排序和二次排序以及全排序

大数据-Hadoop生态(12)-Hadoop序列化和源码追踪