Day21:增量处理与任务流调度
Posted 保护胖丁
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day21:增量处理与任务流调度相关的知识,希望对你有一定的参考价值。
知识点01:回顾
-
学员考勤管理需求,有哪些维度与指标?
- 核心目标:提高学员学习质量
- 主题目标:基于不同维度统计分析考勤指标
- 指标:出勤人数、出勤率、迟到人数、迟到率、请假人数、请假率、旷课人数、旷课率
- 维度:时间、班级
-
学员考勤管理数据来源及有哪些表?
-
来源:学员管理系统:mysql
-
事实表:出勤和请假事实
- 学员打卡信息表
- 学员请假信息表
-
维度表:合法性判断
- 班级作息时间表
- 班级排课信息表
- 班级总人数表
-
-
学员考勤管理主题如何设计,每层的功能是什么?
- ODS:事实表
- DIM:维度表
- DWD:没有设计
- DWM
- 班级出勤状态表
- step1:先基于学员打卡信息表得到学员出勤状态表
- step2:再基于学员出勤状态表得到班级出勤状态表
- 班级请假状态表
- 直接基于学员请假信息表进行分组聚合
- 班级旷课状态表
- 班级总人数表 - 班级出勤状态表 - 班级请假状态表
- 结果
- 班级出勤状态表
- 班级请假状态表
- 班级旷课状态表
- 班级出勤状态表
- DWS:实现所有指标的合并
- DWM层三张表与DIM中的总人数表进行关联
- 关联字段:班级id、上课日期
- 基于天维度下的指标的结果
- APP
- 构建月、年维度的结果
-
什么是CBO,如何使用CBO?
- CBO:基于代价的优化器,会根据所有方案付出的代价,选择最优的方案来实现
- step1:先通过分析优化器anlayze构建数据的元数据
- step2:通过CBO进行优化
知识点02:目标
核心目标:实现离线数据仓库所有程序的自动化运行
- 实现增量处理以及增量脚本的开发
- 增量
- 增量采集:采集昨天的数据
- 增量处理:处理昨天的输出
- 增量导出:导出昨天的结果
- 脚本:怎么解决增量的问题?
- 增量
- 自动化运行
- 需求1:程序A定时运行:每天00:05运行
- 需求2:程序A运行结束,程序B自动运行
- 任务流调度工具来实现:Oozie
知识点03:增量业务需求
-
目标:了解增量业务场景及需求
-
路径
- step1:离线与实时
- step2:离线需求
- step3:增量与全量
-
实施
- 离线与实时
- 离线:以时间为单位来实现数据的处理:采集、计算
- 场景:每天处理一次,每个小时处理一次,每个月,每年
- 特点:时效性比较低,一般都是分钟级别
- 工具:Hadoop生态圈
- Sqoop、HDFS、Hive、MapReduce、SparkCore、SparkSQL、Tez、Impala、Sqoop、MySQL
- 实时:以数据为单位的实现数据的处理:采集、计算
- 场景:产生一条数据就要立刻采集以及处理一条数据
- 特点:时效性非常高,一般都是毫秒级别
- 工具:实时生态圈
- Flume、Canal、Kafka、SparkStreaming/Flink、Redis、Hbase
- 离线:以时间为单位来实现数据的处理:采集、计算
- 离线需求
- 实现离线采集、离线计算、离线结果保存
- 所有过程:都是增量的过程
- 增量与全量
- 全量:每次都所有数据进行处理
- 一般用于数据迁移、维度表的更新
- 增量:每次对最新【新增、更新】的数据进行处理
- 工作中主要的场景
- 全量:每次都所有数据进行处理
- 离线与实时
-
小结
- 了解增量处理的需求及应用场景
知识点04:增量采集:方案
-
目标:掌握增量采集的实现方案
-
实施
-
具体方案也取决于使用的采集工具
-
Flume:增量文件采集
- exec:tail命令,动态的获取文件的尾部
- tail命令,自动读取文件的尾部
- taildir:动态实时监控多个文件
- 记录文件的采集位置:taildir_position.json
- 实现增量采集
- exec:tail命令,动态的获取文件的尾部
-
Sqoop:增量采集数据库
-
方式一:按照某一列自增的int值来实现:append
- 要求:必须有一列自增的int值,必须有主键
- 特点:只能采集新增的数据
-
方式二:按照数据变化的时间列的值来实现:lastmodifield
- 要求:必须有一列时间列,时间列随着数据的更新而自动更新
- 特点:能采集新增和更新的数据
-
方式三:通过指定目录分区采集到对应的HDFS目录下
-
要求:表中有两个字段
- create_time:创建时间
- 新增的数据
- create_time:创建时间
-
-
update_time:更新时间
- 更新的数据- 怎么解决更新和新增数据的问题:通过SQL的过滤
-
-
-e "select * from table where substr(create_time,1,10) = '2021-05-15' or substr(update_time,1,10) = '2021-05-15'“
- 增量要求目录是提前存在,追加新增的数据进入,没有使用官方提供的增量,目录不能提前存在
--target-dir /nginx/log/2021-05-15/
-
问题:如何通过Sqoop将数据采集到Hive的分区表中?
-
方式一:Sqoop提供的方案
-
–hive-partition-key daystr:指定分区的字段是哪个字段
-
–hive-partition-value 2021-05-15:指定导入哪个分区
-
原理
-
根据指定的参数,在HDFS中创建一个目录:key=value
table/daystr=2021-05-15
-
在Hive中加载分区即可
-
-
问题:无法构建多级分区
-
-
方式二:通过手动指定HDFS方式来代替
--target-dir /nginx/log/daystr=2021-05-15/hourstr=00
-
-
小结
- 增量采集的方案有几种?
- 方式一:append
- 自增的int列,只能采集新增的
- 方式二:lastmodifield
- 时间变化的列,能采集新增的和更新的
- 方式三:通过SQLwhere过滤
- create_time和update_time,能采集新增的和更新的
- 输出目录不能提前存储,必须写入对应的指定分区存储目录
- 方式一:append
- 增量采集的方案有几种?
知识点05:增量采集:实现
-
目标:实现数据的增量采集
-
实施
-
创建MySQL测试数据表
create database if not exists db_order; use db_order; drop table if exists tb_order; create table tb_order( id varchar(10) primary key, pid varchar(10) not null, userid varchar(10) not null, price double not null, create_time varchar(20) not null );
-
插入测试数据
insert into tb_order values('o00001','p00001','u00001',100,'2021-05-13 00:01:01'); insert into tb_order values('o00002','p00002','u00002',100,'2021-05-13 10:01:02'); insert into tb_order values('o00003','p00003','u00003',100,'2021-05-13 11:01:03'); insert into tb_order values('o00004','p00004','u00004',100,'2021-05-13 23:01:04'); insert into tb_order values('o00005','p00005','u00001',100,'2021-05-14 00:01:01'); insert into tb_order values('o00006','p00006','u00002',100,'2021-05-14 10:01:02'); insert into tb_order values('o00007','p00007','u00003',100,'2021-05-14 11:01:03'); insert into tb_order values('o00008','p00008','u00004',100,'2021-05-14 23:01:04');
-
第一次增量采集:假设今天15号
sqoop import \\ --connect jdbc:mysql://node3:3306/db_order \\ --username root \\ --password-file file:///export/data/sqoop.passwd \\ --query "select * from tb_order where substring(create_time,1,10) = '2021-05-14' and \\$CONDITIONS " \\ --delete-target-dir \\ --target-dir /nginx/logs/tb_order/daystr=2021-05-14 \\ --fields-terminated-by '\\t' \\ -m 1
-
15号MySQL有新的数据生成
insert into tb_order values('o00009','p00005','u00001',100,'2021-05-15 00:01:01'); insert into tb_order values('o00010','p00006','u00002',100,'2021-05-15 10:01:02'); insert into tb_order values('o00011','p00007','u00003',100,'2021-05-15 11:01:03'); insert into tb_order values('o00012','p00008','u00004',100,'2021-05-15 23:01:04');
-
第二次增量采集:假设今天16号
sqoop import \\ --connect jdbc:mysql://node3:3306/db_order \\ --username root \\ --password-file file:///export/data/sqoop.passwd \\ --query "select * from tb_order where substring(create_time,1,10) = '2021-05-15' and \\$CONDITIONS " \\ --delete-target-dir \\ --target-dir /nginx/logs/tb_order/daystr=2021-05-15 \\ --fields-terminated-by '\\t' \\ -m 1
-
-
小结
- 实现数据的增量采集
知识点06:增量处理
-
目标:实现数据的增量处理
-
路径
- step1:ETL实现
- step2:数据处理
-
实施
-
Hive中建表
use default; drop table default.tb_order; create table if not exists default.tb_order( id string , pid string, userid string, price double , create_time string ) partitioned by (daystr string) row format delimited fields terminated by '\\t' location '/user/hive/warehouse/tb_order';
-
第一次处理
-
假设今天15号:对14号的数据进行处理
-
增量ETL
-
如果ETL发生在进入Hive之前,ETL工作中都是通过SparkCore/MR程序进行处理
yarn jar etl.jar main_class \\ #输入目录:数据采集的目录 /nginx/logs/tb_order/daystr=2021-05-14 \\ #输出目录:构建Hive表的目录 /user/hive/warehouse/tb_order/daystr=2021-05-14
-
这个程序一般不会给参数,昨天的日期都是在ETL程序中动态获取的
//Input Path inputPath = new Path("/nginx/logs/tb_order/daystr="+yesterday) TextInputFormat.setInputPaths(job,inputPath) //Output Path outputPath = new Path("/user/hive/warehouse/tb_order/daystr="+yesterday) TextOutputFormat.setOutputPath(job,outputPath)
-
-
模拟数据ETL的过程
hdfs dfs -cp /nginx/logs/tb_order/daystr=2021-05-14 /user/hive/warehouse/tb_order/
-
加载分区
- 第一种方式:修改Hive元数据,默认用于修复添加分区,不常用
msck repair table tb_order;
- 这个命令在Hive2.1版本中是个bug,不能用
-
-
增量处理
select daystr, count(id) as order_number, sum(price) as order_price from default.tb_order where daystr='2021-05-14' group by daystr;
-
-
第二次处理
-
假设今天16号:对15号的数据进行处理
-
增量ETL
-
模拟数据ETL的过程
hdfs dfs -cp /nginx/logs/tb_order/daystr=2021-05-15 /user/hive/warehouse/tb_order/
-
加载分区
- 添加分区元数据信息,常用的方式
-
alter table tb_order add if not exists partition(daystr='2021-05-15');
-
要求:HDFS上目录的名称必须为分区字段=值
/user/hive/warehouse/tb_order/daystr=2021-05-15
- Hive自动根据分区默认的目录名来关联的
-
场景
/user/hive/warehouse/tb_order/2021-05-15
-
实现:通过location关键字手动指定分区对应的HDFS
alter table tb_order add if not exists partition(daystr='2021-05-15') location '/user/hive/warehouse/tb_order/2021-05-15';
-
增量处理
select daystr, count(id) as order_number, sum(price) as order_price from default.tb_order where daystr='2021-05-15' group by daystr;
-
-
-
小结
- 实现数据的增量处理
知识点07:增量导出
-
目标:实现增量分析结果的增量导出
-
路径
- step1:建表
- step2:第一次导出
- step3:第二次导出
-
实施
-
建表
-
Hive中建立APP层结果表
drop table if exists tb_order_rs; create table if not exists default.tb_order_rs( daystr string, order_number int, order_price double ) row format delimited fields terminated by '\\t';
-
MySQL中创建导出结果表
use db_order; drop table if exists db_order.tb_order_rs; create table db_order.tb_order_rs( daystr varchar(20) primary key, order_number int, order_price double );
-
-
第一次导出
-
第一次分析的结果写入HiveAPP层的结果表
insert into table tb_order_rs select daystr, count(id) as order_number, sum(price) as order_price from default.tb_order where daystr='2021-05-14' group by daystr;
-
第一次导出到MySQL
sqoop export \\ --connect jdbc:mysql://node3:3306/db_order \\ --username root \\ --password 123456 \\ --table tb_order_rs \\ --hcatalog-database default \\ --hcatalog-table tb_order_rs \\ --input-fields-terminated-by '\\t' \\ --update-key daystr \\ --update-mode allowinsert \\ -m 1
-
查看MySQL
-
-
-
第二次导出
-
第二次分析的结果写入HiveAPP层的结果表
insert into table tb_order_rs select daystr, count(id) as order_number, sum(price) as order_price from default.tb_order where daystr='2021-05-15' group by daystr;
-
第二次导出到MySQL
sqoop export \\ --connect jdbc:mysql://node3:3306/db_order \\ --username root \\ --password 123456 \\ --table tb_order_rs \\ --hcatalog-database default \\ --hcatalog-table tb_order_rs \\ --input-fields-terminated-by '\\t' \\ --update-key daystr \\ --update-mode allowinsert \\ -m 1
-
查看MySQL
-
-
小结
- 实现增量分析结果的增量导出
知识点08:增量采集脚本开发
-
目标:实现增量采集脚本的开发
-
路径
- step1:分析
- step2:实现
-
实施
-
分析
- 需求:实现增量的采集,默认采集昨天的数据,允许采集指定日期的数据,并且实现模拟ETL过程
- 如果给定了参数,处理参数日期的数据
- 如果没有给参数,默认处理昨天的数据
- 需求:实现增量的采集,默认采集昨天的数据,允许采集指定日期的数据,并且实现模拟ETL过程
-
实现
-
创建脚本
mkdir /export/data/shell vim /export/data/shell/01.collect.sh
-
开发脚本
#!/bin/bash #step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期 if [ $# -ne 0 ] then #参数个数不为0 if [ $# -ne 1 ] then echo "参数至多只能有一个,为处理的日期,请重新运行!" exit 100 else #参数个数只有1个,就用第一个参数作为处理的日期 yesterday=$1 fi else #参数个数为0,默认处理昨天的日期 yesterday=`date -d '-1 day' +%Y-%m-%d` fi echo "step1:要处理的日期是:${yesterday}" echo "step2:开始运行采集的程序" #step2:运行增量采集 SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0 $SQOOP_HOME/bin/sqoop import \\ --connect jdbc:mysql://node3:3306/db_order \\ --username root \\ --password-file file:///export/data/sqoop.passwd \\ --query "select * from tb_order where substring(create_time,1,10) = '${yesterday}' and \\$CONDITIONS " \\ --delete-target-dir \\ --target-dir /nginx/logs/tb_order/daystr=${yesterday} \\ --fields-terminated-by '\\t' \\ -m 1 echo "step2:采集的程序运行结束" echo "step3:开始运行ETL" #模拟ETL的过程,将采集的新增的数据移动到表的目录下 HADOOP_HOME=/export/server/hadoop-2.6.0-cdh5.14.0 #先判断结果是否存在,如果已经存在,先删除再移动 $HADOOP_HOME/bin/hdfs dfs -test -e /user/hive/warehouse/tb_order/daystr=${yesterday} if [ $? -eq 0 ] then #存在 $HADOOP_HOME/bin/hdfs dfs -rm -r /user/hive/warehouse/tb_order/daystr=${yesterday} $HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/ else #不存在 $HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/ fi echo "step3:ETL结束"
-
-
-
小结
- 实现增量采集脚本的开发
知识点09:增量处理脚本开发
-
目标:实现增量处理脚本开发
-
路径
- step1:分析
- step2:实现
-
实施
-
分析
- step1:申明分区
- step2:增量处理,将结果保存早结果表中
-
实现
-
创建脚本
vim /export/data/shell/02.analysis.sh vim /export/data/shell/02.analysis.sql
-
Shell脚本
#!/bin/bash #step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期 if [ $# -ne 0 ] then #参数个数不为0 if [ $# -ne 1 ] then echo "参数至多只能有一个,为处理的日期,请重新运行!" exit 100 else #参数个数只有1个,就用第一个参数作为处理的日期 yesterday=$1 fi else #参数个数为0,默认处理昨天的日期 yesterday=`date -d '-1 day' +%Y-%m-%d` fi echo "step1:要处理的日期是:${yesterday}" echo "step2:开始运行分析" #step2:运行分析程序 HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0 $HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f /export/data/shell/02.analysis.sql echo "step2:分析的程序运行结束"
-
SQL文件
create table if not exists default.tb_order( id string , pid string, userid string, price double , create_time string ) partitioned by (daystr string) row format delimited fields terminated by '\\t' location '/user/hive/warehouse/tb_order'; alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}'); create table if not exists default.tb_order_rs( daystr string, order_number int, order_price double ) row format delimited fields terminated by '\\t'; insert into table default.tb_order_rs select daystr, count(id) as order_number, sum(price) as order_price from default.tb_order where daystr='${hiveconf:yest}' group by daystr;
-
-
-
小结
- 实现增量处理脚本开发
知识点10:增量导出脚本开发
-
目标:实现增量导出脚本的开发
-
路径
- step1:分析
- step2:实现
-
实施
-
分析
- 实现将Hive结果表的数据增量导出到MySQL中
-
实现
-
创建脚本
vim /export/data/shell/03.export.sh
-
Shell脚本
#!/bin/bash echo "step1:开始运行导出的程序" #step2:运行增量采集 SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0 $SQOOP_HOME/bin/sqoop export \\ --connect jdbc:mysql://node3:3306/db_order \\ --username root \\ --password 123456 \\ --table tb_order_rs \\ --hcatalog-database default \\ --hcatalog-table tb_order_rs \\ --input-fields-terminated-by '\\t' \\ --update-key daystr \\ --update-mode allowinsert \\ -m 1 echo "step1:导出的程序运行结束"
-
-
-
小结
- 实现增量导出脚本的开发
知识点11:任务流调度需求
-
目标:了解任务流调度实际工具需求
-
路径
- step1:整体需求
- step2:调度类型
-
实施
-
整体需求
-
相同的业务线,有不同的需求会有多个程序来实现,这多个程序共同完成的需求,组合在一起就是工作流或者叫做任务流
-
基于工作流来实现任务流的自动化运行
-
-
- **需求1:基于时间的任务运行**
- Job1和Job2是在每天固定的时间去采集昨天的数据
- 每天00:00
- **需求2:基于运行依赖关系的任务运行**
- Job3必须等待Job1运行成功,才能运行
- Job5必须等待Job3和Job4都运行成功才能运行
-
调度类型
- 定时调度:基于某种时间的规律进行调度运行
- 依赖调度:基于某种依赖关系进行调度运行
-
小结
- 任务调度的常见的两种调度类型是什么?
- 定时调度
- 依赖调度
- 任务调度的常见的两种调度类型是什么?
知识点12:任务流调度工具
-
目标:了解常见的任务流调度工具
-
实施
-
Linux Crontab:Linux中自带的一个工具
-
优点
- 简单,不用做额外的部署,能实现大多数的定时需求
- crontab -e
-
缺点
- 只能做定时任务的执行
-
语法
* * * * * command 分钟 小时 日 月 周几
-
-
Oozie:Cloudera公司研发的Hadoop生态圈的调度工具
- 官网:oozie.apache.org
- 优点
- 功能很强大,能满足几乎所有常规的任务流调度的需求
- 支持DAG流程调度
- 缺点
- 本身不是分布式的工具,依赖于MapReduce来实现分布式
- 原生的交互开发接口不友好
- 整体的监控不完善
- 学习成本比较高
-
-
Zeus:阿里巴巴最早基于Hadoop1研发的一个调度系统,目前市场上的Zeus一般都是携程版本的Zeus
-
优点
- 交互非常友好
- 使用非常简单
- 分布式的,功能相对也比较全面
-
缺点
- Bug非常多,阿里没有继续研发Zeus,不支持Hadoop2
-
-
Azkaban:LinkedIn公司研发的分布式调度工具
- 优点
- 重点着重于自身的调度功能的研发,其他的辅助性功能都通过插件来完成
- 自身也是分布式调度系统
- 界面交互性比较友好
- 开发交互性:properties或者JSON
- 缺点
- 3.x版本开始才支持完全分布式
- 优点
-
小结
- 了解常见的任务流调度工具
知识点13:Oozie的基本介绍
- 目标:掌握Oozie的功能、特点及应用场景
- 路径
- step1:功能
- step2:特点
- step3:应用
- step4:原理
- 实施
- 功能
- Oozie是一个专门为管理Hadoop生态的程序调度而设计的工作流调度系统
- 基于DAG实现依赖调度:WorkFlow
- 基于定时器实现定时调度:Coordinator
- 特点
- 优点:功能全面
- 缺点:部署相对复杂、原生开发方式过于复杂
- 应用
- 基于Hadoop平台的分布式离线任务流调度
- 原理
- 底层依赖于MapReduce,将工作流变成MapReduce程序,提交个YARN
- 由YARN来将不同的工作流分配到不同的机器上运行,用于构建分布式调度系统
- 功能
- 小结
- 了解Oozie的功能、特点、应用场景和基本原理
知识点14:Oozie的使用方式
-
目标:了解Oozie的使用方式
-
路径
- step1:原生方式
- step2:集成Hue
-
实施
-
原生方式
-
这种方式,是通过自己写代码的方式来实现工作流的开发,效率低,容易出问题,不用
-
实现一个效果:4个程序
- 第一个程序:shell脚本,定时运行的
- 第二个程序:Spark程序,必须等第一个程序运行完才能运行
- 第三个程序:MapReduce程序,必须等第二个程序运行才能运行’
- 第四个程序:Hive,必须等第三个程序运行完才能运行
-
先要开发一个XML文件
- 控制节点:start、end、kill
- 控制程序运行的流程
- start:开始节点
- end:终止节点
- kill:强制退出节点
- fork
- join
- 程序节点:action
<start to="first"> <action name="first"> <shell> <path>xxx.sh</path> <args></args> </shell> <ok to="second"> </ok> <error to="kill"></error> </action> <action name ="second"> <spark> <jar></jar> <class></class> …… </spark> <ok to="third"> </ok> <error to="kill"></error> <action> <action name ="forth"> <hive> <scprit></scprit> <path></path> …… </hive> 《分布式技术原理与算法解析》学习笔记Day11
- 控制节点:start、end、kill
-
-