Day21:增量处理与任务流调度

Posted 保护胖丁

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day21:增量处理与任务流调度相关的知识,希望对你有一定的参考价值。

知识点01:回顾

  1. 学员考勤管理需求,有哪些维度与指标?

    • 核心目标:提高学员学习质量
    • 主题目标:基于不同维度统计分析考勤指标
    • 指标:出勤人数、出勤率、迟到人数、迟到率、请假人数、请假率、旷课人数、旷课率
    • 维度:时间、班级
  2. 学员考勤管理数据来源及有哪些表?

    • 来源:学员管理系统:mysql

    • 事实表:出勤和请假事实

      • 学员打卡信息表
      • 学员请假信息表
    • 维度表:合法性判断

      • 班级作息时间表
      • 班级排课信息表
      • 班级总人数表
  3. 学员考勤管理主题如何设计,每层的功能是什么?

    • ODS:事实表
    • DIM:维度表
    • DWD:没有设计
    • DWM
      • 班级出勤状态表
        • step1:先基于学员打卡信息表得到学员出勤状态表
        • step2:再基于学员出勤状态表得到班级出勤状态表
      • 班级请假状态表
        • 直接基于学员请假信息表进行分组聚合
      • 班级旷课状态表
        • 班级总人数表 - 班级出勤状态表 - 班级请假状态表
      • 结果
        • 班级出勤状态表
        • 班级请假状态表
        • 班级旷课状态表
    • DWS:实现所有指标的合并
      • DWM层三张表与DIM中的总人数表进行关联
      • 关联字段:班级id、上课日期
      • 基于天维度下的指标的结果
    • APP
      • 构建月、年维度的结果

在这里插入图片描述

  1. 什么是CBO,如何使用CBO?

    • CBO:基于代价的优化器,会根据所有方案付出的代价,选择最优的方案来实现
    • step1:先通过分析优化器anlayze构建数据的元数据
    • step2:通过CBO进行优化

知识点02:目标

核心目标:实现离线数据仓库所有程序的自动化运行

  1. 实现增量处理以及增量脚本的开发
    • 增量
      • 增量采集:采集昨天的数据
      • 增量处理:处理昨天的输出
      • 增量导出:导出昨天的结果
    • 脚本:怎么解决增量的问题?
  2. 自动化运行
    • 需求1:程序A定时运行:每天00:05运行
    • 需求2:程序A运行结束,程序B自动运行
    • 任务流调度工具来实现:Oozie

知识点03:增量业务需求

  • 目标了解增量业务场景及需求

  • 路径

    • step1:离线与实时
    • step2:离线需求
    • step3:增量与全量
  • 实施

    • 离线与实时
      • 离线:以时间为单位来实现数据的处理:采集、计算
        • 场景:每天处理一次,每个小时处理一次,每个月,每年
        • 特点:时效性比较低,一般都是分钟级别
        • 工具:Hadoop生态圈
          • Sqoop、HDFS、Hive、MapReduce、SparkCore、SparkSQL、Tez、Impala、Sqoop、MySQL
      • 实时:以数据为单位的实现数据的处理:采集、计算
        • 场景:产生一条数据就要立刻采集以及处理一条数据
        • 特点:时效性非常高,一般都是毫秒级别
        • 工具:实时生态圈
          • Flume、Canal、Kafka、SparkStreaming/Flink、Redis、Hbase
    • 离线需求
      • 实现离线采集、离线计算、离线结果保存
      • 所有过程:都是增量的过程
    • 增量与全量
      • 全量:每次都所有数据进行处理
        • 一般用于数据迁移、维度表的更新
      • 增量:每次对最新【新增、更新】的数据进行处理
        • 工作中主要的场景
  • 小结

    • 了解增量处理的需求及应用场景

