Python工业项目实战02:数仓设计及数据采集

Posted 黑马程序员官方

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python工业项目实战02:数仓设计及数据采集相关的知识,希望对你有一定的参考价值。

知识点01:课程回顾

  1. 一站制造项目的需求是什么?

    • 行业:工业物联网
    • 项目:加油站服务商数据分析平台
    • 需求
      • 提高服务质量:安装、维修、巡检、改造工单分析,回访分析
      • 合理规划成本运算:收益分析、报销分析、物料成本
  2. 一站制造项目的技术选型是什么?

    • 数据来源:Oracle【CRM系统、客服系统、报销系统】
    • 数据采集:Sqoop
    • 数据存储:Hive数据仓库
    • 数据计算:SparkSQL【离线+实时、DSL + SQL】
    • 数据应用:mysql + Grafana
    • 调度工具:Airflow
    • 服务监控:Prometheus
    • 资源容器:Docker
  3. Docker中的基本容器管理命令是什么?

    • 启动和关闭:docker start|stop 容器名称
    • 进入和退出
      • docker exec -it 容器名称 bash
      • exit
  4. 问题

    • DG连接问题

      • 原理:JDBC:用Java代码连接数据库

      • Hive/SparkSQL:端口有区别

        • 可以为同一个端口,只要不在同一台机器

        • 项目:一台机器

          • HiveServer:10000

            hiveserver.port = 10000
            
          • SparkSQL:10001

            start-thriftserver.sh --hiveserver.prot = 10001
            
      • MySQL:hostname、port、username、password

      • Oracle:hostname、port、username、password、sid

      • 驱动导入

        • 自动导入:MYSQL、Oracle
        • 手动导入:Hive、SparkSQL
          • step1:清空所有自带的包
          • step2:导入所有的包
            • hive-2.1.0
            • hive-2.1.0-spark
    • CS模式设计问题

      • Thrift启动问题
      • CS模式:客户端服务端模式
        • Client:客户端
          • Hive:Beeline、Hue
          • SparkSQL
        • Server:服务端
          • Hive:Hiveserver2【负责解析SQL语句】
            • HiveServer作为Metastore的客户端
            • MetaStore作为HiveServer的服务端
          • SparkSQL:ThriftServer【负责解析SQL语句转换为SparkCore程序】
            • 放入hive-site.xml文件到Spark的conf目录的目的?
              • 让SparkSQL能够访问Hive的元数据服务的地址:metastore、
              • 为了访问Hive
            • 不放行不行:可以
            • 启动ThriftServer或者HiveServer
              • docker start hadoop
              • docker start hive
              • docker start spark
    • 问题:思路

      • 现象:异常

        • Python:error:xxxxxx
        • Java:throw Exception:xxxxxxxxx
        • 进程没有明显报错:找日志文件
          • 日志文件:logs
          • 查看日志:tail -100f logs/xxxxxxxx.log
      • 分析错误

        • ArrayoutofIndex
        • NullException
        • ClassNotFound
        • 自己先尝试解决
        • 如果解决不了,就问老师

知识点02:课程目标

  1. 数据仓库设计
    • 建模:维度建模:【事实表、维度表】
    • 分层:ODS、DW【DWD、DWM、DWS】、APP
    • 掌握本次项目中数仓的分层
      • ODS、DWD、DWB、DWS、ST、DM
  2. 业务系统流程和数据来源
    • 数据源
    • 常见的数据表
  3. 数据采集
    • 核心1:实现自动化增量采集
    • 核心2:Sqoop采集中的一个特殊问题以及解决方案

