50000字,数仓建设保姆级教程,离线和实时一网打尽(理论+实战) 下

Posted 园陌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了50000字,数仓建设保姆级教程,离线和实时一网打尽(理论+实战) 下相关的知识,希望对你有一定的参考价值。

本文大纲:

因内容较多,本文将直接从第五章开始,完整版文档请点击下方链接:

数仓建设保姆级教程PDF文档

前四章内容在上方链接获取

第五章 实时数仓建设核心

1. 实时计算初期

虽然实时计算在最近几年才火起来,但是在早期也有部分公司有实时计算的需求,但是数据量比较少,所以在实时方面形成不了完整的体系,基本所有的开发都是具体问题具体分析,来一个需求做一个,基本不考虑它们之间的关系,开发形式如下:

如上图所示,拿到数据源后,会经过数据清洗,扩维,通过Flink进行业务逻辑处理,最后直接进行业务输出。把这个环节拆开来看,数据源端会重复引用相同的数据源,后面进行清洗、过滤、扩维等操作,都要重复做一遍,唯一不同的是业务的代码逻辑是不一样的。

随着产品和业务人员对实时数据需求的不断增多,这种开发模式出现的问题越来越多:

  1. 数据指标越来越多,“烟囱式”的开发导致代码耦合问题严重。

  2. 需求越来越多,有的需要明细数据,有的需要 OLAP 分析。单一的开发模式难以应付多种需求。

  3. 每个需求都要申请资源,导致资源成本急速膨胀,资源不能集约有效利用。

  4. 缺少完善的监控系统,无法在对业务产生影响之前发现并修复问题。

大家看实时数仓的发展和出现的问题,和离线数仓非常类似,后期数据量大了之后产生了各种问题,离线数仓当时是怎么解决的?离线数仓通过分层架构使数据解耦,多个业务可以共用数据,实时数仓是否也可以用分层架构呢?当然是可以的,但是细节上和离线的分层还是有一些不同,稍后会讲到。

2. 实时数仓建设

从方法论来讲,实时和离线是非常相似的,离线数仓早期的时候也是具体问题具体分析,当数据规模涨到一定量的时候才会考虑如何治理。分层是一种非常有效的数据治理方式,所以在实时数仓如何进行管理的问题上,首先考虑的也是分层的处理逻辑。

实时数仓的架构如下图:

从上图中我们具体分析下每层的作用:

  • 数据源:在数据源的层面,离线和实时在数据源是一致的,主要分为日志类和业务类,日志类又包括用户日志,埋点日志以及服务器日志等。

  • 实时明细层:在明细层,为了解决重复建设的问题,要进行统一构建,利用离线数仓的模式,建设统一的基础明细数据层,按照主题进行管理,明细层的目的是给下游提供直接可用的数据,因此要对基础层进行统一的加工,比如清洗、过滤、扩维等。

  • 汇总层:汇总层通过Flink的简洁算子直接可以算出结果,并且形成汇总指标池,所有的指标都统一在汇总层加工,所有人按照统一的规范管理建设,形成可复用的汇总结果。

我们可以看出,实时数仓和离线数仓的分层非常类似,比如 数据源层,明细层,汇总层,乃至应用层,他们命名的模式可能都是一样的。但仔细比较不难发现,两者有很多区别:

  • 与离线数仓相比,实时数仓的层次更少一些:

    • 从目前建设离线数仓的经验来看,数仓的数据明细层内容会非常丰富,处理明细数据外一般还会包含轻度汇总层的概念,另外离线数仓中应用层数据在数仓内部,但实时数仓中,app 应用层数据已经落入应用系统的存储介质中,可以把该层与数仓的表分离

    • 应用层少建设的好处:实时处理数据的时候,每建一个层次,数据必然会产生一定的延迟

    • 汇总层少建的好处:在汇总统计的时候,往往为了容忍一部分数据的延迟,可能会人为的制造一些延迟来保证数据的准确。举例,在统计跨天相关的订单事件中的数据时,可能会等到 00:00:05 或者 00:00:10 再统计,确保 00:00 前的数据已经全部接受到位了,再进行统计。所以,汇总层的层次太多的话,就会更大的加重人为造成的数据延迟。
  • 与离线数仓相比,实时数仓的数据源存储不同:

    • 在建设离线数仓的时候,基本整个离线数仓都是建立在 Hive 表之上。但是,在建设实时数仓的时候,同一份表,会使用不同的方式进行存储。比如常见的情况下,明细数据或者汇总数据都会存在 Kafka 里面,但是像城市、渠道等维度信息需要借助 Hbase,MySQL 或者其他 KV 存储等数据库来进行存储

3. Lambda架构的实时数仓

Lambda和Kappa架构的概念已在前文中解释,不了解的小伙伴可点击链接:一文读懂大数据实时计算

下图是基于 Flink 和 Kafka 的 Lambda 架构的具体实践,上层是实时计算,下层是离线计算,横向是按计算引擎来分,纵向是按实时数仓来区分:

Lambda架构是比较经典的架构,以前实时的场景不是很多,以离线为主,当附加了实时场景后,由于离线和实时的时效性不同,导致技术生态是不一样的。Lambda架构相当于附加了一条实时生产链路,在应用层面进行一个整合,双路生产,各自独立。这在业务应用中也是顺理成章采用的一种方式。

双路生产会存在一些问题,比如加工逻辑double,开发运维也会double,资源同样会变成两个资源链路。因为存在以上问题,所以又演进了一个Kappa架构。

4. Kappa架构的实时数仓

Kappa架构相当于去掉了离线计算部分的Lambda架构,具体如下图所示:

Kappa架构从架构设计来讲比较简单,生产统一,一套逻辑同时生产离线和实时。但是在实际应用场景有比较大的局限性,因为实时数据的同一份表,会使用不同的方式进行存储,这就导致关联时需要跨数据源,操作数据有很大局限性,所以在业内直接用Kappa架构生产落地的案例不多见,且场景比较单一。

