HIVE大数据实战项目---用户行为分析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HIVE大数据实战项目---用户行为分析相关的知识,希望对你有一定的参考价值。

参考技术A 相关精彩专题链接: 数据成就更好的你

一、项目需求
本案例的数据为小程序运营数据,以行业常见指标对用户行为进行分析,包括UV、PV、新增用户分析、留存分析、复购分析等内容。

项目需求如下:
1.日访问量分析,并观察其走势
2.不同行为类型的访问量分析
3.一天中不同时间段的访问量分析(时间段按小时划分)
4.每日新增用户情况分析
5.用户留存分析
6.复购分析
7.商品排行榜分析
8.利用sqoop将数据分析结果导入mysql存储

二、数据介绍
1.用户行为信息表

2.查看具体的数据格式
a.用户信息:head -n 3 behavior.txt

b.去除首行,首行为标题行,hive导入数据时不需要此行:
sed -i "1d" behavior.txt

三、创建表
创建用户行为表(需结合数据格式)

四、用户行为分析:pv/uv
1.日访问量分析,并观察其走势

2.不同行为类型的访问量分析

3.一天中不同时间段的访问量分析(时间段按小时划分)

五、获客分析
获客分析:观察每日新增用户情况。新用户的定义:第一次访问网站

六、用户留存分析
留存定义:
1月1日,新增用户200人;
次日留存:第2天,1月2日,这200人里面有100人活跃,则次日留存率为:100 / 200 = 50%
2日留存:第3天,1月3日,这200名新增用户里面有80人活跃, 第3日新增留存率为:80/200 = 40%; 以此类推

留存分析结果如下:
例:2019-11-28日的新增7610个用户,次日这些新增用户有6026个再次访问网页,留存率为79.19%,第4天,有5980个用户再次访问,留存率为78.58%

七、复购分析
指在单位时间段内,重复购买率=再次购买人数/总购买人数。
例如在一个月内,有100个客户成交,其中有20个是回头客,则重复购买率为20%。
此处的回头客定义为:按天去重,即一个客户一天产生多笔交易付款,则算一次购买,除非在统计周期内另外一天也有购买的客户才是回头客。

1.用户的购买次数统计

2.复购率计算

八、商品排行榜信息
1.商品的销售数量top10,排名需考虑并列排名的情况

2.商品的浏览次数top10,排名需考虑并列排名的情况

3.商品的收藏次数top10,排名需考虑并列排名的情况

4.城市购买力排名

九、利用sqoop将数据分析结果导入mysql存储

1.在mysql创建一张表,字段类型、顺序都和hive中的表一样

2.测试sqoop连接mysql是否成功

3.利用sqoop将数据分析结果导入mysql存储

4.mysql中查询导入结果,看结果是否正确

大数据实战:用户流量分析系统

文章出处:http://blog.csdn.net/sdksdk0/article/details/51628874

作者:朱培

---------------------------------------------------------------------------------------------------------------

 

本文是结合Hadoop中的mapreduce来对用户数据进行分析,统计用户的手机号码、上行流量、下行流量、总流量的信息,同时可以按照总流量大小对用户进行分组排序等。是一个非常简洁易用的hadoop项目,主要用户进一步加强对MapReduce的理解及实际应用。文末提供源数据采集文件和系统源码。

本案例非常适合hadoop初级人员学习以及想入门大数据云计算、数据分析等领域的朋友进行学习。

一、待分析的数据源

以下是一个待分析的文本文件,里面有非常多的用户浏览信息,保扩用户手机号码,上网时间,机器序列号,访问的IP,访问的网站,上行流量,下行流量,总流量等信息。这里只截取一小段,具体文件在文末提供下载链接。

技术分享


二、基本功能实现

想要统计出用户的上行流量、下行流量、总流量信息,我们需要建立一个bean类来对数据进行封装。于是新建应该Java工程,导包,或者直接建立一个MapReduce工程。在这里面建立一个FlowBean.java文件。
 
[html] view plain copy
 
 print?
  1.        private long upFlow;  
  2. private long dFlow;  
  3. private long sumFlow;  