知识点04:增量采集:方案

  • 目标掌握增量采集的实现方案

  • 实施

    • 具体方案也取决于使用的采集工具

    • Flume:增量文件采集

      • exec:tail命令,动态的获取文件的尾部
        • tail命令,自动读取文件的尾部
      • taildir:动态实时监控多个文件
        • 记录文件的采集位置:taildir_position.json
        • 实现增量采集
    • Sqoop:增量采集数据库

      • 方式一:按照某一列自增的int值来实现:append

        • 要求:必须有一列自增的int值,必须有主键
        • 特点:只能采集新增的数据
      • 方式二:按照数据变化的时间列的值来实现:lastmodifield

        • 要求:必须有一列时间列,时间列随着数据的更新而自动更新
        • 特点:能采集新增和更新的数据
      • 方式三:通过指定目录分区采集到对应的HDFS目录下

        • 要求:表中有两个字段

          • create_time:创建时间
            • 新增的数据
      • update_time:更新时间
        - 更新的数据

        • 怎么解决更新和新增数据的问题:通过SQL的过滤
    -e "select * from table where substr(create_time,1,10) = '2021-05-15' or substr(update_time,1,10) = '2021-05-15'
  • 增量要求目录是提前存在,追加新增的数据进入,没有使用官方提供的增量,目录不能提前存在
    --target-dir /nginx/log/2021-05-15/
  • 问题:如何通过Sqoop将数据采集到Hive的分区表中?

    • 方式一:Sqoop提供的方案

      • –hive-partition-key daystr:指定分区的字段是哪个字段

      • –hive-partition-value 2021-05-15:指定导入哪个分区

      • 原理

        • 根据指定的参数,在HDFS中创建一个目录:key=value

          table/daystr=2021-05-15
          
        • 在Hive中加载分区即可

      • 问题:无法构建多级分区

    • 方式二:通过手动指定HDFS方式来代替

      --target-dir /nginx/log/daystr=2021-05-15/hourstr=00
      
  • 小结

    • 增量采集的方案有几种?
      • 方式一:append
        • 自增的int列,只能采集新增的
      • 方式二:lastmodifield
        • 时间变化的列,能采集新增的和更新的
      • 方式三:通过SQLwhere过滤
        • create_time和update_time,能采集新增的和更新的
        • 输出目录不能提前存储,必须写入对应的指定分区存储目录

知识点05:增量采集:实现

  • 目标:实现数据的增量采集

  • 实施

    • 创建MySQL测试数据表

      create database if not exists db_order;
      use db_order;
      drop table if exists tb_order;
      create table tb_order(
        id varchar(10) primary key,
        pid varchar(10) not null,
        userid varchar(10) not null,
        price double not null,
        create_time varchar(20) not null
      );
      
    • 插入测试数据

      insert into tb_order values('o00001','p00001','u00001',100,'2021-05-13 00:01:01');
      insert into tb_order values('o00002','p00002','u00002',100,'2021-05-13 10:01:02');
      insert into tb_order values('o00003','p00003','u00003',100,'2021-05-13 11:01:03');
      insert into tb_order values('o00004','p00004','u00004',100,'2021-05-13 23:01:04');
      insert into tb_order values('o00005','p00005','u00001',100,'2021-05-14 00:01:01');
      insert into tb_order values('o00006','p00006','u00002',100,'2021-05-14 10:01:02');
      insert into tb_order values('o00007','p00007','u00003',100,'2021-05-14 11:01:03');
      insert into tb_order values('o00008','p00008','u00004',100,'2021-05-14 23:01:04');
      
    • 第一次增量采集:假设今天15号

      sqoop  import \\
      --connect jdbc:mysql://node3:3306/db_order \\
      --username root \\
      --password-file file:///export/data/sqoop.passwd \\
      --query "select * from tb_order where substring(create_time,1,10) = '2021-05-14' and \\$CONDITIONS " \\
      --delete-target-dir \\
      --target-dir /nginx/logs/tb_order/daystr=2021-05-14 \\
      --fields-terminated-by '\\t' \\
      -m 1
      
    • 15号MySQL有新的数据生成

      insert into tb_order values('o00009','p00005','u00001',100,'2021-05-15 00:01:01');
      insert into tb_order values('o00010','p00006','u00002',100,'2021-05-15 10:01:02');
      insert into tb_order values('o00011','p00007','u00003',100,'2021-05-15 11:01:03');
      insert into tb_order values('o00012','p00008','u00004',100,'2021-05-15 23:01:04');
      
    • 第二次增量采集:假设今天16号

      sqoop  import \\
      --connect jdbc:mysql://node3:3306/db_order \\
      --username root \\
      --password-file file:///export/data/sqoop.passwd \\
      --query "select * from tb_order where substring(create_time,1,10) = '2021-05-15' and \\$CONDITIONS " \\
      --delete-target-dir \\
      --target-dir /nginx/logs/tb_order/daystr=2021-05-15 \\
      --fields-terminated-by '\\t' \\
      -m 1
      
  • 小结

    • 实现数据的增量采集

