数据仓库 DWD数据明细层操作示例

Posted noyouth

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据仓库 DWD数据明细层操作示例相关的知识,希望对你有一定的参考价值。

DWD(Data Warehouse Detail):数据明细层,结构和粒度与原始表保持一致,对ODS层数据进行清洗(取出空值、脏数据、超过极限范围的数据)。

DWD层的数据来源于ODS原始数据层,在原始数据层的Hive表里,只有一个字段,存储了原始的一条条日志信息,下面以事件(如商品点击事件,展示详情事件)日志来说明,原始日志如下:

1593095829089|{
    "cm":{
        "ln":"-89.3",
        "sv":"V2.6.6",
        "os":"8.0.3",
        "g":"SU1Z29ZJ@gmail.com",
        "mid":"1",
        "nw":"3G",
        "l":"en",
        "vc":"3",
        "hw":"640*1136",
        "ar":"MX",
        "uid":"1",
        "t":"1593002588300",
        "la":"-16.2",
        "md":"sumsung-3",
        "vn":"1.2.2",
        "ba":"Sumsung",
        "sr":"D"
    },
    "ap":"app",
    "et":[
        {
            "ett":"1593077273840",
            "en":"display",
            "kv":{
                "goodsid":"0",
                "action":"2",
                "extend1":"2",
                "place":"1",
                "category":"93"
            }
        },
        {
            "ett":"1593052169678",
            "en":"loading",
            "kv":{
                "extend2":"",
                "loading_time":"54",
                "action":"1",
                "extend1":"",
                "type":"1",
                "type1":"102",
                "loading_way":"1"
            }
        },
        {
            "ett":"1593013890514",
            "en":"notification",
            "kv":{
                "ap_time":"1593003516686",
                "action":"4",
                "type":"2",
                "content":""
            }
        },
        {
            "ett":"1592999171192",
            "en":"error",
            "kv":{
                "errorDetail":"java.lang.NullPointerException\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\n at cn.lift.dfdf.web.AbstractBaseController.validInbound",
                "errorBrief":"at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"
            }
        },
        {
            "ett":"1593002958311",
            "en":"comment",
            "kv":{
                "p_comment_id":1,
                "addtime":"1593079427374",
                "praise_count":188,
                "other_id":0,
                "comment_id":9,
                "reply_count":193,
                "userid":3,
                "content":"涂士震嫩庙胞洪邮骗具捶赣锗塌舅捕沥爷"
            }
        },
        {
            "ett":"1593052803303",
            "en":"favorites",
            "kv":{
                "course_id":4,
                "id":0,
                "add_time":"1593044515996",
                "userid":7
            }
        },
        {
            "ett":"1593095771819",
            "en":"praise",
            "kv":{
                "target_id":8,
                "id":5,
                "type":4,
                "add_time":"1593000096852",
                "userid":8
            }
        }]
}

数据格式为服务器时间|事件json,json中又包括公共字段cm,数据来源ap,以及事件数组et。由于事件是一段时间提交一次,是一个包含了多个不同类型事件的json数组,用en字段区分不同的事件,如display表示商品点击事件。因此在这里的处理需要经过两步,首先将ODS表中的长传json解析成一个个字段的DWD层的基础明细表,并且利用UDTF函数,将事件数组中的每个事件炸裂开,这些数据全部放在基础明细表里。然后针对不同的事件,将某一类事件过滤出来,并且取出事件中的kv值,放在特定的某一事件的DWD明细表中。

一 基础事件明细表

基础事件明细表包含了所有类型的事件数据,需要定义一个UDF函数,用来拆分长串的日志,将其处理成一个规则的格式,即以 分隔的字符串,后续可以通过hive自带的split函数转化成数组,利用下标取值。

public class BaseFieldUDF extends UDF {

    public String evaluate(String line,String keysStr){
        String[] keysArr = keysStr.split(",");

        //原始时间日志格式:时间|json日志
        String[] logContent = line.split("\|");
        if (logContent.length != 2 || StringUtils.isBlank(logContent[1])){
            return "";
        }

        StringBuffer sb = new StringBuffer();
        try {
            //拼接公共字段
            JSONObject jsonObject = new JSONObject(logContent[1]);
            JSONObject cm = jsonObject.getJSONObject("cm");
            for (int i = 0; i < keysArr.length; i++) {
                String key = keysArr[i].trim();
                if (cm.has(key)){
                    sb.append(cm.getString(key)).append("	");
                }

            }

            //拼接事件字段
            sb.append(jsonObject.getString("et")).append("	");

            //拼接服务器时间
            sb.append(logContent[0]).append("	");


        } catch (JSONException e) {
            e.printStackTrace();
        }

        return sb.toString();
    }
    
}