然后就是各种右键生成get,set方法,还要toString(),以及生成构造函数,(千万记得要生成一个空的构造函数,不然后面进行分析的时候会报错)。
完整代码如下:
[java] view plain copy
 
 print?
  1. package cn.tf.flow;  
  2.   
  3. import java.io.DataInput;  
  4. import java.io.DataOutput;  
  5. import java.io.IOException;  
  6.   
  7. import org.apache.hadoop.io.Writable;  
  8. import org.apache.hadoop.io.WritableComparable;  
  9.   
  10. public class FlowBean  implements WritableComparable<FlowBean>{  
  11.       
  12.     private long upFlow;  
  13.     private long dFlow;  
  14.     private long sumFlow;  
  15.     public long getUpFlow() {  
  16.         return upFlow;  
  17.     }  
  18.     public void setUpFlow(long upFlow) {  
  19.         this.upFlow = upFlow;  
  20.     }  
  21.     public long getdFlow() {  
  22.         return dFlow;  
  23.     }  
  24.     public void setdFlow(long dFlow) {  
  25.         this.dFlow = dFlow;  
  26.     }  
  27.     public long getSumFlow() {  
  28.         return sumFlow;  
  29.     }  
  30.     public void setSumFlow(long sumFlow) {  
  31.         this.sumFlow = sumFlow;  
  32.     }  
  33.     public FlowBean(long upFlow, long dFlow) {  
  34.         super();  
  35.         this.upFlow = upFlow;  
  36.         this.dFlow = dFlow;  
  37.         this.sumFlow = upFlow+dFlow;  
  38.     }  
  39.     @Override  
  40.     public void readFields(DataInput in) throws IOException {  
  41.         upFlow=in.readLong();  
  42.         dFlow=in.readLong();  
  43.         sumFlow=in.readLong();  
  44.           
  45.     }  
  46.     @Override  
  47.     public void write(DataOutput out) throws IOException {  
  48.         out.writeLong(upFlow);  
  49.         out.writeLong(dFlow);  
  50.         out.writeLong(sumFlow);  
  51.     }  
  52.     public FlowBean() {  
  53.         super();  
  54.     }  
  55.   
  56.     @Override  
  57.     public String toString() {  
  58.            
  59.         return  upFlow + "\t" + dFlow + "\t" + sumFlow;  
  60.     }  
  61.     @Override  
  62.     public int compareTo(FlowBean o) {  
  63.           
  64.         return this.sumFlow>o.getSumFlow() ? -1:1;  
  65.     }  
  66.       
  67.       
  68.   
  69. }  

然后就是这个统计的代码了,新建一个FlowCount.java.在这个类里面,我直接把Mapper和Reduce写在同一个类里面了,如果按规范的要求应该是要分开写的。
在mapper中,获取后面三段数据的值,所以我的这里length-2,length-3.
[java] view plain copy
 
 print?
  1.       public static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {  
  2.     @Override  
  3.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  4.   
  5.         // 拿到这行的内容转成string  
  6.         String line = value.toString();  
  7.   
  8.         String[] fields = StringUtils.split(line, "\t");  
  9.         try {  
  10.             if (fields.length > 3) {  
  11.                 // 获得手机号及上下行流量字段值  
  12.                 String phone = fields[1];  
  13.                 long upFlow = Long.parseLong(fields[fields.length - 3]);  
  14.                 long dFlow = Long.parseLong(fields[fields.length - 2]);  
  15.   
  16.                 // 输出这一行的处理结果,key为手机号,value为流量信息bean  
  17.                 context.write(new Text(phone), new FlowBean(upFlow, dFlow));  
  18.             } else {  
  19.                 return;  
  20.             }  
  21.         } catch (Exception e) {  
  22.   
  23.         }  
  24.   
  25.     }  
  26.   
  27. }  


 
在reduce中队数据进行整理,统计
 
