增量更新 Hive 表数据
Posted 有关SQL
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了增量更新 Hive 表数据相关的知识,希望对你有一定的参考价值。
Hive 的更新很有趣。
Hive 的表有两种,一种是 managed table, 一种是 external table.
managed table 是 Hive 自动帮我们维护的表,自动分割底层存储文件,自动分区,这些自动化的操作,都是 Hive 封装了与 Hadoop 交互的接口。
external table 只是一种在 Hive 维护的与外部文件的映射。
managed table 与 external table 最大的区别在于删除的时候,external table 默认情况下只是删除表定义,而数据依旧在hadoop 上存储着;managed table 则是表定义连着表数据一起被删除了。
早期的时候, Hive 支持的表操作只有两种:OverWrite 和 Appand
Overwrite 并不是对某一行的数据做更新,而是对整张表做覆盖,所以感觉上 Hive 更像是在做 ETL 里面的 Staging, 而不像是最终存储计算结果的地方。Hive 超强的计算能力可以做为大数据量转换的工具,最终结果将被送到关系型数据库或者其他 Hive 实例上存储。
hortonworks 有一篇提出相关解决方案的文章,介绍了 4步走解决增量更新 Hive 表:
url如下:
https://hortonworks.com/blog/four-step-strategy-incremental-updateds-hive
1. Ingest
2. Reconcile
3. Compact
4. Purge
过程中,用到了四个 Hive 表,分别是:
base_table: 初始化装载源库来的表数据,表示最新数据
incremental_table:用来装载上一次增量更新以来,发生过更改的数据,包括新增,更新,和删除
reconcile_view:以 base_table, incremental_table 计算出来的最新数据,涉及到的操作,有删除,更新,和新增。每一次都要重复计算是不是有些多余,浪费很多对没有变更的数据的重复计算。如果有对数据有分区,只要对有数据更新的分区做增量更新,会有很大效率的提高。
reporting_table:将reconcile_view的数据,装载到 reporting_table中,用它来替换掉 base_table中的数据。
一) 取决于源数据库的服务是否支持直连抽取数据,可以有两种方法完成第一步 ingest, 即 Extract.
1. File Processing: 由源数据库自发的输出,以文件方式在合理的时间窗口导出
2. RDBMS Processing (Database Client based ETL): 由 Sqoop 来完成抽取; ETL 工具, kettle, Informatica等;
File Processing :
1. 由数据库软件自带的导入导出,将文件导出一定分隔符分割的文本文件
2. 将这些文本文件放到 Hive 映射的文件夹下面
RDBMS Processing (Database Client based ETL):
1. SQOOP: 既可以实现初始化导入,也可以完成增量导入,增量导入的实现,依赖于Sqoop 本身的 check-sum 机制。check-sum 是对 Hive 表中的一行用来做校验数据做了 hash 计算,根据匹配是否来做增量更新。
以下是文章的原文,展示了 Sqoop 的具体用法:
SQOOP is the JDBC-based utility for integrating with traditional databases.
A SQOOP Import allows for the movement of data into either HDFS (a delimited format can be defined as part of the Import definition) or directly into a Hive table.
The entire source table can be moved into HDFS or Hive using the “--table” parameter.
sqoop import
--connect jdbc:teradata://{host name or ip address}/Database=retail
--connection-manager org.apache.sqoop.teradata.TeradataConnManager
--username dbc
--password dbc
--table SOURCE_TBL
--target-dir /user/hive/incremental_table -m 1
注**
--table source_TBL: 是指关系型数据库里的原表
--target-dir :Hive 中表对应的存储目录
After the initial import, subsequent imports can leverage SQOOP’s native support for “Incremental Import” by using the “check-column”, “incremental” and “last-value” parameters.
sqoop import
--connect jdbc:teradata://{host name or ip address}/Database=retail
--connection-manager org.apache.sqoop.teradata.TeradataConnManager
--username dbc
--password dbc
--table SOURCE_TBL
--target-dir /user/hive/incremental_table -m 1
--check-column modified_date
--incremental lastmodified
--last-value {last_import_date|last_import_value}
注**
--check-column : 是指定原表中用来做增量判断条件的那一字段
--incremental lastmodified: 指定增量的模式,append 或者 lastmodified.
在数据仓库中,无论是维度表还是事实表,我们总会设计一栏自增列,作为代理键或者主键。这个时候这些键值总是自增长的,因此适合采用 append 形式,指定check-sum 列为自增列,如果有比 {last_import_value}大的值,就会被 sqoop 导入进来;
在设计数据库的时候,为了审计,我们通常也会设计一列为 timestamp 列,每对一行做了修改,就会重置这列 timestamp 为当前时间戳。如果是针对这类行数据,我们要指定的便是 lastmodified, 配合 check-sum 设置为 timestamp 列,sqoop 就会导入比{last_import_date} 大的数据行。
--last-value { last_import_date } 这是需要从程序外面传进来的
考虑到这是增量更新,那么理应把 sqoop 做成一个 Job 来自动化运行,并且记录每一次的时间,作为下次运行时要传入的 {last_import_date} 或者{last_import_value}
Alternately, you can leverage the “query” parameter, and have SQL select statements limit the import to new or changed records only.
sqoop import
--connect jdbc:teradata://{host name or ip address}/Database=retail
--connection-manager org.apache.sqoop.teradata.TeradataConnManager
--username dbc
--password dbc
--target-dir /user/hive/incremental_table -m 1
--query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’
Note: For the initial load, substitute “base_table” for “incremental_table”. For all subsequent loads, use “incremental_table”.
注**
这是前面两种全量和增量的替代写法,用指定的查询,从原关系型数据库导出数据,不同的是,全量的时候,要指定导入的 Hive 目标表是 base_table, 而增量的时候,导入的是 incremental_table.
二) Reconciliation 将新旧数据综合起来
初始化时,装载最终的目标表没有多少难度。
在这段中,主要解决的问题是增量与初始化的融合。
初始化的数据,存储在 base_table 中, 而增量数据我们已经装载到了 incremental_table 中。
将两者的数据合二为一,就可以生成与源数据库一致的最新数据。
前提是源数据库的任何数据行不接受硬删除即delete 操作,而是在行上打了一个软删除的标签,表示该行已删除。
如果是做了硬删除,那么同时也要做好删除的审计,将删除的数据行放入审计表中,一同发送给 incremental_table .
base_table
CREATE TABLE base_table (
id string,
field1 string,
field2 string,
field3 string,
field4 string,
field5 string,
modified_date string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/hive/base_table';
incremental_table
CREATE EXTERNAL TABLE incremental_table (
id string,
field1 string,
field2 string,
field3 string,
field4 string,
field5 string,
modified_date string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/hive/incremental_table';
reconcile_view
CREATE VIEW reconcile_view AS
SELECT t2.id, t2.field1, t2.field2, t2.field3, t2.field4, t2.field5, t2.modified_date FROM
(SELECT *,ROW_NUMBER() OVER (PARTITION BY id ORDER BY modified_date DESC) rn
FROM (SELECT * FROM base_table
UNION ALL
SELECT * FROM incremental_table)
t1) t2
WHERE rn = 1;
从最后一个view定义来解说,incremental_table 必须拥有增量记录的全部,因此硬删除操作就不会反应在 incremental_table 里头。
但是 reconcile_view 所涉及的量毕竟有限,浪费明明不会更改的那部分数据的计算。
因此如果能做好分区,仅仅对某几个分区做全量更新会更高效。
三) Compact: 物化视图,即将reconciliation_view 装载到 reporting_table 里面去
reporting_table
DROP TABLE reporting_table;
CREATE TABLE reporting_table AS
SELECT * FROM reconcile_view;
首先是要将之前的 reporting_table 删除,再重建 reporting _table, 用 reconciliation_view 填充这张表。
在这张表的基础上,可以做很多聚合,过滤等操作,进行数据二次加工。
四) Purge: 将多余的表数据清空
base_table :应该当换成 reporting_table 里面的数据
incremental_table: 清空
DROP TABLE base_table;
CREATE TABLE base_table AS
SELECT * FROM reporting_table;
hadoop fs –rm –r /user/hive/incremental_table/*
总结:
1. Oozie 可以将这4步统一做成一个工作流,方便调度
2. 可以用脚本自定义工作流,就像数据仓库的 ETL 一样
以上是关于增量更新 Hive 表数据的主要内容,如果未能解决你的问题,请参考以下文章
Hive - 如何在 Hive 中跟踪和更新增量表的上次修改日期?