关于 Kappa 架构,熟悉实时数仓生产的同学,可能会有一个疑问。因为我们经常会面临业务变更,所以很多业务逻辑是需要去迭代的。之前产出的一些数据,如果口径变更了,就需要重算,甚至重刷历史数据。对于实时数仓来说,怎么去解决数据重算问题?

Kappa 架构在这一块的思路是:首先要准备好一个能够存储历史数据的消息队列,比如 Kafka,并且这个消息队列是可以支持你从某个历史的节点重新开始消费的。接着需要新起一个任务,从原来比较早的一个时间节点去消费 Kafka 上的数据,然后当这个新的任务运行的进度已经能够和现在的正在跑的任务齐平的时候,你就可以把现在任务的下游切换到新的任务上面,旧的任务就可以停掉,并且原来产出的结果表也可以被删掉。

5. 流批结合的实时数仓

随着实时 OLAP 技术的发展,目前开源的OLAP引擎在性能,易用等方面有了很大的提升,如Doris、Presto等,加上数据湖技术的迅速发展,使得流批结合的方式变得简单。

如下图是流批结合的实时数仓:

数据从日志统一采集到消息队列,再到实时数仓,作为基础数据流的建设是统一的。之后对于日志类实时特征,实时大屏类应用走实时流计算。对于Binlog类业务分析走实时OLAP批处理。

我们看到流批结合的方式与上面几种架构的存储方式发生了变化,由Kafka换成了Iceberg,Iceberg是介于上层计算引擎和底层存储格式之间的一个中间层,我们可以把它定义成一种“数据组织格式”,底层存储还是HDFS,那么为什么加了中间层,就对流批结合处理的比较好了呢?Iceberg的ACID能力可以简化整个流水线的设计,降低整个流水线的延迟,并且所具有的修改、删除能力能够有效地降低开销,提升效率。Iceberg可以有效支持批处理的高吞吐数据扫描和流计算按分区粒度并发实时处理。

六、基于Flink SQL从0到1构建一个实时数仓

实时数仓主要解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析,实时大屏展示,实时监控报警各个场景。虽然关于实时数仓架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。接下来主要介绍Flink SQL从0到1搭建一个实时数仓的demo,涉及到数据采集、存储、计算、可视化整个流程。

1. 案例简介

本文以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成。

2. 架构设计

具体的架构设计如图所示:首先通过canal解析mysql的binlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入Kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行join,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。

3. 业务数据准备

1. 订单表(order_info)

CREATE TABLE `order_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 编号,
  `consignee` varchar(100) DEFAULT NULL COMMENT 收货人,
  `consignee_tel` varchar(20) DEFAULT NULL COMMENT 收件人电话,
  `total_amount` decimal(10,2) DEFAULT NULL COMMENT 总金额,
  `order_status` varchar(20) DEFAULT NULL COMMENT 订单状态,
  `user_id` bigint(20) DEFAULT NULL COMMENT 用户id,
  `payment_way` varchar(20) DEFAULT NULL COMMENT 付款方式,
  `delivery_address` varchar(1000) DEFAULT NULL COMMENT 送货地址,
  `order_comment` varchar(200) DEFAULT NULL COMMENT 订单备注,
  `out_trade_no` varchar(50) DEFAULT NULL COMMENT 订单交易编号(第三方支付用),
  `trade_body` varchar(200) DEFAULT NULL COMMENT 订单描述(第三方支付用),
  `create_time` datetime DEFAULT NULL COMMENT 创建时间,
  `operate_time` datetime DEFAULT NULL COMMENT 操作时间,
  `expire_time` datetime DEFAULT NULL COMMENT 失效时间,
  `tracking_no` varchar(100) DEFAULT NULL COMMENT 物流单编号,
  `parent_order_id` bigint(20) DEFAULT NULL COMMENT 父订单编号,
  `img_url` varchar(200) DEFAULT NULL COMMENT 图片路径,
  `province_id` int(20) DEFAULT NULL COMMENT 地区,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=订单表;

2. 订单详情表(order_detail)

CREATE TABLE `order_detail` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 编号,
  `order_id` bigint(20) DEFAULT NULL COMMENT 订单编号,
  `sku_id` bigint(20) DEFAULT NULL COMMENT sku_id,
  `sku_name` varchar(200) DEFAULT NULL COMMENT sku名称(冗余),
  `img_url` varchar(200) DEFAULT NULL COMMENT 图片名称(冗余),
  `order_price` decimal(10,2) DEFAULT NULL COMMENT 购买价格(下单时sku价格),
  `sku_num` varchar(200) DEFAULT NULL COMMENT 购买个数,
  `create_time` datetime DEFAULT NULL COMMENT 创建时间,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=订单详情表;

3. 商品表(sku_info)

CREATE TABLE `sku_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT skuid(itemID),
  `spu_id` bigint(20) DEFAULT NULL COMMENT spuid,
  `price` decimal(10,0) DEFAULT NULL COMMENT 价格,
  `sku_name` varchar(200) DEFAULT NULL COMMENT sku名称,
  `sku_desc` varchar(2000) DEFAULT NULL COMMENT 商品规格描述,
  `weight` decimal(10,2) DEFAULT NULL COMMENT 重量,
  `tm_id` bigint(20) DEFAULT NULL COMMENT 品牌(冗余),
  `category3_id` bigint(20) DEFAULT NULL COMMENT 三级分类id(冗余),
  `sku_default_img` varchar(200) DEFAULT NULL COMMENT 默认显示图片(冗余),
  `create_time` datetime DEFAULT NULL COMMENT 创建时间,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=商品表;

4. 商品一级类目表(base_category1)

CREATE TABLE `base_category1` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 编号,
  `name` varchar(10) NOT NULL COMMENT 分类名称,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=一级分类表;