[java] view plain copy
 
 print?
  1. public static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {  
  2.   
  3.         @Override  
  4.         protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {  
  5.   
  6.             long upSum = 0;  
  7.             long dSum = 0;  
  8.   
  9.             for (FlowBean bean : values) {  
  10.   
  11.                 upSum += bean.getUpFlow();  
  12.                 dSum += bean.getdFlow();  
  13.             }  
  14.   
  15.             FlowBean resultBean = new FlowBean(upSum, dSum);  
  16.             context.write(key, resultBean);  
  17.   
  18.         }  
  19.   
  20.     }  


最后在main方法中调用执行。
[java] view plain copy
 
 print?
  1. public static void main(String[] args) throws Exception {  
  2.   
  3.         Configuration conf = new Configuration();  
  4.         Job job = Job.getInstance(conf);  
  5.   
  6.         job.setJarByClass(FlowCount.class);  
  7.   
  8.         job.setMapperClass(FlowCountMapper.class);  
  9.         job.setReducerClass(FlowCountReducer.class);  
  10.   
  11.         job.setMapOutputKeyClass(Text.class);  
  12.         job.setMapOutputValueClass(FlowBean.class);  
  13.   
  14.         job.setOutputKeyClass(Text.class);  
  15.         job.setOutputValueClass(FlowBean.class);  
  16.   
  17.         FileInputFormat.setInputPaths(job, new Path(args[0]));  
  18.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  19.   
  20.         boolean res = job.waitForCompletion(true);  
  21.         System.exit(res ? 0 : 1);  
  22.   
  23.     }  
当然啦,还需要先在你的hdfs根目录中建立/flow/data数据,然后我那个用户的数据源上传上去。
[java] view plain copy
 
 print?
  1. bin/hadoop fs -mkdir -p /flow/data  
  2. bin/hadoop fs -put HTTP_20130313143750.dat /flow/data  
  3. bin/hadoop jar  ../lx/flow.jar  
 
把上面这个MapReduce工程打包成一个jar文件,然后用hadoop来执行这个jar文件。例如我放在~/hadoop/lx/flow.jar,然后再hadoop安装目录中执行
[java] view plain copy
 
 print?
  1. bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCount  /flow/data  /flow/output  
 
最后执行结果如下:

技术分享

在这整过过程中,我们是有yarnchild的进程在执行的,如下图所示:当整个过程执行完毕之后yarnchild也会自动退出。
技术分享

三、按总流量从大到小排序

如果你上面这个基本操作以及完成了的话,按总流量排序就非常简单了。我们新建一个FlowCountSort.Java.

全部代码如下:

 

[java] view plain copy
 
 print?
  1. package cn.tf.flow;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.commons.lang.StringUtils;  
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.LongWritable;  
  9. import org.apache.hadoop.io.Text;  
  10. import org.apache.hadoop.mapreduce.Job;  
  11. import org.apache.hadoop.mapreduce.Mapper;  
  12. import org.apache.hadoop.mapreduce.Reducer;  
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  15.   
  16. public class FlowCountSort {  
  17.   
  18. public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>{  
  19.           
  20.         @Override  
  21.         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
  22.               
  23.             String line=value.toString();  
  24.             String[] fields=StringUtils.split(line,"\t");  
  25.               
  26.             String phone=fields[0];  
  27.             long upSum=Long.parseLong(fields[1]);  
  28.             long dSum=Long.parseLong(fields[2]);  
  29.               
  30.             FlowBean sumBean=new FlowBean(upSum,dSum);  
  31.               
  32.             context.write(sumBean, new Text(phone));  
  33.           
  34.         }     
  35. }  
  36.   
  37.     public static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>{  
  38.           
  39.         //进来的“一组”数据就是一个手机的流量bean和手机号  
  40.         @Override  
  41.         protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
  42.       
  43.             context.write(values.iterator().next(), key);  
  44.         }  
  45.     }  
  46.   
  47.     public static void main(String[] args) throws Exception {  
  48.   
  49.         Configuration conf = new Configuration();  
  50.         Job job = Job.getInstance(conf);  
  51.   
  52.         job.setJarByClass(FlowCountSort.class);  
  53.   
  54.         job.setMapperClass(FlowCountSortMapper.class);  
  55.         job.setReducerClass(FlowCountSortReducer.class);  
  56.   
  57.         job.setMapOutputKeyClass(FlowBean.class);  
  58.         job.setMapOutputValueClass(Text.class);  
  59.   
  60.         job.setOutputKeyClass(Text.class);  
  61.         job.setOutputValueClass(FlowBean.class);  
  62.   
  63.         FileInputFormat.setInputPaths(job, new Path(args[0]));  
  64.         FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  65.   
  66.         boolean res = job.waitForCompletion(true);  
  67.         System.exit(res ? 0 : 1);  
  68.   
  69.     }  
  70.       
  71. }  


