2021年大数据Hadoop(二十):MapReduce的排序和序列化
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年大数据Hadoop(二十):MapReduce的排序和序列化相关的知识,希望对你有一定的参考价值。
全网最详细的Hadoop文章系列,强烈建议收藏加关注!
后面更新文章都会列出历史文章目录,帮助大家回顾知识重点。
目录
本系列历史文章
2021年大数据Hadoop(十九):MapReduce分区
2021年大数据Hadoop(十八):MapReduce程序运行模式和深入解析
2021年大数据Hadoop(十七):MapReduce编程规范及示例编写
2021年大数据Hadoop(十六):MapReduce计算模型介绍
2021年大数据Hadoop(十五):Hadoop的联邦机制 Federation
2021年大数据Hadoop(十三):HDFS意想不到的其他功能
2021年大数据Hadoop(十一):HDFS的元数据辅助管理
2021年大数据Hadoop(八):HDFS的Shell命令行使用
2021年大数据Hadoop(七):HDFS分布式文件系统简介
2021年大数据Hadoop(六):全网最详细的Hadoop集群搭建
2021年大数据Hadoop(二):Hadoop发展简史和特性优点
前言
2021年全网最详细的大数据笔记,轻松带你从入门到精通,该栏目每天更新,汇总知识分享
MapReduce的排序和序列化
概述
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。
当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。
Java的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop自己开发了一套序列化机制(Writable),精简,高效。不用像java对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。
Writable是Hadoop的序列化格式,hadoop定义了这样一个Writable接口。
一个类要支持可序列化只需实现这个接口即可。
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
另外 Writable 有一个子接口是 WritableComparable, WritableComparable 是既可实现序列化, 也可以对key进行比较, 我们这里可以通过自定义 Key 实现 WritableComparable 来实现我们的排序功能.
// WritableComparable分别继承Writable和Comparable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
//Comparable
public interface Comparable<T> {
int compareTo(T var1);
}
Comparable接口中的comparaTo方法用来定义排序规则,用于将当前对象与方法的参数进行比较。
例如:o1.compareTo(o2);
如果指定的数与参数相等返回0。
如果指定的数小于参数返回 -1。
如果指定的数大于参数返回 1。
返回正数的话,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。
需求
数据格式如下
a 1
a 9
b 3
a 7
b 8
b 10
a 5
要求:
第一列按照字典顺序进行排列
第一列相同的时候, 第二列按照升序进行排列
分析
实现自定义的bean来封装数据,并将bean作为map输出的key来传输
MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。
如果自定义的JavaBean要参与MapReduce运算,则必须进行序列化,必须实现Writable接口,如果该JavaBean作为K2,则必须实现WritableComparable接口,让JavaBean具有排序的功能
实现
自定义类型和比较器
public class SortBean implements WritableComparable<SortBean>{
private String word;
private int num;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return word + "\\t"+ num ;
}
//实现比较器,指定排序的规则
/*
规则:
第一列(word)按照字典顺序进行排列 // aac aad
第一列相同的时候, 第二列(num)按照升序进行排列
*/
/*
a 1
a 5
b 3
b 8
*/
@Override
public int compareTo(SortBean sortBean) {
//先对第一列排序: Word排序
int result = this.word.compareTo(sortBean.word);
//如果第一列相同,则按照第二列进行排序
if(result == 0){
return this.num - sortBean.num;
}
return result;
}
//实现序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(word);
out.writeInt(num);
}
//实现反序列
@Override
public void readFields(DataInput in) throws IOException {
this.word = in.readUTF();
this.num = in.readInt();
}
}
编写Mapper代码
public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable> {
/*
map方法将K1和V1转为K2和V2:
K1 V1
0 a 3
5 b 7
----------------------
K2 V2
SortBean(a 3) NullWritable
SortBean(b 7) NullWritable
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:将行文本数据(V1)拆分,并将数据封装到SortBean对象,就可以得到K2
String[] split = value.toString().split("\\t");
SortBean sortBean = new SortBean();
sortBean.setWord(split[0]);
sortBean.setNum(Integer.parseInt(split[1]));
//2:将K2和V2写入上下文中
context.write(sortBean, NullWritable.get());
}
}
编写Reducer代码
public class SortReducer extends Reducer<SortBean,NullWritable,SortBean,NullWritable> {
//reduce方法将新的K2和V2转为K3和V3
@Override
protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
编写主类代码
public class SortRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//1:创建job对象
Job job = Job.getInstance(conf, "mapreduce_sort");
//2:指定job所在的jar包
job.setJarByClass(SortRunner.class);
//3:指定源文件的读取方式类和源文件的读取路径
job.setInputFormatClass(TextInputFormat.class);
///TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/input/sort_input"));
TextInputFormat.addInputPath(job, new Path("file:///D:\\\\input\\\\sort_input"));
//4:指定自定义的Mapper类和K2、V2类型
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(SortBean.class);
job.setMapOutputValueClass(NullWritable.class);
//5:指定自定义的Reducer类和K3、V3的数据类型
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(SortBean.class);
job.setOutputValueClass(NullWritable.class);
//6:指定输出方式类和结果输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path("file:///D:\\\\out\\\\sort_out"));
//7:将job提交给yarn集群
boolean bl = job.waitForCompletion(true);
System.exit(bl?0:1);
}
}
以上是关于2021年大数据Hadoop(二十):MapReduce的排序和序列化的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Hadoop(二十五):YARN通俗介绍和基本架构
2021年大数据Hadoop(二十六):Yarn三大组件介绍
2021年大数据Hadoop(二十四):MapReduce高阶训练
2021年大数据Hadoop(二十三):MapReduce的运行机制详解