知识点03:数仓设计回顾

  • 目标:了解数据仓库设计的核心知识点

  • 路径

    • step1:分层
    • step2:建模
  • 实施

    • 分层

      • 什么是分层?
        • 本质:规范化数据的处理流程
        • 实现:每一层在Hive中就是一个数据库
      • 为什么要分层?
        • 清晰数据结构:每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解。
        • 数据血缘追踪:简单来讲可以这样理解,我们最终给业务诚信的是一能直接使用的张业务表,但是它的来源有很多,如果有一张来源表出问题了,我们希望能够快速准确地定位到问题,并清楚它的危害范围。
        • 减少重复开发:规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算。
        • 把复杂问题简单化:一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。
        • 屏蔽原始数据的异常对业务的影响:不必改一次业务就需要重新接入数据
      • 怎么分层?
        • ODS:原始数据层/操作数据层,最接近与原始数据的层次,数据基本与原始数据保持一致
        • DW:数据仓库层,实现数据的处理转换
          • DWD:实现ETL
          • DWM:轻度聚合
          • DWS:最终聚合
        • ADS/APP/DA:数据应用层
    • 建模

      • 什么是建模?

        • 本质:决定了数据存储的方式,表的设计
      • 为什么要建模?

        • 大数据系统需要数据模型方法来帮助更好地组织和存储数据,以便在性能、成本、效率和质量之间取得最佳平衡。
        • 性能:良好的数据模型能帮助我们快速查询所需要的数据,减少数据的I/O吞吐
        • 成本:良好的数据模型能极大地减少不必要的数据冗余,也能实现计算结果复用,极大地降低大数据系统中的存储和计算成本
        • 效率:良好的数据模型能极大地改善用户使用数据的体验,提高使用数据的效率
        • 质量:良好的数据模型能改善数据统计口径的不一致性,减少数据计算错误的可能性
      • 有哪些建模方法?

        • ER模型:从全企业的高度设计一个 3NF 【三范式】模型,用实体关系模型描述企业业务,满足业务需求的存储
        • 维度模型:从分析决策的需求出发构建模型,为分析需求服务,重点关注用户如何更快速的完成需求分析,具有较好的大规模复杂查询的响应性能
        • Data Vault:ER 模型的衍生,基于主题概念将企业数据进行结构化组织,并引入了更进一步的范式处理来优化模型,以应对源系统变更的扩展性
        • Anchor:一个高度可扩展的模型,核心思想是所有的扩展知识添加而不是修改,因此将模型规范到 6NF,基本变成了 k-v 结构化模型
      • 怎么构建维度模型步骤?

        • a.选择业务过程:你要做什么?
        • b.声明粒度:你的分析基于什么样的颗粒度?
        • c.确认环境的维度:你的整体有哪些维度?
        • d.确认用于度量的事实:你要基于这些维度构建哪些指标?
      • 具体的实施流程是什么?

        • a.需求调研:业务调研和数据调研

          • 业务调研:明确分析整个业务实现的过程
          • 数据调研:数据的内容是什么
      • b.划分主题域:面向业务将业务划分主题

        • 构建哪些主题域以及每个主题域中有哪些主题
      • 服务域:工单主题、回访主题、物料主题

        • c.构建维度总线矩阵:明确每个业务主题对应的维度关系

          主题域:主题时间维度地区维度
          工单主题YY
          回访主题NY
          物料主题YN
      • d.明确指标统计:明确所有原生指标与衍生指标

        • 工单主题:安装工单个数、维修工单个数……

        • 回访主题:用户满意个数、不满意个数、服务态度不满意个数、技术能力不满意个数

        • e.定义事实与维度规范

          • 分层规范
          • 开发规范
        • ……

      • f.代码开发

    • 事实表

      • 表的分类

        • 事务事实表:原始的事务事实的数据表,原始业务数据表
        • 周期快照事实表:周期性对事务事实进行聚合的结果
        • 累计快照事实表:随着时间的变化,事实是不定的,不断完善的过程
      • 无事实事实表:特殊的事实表,里面没有事实,是多个维度的组合,用于求事实的差值

      • 值的分类

        • 可累加事实:在任何维度下指标的值都可以进行累加
        • 半可累加事实:在一定维度下指标的值都可以进行累加
        • 不可累加事实:在任何维度下指标的值都不可以进行累加
      • 维度表

        • 维度设计模型
          • 雪花模型:维度表拥有子维度表,部分维度表关联在维度表中,间接的关联事实表
          • 星型模型/星座模型:维度表没有子维度,直接关联在事实表上,星座模型中有多个事实
        • 上卷与下钻
          • 上卷:从小维度到一个大的维度,颗粒度从细到粗
          • 下钻:从大维度到一个小的维度,颗粒度从粗到细
      • 拉链表

        • 功能:解决事实中渐变维度发生变化的问题,通过时间来标记维度的每一种状态,存储所有状态

        • 实现

          • step1:先采集所有增量数据到更新表中
          • step2:将更新表的数据与老的拉链表的数据进行合并写入一张临时表
          • step3:将临时表的结果覆盖到拉链表中
  • 小结

    • 了解数据仓库设计的核心知识点