知识点06:增量处理

  • 目标实现数据的增量处理

  • 路径

    • step1:ETL实现
    • step2:数据处理
  • 实施

    • Hive中建表

      use default;
      drop table default.tb_order;
      create table if not exists default.tb_order(
        id string ,
        pid string,
        userid string,
        price double ,
        create_time string
      )
      partitioned  by (daystr string)
      row format delimited fields terminated by '\\t'
      location '/user/hive/warehouse/tb_order';
      
    • 第一次处理

      • 假设今天15号:对14号的数据进行处理

      • 增量ETL

        • 如果ETL发生在进入Hive之前,ETL工作中都是通过SparkCore/MR程序进行处理

          yarn jar etl.jar   main_class \\
          #输入目录:数据采集的目录
          /nginx/logs/tb_order/daystr=2021-05-14  \\
          #输出目录:构建Hive表的目录
          /user/hive/warehouse/tb_order/daystr=2021-05-14  
          
          • 这个程序一般不会给参数,昨天的日期都是在ETL程序中动态获取的

            //Input
            Path inputPath = new Path("/nginx/logs/tb_order/daystr="+yesterday)
            TextInputFormat.setInputPaths(job,inputPath)
            //Output
            Path outputPath = new Path("/user/hive/warehouse/tb_order/daystr="+yesterday)
            TextOutputFormat.setOutputPath(job,outputPath)
            
        • 模拟数据ETL的过程

          hdfs dfs -cp /nginx/logs/tb_order/daystr=2021-05-14  /user/hive/warehouse/tb_order/
          
        • 加载分区

          • 第一种方式:修改Hive元数据,默认用于修复添加分区,不常用
          msck repair table tb_order;
          
          • 这个命令在Hive2.1版本中是个bug,不能用
      • 增量处理

        select 
          daystr,
          count(id) as order_number,
          sum(price) as order_price
        from default.tb_order
        where daystr='2021-05-14'
        group by daystr;
        
    • 第二次处理

      • 假设今天16号:对15号的数据进行处理

      • 增量ETL

        • 模拟数据ETL的过程

          hdfs dfs -cp /nginx/logs/tb_order/daystr=2021-05-15  /user/hive/warehouse/tb_order/
          
        • 加载分区

          • 添加分区元数据信息,常用的方式
      alter table tb_order add if not exists partition(daystr='2021-05-15');
      
      • 要求:HDFS上目录的名称必须为分区字段=值

        /user/hive/warehouse/tb_order/daystr=2021-05-15
        
        • Hive自动根据分区默认的目录名来关联的
      • 场景

          /user/hive/warehouse/tb_order/2021-05-15
      
      • 实现:通过location关键字手动指定分区对应的HDFS

        alter table tb_order add if not exists partition(daystr='2021-05-15') location '/user/hive/warehouse/tb_order/2021-05-15'; 
        
      • 增量处理

        select 
          daystr,
          count(id) as order_number,
          sum(price) as order_price
        from default.tb_order
        where daystr='2021-05-15'
        group by daystr;
        
  • 小结

    • 实现数据的增量处理

