Flink实时数仓数据仓库项目实战 《四》日志数据分流 DWD

Posted 一阵暖风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink实时数仓数据仓库项目实战 《四》日志数据分流 DWD相关的知识,希望对你有一定的参考价值。

文章目录

【Flink实时数仓】数据仓库项目实战 《四》日志数据分流-流量域 【DWD】

DWD层设计要点:
(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。
(2)DWD层表名的命名规范为dwd_数据域_表名

1.流量域未经加工的事务事实表

1.1主要任务

1.1.1数据清洗(ETL)

数据传输过程中可能会出现部分数据丢失的情况,导致 JSON 数据结构不再完整,因此需要对脏数据进行过滤。

1.1.2新老访客状态标记修复

日志数据 common 字段下的 is_new 字段是用来标记新老访客状态的,1 表示新访客,0 表示老访客。前端埋点采集到的数据可靠性无法保证,可能会出现老访客被标记为新访客的问题,因此需要对该标记进行修复。

1.1.3新老访客状态标记修复

本节将通过分流对日志数据进行拆分,生成五张事务事实表写入 Kafka.
流量域页面浏览事务事实表
流量域启动事务事实表
流量域动作事务事实表
流量域曝光事务事实表
流量域错误事务事实表

1.2图解

1.3代码

代码来自尚硅谷,微信关注尚硅谷公众号 回复: 大数据 即可获取源码及资料。

展示主流程代码。具体工具类及实现请下载源码。

package com.atguigu.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

//数据流:web/app -> nginx -> 日志服务器(.log) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序:     Mock(lg.sh) -> Flume(f1) -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK)
public class BaseLogApp 

    public static void main(String[] args) throws Exception 

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //生产环境中设置为Kafka主题的分区数

        //1.1 开启CheckPoint
        //env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));

        //1.2 设置状态后端
        //env.setStateBackend(new HashMapStateBackend());
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/211126/ck");
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.消费Kafka topic_log 主题的数据创建流
        String topic = "topic_log";
        String groupId = "base_log_app_211126";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.过滤掉非JSON格式的数据&将每行数据转换为JSON对象
        OutputTag<String> dirtyTag = new OutputTag<String>("Dirty") 
        ;
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() 
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception 

                try 
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                 catch (Exception e) 
                    ctx.output(dirtyTag, value);
                
            
        );
        //获取侧输出流脏数据并打印
        DataStream<String> dirtyDS = jsonObjDS.getSideOutput(dirtyTag);
        dirtyDS.print("Dirty>>>>>>>>>>>>");

        //TODO 4.按照Mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

        //TODO 5.使用状态编程做新老访客标记校验
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = keyedStream.map(new RichMapFunction<JSONObject, JSONObject>() 
            private ValueState<String> lastVisitState;

            @Override
            public void open(Configuration parameters) throws Exception 
                lastVisitState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-visit", String.class));
            

            @Override
            public JSONObject map(JSONObject value) throws Exception 

                //获取is_new标记 & ts 并将时间戳转换为年月日
                String isNew = value.getJSONObject("common").getString("is_new");
                Long ts = value.getLong("ts");
                String curDate = DateFormatUtil.toDate(ts);

                //获取状态中的日期
                String lastDate = lastVisitState.value();

                //判断is_new标记是否为"1"
                if ("1".equals(isNew)) 
                    if (lastDate == null) 
                        lastVisitState.update(curDate);
                     else if (!lastDate.equals(curDate)) 
                        value.getJSONObject("common").put("is_new", "0");
                    
                 else if (lastDate == null) 
                    lastVisitState.update(DateFormatUtil.toDate(ts - 24 * 60 * 60 * 1000L));
                
                return value;
            
        );

        //TODO 6.使用侧输出流进行分流处理  页面日志放到主流  启动、曝光、动作、错误放到侧输出流
        OutputTag<String> startTag = new OutputTag<String>("start") 
        ;
        OutputTag<String> displayTag = new OutputTag<String>("display") 
        ;
        OutputTag<String> actionTag = new OutputTag<String>("action") 
        ;
        OutputTag<String> errorTag = new OutputTag<String>("error") 
        ;
        SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() 
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception 

                //尝试获取错误信息
                String err = value.getString("err");
                if (err != null) 
                    //将数据写到error侧输出流
                    ctx.output(errorTag, value.toJSONString());
                

                //移除错误信息
                value.remove("err");

                //尝试获取启动信息
                String start = value.getString("start");
                if (start != null) 
                    //将数据写到start侧输出流
                    ctx.output(startTag, value.toJSONString());
                 else 

                    //获取公共信息&页面id&时间戳
                    String common = value.getString("common");
                    String pageId = value.getJSONObject("page").getString("page_id");
                    Long ts = value.getLong("ts");

                    //尝试获取曝光数据
                    JSONArray displays = value.getJSONArray("displays");
                    if (displays != null && displays.size() > 0) 
                        //遍历曝光数据&写到display侧输出流
                        for (int i = 0; i < displays.size(); i++) 
                            JSONObject display = displays.getJSONObject(i);
                            display.put("common", common);
                            display.put("page_id", pageId);
                            display.put("ts", ts);
                            ctx.output(displayTag, display.toJSONString());
                        
                    

                    //尝试获取动作数据
                    JSONArray actions = value.getJSONArray("actions");
                    if (actions != null && actions.size() > 0) 
                        //遍历曝光数据&写到display侧输出流
                        for (int i = 0; i < actions.size(); i++) 
                            JSONObject action = actions.getJSONObject(i);
                            action.put("common", common);
                            action.put("page_id", pageId);
                            ctx.output(actionTag, action.toJSONString());
                        
                    

                    //移除曝光和动作数据&写到页面日志主流
                    value.remove("displays");
                    value.remove("actions");
                    out.collect(value.toJSONString());
                
            
        );

        //TODO 7.提取各个侧输出流数据
        DataStream<String> startDS = pageDS.getSideOutput(startTag);
        DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
        DataStream<String> actionDS = pageDS.getSideOutput(actionTag);
        DataStream<String> errorDS = pageDS.getSideOutput(errorTag);

        //TODO 8.将数据打印并写入对应的主题
        pageDS.print("Page>>>>>>>>>>");
        startDS.print("Start>>>>>>>>");
        displayDS.print("Display>>>>");
        actionDS.print("Action>>>>>>");
        errorDS.print("Error>>>>>>>>");

        String page_topic = "dwd_traffic_page_log";
        String start_topic = "dwd_traffic_start_log";
        String display_topic = "dwd_traffic_display_log";
        String action_topic = "dwd_traffic_action_log";
        String error_topic = "dwd_traffic_error_log";

        pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(page_topic));
        startDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(start_topic));
        displayDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(display_topic));
        actionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(action_topic));
        errorDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(error_topic));

        //TODO 9.启动任务
        env.execute("BaseLogApp");

    