知识点04:分层整体设计

  • 目标掌握油站分析项目中的分层整体设计

  • 实施

    • ODS:原始数据层:最接近于原始数据的层次,直接采集写入层次:原始事务事实表

    • DWD:明细数据层:对ODS层的数据根据业务需求实现ETL以后的结果:ETL以后事务事实表

    • DWB:基础数据层:类似于以前讲解的DWM,轻度聚合

      • 关联:将主题事实的表进行关联,所有与这个主题相关的字段合并到一张表
      • 聚合:基于主题的事务事实构建基础指标
      • 主题事务事实表
    • ST:数据应用层:类似于以前讲解的APP,存储每个主题基于维度分析聚合的结果:周期快照事实表

      • 供数据分析的报表
    • DM:数据集市:按照不同部门的数据需求,将暂时没有实际主题需求的数据存储

      • 做部门数据归档,方便以后新的业务需求的迭代开发
    • DWS:维度数据层:类似于以前讲解的DIM:存储维度数据表

    • 数据仓库设计方案

      • 从上到下:在线教育:先明确需求和主题,然后基于主题的需求采集数据,处理数据
        • 场景:数据应用比较少,需求比较简单
      • 上下到上:一站制造:将整个公司所有数据统一化在数据仓库中存储准备,根据以后的需求,动态直接获取数据
        • 场景:数据应用比较多,业务比较复杂
  • 小结

    • 掌握油站分析项目中的分层整体设计
      • ODS:原始数据层
      • DWD:明细数据层
      • DWB:轻度汇总层
      • ST:数据应用层
      • DM:数据集市层
      • DWS:维度数据层

知识点05:分层具体功能

  • 目标:掌握油站分析的每层的具体功能
  • 实施
    • ODS
      • 数据内容:存储所有原始业务数据,基本与Oracle数据库中的业务数据保持一致
      • 数据来源:使用Sqoop从Oracle中同步采集
      • 存储设计:Hive分区表,avro文件格式存储,保留3个月
    • DWD
      • 数据内容:存储所有业务数据的明细数据
      • 数据来源:对ODS层的数据进行ETL扁平化处理得到
      • 存储设计:Hive分区表,orc文件格式存储,保留所有数据
    • DWB
      • 数据内容:存储所有事实与维度的基本关联、基本事实指标等数据
      • 数据来源:对DWD层的数据进行清洗过滤、轻度聚合以后的数据
      • 存储设计:Hive分区表,orc文件格式存储,保留所有数据
    • ST
      • 数据内容:存储所有报表分析的事实数据
      • 数据来源:基于DWB和DWS层,通过对不同维度的统计聚合得到所有报表事实的指标
    • DM
      • 数据内容:存储不同部门所需要的不同主题的数据
      • 数据来源:对DW层的数据进行聚合统计按照不同部门划分
    • DWS
      • 数据内容:存储所有业务的维度数据:日期、地区、油站、呼叫中心、仓库等维度表
      • 数据来源:对DWD的明细数据中抽取维度数据
      • 存储设计:Hive普通表,orc文件 + Snappy压缩
      • 特点:数量小、很少发生变化、全量采集
  • 小结
    • 掌握油站分析的每层的具体功能

知识点06:业务系统结构

  • 目标了解一站制造中的业务系统结构

  • 实施

    • 数据来源

      • 业务流程
      • 油站站点联系呼叫中心,申请工单
        • 呼叫中心分派工单给工程师
      • 工程师完成工单
        • 工程师费用报销
        • 呼叫中心回访工单

- **ERP系统**:企业资源管理系统,存储整个公司所有资源的信息
  - 所有的工程师、物品、设备产品供应链、生产、销售、财务的信息都在ERP系统中
- **CISS系统**:客户服务管理系统,存储所有用户、运营数据
  - 工单信息、用户信息
- **呼叫中心系统**:负责实现所有客户的需求申请、调度、回访等
  - 呼叫信息、分配信息、回访信息
  • 组织结构

    • 运营部(编制人数300人)
      • 负责服务策略制定和实施,对服务网络运营过程管理。部门职能包括物料管理、技术支持、服务效率管理、服务质量控制、服务标准化和可视化实施等工作。承担公司基础服务管理方面具体目标责任
    • 综合管理部(编制人数280人)
      • 下属部门有呼叫中心、信息运维、人事行政、绩效考核与培训、企划部等部门。负责公司市场部、运营部、财务部等专业业务以外的所有职能类工作,包括行政后勤管理、劳动关系、绩效考核与培训、企划宣传、采购需求管理、信息建设及数据分析、公司整体目标和绩效管理等工作。
    • 市场部(编制人数50人)
      • 负责客户需求开发、服务产品开发、市场拓展与销售管理工作,执行销售策略、承担公司市场、销售方面具体目标责任。
    • 财务部(编制人数10人)
      • 负责服务公司财务收支、费用报销、报表统计、财务分析等财务管理工作
    • 市场销售服务中心(编制人数4000人)
      • 负责服务产品销售,设备的安装、维护、修理、改造等工作,严格按照公司管理标准实施日常服务工作
  • 业务流程

  • 小结

    • 了解一站制造中的业务系统结构