5. 商品二级类目表(base_category2)

CREATE TABLE `base_category2` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 编号,
  `name` varchar(200) NOT NULL COMMENT 二级分类名称,
  `category1_id` bigint(20) DEFAULT NULL COMMENT 一级分类编号,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=二级分类表;

6. 商品三级类目表(base_category3)

CREATE TABLE `base_category3` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 编号,
  `name` varchar(200) NOT NULL COMMENT 三级分类名称,
  `category2_id` bigint(20) DEFAULT NULL COMMENT 二级分类编号,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT=三级分类表;

7. 省份表(区域表(base_region)base_province)

CREATE TABLE `base_province` (
  `id` int(20) DEFAULT NULL COMMENT id,
  `name` varchar(20) DEFAULT NULL COMMENT 省名称,
  `region_id` int(20) DEFAULT NULL COMMENT 大区id,
  `area_code` varchar(20) DEFAULT NULL COMMENT 行政区位码
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

8. 区域表(base_region)

CREATE TABLE `base_region` (
  `id` int(20) NOT NULL COMMENT 大区id,
  `region_name` varchar(20) DEFAULT NULL COMMENT 大区名称,
   PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

4. 数据处理流程

1. ods层数据同步

关于ODS层的数据同步这里就不详细展开。主要使用canal解析MySQL的binlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:

2. DIM层数据准备

本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表和商品维表。处理过程如下:

  • 区域维表

首先将mydw.base_provincemydw.base_region这个主题对应的数据抽取到MySQL中,主要使用Flink SQL的Kafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:

-- -------------------------
--   省份
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_province`;
CREATE TABLE `ods_base_province` (
  `id` INT,
  `name` STRING,
  `region_id` INT ,
  `area_code`STRING
) WITH(
connector = kafka,
 topic = mydw.base_province,
 properties.bootstrap.servers = kms-3:9092,
 properties.group.id = testGroup,
 format = canal-json ,
 scan.startup.mode = earliest-offset 
) ; 

-- -------------------------
--   省份
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_province`;
CREATE TABLE `base_province` (
    `id` INT,
    `name` STRING,
    `region_id` INT ,
    `area_code`STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/dim,
    table-name = base_province, -- MySQL中的待插入数据的表
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe,
    sink.buffer-flush.interval = 1s
);

-- -------------------------
--   省份
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_province
SELECT *
FROM ods_base_province;

-- -------------------------
--   区域
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_region`;
CREATE TABLE `ods_base_region` (
  `id` INT,
  `region_name` STRING
) WITH(
connector = kafka,
 topic = mydw.base_region,
 properties.bootstrap.servers = kms-3:9092,
 properties.group.id = testGroup,
 format = canal-json ,
 scan.startup.mode = earliest-offset 
) ; 

-- -------------------------
--   区域
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_region`;
CREATE TABLE `base_region` (
    `id` INT,
    `region_name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/dim,
    table-name = base_region, -- MySQL中的待插入数据的表
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe,
    sink.buffer-flush.interval = 1s
);

-- -------------------------
--   区域
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_region
SELECT *
FROM ods_base_region;

经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province作为维表:

-- ---------------------------------
-- DIM层,区域维表,
-- 在MySQL中创建视图
-- ---------------------------------
DROP VIEW IF EXISTS dim_province;
CREATE VIEW dim_province AS
SELECT
  bp.id AS province_id,
  bp.name AS province_name,
  br.id AS region_id,
  br.region_name AS region_name,
  bp.area_code AS area_code
FROM base_region br 
     JOIN base_province bp ON br.id= bp.region_id;

这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:

-- -------------------------
--  一级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category1`;
CREATE TABLE `ods_base_category1` (
  `id` BIGINT,
  `name` STRING
)WITH(
 connector = kafka,
 topic = mydw.base_category1,
 properties.bootstrap.servers = kms-3:9092,
 properties.group.id = testGroup,
 format = canal-json ,
 scan.startup.mode = earliest-offset 
) ;

-- -------------------------
--  一级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category1`;
CREATE TABLE `base_category1` (
    `id` BIGINT,
    `name` STRING,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/dim,
    table-name = base_category1, -- MySQL中的待插入数据的表
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe,
    sink.buffer-flush.interval = 1s
);

-- -------------------------
--  一级类目表
--   MySQL Sink Load Data
-- ------------------------- 

INSERT INTO base_category1
SELECT *
FROM ods_base_category1;

-- -------------------------
--  二级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category2`;
CREATE TABLE `ods_base_category2` (
  `id` BIGINT,
  `name` STRING,
  `category1_id` BIGINT
)WITH(
connector = kafka,
 topic = mydw.base_category2,
 properties.bootstrap.servers = kms-3:9092,
 properties.group.id = testGroup,
 format = canal-json ,
 scan.startup.mode = earliest-offset 
) ;

-- -------------------------
--  二级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category2`;
CREATE TABLE `base_category2` (
    `id` BIGINT,
    `name` STRING,
    `category1_id` BIGINT,
     PRIMARY KEY (id) NOT ENFORCED
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/dim,
    table-name = base_category2, -- MySQL中的待插入数据的表
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe,
    sink.buffer-flush.interval = 1s
);

-- -------------------------
--  二级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category2
SELECT *
FROM ods_base_category2;

-- -------------------------
-- 三级类目表
--   kafka Source
-- ------------------------- 
DROP TABLE IF EXISTS `ods_base_category3`;
CREATE TABLE `ods_base_category3` (
  `id` BIGINT,
  `name` STRING,
  `category2_id` BIGINT
)WITH(
connector = kafka,
 topic = mydw.base_category3,
 properties.bootstrap.servers = kms-3:9092,
 properties.group.id = testGroup,
 format = canal-json ,
 scan.startup.mode = earliest-offset 
) ; 

-- -------------------------
--  三级类目表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `base_category3`;
CREATE TABLE `base_category3` (
    `id` BIGINT,
    `name` STRING,
    `category2_id` BIGINT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/dim,
    table-name = base_category3, -- MySQL中的待插入数据的表
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe,
    sink.buffer-flush.interval = 1s
);

-- -------------------------
--  三级类目表
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO base_category3
SELECT *
FROM ods_base_category3;

-- -------------------------
--   商品表
--   Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_sku_info`;
CREATE TABLE `ods_sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0)
) WITH(
 connector = kafka,
 topic = mydw.sku_info,
 properties.bootstrap.servers = kms-3:9092,
 properties.group.id = testGroup,
 format = canal-json ,
 scan.startup.mode = earliest-offset 
) ; 

-- -------------------------
--   商品表
--   MySQL Sink
-- ------------------------- 
DROP TABLE IF EXISTS `sku_info`;
CREATE TABLE `sku_info` (
  `id` BIGINT,
  `spu_id` BIGINT,
  `price` DECIMAL(10,0),
  `sku_name` STRING,
  `sku_desc` STRING,
  `weight` DECIMAL(10,2),
  `tm_id` BIGINT,
  `category3_id` BIGINT,
  `sku_default_img` STRING,
  `create_time` TIMESTAMP(0),
   PRIMARY KEY (tm_id) NOT ENFORCED
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/dim,
    table-name = sku_info, -- MySQL中的待插入数据的表
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe,
    sink.buffer-flush.interval = 1s
);

-- -------------------------
--   商品
--   MySQL Sink Load Data
-- ------------------------- 
INSERT INTO sku_info
SELECT *
FROM ods_sku_info;

经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQL的dim库中创建一张视图:dim_sku_info,用作后续使用的维表。

-- ---------------------------------
-- DIM层,商品维表,
-- 在MySQL中创建视图
-- ---------------------------------
CREATE VIEW dim_sku_info AS
SELECT
  si.id AS id,
  si.sku_name AS sku_name,
  si.category3_id AS c3_id,
  si.weight AS weight,
  si.tm_id AS tm_id,
  si.price AS price,
  si.spu_id AS spu_id,
  c3.name AS c3_name,
  c2.id AS c2_id,
  c2.name AS c2_name,
  c3.id AS c1_id,
  c3.name AS c1_name
FROM
(
  sku_info si 
  JOIN base_category3 c3 ON si.category3_id = c3.id
  JOIN base_category2 c2 ON c3.category2_id =c2.id
  JOIN base_category1 c1 ON c2.category1_id = c1.id
);

至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。

3. DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:

-- -------------------------
--   订单详情
--   Kafka Source
-- ------------------------- 

DROP TABLE IF EXISTS `ods_order_detail`;
CREATE TABLE `ods_order_detail`(
  `id` BIGINT,
  `order_id` BIGINT,
  `sku_id` BIGINT,
  `sku_name` STRING,
  `img_url` STRING,
  `order_price` DECIMAL(10,2),
  `sku_num` INT,
  `create_time` TIMESTAMP(0)
) WITH(
 connector = kafka,
 topic = mydw.order_detail,
 properties.bootstrap.servers = kms-3:9092,
 properties.group.id = testGroup,
 format = canal-json ,
 scan.startup.mode = earliest-offset 
) ; 

-- -------------------------
--   订单信息
--   Kafka Source
-- -------------------------
DROP TABLE IF EXISTS `ods_order_info`;
CREATE TABLE `ods_order_info` (
  `id` BIGINT,
  `consignee` STRING,
  `consignee_tel` STRING,
  `total_amount` DECIMAL(10,2),
  `order_status` STRING,
  `user_id` BIGINT,
  `payment_way` STRING,
  `delivery_address` STRING,
  `order_comment` STRING,
  `out_trade_no` STRING,
  `trade_body` STRING,
  `create_time` TIMESTAMP(0) ,
  `operate_time` TIMESTAMP(0) ,
  `expire_time` TIMESTAMP(0) ,
  `tracking_no` STRING,
  `parent_order_id` BIGINT,
  `img_url` STRING,
  `province_id` INT
) WITH(
connector = kafka,
 topic = mydw.order_info,
 properties.bootstrap.servers = kms-3:9092,
 properties.group.id = testGroup,
 format = canal-json ,
 scan.startup.mode = earliest-offset 
) ; 

-- ---------------------------------
-- DWD层,支付订单明细表dwd_paid_order_detail
-- ---------------------------------
DROP TABLE IF EXISTS dwd_paid_order_detail;
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,0),
  create_time STRING,
  pay_time STRING
 ) WITH (
    connector = kafka,
    topic = dwd_paid_order_detail,
    scan.startup.mode = earliest-offset,
    properties.bootstrap.servers = kms-3:9092,
    format = changelog-json
);
-- ---------------------------------
-- DWD层,已支付订单明细表
-- 向dwd_paid_order_detail装载数据
-- ---------------------------------
INSERT INTO dwd_paid_order_detail
SELECT
  od.id,
  oi.id order_id,
  oi.user_id,
  oi.province_id,
  od.sku_id,
  od.sku_name,
  od.sku_num,
  od.order_price,
  oi.create_time,
  oi.operate_time
FROM
    (
    SELECT * 
    FROM ods_order_info
    WHERE order_status = 2 -- 已支付
    ) oi JOIN
    (
    SELECT *
    FROM ods_order_detail
    ) od 
    ON oi.id = od.order_id;

4. ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

  • ads_province_index

首先在MySQL中创建对应的ADS目标表:ads_province_index

CREATE TABLE ads.ads_province_index(
  province_id INT(10),
  area_code VARCHAR(100),
  province_name VARCHAR(100),
  region_id INT(10),
  region_name VARCHAR(100),
  order_amount DECIMAL(10,2),
  order_count BIGINT(10),
  dt VARCHAR(100),
  PRIMARY KEY (province_id, dt) 
) ;

向MySQL的ADS层目标装载数据:

-- Flink SQL Cli操作
-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个省份的订单数
--      2.每天每个省份的订单金额
-- ---------------------------------
CREATE TABLE ads_province_index(
  province_id INT,
  area_code STRING,
  province_name STRING,
  region_id INT,
  region_name STRING,
  order_amount DECIMAL(10,2),
  order_count BIGINT,
  dt STRING,
  PRIMARY KEY (province_id, dt) NOT ENFORCED  
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/ads,
    table-name = ads_province_index, 
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe
);
-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    connector = kafka,
    topic = dwd_paid_order_detail,
    scan.startup.mode = earliest-offset,
    properties.bootstrap.servers = kms-3:9092,
    format = changelog-json
);

-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表
-- ---------------------------------
CREATE TABLE tmp_province_index(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE
)WITH (
    connector = kafka,
    topic = tmp_province_index,
    scan.startup.mode = earliest-offset,
    properties.bootstrap.servers = kms-3:9092,
    format = changelog-json
);
-- ---------------------------------
-- tmp_province_index
-- 订单汇总临时表数据装载
-- ---------------------------------
INSERT INTO tmp_province_index
SELECT
      province_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
      TO_DATE(pay_time,yyyy-MM-dd) pay_date
FROM dwd_paid_order_detail
GROUP BY province_id,TO_DATE(pay_time,yyyy-MM-dd)
;
-- ---------------------------------
-- tmp_province_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_province_index_source(
    province_id INT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    connector = kafka,
    topic = tmp_province_index,
    scan.startup.mode = earliest-offset,
    properties.bootstrap.servers = kms-3:9092,
    format = changelog-json
);

-- ---------------------------------
-- DIM层,区域维表,
-- 创建区域维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_province`;
CREATE TABLE dim_province (
  province_id INT,
  province_name STRING,
  area_code STRING,
  region_id INT,
  region_name STRING ,
  PRIMARY KEY (province_id) NOT ENFORCED
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/dim,
    table-name = dim_province, 
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe,
    scan.fetch-size = 100
);

-- ---------------------------------
-- 向ads_province_index装载数据
-- 维表JOIN
-- ---------------------------------

INSERT INTO ads_province_index
SELECT
  pc.province_id,
  dp.area_code,
  dp.province_name,
  dp.region_id,
  dp.region_name,
  pc.order_amount,
  pc.order_count,
  cast(pc.pay_date as VARCHAR)
FROM
tmp_province_index_source pc
  JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp 
  ON dp.province_id = pc.province_id;

当提交任务之后:观察Flink WEB UI

查看ADS层的ads_province_index表数据:

  • ads_sku_index

首先在MySQL中创建对应的ADS目标表:ads_sku_index

CREATE TABLE ads_sku_index
(
  sku_id BIGINT(10),
  sku_name VARCHAR(100),
  weight DOUBLE,
  tm_id BIGINT(10),
  price DOUBLE,
  spu_id BIGINT(10),
  c3_id BIGINT(10),
  c3_name VARCHAR(100) ,
  c2_id BIGINT(10),
  c2_name VARCHAR(100),
  c1_id BIGINT(10),
  c1_name VARCHAR(100),
  order_amount DOUBLE,
  order_count BIGINT(10),
  sku_count BIGINT(10),
  dt varchar(100),
  PRIMARY KEY (sku_id,dt)
);

向MySQL的ADS层目标装载数据:

-- ---------------------------------
-- 使用 DDL创建MySQL中的ADS层表
-- 指标:1.每天每个商品对应的订单个数
--      2.每天每个商品对应的订单金额
--      3.每天每个商品对应的数量
-- ---------------------------------
CREATE TABLE ads_sku_index
(
  sku_id BIGINT,
  sku_name VARCHAR,
  weight DOUBLE,
  tm_id BIGINT,
  price DOUBLE,
  spu_id BIGINT,
  c3_id BIGINT,
  c3_name VARCHAR ,
  c2_id BIGINT,
  c2_name VARCHAR,
  c1_id BIGINT,
  c1_name VARCHAR,
  order_amount DOUBLE,
  order_count BIGINT,
  sku_count BIGINT,
  dt varchar,
  PRIMARY KEY (sku_id,dt) NOT ENFORCED
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/ads,
    table-name = ads_sku_index, 
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe
);

-- ---------------------------------
-- dwd_paid_order_detail已支付订单明细宽表
-- ---------------------------------
CREATE TABLE dwd_paid_order_detail
(
  detail_id BIGINT,
  order_id BIGINT,
  user_id BIGINT,
  province_id INT,
  sku_id BIGINT,
  sku_name STRING,
  sku_num INT,
  order_price DECIMAL(10,2),
  create_time STRING,
  pay_time STRING
 ) WITH (
    connector = kafka,
    topic = dwd_paid_order_detail,
    scan.startup.mode = earliest-offset,
    properties.bootstrap.servers = kms-3:9092,
    format = changelog-json
);

-- ---------------------------------
-- tmp_sku_index
-- 商品指标统计
-- ---------------------------------
CREATE TABLE tmp_sku_index(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
 order_sku_num BIGINT,
    pay_date DATE
)WITH (
    connector = kafka,
    topic = tmp_sku_index,
    scan.startup.mode = earliest-offset,
    properties.bootstrap.servers = kms-3:9092,
    format = changelog-json
);
-- ---------------------------------
-- tmp_sku_index
-- 数据装载
-- ---------------------------------
INSERT INTO tmp_sku_index
SELECT
      sku_id,
      count(distinct order_id) order_count,-- 订单数
      sum(order_price * sku_num) order_amount, -- 订单金额
   sum(sku_num) order_sku_num,
      TO_DATE(pay_time,yyyy-MM-dd) pay_date
FROM dwd_paid_order_detail
GROUP BY sku_id,TO_DATE(pay_time,yyyy-MM-dd)
;

-- ---------------------------------
-- tmp_sku_index_source
-- 使用该临时汇总表,作为数据源
-- ---------------------------------
CREATE TABLE tmp_sku_index_source(
    sku_id BIGINT,
    order_count BIGINT,-- 订单数
    order_amount DECIMAL(10,2), -- 订单金额
    order_sku_num BIGINT,
    pay_date DATE,
    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列
 ) WITH (
    connector = kafka,
    topic = tmp_sku_index,
    scan.startup.mode = earliest-offset,
    properties.bootstrap.servers = kms-3:9092,
    format = changelog-json
);
-- ---------------------------------
-- DIM层,商品维表,
-- 创建商品维表数据源
-- ---------------------------------
DROP TABLE IF EXISTS `dim_sku_info`;
CREATE TABLE dim_sku_info (
  id BIGINT,
  sku_name STRING,
  c3_id BIGINT,
  weight DECIMAL(10,2),
  tm_id BIGINT,
  price DECIMAL(10,2),
  spu_id BIGINT,
  c3_name STRING,
  c2_id BIGINT,
  c2_name STRING,
  c1_id BIGINT,
  c1_name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    connector = jdbc,
    url = jdbc:mysql://kms-1:3306/dim,
    table-name = dim_sku_info, 
    driver = com.mysql.jdbc.Driver,
    username = root,
    password = 123qwe,
    scan.fetch-size = 100
);
-- ---------------------------------
-- 向ads_sku_index装载数据
-- 维表JOIN
-- ---------------------------------
INSERT INTO ads_sku_index
SELECT
  sku_id ,
  sku_name ,
  weight ,
  tm_id ,
  price ,
  spu_id ,
  c3_id ,
  c3_name,
  c2_id ,
  c2_name ,
  c1_id ,
  c1_name ,
  sc.order_amount,
  sc.order_count ,
  sc.order_sku_num ,
  cast(sc.pay_date as VARCHAR)
FROM
tmp_sku_index_source sc 
  JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
  ON ds.id = sc.sku_id;

当提交任务之后:观察Flink WEB UI

查看ADS层的ads_sku_index表数据

5. FineBI展示

七、数据治理

数仓建设真正的难点不在于数仓设计,而在于后续业务发展起来,业务线变的庞大之后的数据治理,包括资产治理、数据质量监控、数据指标体系的建设等。

其实数据治理的范围很⼴,包含数据本⾝的管理、数据安全、数据质量、数据成本等。在DAMA 数据管理知识体系指南中,数据治理位于数据管理“车轮图”的正中央,是数据架构、数据建模、数据存储、数据安全、数据质量、元数据管理、主数据管理等10大数据管理领域的总纲,为各项数据管理活动提供总体指导策略。

1. 数据治理之道是什么

1. 数据治理需要体系建设

为发挥数据价值需要满足三个要素:合理的平台架构、完善的治理服务、体系化的运营手段

根据企业的规模、所属行业、数据量等情况选择合适的平台架构;治理服务需要贯穿数据全生命周期,保证数据在采集、加工、共享、存储、应用整个过程中的完整性、准确性、一致性和实效性;运营手段则应当包括规范的优化、组织的优化、平台的优化以及流程的优化等等方面。

2. 数据治理需要夯实基础

数据治理需要循序渐进,但在建设初期至少需要关注三个方面:数据规范、数据质量、数据安全。规范化的模型管理是保障数据可以被治理的前提条件,高质量的数据是数据可用的前提条件,数据的安全管控是数据可以共享交换的前提条件。

3. 数据治理需要IT赋能

数据治理不是一堆规范文档的堆砌,而是需要将治理过程中所产生的的规范、流程、标准落地到IT平台上,在数据生产过程中通过“以终为始”前向的方式进行数据治理,避免事后稽核带来各种被动和运维成本的增加。

4. 数据治理需要聚焦数据

数据治理的本质是管理数据,因此需要加强元数据管理和主数据管理,从源头治理数据,补齐数据的相关属性和信息,比如:元数据、质量、安全、业务逻辑、血缘等,通过元数据驱动的方式管理数据生产、加工和使用。

5. 数据治理需要建管一体化

数据模型血缘与任务调度的一致性是建管一体化的关键,有助于解决数据管理与数据生产口径不一致的问题,避免出现两张皮的低效管理模式。

2. 浅谈数据治理方式

如上面所说,数据治理的范围非常广,其中最重要的是数据质量治理,而数据质量涉及的范围也很广,贯穿数仓的整个生命周期,从数据产生->数据接入->数据存储->数据处理->数据输出->数据展示,每个阶段都需要质量治理,评价维度包括完整性、规范性、一致性、准确性、唯一性、关联性等。

在系统建设的各个阶段都应该根据标准进行数据质量检测和规范,及时进行治理,避免事后的清洗工作。

质量检测可参考以下维度:

维度 衡量标准
完整性 业务指定必须的数据是否缺失,不允许为空字符或者空值等。例如,数据源是否完整、维度取值是否完整、数据取值是否完整等
时效性 当需要使用时,数据能否反映当前事实。即数据必须及时,能够满足系统对数据时间的要求。例如处理(获取、整理、清洗、加载等)的及时性
唯一性 在指定的数据集中数据值是否唯一
参照完整性 数据项是否在父表中有定义
依赖一致性 数据项取值是否满足与其他数据项之间的依赖关系
正确性 数据内容和定义是否一致
精确性 数据精度是否达到业务规则要求的位数
技术有效性 数据项是否按已定义的格式标准组织
业务有效性 数据项是否符合已定义的
可信度 根据客户调查或客户主动提供获得
可用性 数据可用的时间和数据需要被访问时间的比例
可访问性 数据是否便于自动化读取

下面是根据美团的技术文章总结的几点具体治理方式:

1. 规范治理

规范是数仓建设的保障。为了避免出现指标重复建设和数据质量差的情况,统一按照最详细、可落地的方法进行规范建设。

(1) 词根

词根是维度和指标管理的基础,划分为普通词根与专有词根,提高词根的易用性和关联性。

  • 普通词根:描述事物的最小单元体,如:交易-trade。

  • 专有词根:具备约定成俗或行业专属的描述体,如:美元-USD。

(2) 表命名规范

通用规范

  • 表名、字段名采用一个下划线分隔词根(示例:clienttype->client_type)。

  • 每部分使用小写英文单词,属于通用字段的必须满足通用字段信息的定义。

  • 表名、字段名需以字母为开头。

  • 表名、字段名最长不超过64个英文字符。

  • 优先使用词根中已有关键字(数仓标准配置中的词根管理),定期Review新增命名的不合理性。

  • 在表名自定义部分禁止采用非标准的缩写。

表命名规则

  • 表名称 = 类型 + 业务主题 + 子主题 + 表含义 + 存储格式 + 更新频率 +结尾,如下图所示:

(3) 指标命名规范

结合指标的特性以及词根管理规范,将指标进行结构化处理。

  1. 基础指标词根,即所有指标必须包含以下基础词根:

  1. 业务修饰词,用于描述业务场景的词汇,例如trade-交易。

3.日期修饰词,用于修饰业务发生的时间区间。

4.聚合修饰词,对结果进行聚集操作。

5.基础指标,单一的业务修饰词+基础指标词根构建基础指标 ,例如:交易金额-trade_amt。

6.派生指标,多修饰词+基础指标词根构建派生指标。派生指标继承基础指标的特性,例如:安装门店数量-install_poi_cnt。

7.普通指标命名规范,与字段命名规范一致,由词汇转换即可以。

2. 架构治理

(1) 数据分层

优秀可靠的数仓体系,往往需要清晰的数据分层结构,即要保证数据层的稳定又要屏蔽对下游的影响,并且要避免链路过长,一般的分层架构如下:

(2) 数据流向

稳定业务按照标准的数据流向进行开发,即ODS-->DWD-->DWA-->APP。非稳定业务或探索性需求,可以遵循ODS->DWD->APP或者ODS->DWD->DWT->APP两个模型数据流。在保障了数据链路的合理性之后,又在此基础上确认了模型分层引用原则:

  • 正常流向:ODS>DWD->DWT->DWA->APP,当出现ODS >DWD->DWA->APP这种关系时,说明主题域未覆盖全。应将DWD数据落到DWT中,对于使用频度非常低的表允许DWD->DWA。

  • 尽量避免出现DWA宽表中使用DWD又使用(该DWD所归属主题域)DWT的表。

  • 同一主题域内对于DWT生成DWT的表,原则上要尽量避免,否则会影响ETL的效率。

  • DWT、DWA和APP中禁止直接使用ODS的表, ODS的表只能被DWD引用。

  • 禁止出现反向依赖,例如DWT的表依赖DWA的表。
3. 元数据治理

元数据可分为技术元数据和业务元数据:

技术元数据为开发和管理数据仓库的IT 人员使用,它描述了与数据仓库开发、管理和维护相关的数据,包括数据源信息、数据转换描述、数据仓库模型、数据清洗与更新规则、数据映射和访问权限等。

常见的技术元数据有:

  • 存储元数据:如表、字段、分区等信息。

  • 运行元数据:如大数据平台上所有作业运行等信息:类似于 Hive Job 日志,包括作业类型、实例名称、输入输出、 SQL 、运行参数、执行时间,执行引擎等。

  • 数据开发平台中数据同步、计算任务、任务调度等信息:包括数据同步的输入输出表和字段,以及同步任务本身的节点信息:计算任务主要有输入输出、任务本身的节点信息 任务调度主要有任务的依赖类型、依赖关系等,以及不同类型调度任务的运行日志等。

  • 数据质量和运维相关元数据:如任务监控、运维报警、数据质量、故障等信息,包括任务监控运行日志、告警配置及运行日志、故障信息等。

业务元数据为管理层和业务分析人员服务,从业务角度描述数据,包括商务术语、数据仓库中有什么数据、数据的位置和数据的可用性等,帮助业务人员更好地理解数据仓库中哪些数据是可用的以及如何使用。

  • 常见的业务元数据有维度及属性(包括维度编码,字段类型,创建人,创建时间,状态等)、业务过程、指标(包含指标名称,指标编码,业务口径,指标类型,责任人,创建时间,状态,sql等),安全等级,计算逻辑等的规范化定义,用于更好地管理和使用数据。数据应用元数据,如数据报表、数据产品等的配置和运行元数据。

元数据不仅定义了数据仓库中数据的模式、来源、抽取和转换规则等,而且是整个数据仓库系统运行的基础,元数据把数据仓库系统中各个松散的组件联系起来,组成了一个有机的整体

元数据治理主要解决三个问题

  1. 通过建立相应的组织、流程和工具,推动业务标准的落地实施,实现指标的规范定义,消除指标认知的歧义;

  2. 基于业务现状和未来的演进方式,对业务模型进行抽象,制定清晰的主题、业务过程和分析方向,构建完备的技术元数据,对物理模型进行准确完善的描述,并打通技术元数据与业务元数据的关系,对物理模型进行完备的刻画;

  3. 通过元数据建设,为使用数据提效,解决“找数、理解数、评估”难题以及“取数、数据可视化”等难题。
4. 安全治理

围绕数据安全标准,首先要有数据的分级、分类标准,确保数据在上线前有着准确的密级。第二,针对数据使用方,要有明确的角色授权标准,通过分级分类和角色授权,来保障重要数据拿不走。第三,针对敏感数据,要有隐私管理标准,保障敏感数据的安全存储,即使未授权用户绕过权限管理拿到敏感数据,也要确保其看不懂。第四,通过制定审计标准,为后续的审计提供审计依据,确保数据走不脱。

5. 数据生命周期治理

任何事物都具有一定的生命周期,数据也不例外。从数据的产生、加工、使用乃至消亡都应该有一个科学的管理办法,将极少或者不再使用的数据从系统中剥离出来,并通过核实的存储设备进行保留,不仅能够提高系统的运行效率,更好的服务客户,还能大幅度减少因为数据长期保存带来的储存成本。数据生命周期一般包含在线阶段、归档阶段(有时还会进一步划分为在线归档阶段和离线归档阶段)、销毁阶段三大阶段,管理内容包括建立合理的数据类别,针对不同类别的数据制定各个阶段的保留时间、存储介质、清理规则和方式、注意事项等。

从上图数据生命周期中各参数间的关系中我们可以了解到,数据生命周期管理可以使得高价值数据的查询效率大幅提升,而且高价格的存储介质的采购量也可以减少很多;但是随着数据的使用程度的下降,数据被逐渐归档,查询时间也慢慢的变长;最后随着数据的使用频率和价值基本没有了之后,就可以逐渐销毁了。


猜你喜欢:

  1. 美团数据平台及数仓建设实践,超十万字总结

  2. 上百本优质大数据书籍,附必读清单(大数据宝藏)

  3. 五万字 | 耗时一个月整理出这份Hadoop吐血宝典

八、数据质量建设

数据治理的范围非常广,包含数据本⾝的管理、数据安全、数据质量、数据成本等。在这么多治理内容中,大家想下最重要的治理是什么?当然是数据质量治理,因为数据质量是数据分析结论有效性和准确性的基础,也是这一切的前提。所以如何保障数据质量,确保数据可用性是数据仓库建设中不容忽视的环节。

数据质量涉及的范围也很广,贯穿数仓的整个生命周期,从数据产生->数据接入->数据存储->数据处理->数据输出->数据展示,每个阶段都需要质量治理。

在系统建设的各个阶段都应该根据标准进行数据质量检测和规范,及时进行治理,避免事后的清洗工作。

1. 为什么要进行数据质量评估

很多刚入门的数据人,拿到数据后会立刻开始对数据进行各种探查、统计分析等,企图能立即发现数据背后隐藏的信息和知识。然而忙活了一阵才颓然发现,并不能提炼出太多有价值的信息,白白浪费了大量的时间和精力。比如和数据打交道的过程中,可能会出现以下的场景:

场景一:作为数据分析人员,要统计一下近 7 天用户的购买情况,结果从数仓中统计完发现,很多数据发生了重复记录,甚至有些数据统计单位不统一。

场景二:业务看报表,发现某一天的成交 gmv 暴跌,经过排查发现,是当天的数据缺失。

造成这一情况的一个重要因素就是忽视了对数据质量的客观评估,没有制定合理的衡量标准,导致没有发现数据已出现问题。所以,进行科学、客观的数据质量衡量标准是非常必要且十分重要的。

2. 数据质量衡量标准

如何评估数据质量的好坏,业界有不同的标准,我总结了以下六个维度进行评估,包括完整性、规范性、一致性、准确性、唯一性、及时性

  1. 数据完整性

完整性指的是数据信息是否存在缺失的状况,数据缺失的情况可能是整个数据记录缺失,也可能是数据中某个字段信息的记录缺失。

  1. 数据规范性

规范性指的是描述数据遵循预定的语法规则的程度,是否符合其定义,比如数据的类型、格式、取值范围等。

  1. 数据一致性

一致性是指数据是否遵循了统一的规范,数据集合是否保持了统一的格式。数据质量的一致性主要体现在数据记录的规范和数据是否符合逻辑,一致性并不意味着数值上的绝对相同,而是数据收集、处理的方法和标准的一致。常见的一致性指标有:ID 重合度、属性一致、取值一致、采集方法一致、转化步骤一致。

  1. 数据准确性

准确性是指数据记录的信息是否存在异常或错误。和一致性不一样,存在准确性问题的数据不仅仅只是规则上的不一致,更为常见的数据准确性错误就如乱码,其次异常的大或者小的数据也是不符合条件的数据。常见的准确性指标有:缺失值占比、错误值占比、异常值占比、抽样偏差、数据噪声。

  1. 数据唯一性

唯一性指的是数据库的数据不存在重复的情形。比如真实成交 1 万条,但数据表有 3000 条重复了,成了 1.3 万条成交记录,这种数据不符合数据唯一性。

  1. 数据及时性

及时性是指数据从产生到可以查看的时间间隔,也叫数据的延时时长。比如一份数据是统计离线今日的,结果都是第二天甚至第三天才能统计完,这种数据不符合数据及时性。

还有一些其他的衡量标准,在此简单列出

维度 衡量标准
参照完整性 数据项是否在父表中有定义
依赖一致性 数据项取值是否满足与其他数据项之间的依赖关系
正确性 数据内容和定义是否一致
精确性 数据精度是否达到业务规则要求的位数
技术有效性 数据项是否按已定义的格式标准组织
业务有效性 数据项是否符合已定义的
可信度 根据客户调查或客户主动提供获得
可用性 数据可用的时间和数据需要被访问时间的比例
可访问性 数据是否便于自动化读取

3. 数据质量管理流程

本节流程如下图所示:

1. 数据资产等级

1) 等级定义

根据当数据质量

以上是关于50000字,数仓建设保姆级教程,离线和实时一网打尽(理论+实战) 下的主要内容,如果未能解决你的问题,请参考以下文章

50000字,数仓建设保姆级教程,离线和实时一网打尽(理论+实战) 下

50000字,数仓建设保姆级教程,离线和实时一网打尽(理论+实战) 下

数仓建设教程

数仓建设教程

数仓建设教程

数仓建设保姆级教程,离线和实时一网打尽(理论+实战)