MapReduce清洗数据进行可视化
Posted quyangzhangsiyuan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce清洗数据进行可视化相关的知识,希望对你有一定的参考价值。
继上篇第一阶段清洗数据并导入hive
本篇是剩下的两阶段
2、数据处理:
·统计最受欢迎的视频/文章的Top10访问次数 (video/article)
·按照地市统计最受欢迎的Top10课程 (ip)
·按照流量统计最受欢迎的Top10课程 (traffic)
3、数据可视化:将统计结果倒入MySql数据库中,通过图形化展示的方式展现出来。
2、
·统计最受欢迎的视频/文章的Top10访问次数 (video/article)
package mapreduce; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class GetVideoResult { public static void main(String[] args) { try { Job job = Job.getInstance(); job.setJobName("GetVideoResult"); job.setJarByClass(GetVideoResult.class); job.setMapperClass(doMapper.class); job.setReducerClass(doReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out1/part-r-00000"); Path out = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out1.2"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); //System.exit(job.waitForCompletion(true) ? 0:1); if(job.waitForCompletion(true)) { Job job1 = Job.getInstance(); job1.setJobName("Sort"); job1.setJarByClass(GetVideoResult.class); job1.setMapperClass(doMapper1.class); job1.setReducerClass(doReducer1.class); job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(Text.class); job1.setSortComparatorClass(IntWritableDecreasingComparator.class); job1.setInputFormatClass(TextInputFormat.class); job1.setOutputFormatClass(TextOutputFormat.class); Path in1 = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out1.2/part-r-00000"); Path out1 = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out1.3"); FileInputFormat.addInputPath(job1,in1); FileOutputFormat.setOutputPath(job1,out1); System.exit(job1.waitForCompletion(true) ? 0:1); } } catch (Exception e) { e.printStackTrace(); } } public static class doMapper extends Mapper<Object,Text,Text,IntWritable>{ public static Text word = new Text(); public static final IntWritable id = new IntWritable(1); @Override protected void map(Object key,Text value,Context context) throws IOException,InterruptedException{ String[] data = value.toString().split(" "); word.set(data[5]); //id.set(Integer.parseInt(data[5])); context.write(word,id); } } public static class doReducer extends Reducer< Text, IntWritable, IntWritable, Text>{ private static IntWritable result= new IntWritable(); public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int sum = 0; for(IntWritable value:values){ sum += value.get(); } result.set(sum); context.write(result,key); } } public static class doMapper1 extends Mapper<Object , Text , IntWritable,Text >{ private static Text goods=new Text(); private static IntWritable num=new IntWritable(); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); String arr[]=line.split(" "); num.set(Integer.parseInt(arr[0])); goods.set(arr[1]); context.write(num,goods); } } public static class doReducer1 extends Reducer< IntWritable, Text, IntWritable, Text>{ private static IntWritable result= new IntWritable(); int i=0; public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ for(Text value:values){ if(i<10) { context.write(key,value); i++; } } } } private static class IntWritableDecreasingComparator extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } }
自己一开始使用两个类完成的,先求和在排序,在网上查阅资料后发现可以有两个job,然后就在一个类中完成,然后MapReduce本来的排序是升序,而我们需要的是降序,所以在此引入了一个比较器。
按照地市统计最受欢迎的Top10课程 (ip)
package mapreduce; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class GetVideoResultip { public static void main(String[] args) { try { Job job = Job.getInstance(); job.setJobName("GetVideoResult"); job.setJarByClass(GetVideoResultip.class); job.setMapperClass(doMapper.class); job.setReducerClass(doReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in = new Path("hdfs://192.168.137.67:9000/mymapreducel/in/result.txt"); Path out = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out2.1"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); //System.exit(job.waitForCompletion(true) ? 0:1); if(job.waitForCompletion(true)) { Job job1 = Job.getInstance(); job1.setJobName("Sort"); job1.setJarByClass(GetVideoResult.class); job1.setMapperClass(doMapper1.class); job1.setReducerClass(doReducer1.class); job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(Text.class); job1.setSortComparatorClass(IntWritableDecreasingComparator.class); job1.setInputFormatClass(TextInputFormat.class); job1.setOutputFormatClass(TextOutputFormat.class); Path in1 = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out2.1/part-r-00000"); Path out1 = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out2.2"); FileInputFormat.addInputPath(job1,in1); FileOutputFormat.setOutputPath(job1,out1); System.exit(job1.waitForCompletion(true) ? 0:1); } } catch (Exception e) { e.printStackTrace(); } } public static class doMapper extends Mapper<Object,Text,Text,IntWritable>{ public static Text word = new Text(); public static final IntWritable id = new IntWritable(1); @Override protected void map(Object key,Text value,Context context) throws IOException,InterruptedException{ String[] data = value.toString().split(","); String str=data[0]+" "+data[5]; System.out.println(str); word.set(str); //id.set(Integer.parseInt(data[5])); context.write(word,id); } } public static class doReducer extends Reducer< Text, IntWritable, IntWritable, Text>{ private static IntWritable result= new IntWritable(); public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int sum = 0; for(IntWritable value:values){ sum += value.get(); } result.set(sum); context.write(result,key); } } public static class doMapper1 extends Mapper<Object , Text , IntWritable,Text >{ private static Text goods=new Text(); private static IntWritable num=new IntWritable(); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); String arr[]=line.split(" "); String str=arr[1]+" "+arr[2]; num.set(Integer.parseInt(arr[0])); goods.set(str); context.write(num,goods); } } public static class doReducer1 extends Reducer< IntWritable, Text, IntWritable, Text>{ private static IntWritable result= new IntWritable(); int i=0; public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ for(Text value:values){ if(i<10) { context.write(key,value); i++; } } } } private static class IntWritableDecreasingComparator extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } }
·按照流量统计最受欢迎的Top10课程 (traffic)
package mapreduce; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class GetVideoResulttraffic { public static void main(String[] args) { try { Job job = Job.getInstance(); job.setJobName("GetVideoResult"); job.setJarByClass(GetVideoResultip.class); job.setMapperClass(doMapper.class); job.setReducerClass(doReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in = new Path("hdfs://192.168.137.67:9000/mymapreducel/in/result.txt"); Path out = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out3.1"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); //System.exit(job.waitForCompletion(true) ? 0:1); if(job.waitForCompletion(true)) { Job job1 = Job.getInstance(); job1.setJobName("Sort"); job1.setJarByClass(GetVideoResult.class); job1.setMapperClass(doMapper1.class); job1.setReducerClass(doReducer1.class); job1.setOutputKeyClass(IntWritable.class); job1.setOutputValueClass(Text.class); job1.setSortComparatorClass(IntWritableDecreasingComparator.class); job1.setInputFormatClass(TextInputFormat.class); job1.setOutputFormatClass(TextOutputFormat.class); Path in1 = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out3.1/part-r-00000"); Path out1 = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out3.2"); FileInputFormat.addInputPath(job1,in1); FileOutputFormat.setOutputPath(job1,out1); System.exit(job1.waitForCompletion(true) ? 0:1); } } catch (Exception e) { e.printStackTrace(); } } public static class doMapper extends Mapper<Object,Text,Text,IntWritable>{ public static Text word = new Text(); public static final IntWritable id = new IntWritable(); @Override protected void map(Object key,Text value,Context context) throws IOException,InterruptedException{ String[] data = value.toString().split(","); //String str=data[0]+" "+data[5]; data[3] = data[3].substring(0, data[3].length()-1); word.set(data[5]); id.set(Integer.parseInt(data[3])); context.write(word,id); } } public static class doReducer extends Reducer< Text, IntWritable, IntWritable, Text>{ private static IntWritable result= new IntWritable(); public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{ int sum = 0; for(IntWritable value:values){ sum += value.get(); } result.set(sum); context.write(result,key); } } public static class doMapper1 extends Mapper<Object , Text , IntWritable,Text >{ private static Text goods=new Text(); private static IntWritable num=new IntWritable(); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); String arr[]=line.split(" "); num.set(Integer.parseInt(arr[0])); goods.set(arr[1]); context.write(num,goods); } } public static class doReducer1 extends Reducer< IntWritable, Text, IntWritable, Text>{ private static IntWritable result= new IntWritable(); int i=0; public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ for(Text value:values){ if(i<10) { context.write(key,value); i++; } } } } private static class IntWritableDecreasingComparator extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } }
3、数据没有导入到mysql中,但是通过MapReduce进行了echarts可视化
先通过MapReduce进行清洗数据,然后在jsp中进行可视化
package mapreduce3; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class Pai { public static List<String> Names=new ArrayList<String>(); public static List<String> Values=new ArrayList<String>(); public static class Sort extends WritableComparator { public Sort() { super(IntWritable.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { return -a.compareTo(b); } } public static class Map extends Mapper<Object , Text , IntWritable,Text >{ private static Text Name=new Text(); private static IntWritable num=new IntWritable(); public void map(Object key,Text value,Context context)throws IOException, InterruptedException { String line=value.toString(); String arr[]=line.split(" "); if(!arr[0].startsWith(" ")) { num.set(Integer.parseInt(arr[0])); Name.set(arr[1]); context.write(num, Name); } } } public static class Reduce extends Reducer< IntWritable, Text, IntWritable, Text>{ private static IntWritable result= new IntWritable(); int i=0; public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ for(Text val:values) { if(i<10) {i=i+1; Names.add(val.toString()); Values.add(key.toString()); } context.write(key,val); } } } public static int run()throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.137.67:9000"); FileSystem fs =FileSystem.get(conf); Job job =new Job(conf,"OneSort"); job.setJarByClass(Pai.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setSortComparatorClass(Sort.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out1.2/part-r-00000"); Path out = new Path("hdfs://192.168.137.67:9000/mymapreducelShiYan/out1.4"); FileInputFormat.addInputPath(job,in); fs.delete(out,true); FileOutputFormat.setOutputPath(job,out); return(job.waitForCompletion(true) ? 0 : 1); } }
zhu.jsp
<%@page import="mapreduce3.Pai"%> <%@page import="mapreduce3.GetVideoResult"%> <%@ page language="java" import="java.util.*" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Insert title here</title> <script src="${pageContext.request.contextPath}/resource/echarts.js"></script> </head> <body> <% Pai ss= new Pai(); ss.run(); String[] a=new String[11]; String[] b=new String[11]; int i=0,j=0; for(i = 0 ; i < 10 ; i++) { a[i] = ss.Values.get(i); b[i] = ss.Names.get(i); } %> <div id="main" style="width: 600px;height:400px;"></div> <script type="text/javascript"> // 基于准备好的dom,初始化echarts实例 var myChart = echarts.init(document.getElementById(‘main‘)); // 指定图表的配置项和数据 var option = { title: { text: ‘最受欢迎的文章/视频 TOP10‘ }, tooltip: {}, legend: { data:[‘统计‘] }, xAxis: { data: [ <% for( i=0;i<10;i++) { %><%=b[i]%>,<% } %>] }, yAxis: {}, series: [{ name: ‘最受欢迎的文章‘, type: ‘bar‘, data: [ <% for( i=0;i<10;i++) { %><%=a[i]%>,<% } %> ] }] }; // 使用刚指定的配置项和数据显示图表。 myChart.setOption(option); </script> </body> </html>
因为其他的数据清洗上边有,代码就不一一展示,只贴出jsp文件,如果想要改变可视化团,在echarts官网中直接复制代码到jsp中进行修改即可。
zhe.jsp
<%@page import="mapreduce3.Pai1"%> <%@page import="mapreduce3.GetVideoResult"%> <%@ page language="java" import="java.util.*" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Insert title here</title> <script src="${pageContext.request.contextPath}/resource/echarts.js"></script> </head> <body> <% Pai1 ss= new Pai1(); ss.run(); String[] a=new String[11]; String[] b=new String[11]; int i=0,j=0; for(i = 0 ; i < 10 ; i++) { a[i] = ss.Values.get(i); b[i] = ss.Names.get(i); } %> <div id="main" style="width: 600px;height:400px;"></div> <script type="text/javascript"> // 基于准备好的dom,初始化echarts实例 var myChart = echarts.init(document.getElementById(‘main‘)); // 指定图表的配置项和数据 var option = { title: { text: ‘按照地市最受欢迎‘ }, tooltip: {}, legend: { data:[‘统计‘] }, xAxis: { data: [
<%
for( i=0;i<10;i++)
{
%>‘<%=b[i]%>‘,
<%
}
%>
] }, yAxis: {}, series: [{ name: ‘最受欢迎的文章‘, type: ‘line‘, data: [ <% for( i=0;i<10;i++) { %><%=a[i]%>,<% } %> ] }] }; // 使用刚指定的配置项和数据显示图表。 myChart.setOption(option); </script> </body> </html>
tu.jsp
<%@page import="mapreduce3.Pai2"%> <%@page import="mapreduce3.GetVideoResult"%> <%@ page language="java" import="java.util.*" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Insert title here</title> <script src="${pageContext.request.contextPath}/resource/echarts.js"></script> </head> <body> <% Pai2 ss= new Pai2(); ss.run(); String[] a=new String[11]; String[] b=new String[11]; int i=0,j=0; for(i = 0 ; i < 10 ; i++) { a[i] = ss.Values.get(i); b[i] = ss.Names.get(i); } %> <div id="main" style="width: 600px;height:400px;"></div> <script type="text/javascript"> // 基于准备好的dom,初始化echarts实例 var myChart = echarts.init(document.getElementById(‘main‘)); // 指定图表的配置项和数据 option = { title : { text: ‘按照流量最受欢迎‘, x:‘center‘ }, tooltip : { trigger: ‘item‘, formatter: "{a} <br/>{b} : {c} ({d}%)" }, legend: { orient: ‘vertical‘, left: ‘left‘, data: [ <% for( i=0;i<10;i++) { %>‘<%=b[i]%>‘, <% } %> ] }, series : [ { name: ‘访问来源‘, type: ‘pie‘, radius : ‘55%‘, center: [‘50%‘, ‘60%‘], data:[ <% for( i=0;i<10;i++) { %>{value:<%=a[i]%>,name:‘<%=b[i]%>‘}, <% } %> ], itemStyle: { emphasis: { shadowBlur: 10, shadowOffsetX: 0, shadowColor: ‘rgba(0, 0, 0, 0.5)‘ } } } ] }; // 使用刚指定的配置项和数据显示图表。 myChart.setOption(option); </script> </body> </html>
以上是关于MapReduce清洗数据进行可视化的主要内容,如果未能解决你的问题,请参考以下文章
大数据之Hadoop(MapReduce):数据清洗(ETL)
ETL项目2:大数据清洗,处理:使用MapReduce进行离线数据分析并报表显示完整项目