知识点07:业务系统数据

  • 目标熟悉业务系统核心数据表

  • 实施

    • 切换查看数据库

  • 查看数据表

    • CISS_BASE:基础数据表
      • 报销项目核算、地区信息、服务商信息、设备信息、故障分类、出差补助信息、油站基础信息等
    • CISS_SERVICE、CISS_S:服务数据表
      • 来电受理单信息、改派记录信息、故障更换材料明细信息、综合报销信息、服务单信息、安装单、维修单、改造单信息
    • CISS_MATERIAL、CISS_M:仓储物料表
      • 物料申明明细信息、网点物料调配申请等
    • ORG:组织机构数据
      • 部门信息、员工信息等
    • EOS:字典信息表
      • 存放不同状态标识的字典
  • 核心数据表

- 运营分析
  - 工单分析、安装分析、维修分析、巡检分析、改造分析、来电受理分析
- 提高服务质量
  - 回访分析
- 运营成本核算
  - 收入、支持分析
  • 小结

    • 熟悉业务系统核心数据表

知识点08:全量与增量分析

  • 目标了解全量表与增量表数据采集需求
  • 实施
    • 全量表
      • 所有维度数据表
      • 场景:不会经常发生变化的数据表,例如维度数据表等
      • 数据表:组织机构信息、地区信息、服务商信息、数据字典等
      • 表名:参考文件《full_import_tables.txt》
    • 增量表
      • 所有事务事实的数据表
      • 场景:经常发生变化的数据表,例如业务数据、用户行为数据等
      • 数据表:工单数据信息、呼叫中心信息、物料仓储信息、报销费用信息等
      • 表名:参考文件《incr_import_tables.txt》
  • 小结
    • 了解全量表与增量表数据采集需求

知识点09:Sqoop命令回顾

  • 目标:掌握Sqoop常用命令的使用

  • 路径

    • step1:语法
    • step2:数据库参数
    • step3:导入参数
    • step4:导出参数
    • step5:其他参数
  • 实施

    • 语法

      sqoop import | export \\
      --数据库连接参数
      --HDFS或者Hive的连接参数
      --配置参数
      
    • 数据库参数

      • –connect jdbc:mysql://hostname:3306
      • –username
      • –password
      • –table
      • –columns
      • –where
      • -e/–query
    • 导入参数

      • –delete-target-dir
      • –target-dir
      • –hcatalog-database
      • –hcatalog-table
    • 导出参数

      • –export-dir
      • –hcatalog-database
      • –hcatalog-table
    • 其他参数

      • -m
    • 连接Oracle语法

      --connect jdbc:oracle:thin:@OracleServer:OraclePort:OracleSID
      
    • 测试采集Oracle数据

      • 进入

        docker exec -it sqoop bash
        
      • 测试

        sqoop import \\
        --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \\
        --username ciss \\
        --password 123456 \\
        --table CISS4.CISS_BASE_AREAS \\
        --target-dir /test/full_imp/ciss4.ciss_base_areas \\
        --fields-terminated-by "\\t" \\
        -m 1
        
      • 查看结果

  • 小结

    • 掌握Sqoop常用命令的使用

知识点10:YARN资源调度及配置

  • 目标实现YARN的资源调度配置

  • 实施

    • 常用端口记住:排错

      • NameNode:8020,50070
    • ResourceManager:8032,8088

      • JobHistoryServer:19888
      • Master:7077,8080
      • HistoryServer:18080
    • YARN调度策略

      • FIFO:不用
        • 单队列,队列内部FIFO,所有资源只给一个程序运行
      • Capacity:Apache
        • 多队列,队列内部FIFO,资源分配给不同的队列,队列内部所有资源只给一个程序运行
      • Fair:CDH
        • 多队列,队列内部共享资源,队列内部的资源可以给多个程序运行
    • YARN面试题

      • 程序提交成功,但是不运行而且不报错,什么问题,怎么解决?
        • 资源问题:APPMaster就没有启动
        • 环境问题
          • NodeManager进程问题:进程存在,但不工作
          • 机器资源不足导致YARN或者HDFS服务停止:磁盘超过90%,所有服务不再工作
          • 解决:实现监控告警:80%,邮件告警
      • YARN中程序运行失败的原因遇到过哪些?
        • 代码逻辑问题
        • 资源问题:Container
          • Application / Driver:管理进程
          • MapTask和ReduceTask / Executor:执行进程
        • 解决问题:配置进程给定更多的资源
    • 问题1:程序已提交YARN,但是无法运行,报错:Application is added to the scheduler and is not activated. User’s AM resource limit exceeded.

      yarn.scheduler.capacity.maximum-am-resource-percent=0.8
      
      • 配置文件:$HADOOP_HOME/etc/hadoop/capacity-scheduler.xml
      • 属性功能:指定队列最大可使用的资源容量大小百分比,默认为0.2,指定越大,AM能使用的资源越多
    • 问题2:程序提交,运行失败,报错:无法申请Container

      yarn.scheduler.minimum-allocation-mb=512
      
      • 配置文件:$HADOOP_HOME/etc/hadoop/yarn-site.xml
      • 属性功能:指定AM为每个Container申请的最小内存,默认为1G,申请不足1G,默认分配1G,值过大,会导致资源不足,程序失败,该值越小,能够运行的程序就越多
    • 问题3:怎么提高YARN集群的并发度?

      • 物理资源、YARN资源、Container资源、进程资源

      • YARN资源配置

        yarn.nodemanager.resource.cpu-vcores=8
        yarn.nodemanager.resource.memory-mb=8192
        
      • Container资源

        yarn.scheduler.minimum-allocation-vcores=1
        yarn.scheduler.maximum-allocation-vcores=32
        yarn.scheduler.minimum-allocation-mb=1024
        yarn.scheduler.maximum-allocation-mb=8192
        
      • MR Task资源

        mapreduce.map.cpu.vcores=1
        mapreduce.map.memory.mb=1024
        mapreduce.reduce.cpu.vcores=1
        mapreduce.reduce.memory.mb=1024
        
      • Spark Executor资源

        --driver-memory  #分配给Driver的内存,默认分配1GB
        --driver-cores   #分配给Driver运行的CPU核数,默认分配1核
        --executor-memory #分配给每个Executor的内存数,默认为1G,所有集群模式都通用的选项
        --executor-cores  #分配给每个Executor的核心数,YARN集合和Standalone集群通用的选项
        --total-executor-cores NUM  #Standalone模式下用于指定所有Executor所用的总CPU核数
        --num-executors NUM #YARN模式下用于指定Executor的个数,默认启动2个
        
    • 实现:修改问题1中的配置属性

      • 注意:修改完成,要重启YARN

  • 小结

    • 实现YARN的资源调度配置

