Hadoop-序列化接口Writable和SequenceFile
Posted 健哥说编程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop-序列化接口Writable和SequenceFile相关的知识,希望对你有一定的参考价值。
序列化Writable接口
Writable接口的两个主要方法:
一个是将状态写入DataOutput。一个是将状态从DataInput中读取出来。
以下是Writable类的继承关系:
int类型
Java基本类型与hadoop的writable子类的对应关系:
1:char类型封装到IntWritable类型中。
2:int基本类型对应两种:
IntWritable 为4个字节。
VIntWritable 1-5个字节。
3:long类型可以封装到两个类型中:LongWritable, 8个字节或是VLongWritable,1-9个字节。
变长字节VIntWritable和VLongWritable比定长字节Intwritable和LongWritable更能节省空间。
Text类型
1:Text最多可以保存2G的数据,因为它是使用整数来保存字节的。
2:与String不同,Text是可变的,可以通过调用set重用。如:Text txt = new Text();txt.set(“Jack”);
NullWritable
NullWritable是一个单例的对象。通过调用NullWritable.get()来获取它的实例。
WritableUtils
在某些操作中,可以使用WritableUtils类进行某些操作。此类是一个工具类。
示例:
//通过WritableUtils写数据
ByteArrayOutputStream array = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(array);
WritableUtils.writeVInt(out, 34);
//通过writableUtils读取数据
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(array.toByteArray()));
int x = WritableUtils.readVInt(in);
System.err.println(x);//34
在Hadoop中有很多的工具类,如:ReflectionUtils类。
上面这两个工具类,都在hadoop-common.jar中的org.apache.hadoop.io包下。
8、SequenceFile的读写操作
首先必须要清楚一点:
Hadoop中的SequenceFile与Java中的SequenceInputStream不是同一个概念。Java中的SequenceInputStream是指多个InputStream的序列。而hadoop中的SequenceFile是以Key-value对保存形式数据的文件对象。
以下是SequenceFile的JavaDoc:
SequenceFiles are flat files consisting of binary key/value pairs.
SequenceFile provides SequenceFile.Writer, SequenceFile.Reader and Sorter classes for writing, reading and sorting respectively.
There are three SequenceFile Writers based on the CompressionType used to compress key/value pairs:
1) Writer : Uncompressed records.
2) RecordCompressWriter : Record-compressed files, only compress values.
3) BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.
通过上面的JavaDoc可知:
1:SequenceFile以扁平的形式保存二进制的key/value键值对。
2:SequenceFile提供了Writer\Reader\Sorter类,以便于读写排序。
SequenceFile.Writer类的方法:
append(Writable key,writable value) ; 向序列文件中追加数据
sync() 向序列文件中添加一个同步点
hflush()将client的数据写出的文件中
SequenceFile.Reader类的方法:
next(Writable key,Writable value) : boolean 用于读取序列文件中的key/value值,如果已经读取到文件最后,则返回false
seek(int) 设置读取行首的边界。
syncSeen():bololean判断是否是同步点。
开发SequenceFile的救命,只需要添加一个依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.6</version>
<scope>provided</scope>
</dependency>
1、SequenceFile的写操作示例
使用SequenceFile.Writer w = SequenceFile.create创建Writer对象。
package cn.wangjian;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
/**
* 写SequenceFile的示例
* @author wangjian
*/
public class Demo01_SequenceFileWriter {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
//创建SequenceFile的key和Value
IntWritable key = new IntWritable();
Text text = new Text();
//声明写入数据的对象
SequenceFile.Writer w = null;
try {
//创建写数据的对象
w = SequenceFile.createWriter(fs, config, new Path("/test/s1.txt"), key.getClass(), text.getClass());
for (int i = 0; i < 10; i++) {
key.set(i * 2);
text.set("this is line " + i);
//向文件中追加数据保存key/value对
w.append(key, text);
}
} finally {
if (w != null) {
IOUtils.closeStream(w);//关闭输入流
}
}
}
}
将上面的程序打包,然后发布到linux平台,执行:
$ hadoop jar seq.jar cn.wangjian.Demo01_SequenceFileWriter
查看hdfs上的文件:
[wangjian@hadoop41 test]$ hdfs dfs -ls /test
Found 2 items
-rw-r--r-- 1 wangjian supergroup 458 2018-05-04 11:14 /test/s1.txt
对于上面的s1.txt文件,由于这是一个序列文件,所以,通过-cat查看此文件时,显示乱码。Hadoop中-text命令,用于显示SequenceFile中的数据,如下:
[wangjian@hadoop41 test]$ hdfs dfs -text /test/s1.txt
18/05/04 11:37:32 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
18/05/04 11:37:32 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
0this is line 0
2this is line 1
4this is line 2
6this is line 3
8this is line 4
10this is line 5
12this is line 6
14this is line 7
16this is line 8
18this is line 9
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://hadoop41:8020");
其他的保持不变。
运行完成以后,就可以通过-text显示文件中的内容。
【注意】:
由于使用了hdfs的协议 ,所以,在本地,必须要有hdfs的依赖,才可以在本地运行,即:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.6</version>
<scope>provided</scope>
</dependency>
2、SequenceFile的读操作
使用SequenceFile.Reader用于读取序列文件中的数据。它的next(key,value)一次读取一个key和一个value,如果读取到了文件的最后,则返回false,以下是读取数据的完整代码:
package cn.wangjian;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
/**
* @author wangjian
*/
public class Demo02_SequenceFileReader {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
FileSystem fs =FileSystem.get(config);
//声明Reader
SequenceFile.Reader reader =null;
try {
//声明Reader对象,传递需要读取的文件
reader = new SequenceFile.Reader(fs, new Path(args[0]),config);
IntWritable key = new IntWritable();
Text value = new Text();
//获取偏移量
long position = reader.getPosition();
//读取数据,如果读取到文件最后,则返回false
while(reader.next(key, value)) {
System.out.println(position+" "+key+" "+value);
position = reader.getPosition();//再获取下一次的偏移量
}
}finally {
IOUtils.closeStream(reader);
}
}
}
执行以下命令,获得以下结果:
[wangjian@hadoop41 test]$ hadoop jar seq.jar cn.wangjian.Demo02_SequenceFileReader /test/s1.txt
128 0 this is line 0
161 2 this is line 1
194 4 this is line 2
227 6 this is line 3
260 8 this is line 4
293 10 this is line 5
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://hadoop41:8020");
然后,添加运行时参数:
运行的结果如下:
128 0 写入数据行号:0
170 2 写入数据行号:1
212 4 写入数据行号:2
254 6 写入数据行号:3
3、通过Reader判断key和value的类型
由于在某些情况下,我们可能不知道具体SequenceFile的key/value类型,所以,可以通过SequenceFile.Reader.getkeyClass()来获取key/value的具体类型。注意,在根据获取到的字节码以后,通过ReflectionUtils反射实例化,并强制类型转换成Writable类型:
//获取key和value类型,注意强制类型转换
Writable key =
(Writable) ReflectionUtils.newInstance(reader.getKeyClass(), config);
Writable value =
(Writable) ReflectionUtils.newInstance(reader.getValueClass(), config);
4、新的API之Writer.Option
在<权威指南>中,关于序列文件的写,有以下代码:
w = SequenceFile.createWriter(fs, config, new Path("/test/s1.txt"), key.getClass(), text.getClass());
通过API可知,createWriter方法,已经被废弃。建议使用:
public static Writer createWriter(Configuration conf, Writer.Option... opts
) throws IOException {}
注意第二个参数为:Writer.Option的可变长参数:
现在我们修改<权威指南>上的代码,即使用Writer.Option,请注意红色字体部分:
package cn.wangjian;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.SequenceFile.Writer.Option;
import org.apache.hadoop.io.Text;
/**
* 写SequenceFile的示例
* @author wangjian
*/
public class Demo01_SequenceFileWriter {
// 声明一些字符串数组,用于保存到SequenceFile中
private static String[] strs = { "Jack and Mary is friends", "Mary And Rose is Friends",
"So Jack and Rose is Friends", "What? Are your sure?", "Is that sure you are" };
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://hadoop41:8020");
//如果使用Option,则fs对象,就可以不用了
//FileSystem fs = FileSystem.get(config);
// 创建SequenceFile的key和Value
IntWritable key = new IntWritable();
Text text = new Text();
// 声明写入数据的对象
SequenceFile.Writer w = null;
try {
// 创建写数据的对象
Option file = Writer.file(new Path(args[0]));
Option keyOption = Writer.keyClass(key.getClass());
Option valueOption = Writer.valueClass(text.getClass());
w = SequenceFile.createWriter(config, file,keyOption,valueOption);
for (int i = 0; i < 10; i++) {
key.set(strs[i].length());// 以字符串的长度为key
text.set(strs[i]);// 以数据为value
w.append(key, text);
}
} finally {
if (w != null) {
IOUtils.closeStream(w);// 关闭输入流
}
}
}
}
测试:
生成SequenceFile文件:
[wangjian@hadoop41 test]$ hadoop jar seq.jar cn.wangjian.Demo01_SequenceFileWriter /test/s3.seq
查看生成的SequenceFile文件:
[wangjian@hadoop41 test]$ hdfs dfs -ls /test
-rw-r--r-- 1 wangjian supergroup 348 2018-05-04 13:38 /test/s3.seq
使用-text命令,查看SequenceFile里面的内容:
[wangjian@hadoop41 test]$ hdfs dfs -text /test/s3.seq
24Jack and Mary is friends
24Mary And Rose is Friends
27So Jack and Rose is Friends
20What? Are your sure?
20Is that sure you are
我们可以使用系统自带的hadoop-mapreduce-example-*.jar,中的sort对SequenceFile进行排序。排序的命令如下:
hadoop jar \
/app/hadoop-2.7.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.5.jar \
sort -r 1 \
-inFormat org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat \
-outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
-outKey org.apache.hadoop.io.IntWritable \
-outValue org.apache.hadoop.io.Text \
/test/s3.seq /out004
请注意,sort只能对SequenceFile进行排序。关于sort功能,参数的具体含义为:
-r为reduce的个数。
-inFormat为输入的文件类型,默认就是SequenceFileInputFormat
-outFormat为输出的文件类型,依然是SequenceFileOutputFormat即为序列化文件
-outKey,-outValue为输出的key/value的具体类型。
最后的两个参数为输入文件和输出目录。
在上面的文件,执行完成以后,使用-text查看输出的数据,可见,已经根据key值进行了排序:
[wangjian@hadoop41 test]$ hdfs dfs -text /out004/*
20Is that sure you are
20What? Are your sure?
24Mary And Rose is Friends
24Jack and Mary is friends
27So Jack and Rose is Friends
9、使用-text查看序列文件的内容
序列文件,即SequenceFile是以key/value形式保存数据的文件:
[wangjian@hadoop41 test]$ hdfs dfs -text /test/s2.txt
0写入数据行号:0
2写入数据行号:1
4写入数据行号:2
6写入数据行号:3
8写入数据行号:4
以上是关于Hadoop-序列化接口Writable和SequenceFile的主要内容,如果未能解决你的问题,请参考以下文章
大数据之Hadoop(MapReduce):自定义bean对象实现序列化接口(Writable)
Hadoop详解——MapReduce原理和执行过程,远程Debug,Writable序列化接口,MapReduce程序编写