大数据项目之电商数仓-用户行为数据仓库

Posted _TIM_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据项目之电商数仓-用户行为数据仓库相关的知识,希望对你有一定的参考价值。

数据仓库分层

  • 把复杂问题简单化,把一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解
  • 清晰的数据结构,每一层都有它的作用域,这样我们在使用表的时候能更方便的定位和理解。 便于维护数据的准确性,当数据出现问题的时候,可以不用修复所有的数据,只需要从有问题的步骤开始修复
  • 减少重复开发,规范数据分层,通过中间层数据,能够减少极大的重复计算,增加一次计算结果的复用性
  • 隔离原始数据,使得真是数据与统计数据接耦

分层结构图

  • ODS层(原始数据层)
    原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
  • DWD层(明细数据层)
    结构和粒度与ODS层保持一致,对ODS层数据进行清洗(去除空值,脏数据,超过极限范围的数据),也有公司叫DWI
  • DWS层(服务数据层)
    DWD为基础,进行轻度汇总。一般聚集到以用户当日,设备当日,商家当日,商品当日等等的粒度。在这层通常会有以某一个维度为线索,组成跨主题的宽表,比如,一个用户的当日的签到数、收藏数、评论数、抽奖数、订阅数、点赞数、浏览商品数、添加购物车数、下单数、支付数、退款数、点击广告数组成的多列表。
  • ADS层(数据应用层)
    数据应用层,也有公司或书把这层命名为APP层、DAL层等。面向实际的数据需求,以DWD或者DWS层的数据为基础,组成的各种统计报表。统计结果最终同步到RDS以供BI或应用系统查询使用。

Hive运行引擎Tez

性能优于MapReduce,用Hive直接编写程序,假设有四个有依赖关系的MapReduce作业,绿色是Rgmallce Task,云状表示写屏蔽,需要将中间结果持久化写到HDFSTez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升DAG作业的性能。

数仓搭建之ODS & DWD

ODS层

原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
创建启动日志表ods_start_log

hive (gmall)> 
drop table if exists ods_start_log;
CREATE EXTERNAL TABLE  `ods_start_log`(`line` string)
PARTITIONED BY (`dt` string)
STORED AS
  INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_start_log';

创建事件日志表ods_event_log

hive (gmall)> 
drop table if exists ods_event_log;
CREATE EXTERNAL TABLE  `ods_event_log`(`line` string)
PARTITIONED BY (`dt` string)
STORED AS
  INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_event_log';

ODS层加载数据脚本

#!/bin/bash

# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;then
 log_date=$1
else 
 log_date=`date  -d "-1 day"  +%F`  
fi 

echo "===日志日期为 $log_date==="
$hive -e "load data inpath '/origin_data/gmall/log/topic_start/$log_date' into table "$APP".ods_start_log partition(dt='$log_date')"
$hive -e "load data inpath '/origin_data/gmall/log/topic_event/$log_date' into table "$APP".ods_event_log partition(dt='$log_date')"

DWD层数据解析

ODS层数据进行清洗(去除空值,脏数据,超过极限范围的数据,行式存储改为列存储,改压缩格式)

创建启动日志基础明细表
其中event_nameevent_json用来对应事件名和整个事件。这个地方将原始日志1对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到UDFUDTF

hive (gmall)> 
drop table if exists dwd_base_start_log;
CREATE EXTERNAL TABLE `dwd_base_start_log`(
	`mid_id` string,
	`user_id` string, 
	`version_code` string, 
	`version_name` string, 
	`lang` string, 
	`source` string, 
	`os` string, 
	`area` string, 
	`model` string,
	`brand` string, 
	`sdk_version` string, 
	`gmail` string, 
	`height_width` string, 
	`app_time` string, 
	`network` string, 
	`lng` string, 
	`lat` string, 
	`event_name` string, 
	`event_json` string, 
	`server_time` string)
PARTITIONED BY (`dt` string)
stored as  parquet
location '/warehouse/gmall/dwd/dwd_base_start_log/';

创建事件日志基础明细表

hive (gmall)> 
drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE `dwd_base_event_log`(
	`mid_id` string,
	`user_id` string, 
	`version_code` string, 
	`version_name` string, 
	`lang` string, 
	`source` string, 
	`os` string, 
	`area` string, 
	`model` string,
	`brand` string, 
	`sdk_version` string, 
	`gmail` string, 
	`height_width` string, 
	`app_time` string, 
	`network` string, 
	`lng` string, 
	`lat` string, 
	`event_name` string, 
	`event_json` string, 
	`server_time` string)
PARTITIONED BY (`dt` string)
stored as  parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/';

自定义UDF函数(解析公共字段)

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;