知识点07:增量导出

  • 目标实现增量分析结果的增量导出

  • 路径

    • step1:建表
    • step2:第一次导出
    • step3:第二次导出
  • 实施

    • 建表

      • Hive中建立APP层结果表

        drop table if exists tb_order_rs;
        create table if not exists default.tb_order_rs(
          daystr string,
          order_number int,
          order_price double
        )
        row format delimited fields terminated by '\\t';
        
      • MySQL中创建导出结果表

        use db_order;
        drop table if exists db_order.tb_order_rs;
        create table db_order.tb_order_rs(
          daystr varchar(20) primary key,
          order_number int,
          order_price double
        );
        
    • 第一次导出

      • 第一次分析的结果写入HiveAPP层的结果表

        insert into table tb_order_rs
        select 
          daystr,
          count(id) as order_number,
          sum(price) as order_price
        from default.tb_order
        where daystr='2021-05-14'
        group by daystr;
        
      • 第一次导出到MySQL

        sqoop export \\
        --connect jdbc:mysql://node3:3306/db_order \\
        --username root \\
        --password 123456 \\
        --table tb_order_rs \\
        --hcatalog-database default \\
        --hcatalog-table tb_order_rs \\
        --input-fields-terminated-by '\\t' \\
        --update-key daystr \\
        --update-mode allowinsert \\
        -m 1
        
      • 查看MySQL

在这里插入图片描述

  • 第二次导出

    • 第二次分析的结果写入HiveAPP层的结果表

      insert into table tb_order_rs
      select 
        daystr,
        count(id) as order_number,
        sum(price) as order_price
      from default.tb_order
      where daystr='2021-05-15'
      group by daystr;
      
    • 第二次导出到MySQL

      sqoop export \\
      --connect jdbc:mysql://node3:3306/db_order \\
      --username root \\
      --password 123456 \\
      --table tb_order_rs \\
      --hcatalog-database default \\
      --hcatalog-table tb_order_rs \\
      --input-fields-terminated-by '\\t' \\
      --update-key daystr \\
      --update-mode allowinsert \\
      -m 1
      
    • 查看MySQL
      在这里插入图片描述

  • 小结

    • 实现增量分析结果的增量导出