然后定义一个UDTF函数,用来对事件数组进行炸裂。传入的是1行1列的事件数组,返回的是2列多行的数据,第1列是事件名,稍后利用这个事件名过滤出不同的事件明细表,第2列是事件的详情kv信息。

public class EventJsonUDTF extends GenericUDTF {

    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
        List<String> fieldNames = new ArrayList<>();
        List<ObjectInspector> fieldTypes = new ArrayList<>();

        fieldNames.add("event_name");
        fieldTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        fieldNames.add("event_json");
        fieldTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldTypes);
    }

    @Override
    public void process(Object[] objects) throws HiveException {
        //获取输入数据
        String input = objects[0].toString();

        if (StringUtils.isBlank(input)){
            return;
        }else {
            try {
                JSONArray ja = new JSONArray(input);
                String[] result = new String[2];

                for (int i = 0; i < ja.length(); i++) {

                    try {
                        result[0] = ja.getJSONObject(i).getString("en");
                        result[1] = ja.getString(i);
                    } catch (JSONException e) {
                        //防止因为某个数据的错误结束整个循环
                        continue;
                    }
                }

                //进来一行数据,返回2列多行数据
                forward(result);

            } catch (JSONException e) {
                e.printStackTrace();
            }
        }

    }

    @Override
    public void close() throws HiveException {

    }
}

接下来就是创建存储事件基础明细需要的表。event_name和event_json字段就是利用UDTF函数得到的结果。

drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE dwd_base_event_log(
`mid_id` string,
`user_id` string, 
`version_code` string, 
`version_name` string, 
`lang` string, 
`source` string, 
`os` string, 
`area` string, 
`model` string,
`brand` string, 
`sdk_version` string, 
`gmail` string, 
`height_width` string, 
`app_time` string, 
`network` string, 
`lng` string, 
`lat` string, 
`event_name` string, 
`event_json` string, 
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location /warehouse/gmall/dwd/dwd_base_event_log/;

然后利用脚本将数据导入到基础明细表。

①需要在执行的sql中添加自定的UDF函数base_analizer,和UDTF函数flat_analizer。

②where条件中加了 base_analizer(line,‘mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la‘)<>‘‘,是因为在我们自定义的UDF函数中如果数据错误,会返回"",所以在这里将其过滤掉。

③因为分区字段赋值了do_date,非严格模式似乎并没有必要。

④UDTF函数返回2列的写法 lateral view flat_analizer(ops) tmp_k as event_name, event_json

⑤因为我们建的是分区表,因此insert overwrite只会覆盖当前分区的数据,并不会覆盖表中的全部分区的数据。

#!/bin/bash

# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
        do_date=$1
else 
        do_date=`date -d "-1 day" +%F`  
fi 

sql="
        add jar /opt/module/hive/hivefunction-1.0-SNAPSHOT.jar;

        create temporary function base_analizer as com.atguigu.udf.BaseFieldUDF;
        create temporary function flat_analizer as com.atguigu.udtf.EventJsonUDTF;

        set hive.exec.dynamic.partition.mode=nonstrict;

        insert overwrite table "$APP".dwd_base_event_log 
        PARTITION (dt=$do_date)
        select
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source ,
        os ,
        area ,
        model ,
        brand ,
        sdk_version ,
        gmail ,
        height_width ,
        network ,
        lng ,
        lat ,
        app_time ,
        event_name , 
        event_json , 
        server_time  
        from
        (
        select
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[0]   as mid_id,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[1]   as user_id,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[2]   as version_code,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[3]   as version_name,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[4]   as lang,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[5]   as source,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[6]   as os,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[7]   as area,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[8]   as model,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[9]   as brand,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[10]   as sdk_version,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[11]  as gmail,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[12]  as height_width,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[13]  as app_time,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[14]  as network,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[15]  as lng,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[16]  as lat,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[17]  as ops,
        split(base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la),	)[18]  as server_time
        from "$APP".ods_event_log where dt=$do_date  and base_analizer(line,mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la)<>‘‘ 
        ) sdk_log lateral view flat_analizer(ops) tmp_k as event_name, event_json;
"

$hive -e "$sql"

 

二 特定事件明细表

特定事件明细表与基础事件明细表的字段大体一样,只是有2处改动

①去掉event_name字段,因为此表中存的就是这一类事件,不再需要event_name来区分。

②将event_json中描述事件详情的kv取出来,形成新的字段。

以商品点击表为例,建表语句如下。去掉了event_name字段,新增了kv信息中的action,goodsid,place,extend1,category五个字段。

drop table if exists dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
)
PARTITIONED BY (dt string)
location /warehouse/gmall/dwd/dwd_display_log/;

然后是利用脚本,将数据从事件基础明细表,导入到特定的事件明细表。下面是一个包含了商品点击,详情,列表,广告,消息通知等事件的完整脚本,虽然很长,但是每一种事件的处理逻辑都是一样的。