知识点11:MR的Uber模式

  • 目标:了解MR的Uber模式的配置及应用

  • 实施

    • Spark为什么要比MR要快

      • MR慢
    • 只有Map和Reduce阶段,每个阶段的结果都必须写入磁盘
      - 如果要实现Map1 -> Map2 -> Reduce1 -> Reduce2

      • Mapreduce1:Map1
        • MapReduce2:Map2 -> Reduce1
      • Mapreduce3:Reduce2
        • MapReduce程序处理是进程级别:MapTask进程、ReduceTask进程
    • 问题:MR程序运行在YARN上时,有一些轻量级的作业要频繁的申请资源再运行,性能比较差怎么办?

      • Uber模式
    • 功能:Uber模式下,程序只申请一个AM Container:所有Map Task和Reduce Task,均在这个Container中顺序执行

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aD4Iv85r-1671506284227)(Day1007_数仓设计及数据采集.assets/image-20210822091155998.png)]

      • 默认不开启
    • 配置:$HADOOP_HOME/etc/hadoop/mapred-site.xml

      mapreduce.job.ubertask.enable=true
      #必须满足以下条件
      mapreduce.job.ubertask.maxmaps=9
      mapreduce.job.ubertask.maxreduces=1
      mapreduce.job.ubertask.maxbytes=128M
      yarn.app.mapreduce.am.resource.cpu-vcores=1
      yarn.app.mapreduce.am.resource.mb=1536M
      
    • 特点

      • Uber模式的进程为AM,所有资源的使用必须小于AM进程的资源
      • Uber模式条件不满足,不执行Uber模式
      • Uber模式,会禁用推测执行机制
  • 小结

    • 了解MR的Uber模式的配置及应用

知识点12:Sqoop采集数据格式问题

  • 目标掌握Sqoop采集数据时的问题

  • 路径

    • step1:现象
    • step2:问题
    • step3:原因
    • step4:解决
  • 实施

    • 现象

      • step1:查看Oracle中CISS_SERVICE_WORKORDER表的数据条数

        select count(1) as cnt from CISS_SERVICE_WORKORDER;
        
      • step2:采集CISS_SERVICE_WORKORDER的数据到HDFS上

      sqoop import
      –connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin
      –username ciss
      –password 123456
      –table CISS4.CISS_SERVICE_WORKORDER
      –delete-target-dir
      –target-dir /test/full_imp/ciss4.ciss_service_workorder
      –fields-terminated-by “\\001”
      -m 1

      
      - step3:Hive中建表查看数据条数
      
      - 进入Hive容器
      
        ```
      docker exec -it hive bash
        ```
      
      - 连接HiveServer
      
        ```
        beeline -u jdbc:hive2://hive.bigdata.cn:10000 -n root -p 123456
        ```
      
      - 创建测试表
      
        ```sql
        create external table test_text(
        line string
        )
        location '/test/full_imp/ciss4.ciss_service_workorder';
        ```
      
      - 统计行数
      
        ```
        select count(*) from test_text;
        ```
      
        
      
      
    • 问题:Sqoop采集完成后导致HDFS数据与Oracle数据量不符

    • 原因

      • sqoop以文本格式导入数据时,默认的换行符是特殊字符

      • Oracle中的数据列中如果出现了\\n、\\r、\\t等特殊字符,就会被划分为多行

      • Oracle数据

        id			name				age
        001			zhang\\nsan			18
        
      • Sqoop遇到特殊字段就作为一行

        001			zhang
        san			18
        
      • Hive

        id			name				age
        001			zhang 
        san			18
        
    • 解决

      • 方案一:删除或者替换数据中的换行符
        • –hive-drop-import-delims:删除换行符
        • –hive-delims-replacement char:替换换行符
        • 不建议使用:侵入了原始数据
      • 方案二:使用特殊文件格式:AVRO格式
  • 小结

    • 掌握Sqoop采集数据时的问题

