数据仓库之电商数仓-- 1用户行为数据采集
Posted FunnyPrince_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据仓库之电商数仓-- 1用户行为数据采集相关的知识,希望对你有一定的参考价值。
目录
一、数据仓库概念
数据仓库(Data Warehouse),是为企业制定决策,提供数据支持的。可以帮助企业改进业务流程、提高产品质量等。
数据仓库的输入数据通常包括:业务数据、用户行为数据和爬虫数据等。
业务数据:就是各行业在处理事务过程中产生的数据。如用户在电商网站中登录、下载、支付等过程中,需要和网站后台数据库进行增删改查交互,产生的数据就是业务数据。业务数据通常存储在mysql、Oracle等数据库中。
二、项目需求及架构设计
2.1 项目需求分析
项目需求
- 用户行为数据采集平台搭建;
- 业务数据采集平台搭建;
- 数据仓库纬度建模;
- 分析设备、会员、商品、地区、活动等电商核心主题,统计的报表接近100个;
- 采用
即席查询
工具,随时进行指标分析; - 对集群性能进行监控,发生异常需要报警;
- 元数据管理;
- 质量监控;
- 权限管理;
2.2 项目框架
2.2.1 技术选型
- 项目技术如何选型?
- 框架版本如何选型(Apache、CDH、HDP)
- 服务器使用物理机还是云主机?
- 如何确认集群规模?
⭐️:
技术选型主要考虑因素:数据量大小、业务需求、行业内经验、技术成熟度、开发维护成本、总成本预算。
数据采集传输:Flume, Kafka, Sqoop
, Logstash, DataX;
数据存储:MySQL, HDFS, HBase
, Redis, MongoDB;
数据计算:Hive, Tex, Spark,
Flink, Storm;
数据查询:presto, Kylin
, Impala, Druid, Clickouse, Doris;
数据可视化:Echarts
, Superset
, QuickBI, DataV;
任务调度:Azkaban
, Oozie, DolphinScheduler, Airflow;
集群监控:Zabbix
, Prometheus;
元数据管理:Atalas
;
权限管理:Ranger
, Sentry.
2.2.2 系统数据流程设计
2.2.3 框架版本选型
- 如何选择Apache/CDH/HDP版本?
1). Apache:运维麻烦,组件间兼容性需要自己调研,开源;
2). CDH:国内使用最多的版本,不开源;
3). HDP:开源,可进行二次开发,但没有CDH稳定,国内使用甚少。
- 云服务选择
1). 阿里云EMR、MaxCompute、DataWorks
2). 亚马逊云EMR
3). 腾讯云EMR
4). 华为云EMR
- 具体版本型号
Apache框架版本:
tips:
框架选型最好选择半年前的稳定版!
2.2.4 服务器选型
- 物理机:需专业运维人员;
- 云主机:若是选择阿里云,运维工作全由阿里云完成。
2.2.5 集群规模
- 如何确认集群规模?(假设:每台服务器8T磁盘,128G内存)
1). 每天日活跃用户100万,每人一天平均100条:100万*100条=1亿条;
2). 每条日志1k左右,每天1亿条:100000000 / 1024 /1024 = 约100G;
3). 半年内不扩容服务器:100G*180天=约18T;
4). 保存3个副本:18T*3=54T;
5). 预留20%~30%Buf = 54T/0.7 = 77T;
6). 约8T*10台服务器。
- 若是考虑数仓分层,数据压缩,又要怎么计算?
2.2.6 集群资源规划设计
在企业中通常会搭建一套生产集群和一套测试集群。生产集群运行生产任务,测试集群用于上线前代码编写和测试。
- 生产集群
1). 消耗内存的需分开;
2). 数据传输数据比较紧密的放在一起(Kafka、Zookeeper
);
3). 客户端尽量放在一到两台服务器上,方便外部访问;
4). 有依赖关系的尽量放在同一台服务器上(如Hive和Azkaban Executor
)。
- 测试集群
三、数据生成模块
3.1 目标数据
我们要收集和分析数据主要包括页面数据、时间数据、曝光数据、启动数据和错误数据。
3.1.1 页面日志
页面数据主要记录一个页面的用户访问情况,包括访问时间、停留时间、页面路径等信息。
3.1.2 事件日志
时间数据主要记录应用内一个具体操作行为,包括操作类型
操作对象、操作对象描述等信息。
3.1.3 曝光日志
曝光数据主要记录页面所曝光的内容,包括曝光对象,曝光类型等信息。
Y2Vf,size_20,color_FFFFFF,t_70,g_se,x_16)
3.1.4 启动日志
启动数据记录应用的启动信息。
3.1.5 错误日志
错误数据记录应用使用过程中的错误信息,包括错误编号及错误信息。
3.2数据埋点
3.2.1 主流埋点方式
目前主流的埋点方式,有代码埋点(前端/后端)、可视化埋点、全埋点三种。
代码埋点是通过调用埋点 SDK 函数,在需要埋点的业务逻辑功能位置调用接口,上报埋点数据。例如,我们对页面中的某个按钮埋点后,当这个按钮被点击时,可以在这个按钮 对应的 OnClick 函数里面调用 SDK 提供的数据发送接口,来发送数据。
可视化埋点只需要研发人员集成采集 SDK,不需要写埋点代码,业务人员就可以通过 访问分析平台的“圈选”功能,来“圈”出需要对用户行为进行捕捉的控件,并对该事件进行命名。圈选完毕后,这些配置会同步到各个用户的终端上,由采集 SDK 按照圈选的配置 自动进行用户行为数据的采集和发送。
全埋点是通过在产品中嵌入 SDK,前端自动采集页面上的全部用户行为事件,上报埋点数据,相当于做了一个统一的埋点。然后再通过界面配置哪些数据需要在系统里面进行分析。
3.2.2 埋点数据上报时机
埋点数据上报时机包括两种方式:
方式一,在离开该页面时,上传在这个页面产生的所有数据(页面、事件、曝光、错误 等)。
优点:批处理,减少了服务器接收数据压力,网络IO少;
缺点:实效性差。
方式二,每个事件、动作、错误等产生后,立即发送。
优点:响应及时;
缺点:对服务器接收数据压力比较大,网络IO增加。
本项目采用方式一埋点。
3.2.3 埋点数据日志结构
我们的日志结构大致可分为两类,一是普通页面埋点日志
,二是启动日志
。
普通页面日志结构如下,每条日志包含了,当前页面的页面信息,所有事件(动作)、 所有曝光信息以及错误信息。除此之外,还包含了一系列公共信息,包括设备信息,地理位 置,应用信息等,即下边的 common 字段。
3.3 服务器和JDK准备
分别安装 hadoop102、hadoop103、hadoop104 三台主机。
服务器和JDK的准备内容看这里==>
3.4 模拟数据
3.4.1 使用说明
- 将 application.properties、gmall2020-mock-log-2020-05-10.jar、path.json、logback.xml 上传到 hadoop102 的/opt/module/applog 目录下
1). 创建 applog
路径
[xiaobai@hadoop102 module]$ mkdir applog
2). 上传文件
3). 如图,在/opt/module/applog目录下执行以下命令生成对应的日志文件log:
java -jar gmall2020-mock-log-2021-01-22.jar
4). 在/opt/module/applog/log目录下查看生成日志:
3.4.2 集群日志生成脚本
- 在 /home/xiaobai/bin目录下创建脚本
lg.sh
:
[xiaobai@hadoop102 bin]$ vim lg.sh
#!/bin/bash
for i in hadoop102 hadoop103; do
echo "========== $i =========="
ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2021-01-22.jar >/dev/null 2>&1 &"
done
注:
1). /opt/module/applog/为 jar 包及配置文件所在路径
2). /dev/null 代表 linux 的空设备文件,所有往这个文件里面写入的内容都会丢失,俗
称“黑洞”。
标准输入 0:从键盘获得输入 /proc/self/fd/0;
标准输出 1:输出到屏幕(即控制台) /proc/self/fd/1;
错误输出 2:输出到屏幕(即控制台) /proc/self/fd/2。
- 修改脚本执行权限:
[xiaobai@hadoop102 bin]$ chmod 777 lg.sh
- 将applog分发给其他节点:
[xiaobai@hadoop102 module]$ xsync applog/
- 删除hadoop104上的applog以及hadoop102上的log,并测试lg.sh:
[xiaobai@hadoop104 module]$ rm -rf applog/
[xiaobai@hadoop102 applog]$ rm -rf log
如图,hadoop102 / hadoop103加载出了log:
四、数据采集模块
4.1 集群所有进程查看脚本
- 在/home/xiaobai/bin 目录下创建脚本
xcall.sh
:
[xiaobai@hadoop102 bin]$ vim xcall.sh
#! /bin/bash
for i in hadoop102 hadoop103 hadoop104
do
echo --------- $i ----------
ssh $i "$*"
done
- 修改脚本执行权限:
[xiaobai@hadoop102 bin]$ chmod 777 xcall.sh
- 启动脚本:
[xiaobai@hadoop102 bin]$ xcall.sh jps
--------- hadoop102 ----------
3081 Jps
--------- hadoop103 ----------
2961 Jps
--------- hadoop104 ----------
3295 Jps
4.2 zookeeper安装
4.2.1 安装ZK
-
集群规划
在hadoop102、hadoop103、hadoop104三个节点上部署zookeeper。 -
解压安装
1). 解压zookeeper安装包到/opt/module/目录下:
[xiaobai@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
2). 修改/opt/module/apache-zookeeper-3.5.7-bin名称为zookeeper-3.5.7:
[xiaobai@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
- 配置服务器编号:
1). 在/opt/module/zookeeper-3.5.7目录下创建zkData
:
[xiaobai@hadoop102 zookeeper-3.5.7]$ mkdir zkData
2). 在/opt/module/zookeeper-3.5.7/zkData目录下创建一个myid
文件:
[xiaobai@hadoop102 zookeeper-3.5.7]$ vim myid
添加myid文件,⚠️一定要在Linux里面创建!
在文件中添加域server对应的编号:
2
3). 同步/opt/module/zookeeper-3.5.7目录到hadoop103、hadop104:
[xiaobai@hadoop102 module]$ xsync zookeeper-3.5.7/
分别在hadoop103、hadoop104上修改myid文件中内容为3、4:
[xiaobai@hadoop103 zkData]$ vim myid
3
[xiaobai@hadoop104 zkData]$ vim myid
4
- 配置
zoo.cfg
文件:
1). 重命名/opt/module/zookeeper-3.5.7/conf目录下的zoo_sample.cfg为zoo.cfg
:
[xiaobai@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg
2). 打开zoo.cfg 文件:
[xiaobai@hadoop102 conf]$ vim zoo.cfg
如图,修改数据存储路径配置:
dataDir=/opt/module/zookeeper-3.5.7/zkData
如图,增加如下配置:
#######################cluster###################################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
3). 同步分发zoo.cfg 配置文件:
[xiaobai@hadoop102 conf]$ xsync zoo.cfg
4). 配置参数解读:
server.A=B:C:D
A 是一个数字,表示这是第几号服务器;
集群模式下配置一个文件 myid
,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server;
B 是这个服务器的地址;
C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
- 集群操作
1). 分别启动zookeeper:
[xiaobai@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh start
[xiaobai@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh start
[xiaobai@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh start
2). 查看状态:
[xiaobai@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
[xiaobai@hadoop103 zookeeper-3.5.7]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
[xiaobai@hadoop104 zookeeper-3.5.7]$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
4.2.2 ZK集群启动停止脚本
- 在hadoop102的/home/xiaobai/bin目录下创建脚本
zk.sh
:
在脚本中编写如下内容:
#!/bin/bash
case $1 in
"start")
for i in hadoop102 hadoop103 hadoop104
do
echo "---------- $i ------------"
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
;;
"stop")
for i in hadoop102 hadoop103 hadoop104
do
echo "---------- $i ------------"
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
;;
"status")
for i in hadoop102 hadoop103 hadoop104
do
echo "---------- $i ------------"
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
;;
esac
- 增加脚本执行权限:
[xiaobai@hadoop102 bin]$ chmod 777 zk.sh
- Zookeeper 集群启动脚本:
[xiaobai@hadoop102 ~]$ zk.sh start
- Zookeeper 集群停止脚本:
[xiaobai@hadoop102 ~]$ zk.sh stop
- 查看Zookeeper 状态:
[xiaobai@hadoop102 ~]$ zk.sh status
---------- hadoop102 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
---------- hadoop103 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
---------- hadoop104 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
4.3 Kafka安装
4.3.1 Kafka集群安装
- 解压缩kafka_2.11-2.4.1.tgz至/opt/module/目录下:
[xiaobai@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
- 修改kafka_2.11-2.4.1名称为kafka:
[xiaobai@hadoop102 module]$ mv kafka_2.11-2.4.1/ kafka
- 在/opt/module/kafka路径下创建logs文件夹:
[xiaobai@hadoop102 kafka]$ mkdir logs
- 如图,在/opt/module/kafka/config目录下修改配置文件server.properties:
[xiaobai@hadoop102 config]$ vim server.properties
#broker 的全局唯一编号,不能重复 broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#配置连接 Zookeeper 集群地址 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
- 分发kafka:
[xiaobai@hadoop102 module]$ xsync kafka/
- 修改各自的broker.id:
[xiaobai@hadoop103 config]$ vim server.properties
broker.id=1
[xiaobai@hadoop104 config]$ vim server.properties
broker.id=2
- 配置环境变量:
[xiaobai@hadoop102 kafka]$ sudo vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:KAFKA_HOME/bin
[xiaobai@hadoop102 kafka]$ source /etc/profile.d/my_env.sh
- 分发my_env.sh:
[xiaobai@hadoop102 kafka]$ sudo /home/xiaobai/bin/xsync /etc/profile.d/my_env.sh
- 在hadoop103、hadoop104上使用source刷新my_env.sh :
[xiaobai@hadoop103 config]$ source /etc/profile.d/my_env.sh
[xiaobai@hadoop104 config]$ source /etc/profile.d/my_env.sh
- 启动集群:
依次在 hadoop102、hadoop103、hadoop104 节点上启动 kafka:
[xiaobai@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
[xiaobai@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
[xiaobai@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
- 关闭集群:
在/opt/module/kafka目录下执行以下命令:
bin/kafka-server-stop.sh stop
4.3.2 Kafka集群启动停止脚本
- 在/home/xiaobai/bin目录下创建脚本文件
kf.sh
:
#!/bin/bash
case $1 in
"start")
for i in hadoop102 hadoop103 hadoop104
do
echo "------启动 $i KAFKA------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
;;
"stop")
for i in hadoop102 hadoop103 hadoop104
do
echo "------停止 $i KAFKA------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
done
;;
esac
- 赋予权限:
[xiaobai@hadoop102 bin]$ chmod 777 kf.sh
- 使用脚本开启kafka:
[xiaobai@hadoop102 bin]$ kf.sh start
- 使用脚本停止kafka:
[xiaobai@hadoop102 bin]$ kf.sh stop
4.3.3 项目经验之Kafka机器数量计算
Kafka机器数量(经验公式)= 2 *(峰值生产速度 * 副本数 / 100)+ 1;
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。
-
峰值生产速度:可以通过压测得到;
-
副本数:
默认副本数是1个,在企业里会有2-3个,通常为2;
副本多可以提高可靠性,但是会降低网络传输速率;
如峰值生产速度为50M/s,副本数为2;
Kafka机器数量 = 2 * (50 * 2 / 100)+ 1 = 3台。
4.3.4 项目经验之压力测试
Kafka压测:
用Kafka官方自带的脚本,对Kafka进行压测:
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh
Kafka压测时,在硬盘读写速度一定的情况下,可以查看到哪些地方出现了瓶颈(CPU,内存,网络 IO);一般都是网络 IO 达到瓶颈。
4.3.5 项目经验之Kafka分区数计算
- 创建一个只有 1 个分区的 topic;
- 测试这个 topic 的 producer 吞吐量和 consumer 吞吐量;
- 假设他们的值分别是 Tp 和 Tc,单位可以是 MB/s;
- 然后假设总的目标吞吐量是 Tt,那么分区数=Tt / min(Tp,Tc) 例如:producer 吞吐量=20m/s;
consumer 吞吐量=50m/s,期望吞吐量 100m/s;
分区数=100 / 20 =5 分区; 分区数一般设置为:3-10 个。
建议参考==》
4.4采集日志Flume
4.4.1 日至采集Flume安装
- 解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下:
[xiaobai@hadoop102 software]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
- 修改apache-flume-1.9.0-bin的名称为
flume
[xiaobai@hadoop102 module]$ mv apache-flume-1.9.0-bin/ flume
- 将lib文件夹下的guava-11.0.2.jar 删除以兼容hadoop3.2.2:
[xiaobai@hadoop102 lib]$ rm -rf guava-11.0.2.jar
注⚠️:删除guava-11.0.2.jar 的服务器节点,一定要配置hadoop环境变量,否则会报异常。
- 将 flume/conf 下 的 flume-env.sh.template 文 件 修 改 为
flume-env.sh
, 并 配 置flume-env.sh
文件:
[xiaobai@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[xiaobai@hadoop102 conf]$ vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
- 分发flume:
[xiaobai@hadoop102 module]$ xsync flume/
4.4.2 项目经验之Flume组件选型
- source
1). flume1.6 source:
exec :
优点:可以实时监控文件变化;
缺点:有丢数据的风险。
spooling:
优点:可以实现断点续传;
缺点:不能实时监控文件变化。
2). flume 1.7 taildir source⭐️
Taildir Source
相比 Exec Source、Spooling Directory Source 的优势:
a. 可实现断点续传,多目录
;Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。
b. 可以实时监控文件的变化
。
Exec Source 可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失;Spooling Directory Source 监控目录,支持断点续传,但不能实时监控文件变化。
3). batchSize 大小如何设置?
答:Event 1K 左右时,500-1000 合适(默认为 100)
- Channel
file channel⭐️: 数据存储在磁盘中,可靠性高,效率低;
memory channel⭐️: 数据存储在内存中,效率高,可靠性差。
采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 数据存储在 Kafka 里面, 所以数据是存储在磁盘中。
4.4.3 日志采集 Flume 配置
-
Flume配置分析:
注:Flume 直接读 log 日志的数据,log 日志的格式是 app.yyyy-mm-dd.log。 -
Flume的具体配置如下:
1). 在/opt/module/flume/conf 目录下创建 file-flume-kafka.conf
文件:
[xiaobai@hadoop102 conf]$ vim file-flume-kafka.conf
2). 在文件配置如下内容:
#定义组件
a1.sources=r1
a1.channels=c1
#配置source (Taildirsource)
a1.source.r1.type=TAILDIR
a1.source.r1.filegroups=f1
a1.source.r1.filegroups.f1=/opt/module/applog/log/app.*
a1.source.r1.positionFile=/opt/module/flume/taildir_position.json
#配置拦截器(ETL数据清洗 判断json是否完整)
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.xiaobai.flume.interceptor.ETLInterceptor$Builder
#配置channel
a1.channels.c1.type=org.apache.flume.channel.kafka.kafkaChannel
a1.channels.c1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic=topic_log
a1.channels.c1.parseAsFlumeEvent=false
#配置sink(Kafka Channel 无Sink)
#拼接组件
a1.sources.r1.channels=c1
3). 分发file-flume-kafka.conf:
[xiaobai@hadoop102 conf]$ xsync file-flume-kafka.conf
tips: com.xiaobai.flume.interceptor.ETLInterceptor是自定义的拦截器全类名。
4.4.4 Flume拦截器
- 创建 Maven 工程 flume-interceptor;
- 创建包名: com.xiaobai.flume.interceptor;
- 在 pom.xml 文件中添加如下配置:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
- 在com.xiaobai.flume.interceptor包下创建JSONUtils类:
package com.xiaobai.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
public class JSONUtils {
// //test数据
// public static void main(String[] args) {
// System.out.println(isValidate("{123323"));
// }
//验证数据是否json
public static boolean isValidate(String log) {
try {
JSON.parse(log);
return true;
} catch(JSONException e){
return false;
}
}
}
- 在com.xiaobai.flume.interceptor下创建ETLInterceptor类:
package com.xiaobai.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event大数据项目之电商数仓-用户行为数据采集