Hadoop中的MapReduce——分布式离线计算框架
Posted 阳哥赚钱很牛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop中的MapReduce——分布式离线计算框架相关的知识,希望对你有一定的参考价值。
上次的文章中,我们使用Java代码实现了MapReduce,这次我们正式的来做MapReduce编程
另外,MapReduce不是只能由Java写,用python、scala也可以
运行三个实例进程:
分布式计算程序的时候 先分map后合reduce
* MapTask----将计算程序分布的运行在多台电脑上 并且是并行运行 互相之前不干扰
* ReduceTask----将不同电脑上map计算出来的结果汇总起来 也可以有个reduce 并且
* 多个reduce之前是并行运行 互不干扰的
* 但是reduce的执行需要依赖于Map阶段的数据
* MRAppMaster----YARN 管理MapTask和ReduceTask进行调度
做Mapreduce编程的话:三步编程法
* 1、编写Mapper类 也就是我们的MapTask任务
* 2、编写Reducer类 也就是我们的reduceTask任务
* 3、编写Driver驱动类 关联MapTask和ReduceTask任务并且提交运行【注意】:
* 1、MapReduce程序处理的数据大部分都是HDFS上的文件数据
* 2、Map阶段和reduce阶段数据都需要通过key-value键值对的形式进行输入和输出
* 3、一般情况下map阶段的输出的key-value键值对就是reduce阶段输入的key-value键值对
* 4、一般情况下一个MapReduce程序只能有一个map和一个reduce,如果程序逻辑复杂,那么需要编写多个MR程序去串行运行
上次我们写的java文件流来去HDFS上的文件,对于结果输出,输出路径不能提前存在,否则会报错。那么我们如果想多次执行,每次都需要删那个输出文件。我们现在可以改进一下:
//通过Job指定文件输入
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.10.3:9000/wordcount.txt"));
//通过job指定文件输出到什么地方
Path outPath = new Path("hdfs://192.168.100.3:9000/output");
//输出路径不能提前存在 否则报错
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.3:9000"), conf, "root");
if (fs.exists(outPath)){
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job,outPath);
//job提交运行
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
使用if判断是否存在这个输出文件,若存在则提前删除。
一、MapReduce中的序列化
1.什么是序列化、反序列化?
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
2.序列化的原因以及JAVA序列化和大数据序列化的区别
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop自己开发了一套序列化机制(Writable),精简、高效。
翻译一下 : 一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储"活的”"对象,可以将“活的”对象发送到远程计算机。.
Hadoop序列化特点:
(1)紧凑:高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)可扩展:随着通信协议的升级而可升级。
(4)互操作:支持多语言的交互。
3.常用的数据序列化类型
常用的数据类型对应的hadoop数据序列化类型
Java类型 | Hadoop Writable类型 |
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
string | Text |
map | MapWritable |
array | ArrayWritable |
4.自定义bean对象实现序列化接口
数据序列化类型远不止上面那些已有的类型,还可以是用户自定义的对象。
自定义bean对象要想序列化传输,必须实现序列化接口,需要注意以下7项。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法
(4)重写反序列化方法
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),且用”\\t”分开,方便后续用
(7)如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序
给大家看一眼我们要操作的文本数据:
我们现在来自定义一个bean对象:
**
* JavaBean对象主要的目的是为了封装手机号的上行流量和下行流量
* 然后在Map阶段当作我们key-value键值对的value输出到reduce阶段
*
*FlowBean是无法直接当作MR程序的key-value键值对的 除非Javabean对象是Hadoop的一个序列化对象
* 1、让自定义Javabean对象实现一个接口Writable(hadoop的序列化接口 实现这个接口那么Javabean对象就可实现序列化)
* 2、重写这个接口中的两个方法:序列化的方法--怎么将对象序列化成二进制 反序列化的方法:怎么将二进制转成Javabean对象
*/
public class FlowBean implements Writable{
private String phone;
private int upFlow;
private int downFlow;
private int sumFlow;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public int getUpFlow() {
return upFlow;
}
public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}
public int getDownFlow() {
return downFlow;
}
public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}
public int getSumFlow() {
return sumFlow;
}
public void setSumFlow(int sumFlow) {
this.sumFlow = sumFlow;
}
@Override
public String toString() {
return "FlowBean{" +
"phone='" + phone + '\\'' +
", upFlow=" + upFlow +
", downFlow=" + downFlow +
", sumFlow=" + sumFlow +
'}';
}
/**
* 序列化的方法:将Java对象的属性值怎么序列化写出
* @param dataOutput
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.phone);//将一个String类型的属性序列化写出二进制数据
dataOutput.writeInt(upFlow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(sumFlow);
}
/**
* 反序列化的方法:将二进制代码转成Javabean对象属性的值
* 反序列化的时候 读取二进制数据值的时候不能随便读
* 你序列化写出的时候先写出的哪个属性的值 那么你就先读哪个属性的值
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
this.phone = dataInput.readUTF();
this.upFlow = dataInput.readInt();
this.downFlow = dataInput.readInt();
this.sumFlow = dataInput.readInt();
}
}
二、编写Mapper程序
map阶段:
* 读取到每一行数据 将每一行中的数据手机号 上行流量 和下行流量获取出来
* map阶段再去输出中间键值对数据的时候 以手机号为key 以上行流量和下行流量为value发送给reduce
/**
* 计算每一个手机号总的上行流量 总的下行流量 总流量
* 两个阶段:
* map阶段:
* 读取到每一行数据 将每一行中的数据手机号 上行流量 和下行流量获取出来‘
* map阶段再去输出中间键值对数据的时候 以手机号为key 以上行流量和下行流量为value发送给reduce
* reduce阶段
* reduce根据手机号(key)将这个手机对应的所有的上行流量和下行流量获取到 然后累加即可
*
*/
public class MapperTest extends Mapper<LongWritable, Text,Text,FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\\t");
FlowBean flowBean = new FlowBean();
String phone = fields[1];
int upFlow = Integer.parseInt(fields[fields.length - 3]);//倒数第三个数据
int downFlow = Integer.parseInt(fields[fields.length - 2]);
flowBean.setPhone(phone);
flowBean.setUpFlow(upFlow);
flowBean.setDownFlow(downFlow);
/**
* 将数据以手机号为key flowbean对象为value写出到reduce
*/
context.write(new Text(phone),flowBean);
}
}
三、Reduce程序编写
public class ReduceTest extends Reducer<Text,FlowBean, NullWritable,FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
Iterator<FlowBean> iterator = values.iterator();
int upSum=0;
int downSum = 0;
while (iterator.hasNext()){
FlowBean bean = iterator.next();
upSum+=bean.getUpFlow();
downSum+=bean.getDownFlow();
}
int sum = upSum+downSum;
FlowBean flowBean = new FlowBean();
flowBean.setPhone(key.toString());
flowBean.setUpFlow(upSum);
flowBean.setDownFlow(downSum);
flowBean.setSumFlow(sum);
context.write(NullWritable.get(),flowBean);
}
}
四、Driver驱动类编写
public class Driver {
public static void main(String[] args) throws Exception {
//获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置jar包
job.setJarByClass(Driver.class);
//关联mapper和reducer
job.setMapperClass(MapperTest.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setReducerClass(ReduceTest.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(FlowBean.class);
//关联文件的输入和文件的输出
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.10.3:9000/phone_data.txt"));
Path outPath = new Path("hdfs://192.168.100.3:9000/output1");
FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.3:9000"), conf, "root");
if (fs.exists(outPath)){
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job,outPath);
//提交运行
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
五、总结
Hadoop中序列化机制
主要的作用是将MR程序中产生的数据以序列化类型在网络中 不同的电脑中数据传递
* 常用序列化数据类型
* LongWritable---long
* IntWritable----int
* NullWritable---null值
如果我们想用一个自定义的Javabean对象充当mr程序的key-value键值对的输入和输出 那么Javabean对象必须实现hadoop的序列化机制
* 1、实现接口Writable
* 2、重写write方法--序列化写出的方法----将Java对象的属性值序列化写出
* 3、重写readFields方法---反序列化的方法 ----将二进制数据反序列化成Javabean对象的属性值
* 要求:反序列化的时候 反序列化的顺序必须是write方法写出数据的顺序
* 4、如果我们reduce阶段也是输出的JavaBean对象 那么在文件当中数据的格式就是Javabean对象的toString()方法
以上是关于Hadoop中的MapReduce——分布式离线计算框架的主要内容,如果未能解决你的问题,请参考以下文章