MapReduce统计上行流量下行流量及流量之和,并且到集群上运行
Posted 飝鱻?
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce统计上行流量下行流量及流量之和,并且到集群上运行相关的知识,希望对你有一定的参考价值。
MapReduce统计上行流量、下行流量及流量之和
数据集需求分析
数据
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200
1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200
需求
- 统计每个电话号码的总的上行流量、下行流量及总流量
- 按照号码的前三个进行分区操作
分析
- 将电话作为key,这样就可以按照key进行分组
- 在reduce阶段进行求和汇总
自定义一个数据类型
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;
public class Bean implements WritableComparable<Bean> {
private int sum_low;
private int sum_up;
private int sum_bean;
@Override
public int compareTo(Bean o) {
return this.sum_bean - o.sum_bean;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(sum_low);
dataOutput.writeInt(sum_up);
dataOutput.writeInt(sum_bean);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
sum_low = dataInput.readInt();
sum_up = dataInput.readInt();
sum_bean = dataInput.readInt();
}
public void set(int sum_low, int sum_up, int sum_bean) {
this.sum_low = sum_low;
this.sum_up = sum_up;
this.sum_bean = sum_bean;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Bean bean = (Bean) o;
return sum_low == bean.sum_low &&
sum_up == bean.sum_up &&
sum_bean == bean.sum_bean;
}
@Override
public int hashCode() {
return Objects.hash(sum_low, sum_up, sum_bean);
}
@Override
public String toString() {
return sum_low + "\\t" + sum_up + "\\t" + sum_bean;
}
public int getSum_low() {
return sum_low;
}
public void setSum_low(int sum_low) {
this.sum_low = sum_low;
}
public int getSum_up() {
return sum_up;
}
public void setSum_up(int sum_up) {
this.sum_up = sum_up;
}
public int getSum_bean() {
return sum_bean;
}
public void setSum_bean(int sum_bean) {
this.sum_bean = sum_bean;
}
}
Map阶段
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
* 求每个手机号的上行流量之和、下行流量之和、上下行流量之和
*/
public class MapTest extends Mapper<LongWritable, Text, Text, Bean> {
Bean v = new Bean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String [] datas = value.toString().split("\\t");
k.set(datas[1]);
//这里先暂时存储单个的上下行流量
v.set(Integer.parseInt(datas[datas.length-3]),Integer.parseInt(datas[datas.length-2]),0);
context.write(k,v);
}
}
自定义分区
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ParTest extends Partitioner<Text,Bean> {
@Override
public int getPartition(Text text, Bean bean, int i) {
String prePhone =text.toString().substring(0,3);//substring左闭右开
int partition = 4;//五个分区,从0开始算
if("136".equals(prePhone)){
partition = 0;
}else if ("137".equals(prePhone)){
partition = 1;
}else if ("138".equals(prePhone)){
partition = 2;
}else if ("139".equals(prePhone)){
partition = 3;
}else {
partition = 4;
}
return partition;
}
}
Reduce阶段
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class RedTest extends Reducer<Text,Bean,Text,Bean> {
int sum_low = 0;
int sum_up = 0;
Bean v = new Bean();
@Override
protected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException {
for (Bean b:values){
sum_up+=b.getSum_up();
sum_low+=b.getSum_low();
}
v.set(sum_low,sum_up,sum_low+sum_up);
context.write(key,v);
sum_up=0;
sum_low=0;
}
}
Driver阶段
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
public class DriTest {
public static void main(String[] args) throws Exception{
File file = new File("D:\\\\FlowSum\\\\output");
if (file.exists()){
delFile(file);
driver();
}else {
driver();
}
}
public static void delFile(File file) {
File[] files = file.listFiles();
if (files != null && files.length != 0) {
for (int i = 0;i<files.length;i++) {
delFile(files[i]);
}
}
file.delete();
}
public static void driver() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.default","hdfs://192.168.0.155:9000/");
Job job = Job.getInstance(conf);
job.setMapperClass(MapTest.class);
job.setReducerClass(RedTest.class);
job.setJarByClass(DriTest.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Bean.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(Bean.class);
job.setNumReduceTasks(5);
job.setPartitionerClass(ParTest.class);
FileInputFormat.setInputPaths(job, "/MR/input");
FileOutputFormat.setOutputPath(job, new Path("/MR/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
将程序打成jar包
在IDEA上打jar包的流程图
- 在集群上运行的命令:
hadoop jar MPTEST.jar FlovwBean.DriTest
- 其中FlovwBean.DriTest是Driver的绝对路径
以上是关于MapReduce统计上行流量下行流量及流量之和,并且到集群上运行的主要内容,如果未能解决你的问题,请参考以下文章
MapReduce从HDFS读取文件聚合后写入HDFS的实现
MapReduce从HDFS读取文件聚合后写入HDFS的实现
MapReduce从HDFS读取文件聚合后写入HDFS的实现
大数据之Hadoop(MapReduce): 序列化案例实操