大数据项目之电商数仓-用户行为数据仓库
Posted _TIM_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据项目之电商数仓-用户行为数据仓库相关的知识,希望对你有一定的参考价值。
数据仓库分层
- 把复杂问题简单化,把一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解
- 清晰的数据结构,每一层都有它的作用域,这样我们在使用表的时候能更方便的定位和理解。 便于维护数据的准确性,当数据出现问题的时候,可以不用修复所有的数据,只需要从有问题的步骤开始修复
- 减少重复开发,规范数据分层,通过中间层数据,能够减少极大的重复计算,增加一次计算结果的复用性
- 隔离原始数据,使得真是数据与统计数据接耦
分层结构图
- ODS层(原始数据层)
原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。 - DWD层(明细数据层)
结构和粒度与ODS
层保持一致,对ODS
层数据进行清洗(去除空值,脏数据,超过极限范围的数据),也有公司叫DWI
。 - DWS层(服务数据层)
以DWD
为基础,进行轻度汇总。一般聚集到以用户当日,设备当日,商家当日,商品当日等等的粒度。在这层通常会有以某一个维度为线索,组成跨主题的宽表,比如,一个用户的当日的签到数、收藏数、评论数、抽奖数、订阅数、点赞数、浏览商品数、添加购物车数、下单数、支付数、退款数、点击广告数组成的多列表。 - ADS层(数据应用层)
数据应用层,也有公司或书把这层命名为APP
层、DAL
层等。面向实际的数据需求,以DWD
或者DWS
层的数据为基础,组成的各种统计报表。统计结果最终同步到RDS
以供BI
或应用系统查询使用。
Hive运行引擎Tez
性能优于MapReduce
,用Hive
直接编写程序,假设有四个有依赖关系的MapReduce
作业,绿色是Rgmallce Task
,云状表示写屏蔽,需要将中间结果持久化写到HDFS
。Tez
可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS
,且中间节点较少,从而大大提升DAG
作业的性能。
数仓搭建之ODS & DWD
ODS层
原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
创建启动日志表ods_start_log
hive (gmall)>
drop table if exists ods_start_log;
CREATE EXTERNAL TABLE `ods_start_log`(`line` string)
PARTITIONED BY (`dt` string)
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_start_log';
创建事件日志表ods_event_log
hive (gmall)>
drop table if exists ods_event_log;
CREATE EXTERNAL TABLE `ods_event_log`(`line` string)
PARTITIONED BY (`dt` string)
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_event_log';
ODS层加载数据脚本
#!/bin/bash
# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;then
log_date=$1
else
log_date=`date -d "-1 day" +%F`
fi
echo "===日志日期为 $log_date==="
$hive -e "load data inpath '/origin_data/gmall/log/topic_start/$log_date' into table "$APP".ods_start_log partition(dt='$log_date')"
$hive -e "load data inpath '/origin_data/gmall/log/topic_event/$log_date' into table "$APP".ods_event_log partition(dt='$log_date')"
DWD层数据解析
对ODS
层数据进行清洗(去除空值,脏数据,超过极限范围的数据,行式存储改为列存储,改压缩格式)
创建启动日志基础明细表
其中event_name
和event_json
用来对应事件名和整个事件。这个地方将原始日志1对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到UDF
和UDTF
。
hive (gmall)>
drop table if exists dwd_base_start_log;
CREATE EXTERNAL TABLE `dwd_base_start_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`event_name` string,
`event_json` string,
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_start_log/';
创建事件日志基础明细表
hive (gmall)>
drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE `dwd_base_event_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`event_name` string,
`event_json` string,
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/';
自定义UDF函数(解析公共字段)
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;
public class BaseFieldUDF extends UDF
public String evaluate(String line, String jsonkeysString)
// 0 准备一个sb
StringBuilder sb = new StringBuilder();
// 1 切割jsonkeys mid uid vc vn l sr os ar md
String[] jsonkeys = jsonkeysString.split(",");
// 2 处理line 服务器时间 | json
String[] logContents = line.split("\\\\|");
// 3 合法性校验
if (logContents.length != 2 || StringUtils.isBlank(logContents[1]))
return "";
// 4 开始处理json
try
JSONObject jsonObject = new JSONObject(logContents[1]);
// 获取cm里面的对象
JSONObject base = jsonObject.getJSONObject("cm");
// 循环遍历取值
for (int i = 0; i < jsonkeys.length; i++)
String filedName = jsonkeys[i].trim();
if (base.has(filedName))
sb.append(base.getString(filedName)).append("\\t");
else
sb.append("").append("\\t");
sb.append(jsonObject.getString("et")).append("\\t");
sb.append(logContents[0]).append("\\t");
catch (JSONException e)
e.printStackTrace();
return sb.toString();
自定义UDTF函数(解析具体事件字段)
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.ArrayList;
public class EventJsonUDTF extends GenericUDTF
//该方法中,我们将指定输出参数的名称和参数类型:
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("event_name");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("event_json");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
//输入1条记录,输出若干条结果
@Override
public void process(Object[] objects) throws HiveException
// 获取传入的et
String input = objects[0].toString();
// 如果传进来的数据为空,直接返回过滤掉该数据
if (StringUtils.isBlank(input))
return;
else
try
// 获取一共有几个事件(ad/facoriters)
JSONArray ja = new JSONArray(input);
if (ja == null)
return;
// 循环遍历每一个事件
for (int i = 0; i < ja.length(); i++)
String[] result = new String[2];
try
// 取出每个的事件名称(ad/facoriters)
result[0] = ja.getJSONObject(i).getString("en");
// 取出每一个事件整体
result[1] = ja.getString(i);
catch (JSONException e)
continue;
// 将结果返回
forward(result);
catch (JSONException e)
e.printStackTrace();
//当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
@Override
public void close() throws HiveException
业务术语
- 用户
用户以设备为判断标准,在移动统计中,每个独立设备认为是一个独立用户。android
系统根据IMEI
号,ios
系统根据OpenUDID
来标识一个独立用户,每部手机一个用户。 - 新增用户
首次联网使用应用的用户。如果一个用户首次打开某app
,那这个用户定义为新增用户;卸载再安装的设备,不会被算作一次新增。新增用户包括日新增用户、周新增用户、月新增用户。 - 活跃用户
打开应用的用户即为活跃用户,不考虑用户的使用情况。每天一台设备打开多次会被计为一个活跃用户。 - 周(月)活跃用户
某个自然周(月)内启动过应用的用户,该周(月)内的多次启动只记一个活跃用户。 - 月活跃率
月活跃用户与截止到该月累计的用户总和之间的比例。 - 沉默用户
用户仅在安装当天(次日)启动一次,后续时间无再启动行为。该指标可以反映新增用户质量和用户与APP
的匹配程度。 - 版本分布
不同版本的周内各天新增用户数,活跃用户数和启动次数。利于判断App
各个版本之间的优劣和用户行为习惯。 - 本周回流用户
上周未启动过应用,本周启动了应用的用户。 - 连续n周活跃用户
连续n周,每周至少启动一次。 - 忠诚用户
连续活跃5周以上的用户 - 连续活跃用户
连续2周及以上活跃的用户 - 近期流失用户
连续n(2<= n <= 4)
周没有启动应用的用户(第n+1
周没有启动过) - 留存用户
某段时间内的新增用户,经过一段时间后,仍然使用应用的被认作是留存用户;这部分用户占当时新增用户的比例即是留存率。例如,5月份新增用户200,这200人在6月份启动过应用的有100人,7月份启动过应用的有80人,8月份启动过应用的有50人;则5月份新增用户一个月后的留存率是50%,二个月后的留存率是40%,三个月后的留存率是25%。 - 用户新鲜度
每天启动应用的新老用户比例,即新增用户数占活跃用户数的比例。 - 单次使用时长
每次启动使用的时间长度。 - 日使用时长
累计一天内的使用时间长度。 - 启动次数计算标准
IOS
平台应用退到后台就算一次独立的启动;Android
平台我们规定,两次启动之间的间隔小于30秒,被计算一次启动。用户在使用过程中,若因收发短信或接电话等退出应用30秒又再次返回应用中,那这两次行为应该是延续而非独立的,所以可以被算作一次使用行为,即一次启动。业内大多使用30秒这个标准,但用户还是可以自定义此时间间隔。
以上是关于大数据项目之电商数仓-用户行为数据仓库的主要内容,如果未能解决你的问题,请参考以下文章