①get_json_object(event_json,‘$.kv.action‘)是一个hive内置的函数,可以从json串中取值,$符号表示此json本身。

②where dt=‘$do_date‘ and event_name=‘display‘ 通过上一步处理的事件名称来区分,以导入不同的事件明细表。

#!/bin/bash

# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
        do_date=$1
else 
        do_date=`date -d "-1 day" +%F`  
fi 

sql="
set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table "$APP".dwd_display_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.action) action,
        get_json_object(event_json,$.kv.goodsid) goodsid,
        get_json_object(event_json,$.kv.place) place,
        get_json_object(event_json,$.kv.extend1) extend1,
        get_json_object(event_json,$.kv.category) category,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=display;


insert overwrite table "$APP".dwd_newsdetail_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.entry) entry,
        get_json_object(event_json,$.kv.action) action,
        get_json_object(event_json,$.kv.goodsid) goodsid,
        get_json_object(event_json,$.kv.showtype) showtype,
        get_json_object(event_json,$.kv.news_staytime) news_staytime,
        get_json_object(event_json,$.kv.loading_time) loading_time,
        get_json_object(event_json,$.kv.type1) type1,
        get_json_object(event_json,$.kv.category) category,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=newsdetail;


insert overwrite table "$APP".dwd_loading_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.action) action,
        get_json_object(event_json,$.kv.loading_time) loading_time,
        get_json_object(event_json,$.kv.loading_way) loading_way,
        get_json_object(event_json,$.kv.extend1) extend1,
        get_json_object(event_json,$.kv.extend2) extend2,
        get_json_object(event_json,$.kv.type) type,
        get_json_object(event_json,$.kv.type1) type1,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=loading;


insert overwrite table "$APP".dwd_ad_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.entry) entry,
        get_json_object(event_json,$.kv.action) action,
        get_json_object(event_json,$.kv.content) content,
        get_json_object(event_json,$.kv.detail) detail,
        get_json_object(event_json,$.kv.source) ad_source,
        get_json_object(event_json,$.kv.behavior) behavior,
        get_json_object(event_json,$.kv.newstype) newstype,
        get_json_object(event_json,$.kv.show_style) show_style,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=ad;


insert overwrite table "$APP".dwd_notification_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.action) action,
        get_json_object(event_json,$.kv.noti_type) noti_type,
        get_json_object(event_json,$.kv.ap_time) ap_time,
        get_json_object(event_json,$.kv.content) content,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=notification;


insert overwrite table "$APP".dwd_active_foreground_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
get_json_object(event_json,$.kv.push_id) push_id,
get_json_object(event_json,$.kv.access) access,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=active_foreground;


insert overwrite table "$APP".dwd_active_background_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.active_source) active_source,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=active_background;


insert overwrite table "$APP".dwd_comment_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.comment_id) comment_id,
        get_json_object(event_json,$.kv.userid) userid,
        get_json_object(event_json,$.kv.p_comment_id) p_comment_id,
        get_json_object(event_json,$.kv.content) content,
        get_json_object(event_json,$.kv.addtime) addtime,
        get_json_object(event_json,$.kv.other_id) other_id,
        get_json_object(event_json,$.kv.praise_count) praise_count,
        get_json_object(event_json,$.kv.reply_count) reply_count,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=comment;


insert overwrite table "$APP".dwd_favorites_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.id) id,
        get_json_object(event_json,$.kv.course_id) course_id,
        get_json_object(event_json,$.kv.userid) userid,
        get_json_object(event_json,$.kv.add_time) add_time,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=favorites;


insert overwrite table "$APP".dwd_praise_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.id) id,
        get_json_object(event_json,$.kv.userid) userid,
        get_json_object(event_json,$.kv.target_id) target_id,
        get_json_object(event_json,$.kv.type) type,
        get_json_object(event_json,$.kv.add_time) add_time,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=praise;


insert overwrite table "$APP".dwd_error_log
PARTITION (dt=$do_date)
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,$.kv.errorBrief) errorBrief,
        get_json_object(event_json,$.kv.errorDetail) errorDetail,
        server_time
from "$APP".dwd_base_event_log 
where dt=$do_date and event_name=error;
"

$hive -e "$sql"

 

以上是关于数据仓库 DWD数据明细层操作示例的主要内容,如果未能解决你的问题,请参考以下文章

数据仓库各层到底在做什么?(ODS,DWD,DWM,DWS,ADS)

数据仓库各层到底在做什么?(ODS,DWD,DWM,DWS,ADS)

数据仓库篇数据集市落地方案

数据仓库层级

数仓建模分层理论

Python工业项目实战03:ODS层及DWD层构建