知识点13:问题解决:Avro格式

  • 目标:掌握使用Avro格式解决采集换行问题

  • 路径

    • step1:常见格式介绍
    • step2:Avro格式特点
    • step3:Sqoop使用Avro格式
    • step4:使用测试
  • 实施

    • 常见格式介绍

      类型介绍
      TextFileHive默认的文件格式,最简单的数据格式,便于查看和编辑,耗费存储空间,I/O性能较低
      SequenceFile含有键值对的二进制文件,优化磁盘利用率和I/O,并行操作数据,查询效率高,但存储空间消耗最大
      AvroFile特殊的二进制文件,设计的主要目标是为了满足schema evolution,Schema和数据保存在一起
      OrcFile列式存储,Schema存储在footer中,不支持schema evolution,高度压缩比并包含索引,查询速度非常快
      ParquetFile列式存储,与Orc类似,压缩比不如Orc,但是查询性能接近,支持的工具更多,通用性更强
      • SparkCore缺点:RDD【数据】:没有Schema
      • SparkSQL优点:DataFrame【数据 + Schema】
      • Schema:列的信息【名称、类型】
    • Avro格式特点

      • 优点
        • 二进制数据存储,性能好、效率高
        • 使用JSON描述模式,支持场景更丰富
        • Schema和数据统一存储,消息自描述
        • 模式定义允许定义数据的排序
      • 缺点
        • 只支持Avro自己的序列化格式
        • 少量列的读取性能比较差,压缩比较低
      • 场景:基于行的大规模结构化数据写入、列的读取非常多或者Schema变更操作比较频繁的场景
    • Sqoop使用Avro格式

      • 选项

        --as-avrodatafile                                     Imports data to Avro datafiles
        
      • 注意:如果使用了MR的Uber模式,必须在程序中加上以下参数避免类冲突问题

        -Dmapreduce.job.user.classpath.first=true
        
    • 使用测试

      sqoop import \\
      -Dmapreduce.job.user.classpath.first=true \\
      --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \\
      --username ciss \\
      --password 123456 \\
      --table CISS4.CISS_SERVICE_WORKORDER \\
      --delete-target-dir \\
      --target-dir /test/full_imp/ciss4.ciss_service_workorder \\
      --as-avrodatafile \\
      --fields-terminated-by "\\001" \\
      -m 1
      
      • Hive中建表

        • 进入Hive容器

          docker exec -it hive bash
          
        • 连接HiveServer

          beeline -u jdbc:hive2://hive.bigdata.cn:10000 -n root -p 123456
          
        • 创建测试表

          create external table test_avro(
          line string
          )
          stored as avro
          location '/test/full_imp/ciss4.ciss_service_workorder';
          
        • 统计行数

          select count(*) from test_avro;
          
  • 小结

    • 掌握如何使用Avro格式解决采集换行问题

知识点14:Sqoop增量采集方案回顾

  • 目标:回顾Sqoop增量采集方案

  • 路径

    • step1:Append
    • step2:Lastmodified
    • step3:特殊方式
  • 实施

    • Append

      • 要求:必须有一列自增的值,按照自增的int值进行判断

      • 特点:只能导入增加的数据,无法导入更新的数据

      • 场景:数据只会发生新增,不会发生更新的场景

      • 代码

        sqoop import \\
        --connect jdbc:mysql://node3:3306/sqoopTest \\
        --username root \\
        --password 123456 \\
        --table tb_tohdfs \\
        --target-dir /sqoop/import/test02 \\
        --fields-terminated-by '\\t' \\
        --check-column id \\
        --incremental append \\
        --last-value 0 \\
        -m 1
        
    • Lastmodified

      • 要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断

      • 特点:既导入新增的数据也导入更新的数据

      • 场景:一般无法满足要求,所以不用

      • 代码

        sqoop import \\
        --connect jdbc:mysql://node3:3306/sqoopTest \\
        --username root \\
        --password 123456 \\
        --table tb_lastmode \\
        --target-dir /sqoop/import/test03 \\
        --fields-terminated-by '\\t' \\
        --incremental lastmodified \\
        --check-column lastmode \\
        --last-value '2021-06-06 16:09:32' \\
        -m 1
        
    • 特殊方式

      • 要求:每次运行的输出目录不能相同

      • 特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集

      • 场景:一般用于自定义增量采集每天的分区数据到Hive

      • 代码

        sqoop  import \\
        --connect jdbc:mysql://node3:3306/db_order \\
        --username root \\
        --password-file file:///export/data/sqoop.passwd \\
        --query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \\$CONDITIONS " \\
        --delete-target-dir \\
        --target-dir /nginx/logs/tb_order/daystr=2021-09-14 \\
        --fields-terminated-by '\\t' \\
        -m 1
        
  • 小结

    • 回顾Sqoop增量采集方案