public class BaseFieldUDF extends UDF 

    public String evaluate(String line, String jsonkeysString) 
        
        // 0 准备一个sb
        StringBuilder sb = new StringBuilder();

        // 1 切割jsonkeys  mid uid vc vn l sr os ar md
        String[] jsonkeys = jsonkeysString.split(",");

        // 2 处理line   服务器时间 | json
        String[] logContents = line.split("\\\\|");

        // 3 合法性校验
        if (logContents.length != 2 || StringUtils.isBlank(logContents[1])) 
            return "";
        

        // 4 开始处理json
        try 
            JSONObject jsonObject = new JSONObject(logContents[1]);

            // 获取cm里面的对象
            JSONObject base = jsonObject.getJSONObject("cm");

            // 循环遍历取值
            for (int i = 0; i < jsonkeys.length; i++) 
                String filedName = jsonkeys[i].trim();

                if (base.has(filedName)) 
                    sb.append(base.getString(filedName)).append("\\t");
                 else 
                    sb.append("").append("\\t");
                
            

            sb.append(jsonObject.getString("et")).append("\\t");
            sb.append(logContents[0]).append("\\t");
         catch (JSONException e) 
            e.printStackTrace();
        

        return sb.toString();
    

自定义UDTF函数(解析具体事件字段)

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;

import java.util.ArrayList;

public class EventJsonUDTF extends GenericUDTF 

    //该方法中,我们将指定输出参数的名称和参数类型:
    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException 

        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        fieldNames.add("event_name");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("event_json");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    

    //输入1条记录,输出若干条结果
    @Override
    public void process(Object[] objects) throws HiveException 

        // 获取传入的et
        String input = objects[0].toString();

        // 如果传进来的数据为空,直接返回过滤掉该数据
        if (StringUtils.isBlank(input)) 
            return;
         else 

            try 
                // 获取一共有几个事件(ad/facoriters)
                JSONArray ja = new JSONArray(input);

                if (ja == null)
                    return;

                // 循环遍历每一个事件
                for (int i = 0; i < ja.length(); i++) 
                    String[] result = new String[2];

                    try 
                        // 取出每个的事件名称(ad/facoriters)
                        result[0] = ja.getJSONObject(i).getString("en");

                        // 取出每一个事件整体
                        result[1] = ja.getString(i);
                     catch (JSONException e) 
                        continue;
                    

                    // 将结果返回
                    forward(result);
                
             catch (JSONException e) 
                e.printStackTrace();
            
        
    

    //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
    @Override
    public void close() throws HiveException 

    


业务术语

  1. 用户
    用户以设备为判断标准,在移动统计中,每个独立设备认为是一个独立用户。android系统根据IMEI号,ios系统根据OpenUDID来标识一个独立用户,每部手机一个用户。
  2. 新增用户
    首次联网使用应用的用户。如果一个用户首次打开某app,那这个用户定义为新增用户;卸载再安装的设备,不会被算作一次新增。新增用户包括日新增用户、周新增用户、月新增用户。
  3. 活跃用户
    打开应用的用户即为活跃用户,不考虑用户的使用情况。每天一台设备打开多次会被计为一个活跃用户。
  4. 周(月)活跃用户
    某个自然周(月)内启动过应用的用户,该周(月)内的多次启动只记一个活跃用户。
  5. 月活跃率
    月活跃用户与截止到该月累计的用户总和之间的比例。
  6. 沉默用户
    用户仅在安装当天(次日)启动一次,后续时间无再启动行为。该指标可以反映新增用户质量和用户与APP的匹配程度。
  7. 版本分布
    不同版本的周内各天新增用户数,活跃用户数和启动次数。利于判断App各个版本之间的优劣和用户行为习惯。
  8. 本周回流用户
    上周未启动过应用,本周启动了应用的用户。
  9. 连续n周活跃用户
    连续n周,每周至少启动一次。
  10. 忠诚用户
    连续活跃5周以上的用户
  11. 连续活跃用户
    连续2周及以上活跃的用户
  12. 近期流失用户
    连续n(2<= n <= 4)周没有启动应用的用户(第n+1周没有启动过)
  13. 留存用户
    某段时间内的新增用户,经过一段时间后,仍然使用应用的被认作是留存用户;这部分用户占当时新增用户的比例即是留存率。例如,5月份新增用户200,这200人在6月份启动过应用的有100人,7月份启动过应用的有80人,8月份启动过应用的有50人;则5月份新增用户一个月后的留存率是50%,二个月后的留存率是40%,三个月后的留存率是25%。
  14. 用户新鲜度
    每天启动应用的新老用户比例,即新增用户数占活跃用户数的比例。
  15. 单次使用时长
    每次启动使用的时间长度。
  16. 日使用时长
    累计一天内的使用时间长度。
  17. 启动次数计算标准
    IOS平台应用退到后台就算一次独立的启动;Android平台我们规定,两次启动之间的间隔小于30秒,被计算一次启动。用户在使用过程中,若因收发短信或接电话等退出应用30秒又再次返回应用中,那这两次行为应该是延续而非独立的,所以可以被算作一次使用行为,即一次启动。业内大多使用30秒这个标准,但用户还是可以自定义此时间间隔。

以上是关于大数据项目之电商数仓-用户行为数据仓库的主要内容,如果未能解决你的问题,请参考以下文章

大数据项目之电商数仓-用户行为数据采集

大数据项目之电商数仓-用户行为数据仓库

大数据项目之电商数仓-用户行为数据仓库

大数据项目之电商数仓-用户行为数据仓库

大数据项目之电商数仓-用户行为数据仓库

大数据项目之电商数仓-业务数据仓库