知识点08:增量采集脚本开发

  • 目标实现增量采集脚本的开发

  • 路径

    • step1:分析
    • step2:实现
  • 实施

    • 分析

      • 需求:实现增量的采集,默认采集昨天的数据,允许采集指定日期的数据,并且实现模拟ETL过程
        • 如果给定了参数,处理参数日期的数据
        • 如果没有给参数,默认处理昨天的数据
    • 实现

      • 创建脚本

        mkdir /export/data/shell
        vim /export/data/shell/01.collect.sh
        
      • 开发脚本

        #!/bin/bash
        
        #step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
        if [ $# -ne 0 ]
        then
        	#参数个数不为0
        	if [ $# -ne 1 ]
        	then
        		echo "参数至多只能有一个,为处理的日期,请重新运行!"
        		exit 100
        	else
        		#参数个数只有1个,就用第一个参数作为处理的日期
        		yesterday=$1
        	fi
        else
        	#参数个数为0,默认处理昨天的日期
        	yesterday=`date -d '-1 day' +%Y-%m-%d`
        fi
        echo "step1:要处理的日期是:${yesterday}"
        
        echo "step2:开始运行采集的程序"
        #step2:运行增量采集
        SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
        $SQOOP_HOME/bin/sqoop  import \\
        --connect jdbc:mysql://node3:3306/db_order \\
        --username root \\
        --password-file file:///export/data/sqoop.passwd \\
        --query "select * from tb_order where substring(create_time,1,10) = '${yesterday}' and \\$CONDITIONS " \\
        --delete-target-dir \\
        --target-dir /nginx/logs/tb_order/daystr=${yesterday} \\
        --fields-terminated-by '\\t' \\
        -m 1
        
        echo "step2:采集的程序运行结束"
        
        
        echo "step3:开始运行ETL"
        #模拟ETL的过程,将采集的新增的数据移动到表的目录下
        HADOOP_HOME=/export/server/hadoop-2.6.0-cdh5.14.0
        #先判断结果是否存在,如果已经存在,先删除再移动
        $HADOOP_HOME/bin/hdfs dfs -test -e  /user/hive/warehouse/tb_order/daystr=${yesterday}
        if [ $? -eq 0 ]
        then
        	#存在
        	$HADOOP_HOME/bin/hdfs dfs -rm -r  /user/hive/warehouse/tb_order/daystr=${yesterday}
        	$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
        else
        	#不存在
        	$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
        fi 
        echo "step3:ETL结束"
        
  • 小结

    • 实现增量采集脚本的开发

知识点09:增量处理脚本开发

  • 目标实现增量处理脚本开发

  • 路径

    • step1:分析
    • step2:实现
  • 实施

    • 分析

      • step1:申明分区
      • step2:增量处理,将结果保存早结果表中
    • 实现

      • 创建脚本

        vim /export/data/shell/02.analysis.sh
        vim /export/data/shell/02.analysis.sql
        
      • Shell脚本

        #!/bin/bash
        
        #step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
        if [ $# -ne 0 ]
        then
        	#参数个数不为0
        	if [ $# -ne 1 ]
        	then
        		echo "参数至多只能有一个,为处理的日期,请重新运行!"
        		exit 100
        	else
        		#参数个数只有1个,就用第一个参数作为处理的日期
        		yesterday=$1
        	fi
        else
        	#参数个数为0,默认处理昨天的日期
        	yesterday=`date -d '-1 day' +%Y-%m-%d`
        fi
        echo "step1:要处理的日期是:${yesterday}"
        
        echo "step2:开始运行分析"
        #step2:运行分析程序
        HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
        $HIVE_HOME/bin/hive --hiveconf yest=${yesterday}  -f  /export/data/shell/02.analysis.sql
        
        echo "step2:分析的程序运行结束"
        
      • SQL文件

        create table if not exists default.tb_order(
          id string ,
          pid string,
          userid string,
          price double ,
          create_time string
        )
        partitioned  by (daystr string)
        row format delimited fields terminated by '\\t'
        location '/user/hive/warehouse/tb_order';
        
        alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}');
        
        
        create table if not exists default.tb_order_rs(
          daystr string,
          order_number int,
          order_price double
        )
        row format delimited fields terminated by '\\t';
        
        insert into table default.tb_order_rs
        select
          daystr,
          count(id) as order_number,
          sum(price) as order_price
        from default.tb_order
        where daystr='${hiveconf:yest}'
        group by daystr;
        
  • 小结

    • 实现增量处理脚本开发

知识点10:增量导出脚本开发

  • 目标实现增量导出脚本的开发

  • 路径

    • step1:分析
    • step2:实现
  • 实施

    • 分析

      • 实现将Hive结果表的数据增量导出到MySQL中
    • 实现

      • 创建脚本

        vim /export/data/shell/03.export.sh
        
      • Shell脚本

        #!/bin/bash
        
        echo "step1:开始运行导出的程序"
        #step2:运行增量采集
        SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
        $SQOOP_HOME/bin/sqoop export \\
        --connect jdbc:mysql://node3:3306/db_order \\
        --username root \\
        --password 123456 \\
        --table tb_order_rs \\
        --hcatalog-database default \\
        --hcatalog-table tb_order_rs \\
        --input-fields-terminated-by '\\t' \\
        --update-key daystr \\
        --update-mode allowinsert \\
        -m 1
        
        echo "step1:导出的程序运行结束"
        
  • 小结

    • 实现增量导出脚本的开发

知识点11:任务流调度需求

  • 目标:了解任务流调度实际工具需求

  • 路径

    • step1:整体需求
    • step2:调度类型
  • 实施

    • 整体需求

      • 相同的业务线,有不同的需求会有多个程序来实现,这多个程序共同完成的需求,组合在一起就是工作流或者叫做任务流

      • 基于工作流来实现任务流的自动化运行

在这里插入图片描述

- **需求1:基于时间的任务运行**

  - Job1和Job2是在每天固定的时间去采集昨天的数据
  - 每天00:00

- **需求2:基于运行依赖关系的任务运行**

  - Job3必须等待Job1运行成功,才能运行
  - Job5必须等待Job3和Job4都运行成功才能运行
  • 调度类型

    • 定时调度:基于某种时间的规律进行调度运行
    • 依赖调度:基于某种依赖关系进行调度运行
  • 小结

    • 任务调度的常见的两种调度类型是什么?
      • 定时调度
      • 依赖调度

知识点12:任务流调度工具

  • 目标:了解常见的任务流调度工具

  • 实施

    • Linux Crontab:Linux中自带的一个工具

      • 优点

        • 简单,不用做额外的部署,能实现大多数的定时需求
        • crontab -e
      • 缺点

        • 只能做定时任务的执行
      • 语法

        *			*			*			*			*			command
        分钟		小时			日			月		周几
        
    • Oozie:Cloudera公司研发的Hadoop生态圈的调度工具

      • 官网:oozie.apache.org
      • 优点
        • 功能很强大,能满足几乎所有常规的任务流调度的需求
        • 支持DAG流程调度
      • 缺点
        • 本身不是分布式的工具,依赖于MapReduce来实现分布式
        • 原生的交互开发接口不友好
        • 整体的监控不完善
        • 学习成本比较高

在这里插入图片描述

  • Zeus:阿里巴巴最早基于Hadoop1研发的一个调度系统,目前市场上的Zeus一般都是携程版本的Zeus

    • 优点

      • 交互非常友好
      • 使用非常简单
      • 分布式的,功能相对也比较全面
    • 缺点

      • Bug非常多,阿里没有继续研发Zeus,不支持Hadoop2

在这里插入图片描述

  • Azkaban:LinkedIn公司研发的分布式调度工具

    • 优点
      • 重点着重于自身的调度功能的研发,其他的辅助性功能都通过插件来完成
      • 自身也是分布式调度系统
      • 界面交互性比较友好
      • 开发交互性:properties或者JSON
    • 缺点
      • 3.x版本开始才支持完全分布式

在这里插入图片描述

  • 小结

    • 了解常见的任务流调度工具

知识点13:Oozie的基本介绍

  • 目标掌握Oozie的功能、特点及应用场景
  • 路径
    • step1:功能
    • step2:特点
    • step3:应用
    • step4:原理
  • 实施
    • 功能
      • Oozie是一个专门为管理Hadoop生态的程序调度而设计的工作流调度系统
      • 基于DAG实现依赖调度:WorkFlow
      • 基于定时器实现定时调度:Coordinator
    • 特点
      • 优点:功能全面
      • 缺点:部署相对复杂、原生开发方式过于复杂
    • 应用
      • 基于Hadoop平台的分布式离线任务流调度
    • 原理
      • 底层依赖于MapReduce,将工作流变成MapReduce程序,提交个YARN
      • 由YARN来将不同的工作流分配到不同的机器上运行,用于构建分布式调度系统
  • 小结
    • 了解Oozie的功能、特点、应用场景和基本原理

知识点14:Oozie的使用方式

  • 目标:了解Oozie的使用方式

  • 路径

    • step1:原生方式
    • step2:集成Hue
  • 实施

    • 原生方式

      • 这种方式,是通过自己写代码的方式来实现工作流的开发,效率低,容易出问题,不用

      • 实现一个效果:4个程序

        • 第一个程序:shell脚本,定时运行的
        • 第二个程序:Spark程序,必须等第一个程序运行完才能运行
        • 第三个程序:MapReduce程序,必须等第二个程序运行才能运行’
        • 第四个程序:Hive,必须等第三个程序运行完才能运行
      • 先要开发一个XML文件

        • 控制节点:start、end、kill
          • 控制程序运行的流程
        • start:开始节点
          • end:终止节点
          • kill:强制退出节点
          • fork
          • join
        • 程序节点:action
        <start to="first">
        <action name="first">
        	<shell>
        		<path>xxx.sh</path>
        		<args></args>
        	</shell>
        	<ok to="second"> </ok>
        	<error to="kill"></error>
        </action>
        <action name ="second">
        	<spark>
        		<jar></jar>
        		<class></class>
        		……
        	</spark>
        	<ok to="third"> </ok>
          <error to="kill"></error>
        <action>
            <action name ="forth">
        	<hive>
        		<scprit></scprit>
        		<path></path>
        		……
        	</hive>
        	《分布式技术原理与算法解析》学习笔记Day11

        作业流调度框架 oozie 使用

        大数据:任务调度

        PYTHON学习笔记-DAY-9

        day9--多线程与多进程

        作业流 oozie调度框架的配置与使用