知识点15:脚本开发思路

  • 目标:实现自动化脚本开发的设计思路分析

  • 路径

    • step1:脚本目标
    • step2:实现流程
    • step3:脚本选型
    • step4:单个测试
  • 实施

    • 脚本目标:实现自动化将多张Oracle中的数据表全量或者增量采集同步到HDFS中

    • 实现流程

      • a. 获取表名
      • b.构建Sqoop命令
      • c.执行Sqoop命令
      • d.验证结果
    • 脚本选型

      • Shell:Linux原生Shell脚本,命令功能全面丰富,主要用于实现自动化Linux指令,适合于Linux中简单的自动化任务开发
      • Python:多平台可移植兼容脚本,自身库功能强大,主要用于爬虫、数据科学分析计算等,适合于复杂逻辑的处理计算场景
      • 场景:一般100行以内的代码建议用Shell,超过100行的代码建议用Python
      • 采集脚本选用:Shell
    • 单个测试

      • 创建一个文件,存放要采集的表的名称

        #创建测试目录
        mkdir -p /opt/datas/shell
        cd /opt/datas/shell/
        #创建存放表名的文件
        vim test_full_table.txt
        
        ciss4.ciss_base_areas
        ciss4.ciss_base_baseinfo
        ciss4.ciss_base_csp
        ciss4.ciss_base_customer
        ciss4.ciss_base_device
        
      • 创建脚本

        vim test_full_import_table.sh
        
      • 构建采集的Sqoop命令

        sqoop import \\
        -Dmapreduce.job.user.classpath.first=true \\
        --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \\
        --username ciss \\
        --password 123456 \\
        --table CISS4.CISS_SERVICE_WORKORDER \\
        --delete-target-dir \\
        --target-dir /test/full_imp/ciss4.ciss_service_workorder \\
        --as-avrodatafile \\
        --fields-terminated-by "\\001" \\
        -m 1
        
      • 封装脚本

        #!/bin/bash
        #export path
        source /etc/profile
        #export the tbname files
        TB_NAME=/opt/datas/shell/test_full_table.txt
        #export the import opt
        IMP_OPT="sqoop import -Dmapreduce.job.user.classpath.first=true"
        #export the jdbc opt
        JDBC_OPT="--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin --username ciss --password 123456"
        
        #read tbname and exec sqoop
        while read tbname
        do
          $IMP_OPT $JDBC_OPT --table $tbname^^ --delete-target-dir --target-dir /test/full_imp/$tbname^^ --as-avrodatafile --fields-terminated-by "\\001" -m 1
        done < $TB_NAME
        
      • 添加执行权限

        chmod u+x test_full_import_table.sh
        
      • 测试执行

        sh -x test_full_import_table.sh
        
      • 检查结果

  • 小结

    • 实现自动化脚本开发的设计思路分析

知识点16:全量及增量采集脚本运行

  • 目标:实现全量采集脚本的运行

  • 实施

    • 全量目标:将所有需要将实现全量采集的表进行全量采集存储到HDFS上

      • Oracle表:组织机构信息、地区信息、服务商信息、数据字典等

      • HDFS路径

        /data/dw/ods/one_make/full_imp/表名/日期
        
    • 增量目标:将所有需要将实现全量采集的表进行增量采集存储到HDFS上

      • 工单数据信息、呼叫中心信息、物料仓储信息、报销费用信息等

      • HDFS路径

        /data/dw/ods/one_make/incr_imp/表名/日期
        
    • 运行脚本

      • 全量采集

        cd /opt/sqoop/one_make
        sh -x full_import_tables.sh 
        
        • 脚本中特殊的一些参数
      • –outdir:Sqoop解析出来的MR的Java程序等输出文件输出的文件

      • 增量采集

        cd /opt/sqoop/one_make
        sh -x incr_import_tables.sh 
        
    • 特殊问题

      • 因oracle表特殊字段类型,导致sqoop导数据任务失败
      • oracle字段类型为: clob或date等特殊类型
      • 解决方案:在sqoop命令中添加参数,指定特殊类型字段列(SERIAL_NUM)的数据类型为string
        • —map-column-java SERIAL_NUM=String
    • 查看结果

      • /data/dw/ods/one_make/full_imp:44张表
      • /data/dw/ods/one_make/incr_imp:57张表
  • 小结

    • 实现全量采集脚本的运行