这个主要就是使用了FlowBean.java中的代码来实现的,主要是继承了WritableComparable<FlowBean>接口来实现,然后重写了compareTo()方法。

 

 

[html] view plain copy
 
 print?
  1. @Override  
  2.     public int compareTo(FlowBean o) {  
  3.           
  4.         return this.sumFlow>o.getSumFlow() ? -1:1;  
  5.     }  
  6.       

按照同样的方法对这个文件打成jar包,然后使用hadoop的相关语句进行执行就可以了。

 

 

[java] view plain copy
 
 print?
  1. bin/hadoop jar ../lx/flowsort.jar cn/tf/flow/FlowCountSort  /flow/output  /flow/sortoutput  

结果图:

 


技术分享

四、按用户号码区域进行分类

流量汇总之后的结果需要按照省份输出到不同的结果文件中,需要解决两个问题:

 1、如何让mr的最终结果产生多个文件: 原理:MR中的结果文件数量由reduce
  task的数量绝对,是一一对应的 做法:在代码中指定reduce task的数量
 
 
  2、如何让手机号进入正确的文件 原理:让不同手机号数据发给正确的reduce task,就进入了正确的结果文件
  要自定义MR中的分区partition的机制(默认的机制是按照kv中k的hashcode%reducetask数)
  做法:自定义一个类来干预MR的分区策略——Partitioner的自定义实现类

主要代码与前面的排序是非常类似的,只要在main方法中添加如下两行代码就可以了。

 

[java] view plain copy
 
 print?
  1.         //指定自定义的partitioner  
  2. job.setPartitionerClass(ProvincePartioner.class);  
  3.   
  4. job.setNumReduceTasks(5);  


这里我们需要新建一个ProvincePartioner.java来处理号码分类的逻辑。

 

 

[java] view plain copy
 
 print?
  1. public class ProvincePartioner extends Partitioner<Text, FlowBean>{  
  2.       
  3.       
  4. private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();  
  5.       
  6.     static {  
  7.           
  8.         provinceMap.put("135", 0);  
  9.         provinceMap.put("136", 1);  
  10.         provinceMap.put("137", 2);  
  11.         provinceMap.put("138", 3);        
  12.     }  
  13.       
  14.     @Override  
  15.     public int getPartition(Text key, FlowBean value, int numPartitions) {  
  16.   
  17.         String prefix = key.toString().substring(0, 3);  
  18.         Integer partNum = provinceMap.get(prefix);  
  19.         if(partNum == null) partNum=4;  
  20.           
  21.         return partNum;  
  22.     }  
  23.   
  24. }  


执行方法和前面也是一样的。从执行的流程中我们可以看到这里启动了5个reduce task,因为我这里数据量比较小,所以只启动了一个map task。

 

技术分享

 

到这里,整个用户流量分析系统就全部结束了。关于大数据的更多内容,欢迎关注。点击左上角头像下方“点击关注".感谢您的支持!

 

 

数据源下载地址:http://download.csdn.net/detail/sdksdk0/9545935

源码项目地址:https://github.com/sdksdk0/HDFS_MapReduce

 





















以上是关于HIVE大数据实战项目---用户行为分析的主要内容,如果未能解决你的问题,请参考以下文章

大数据项目之电商数仓-用户行为数据采集

大数据项目之电商数仓-用户行为数据采集

大数据项目之电商数仓-用户行为数据采集

spark项目实战(一~~九)

大数据项目之电商数仓-用户行为数据仓库

大数据项目之电商数仓-用户行为数据仓库