1.4数据测试

1.4.1 测试脏数据

[root@hadoop102 kafka]# bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_log
>"common":"ar":
>

idea 结果脏数据打印,kafka未输出。

1.4.2 测试err 和 start 数据

输入数据

[root@hadoop102 kafka]# bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_log
>"common":"ar":
>"common":"ar":"110000","ba":"Xiaomi","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","os":"android 11.0","uid":"513","vc":"v2.1.134","err":"error_code":2633,"msg":" Exception in thread \\\\  java.net.SocketTimeoutException\\\\n \\\\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","start":"entry":"notice","loading_time":12438,"open_ad_id":7,"open_ad_ms":4407,"open_ad_skip_ms":0,"ts":1651217959000
>

输出数据

[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_start_log
"common":"ar":"110000","uid":"513","os":"Android 11.0","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","vc":"v2.1.134","ba":"Xiaomi","start":"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":4407,"loading_time":12438,"open_ad_id":7,"ts":1651217959000
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_error_log
"common":"ar":"110000","uid":"513","os":"Android 11.0","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","vc":"v2.1.134","ba":"Xiaomi","err":"msg":" Exception in thread \\\\  java.net.SocketTimeoutException\\\\n \\\\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":2633,"start":"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":4407,"loading_time":12438,"open_ad_id":7,"ts":1651217959000

idea打印数据

1.4.3 输入数据Display Action Page 数据

输入数据

"common":"ar":"500000","uid":"981","os":"ios 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone","err":"msg":" Exception in thread \\\\  java.net.SocketTimeoutException\\\\n \\\\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":1559,"page":"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity","displays":["display_type":"query","item":"15","item_type":"sku_id","pos_id":1,"order":1,"display_type":"query","item":"26","item_type":"sku_id","pos_id":3,"order":2,"display_type":"query","item":"31","item_type":"sku_id","pos_id":2,"order":3,"display_type":"promotion","item":"29","item_type":"sku_id","pos_id":5,"order":4,"display_type":"query","item":"9","item_type":"sku_id","pos_id":2,"order":5,"display_type":"recommend","item":"1","item_type":"sku_id","pos_id":1,"order":6],"actions":["item":"5","action_id":"favor_add","item_type":"sku_id","ts":1651217964522],"ts":1651217961000

输出数据

dwd_traffic_page_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_page_log
"common":"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone","page":"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity","ts":1651217961000
dwd_traffic_page_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_page_log
"common":"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone","page":"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity","ts":1651217961000
dwd_traffic_display_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic  dwd_traffic_display_log
"display_type":"query","page_id":"good_detail","item":"15","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":1,"order":1,"ts":1651217961000
"display_type":"query","page_id":"good_detail","item":"26","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":3,"order":2,"ts":1651217961000
"display_type":"query","page_id":"good_detail","item":"31","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":2,"order":3,"ts":1651217961000
"display_type":"promotion","page_id":"good_detail","item":"29","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":5,"order":4,"ts":1651217961000
"display_type":"query","page_id":"good_detail","item":"9","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":2,"order":5,"ts":1651217961000
"display_type":"recommend","page_id":"good_detail","item":"1","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":1,"order":6,"ts":1651217961000
dwd_traffic_action_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_action_log
"page_id":"good_detail","item":"5","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","action_id":"favor_add","item_type":"sku_id","ts":1651217964522
dwd_traffic_error_log
"common":"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone","err":"msg":" Exception in thread \\\\  java.net.SocketTimeoutException\\\\n \\\\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":1559,"page":"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity","displays":["display_type":"query","item":"15","item_type":"sku_id","pos_id":1,"order":1,"display_type":"query","item":"26","item_type":"sku_id","pos_id":3,"order":2,"display_type":"query","item":"31","item_type":"sku_id","pos_id":2,"order":3,"display_type":"promotion","item":"29","item_type":"sku_id","pos_id":5,"order":4,"display_type":"query","item":"9","item_type":"sku_id","pos_id":2,"order":5,"display_type":"recommend","item":"1","item_type":"sku_id","pos_id":1,"order":6],"actions":["item":"5","action_id":"favor_add","item_type":"sku_id","ts":1651217964522],"ts":1651217961000

idea打印数据

以上是关于Flink实时数仓数据仓库项目实战 《四》日志数据分流 DWD的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.8实时数仓项目实战

数据仓库数据同步策略

「回顾」基于Flink的严选实时数仓实践

Flink实时数仓项目—项目初了解

一套 SQL 搞定数据仓库?Flink有了新尝试

美团点评基于 Flink 的实时数仓建设实践