知识点17:Schema备份及上传

  • 目标:了解如何实现采集数据备份

  • 实施

    • 需求:将每张表的Schema进行上传到HDFS上,归档并且备份

    • Avro文件本地存储

      workhome=/opt/sqoop/one_make
      --outdir $workhome/java_code
      
    • Avro文件HDFS存储

      hdfs_schema_dir=/data/dw/ods/one_make/avsc
      hdfs dfs -put $workhome/java_code/*.avsc $hdfs_schema_dir
      
    • Avro文件本地打包

      local_schema_backup_filename=schema_$biz_date.tar.gz
      tar -czf $local_schema_backup_filename ./java_code/*.avsc
      
    • Avro文件HDFS备份

      hdfs_schema_backup_filename=$hdfs_schema_dir/avro_schema_$biz_date.tar.gz
      hdfs dfs -put $local_schema_backup_filename $hdfs_schema_backup_filename
      
    • 运行测试

      cd /opt/sqoop/one_make/
      ./upload_avro_schema.sh 
      
    • 验证结果

      /data/dw/ods/one_make/avsc/
      *.avsc
      schema_20210101.tar.gz
      
  • 小结

    • 了解如何实现采集数据备份

知识点18:Python脚本

  • 目标:了解如果使用Python脚本如何实现

  • 实施

    • 原理本质

      • 问题:所有的操作是Sqoop、HDFS等命令操作,如何能通过Python代码控制?

      • 解决:本质上是使用Python执行了Linux的Shell命令来实现的

      • 导包

        # 用于实现执行系统操作的包
        import os
        # 用于实现执行Linux的命令的包
        import subprocess
        # 用于实现日期获取解析的包
        import datetime
        # 用于执行时间操作的包
        import time
        # 用于做日志记录的包
        import logging
        
    • 核心代码解析

      • subprocess

        call(String:LinuxCommand):用于提交Linux命令的方法
        
      • logging

        basicConfig(level,filename,filemode,format):用于配置日志记录的方式
        info(Messege):用于记录具体的日志内容
        
      • time

        sleep(15) :休眠15s
        
  • 小结

    • 了解如果使用Python脚本如何实现

p_filename ./java_code/*.avsc
```

  • Avro文件HDFS备份

    hdfs_schema_backup_filename=$hdfs_schema_dir/avro_schema_$biz_date.tar.gz
    hdfs dfs -put $local_schema_backup_filename $hdfs_schema_backup_filename
    
  • 运行测试

    cd /opt/sqoop/one_make/
    ./upload_avro_schema.sh 
    
  • 验证结果

    /data/dw/ods/one_make/avsc/
    *.avsc
    schema_20210101.tar.gz
    
  • 小结

    • 了解如何实现采集数据备份

知识点18:Python脚本

  • 目标:了解如果使用Python脚本如何实现

  • 实施

    • 原理本质

      • 问题:所有的操作是Sqoop、HDFS等命令操作,如何能通过Python代码控制?

      • 解决:本质上是使用Python执行了Linux的Shell命令来实现的

      • 导包

        # 用于实现执行系统操作的包
        import os
        # 用于实现执行Linux的命令的包
        import subprocess
        # 用于实现日期获取解析的包
        import datetime
        # 用于执行时间操作的包
        import time
        # 用于做日志记录的包
        import logging
        
    • 核心代码解析

      • subprocess

        call(String:LinuxCommand):用于提交Linux命令的方法
        
      • logging

        basicConfig(level,filename,filemode,format):用于配置日志记录的方式
        info(Messege):用于记录具体的日志内容
        
      • time

        sleep(15) :休眠15s
        
  • 小结

    • 了解如果使用Python脚本如何实现

以上是关于Python工业项目实战02:数仓设计及数据采集的主要内容,如果未能解决你的问题,请参考以下文章

Python工业项目实战 05:数仓事实层DWB层构建

Python工业项目实战01:项目介绍及环境构建

Python工业项目实战 06:数仓主题应用层ST层构建

Python工业项目实战 04:数仓维度层DWS层构建

Python工业项目实战03:ODS层及DWD层构建

大数据项目实战之在线教育(01数仓需求)