深入剖析 Delta Lake: MySQL CDC 实战
Posted scx_white
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入剖析 Delta Lake: MySQL CDC 实战相关的知识,希望对你有一定的参考价值。
前言
在初建大数据平台时,我想大家都遇到过这样一个需求,
mysql 的数据如何同步到 hdfs 中供数仓分析使用
T+1架构
在早期,业务对实时性的要求可能还不高,大家的计算也都是 T+1
的离线计算,当然也可能有 H+1
的准实时计算。大家的选型可能是 sqoop
、datax
等工具。此时只需要在离线调度平台上配置每天/每小时拉取 mysql
数据同步到 hive
的任务即可。
但该方式也有一些弊端,如:
- 实时性较差
- 全量抽取、尤其是每小时一次的抽取对
mysql
的压力很大 - 单表数据量过大时(上亿数据),
dba
不允许全量抽取,改为增量抽取后,hive
端需要和历史数据做合并。如果增量和历史全量合并执行时长更久,如果增量和最近几个月合并可能导致脏数据 - 涉及到分库分表抽取时难度较大,工具支持的并不完美,需要在
hive
端做合并,操作复杂
H+1架构
在 18
年那会,我想以上的这种方式应该是很多公司都在使用的,即使是现在也有公司在采用。
在此架构的基础上,有些公司可能会有一些大表数据需要做 H+1
的准实时需求,面临的问题是:如果仍然每个小时抽取大表数据,而拉取数据时间又可能很长,会导致 mysql
实例负载持续处于最高水位,造成 mysql
服务不可用,即使是在从库拉取,也会导致从库挂掉,mysql
同步延迟等等问题,所以此时就诞生了一种新的同步方式。
该种架构需要在 mysql
端部署 canal
等能够接收 mysql
主库的 binlog
并且能解析 binlog
的工具。然后把 binlog
根据库或者业务类型发送到不同的 kafka
等 mq
队列。而在下游,需要数据平台开发人员使用 spark
或者flink
等实时框架开发相应的消费代码,把 kafka
的 binlog
变更数据写入/更新到 hbase
或者 kudu
具有更新的db
。在后面就是在离线调度平台使用 hive
或者spark sql
来进行 etl
计算了。
当然,该种架构还是有一定的弊端
- 只是准实时,还是需要在离线调度平台每小时的抽取
hbase
数据 - 把对
mysql
的抽取压力,转移到了hbase
端。如果也有业务在用hbase
,每小时的高峰期会导致业务读/写请求变慢 - 如果每个小时从
hbase
全量抽取,走的hbase
的scan
命令,数据量大的时候极慢,可能会导致后续的整个链路超出1个小时,甚至变成H+n
。 - 如果
hbase
只存近几天的增量数据,然后使用row_number() over
和hive
的全量进行合并,也可能会导致整个链路过长,如果合并最近几个月的全量数据,可能会产生脏数据。
实时方案
以前受限于 parquet
文件写入 HDFS
文件后,要想更新数据,就只能全量写新的数据,成本很高,并且 HDFS
的设计就是一次写入多次读取。都 2020
年了,到底有没有一种架构能够在分钟级别采集到 HDFS
呢?
很明显,是有的。据我所知,目前能够支持 hdfs
更新的工具有 delta lake
、hudi
和 iceberg
(这三款都是数据湖框架,更新也是数据湖数据的更新,但是能够通过其它引擎读取,比如spark、hive、presto
)。但是目前 iceberg
还是不支持更新的 merge/upsert
操作,并且 iceberg
和 flink
目前已经在合作,据说社区内部已经在把该特性的优先级放到最高了,估计会在21
年中附近出来,而其它两款是明确支持更新操作的。我调研了一番后,发现还是 databricks
公司开源的 delta lake
上手比较简单,继承了了 spark
的易用性。
该种架构最简单粗暴,直接使用 spark
或者 flink
解析 binlog
日志,然后将数据处理后写入到 hdfs
,但是问题是一般 mysql
都是 dba
在维护,不会允许个人或者某个团队直接读取mysql binlog
,这样可能会导致重复binlog
消费,对 mysql
压力较大。最终会由 dba
或者中间件团队统一解析binlog
,然后将数据投递到 kafka
等 mq
队列,供其它业务团队消费。所以可能的架构应该是这样
由于 delta lake
和 spark
都是由 databricks
公司开源的,并且强绑定spark
,所以只能使用 spark
来操作
读取kafka解析后的消息
使用 spark structured streaming
我们可以很简单的消费 kafka
的数据
// kafka source
Dataset<Row> source = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaServer())
.option("failOnDataLoss", "true")
.option("startingOffsets", "latest")
.option("groupIdPrefix", "inc")
.option("subscribe", config.getTopic())
.option("kafka.max.poll.interval.ms", Integer.MAX_VALUE)
.load().selectExpr("cast(key as string)", "cast(value as string)");
通过这段代码,我们就获得了 binlog
的数据,其中配置了failOnDataLoss=true
来保证我们的offset如果无法读取就强制失败,提醒负责人数据丢失
设置checkpoint地址以及触发间隔
source.writeStream()
.format("delta")
.foreachBatch(new BatchWriteFunction(config))
.trigger(Trigger.ProcessingTime(config.getTriggerTime(), TimeUnit.MINUTES))
.option("checkpointLocation", config.getCheckPointPath() + "/stream")
.queryName("all stream")
.start();
由于spark structured streaming
没有直接写入 delta table
的实现类,所以需要我们使用 foreachBatch
方式来自己实现,foreachBatch
默认实现了at least once
的语义,由于 delta lake
支持 acid
的特性,所以我们的 sink
端也保证了幂等性,最终实现了 excatly once
的语义。
继续看BatchWriteFunction
sink代码
public class BatchWriteFunction implements VoidFunction2<Dataset<String>, Long>
private final String updateSql;
private final Config config;
private ConcurrentHashMap<String, DeltaTable> cacheTable;
public BatchWriteFunction(Config config)
this.config = config;
if (config.isHourPt())
this.updateSql = "sink.dt = update.dt and sink.hour = update.hour and sink.id = update.id";
else
this.updateSql = "sink.dt = update.dt and sink.id = update.id";
cacheTable = new ConcurrentHashMap<>(2);
@Override
public void call(Dataset<String> rowDataSet, Long v2) throws Exception
if (rowDataSet.isEmpty())
log.warn("batch data size is empty ,ignore");
return;
//省略
private DeltaTable getDeltaTable(Dataset<String> rowDataSet)
DeltaTable deltaTable = cacheTable.get(config.getTableAlias());
if (deltaTable == null)
deltaTable = DeltaTable.forPath(rowDataSet.sparkSession(), config.getSinkPath());
cacheTable.put(config.getTableAlias(), deltaTable);
return deltaTable;
核心代码太长先省略,后续分段介绍
BatchWriteFunction
有三个变量
updateSql
:更新/插入时sql
条件config
:相关配置信息cacheTable
:缓存的delta table
其中在构造函数中通过判断该表是小时分区还是天分区来使用不同的更新/插入条件
具体看重写的 call
方法,有两个参数
rowDataSet
:微批时间内的数据v2
:生成的唯一id,用来写入端持久化使用来保证excatly once
语义。
在 rowDataSet
为空时,我们可以直接返回,如果在这里有多个sink,或者多个action算子时,建议使用rowDataSet.persist
持久化一下
在 rowDataSet
不为空时,由于 rowDataSet
是 json
格式的数据,我们需要解析一下
Dataset<Row> dataset = rowDataSet.sparkSession().read().json(rowDataSet);
这样我们就将 json
数据转换为具有 schema
的 Row
数据集
由于 delta lake
不支持写入时有重复的 id
进行更新同一条数据,所以我们需要做一下去重操作
StructType schema = dataset.schema();
StringBuilder otherCols = new StringBuilder();
String id = Constants.ID;
String updateTime = Constants.UPDATE_TIME;
boolean idExists = false, updateTimeExists = false;
String[] fieldNames = schema.fieldNames();
int len = fieldNames.length;
String fieldName;
for (int i = 0; i < len; i++)
fieldName = fieldNames[i];
if (fieldName.equals(id))
idExists = true;
else if (fieldName.equals(updateTime))
updateTimeExists = true;
else
if (i != len - 1)
otherCols.append(fieldName).append(", ");
else
otherCols.append(fieldName);
if (!idExists || !updateTimeExists)
rowDataSet.show(false);
throw new Exception("主键或者更新字段找不到:" + JSONObject.toJSONString(schema.fieldNames()));
otherCols.insert(0, "struct(" + updateTime + ",");
otherCols.append(") as otherCols");
//4
Dataset<Row> latestChange = dataset.toDF().selectExpr("id", otherCols.toString())
.groupBy("id")
.agg(max("otherCols").as("latest"))
.selectExpr("id", "latest.*");
代码不复杂,主要是根据 id
进行 group by
然后取时间戳最新的那条数据。需要注意的是:otherCols
对象的第一个字段要是数据的更新字段,如:gmt_modified
List<String> dtList = latestChange.toJavaRDD()
.groupBy((Function<Row, String>) v1 -> v1.getAs(Constants.DT))
.keys().collect();
StringBuilder dtStr = new StringBuilder();
int size = dtList.size();
if (size == 1)
dtStr.append("sink.dt = '").append(dtList.get(0)).append("'");
else
dtStr.append("( sink.dt = '").append(dtList.get(0)).append("'");
for (int i = 1; i < size; i++)
dtStr.append(" or sink.dt = '").append(dtList.get(i)).append("'");
dtStr.append(") ");
去重之后,我们为了使用 delta lake
提供的 snapshot isolation
特性来加快delta lake
的 merge
操作,还对这些数据进行了分区聚合
最后进行 merge
操作
DeltaTable deltaTable = getDeltaTable(rowDataSet);
//持续写入避免冲突,绝大部分冲突都在delta内部解决了
while (true)
try
//7
deltaTable.as("sink")
.merge(latestChange.as("update"), dtStr.toString() + " and " + this.updateSql)
.whenMatched(String.format("update.%s > sink.%s", Constants.UPDATE_TIME, Constants.UPDATE_TIME))
.updateAll()
.whenNotMatched()
.insertAll()
.execute();
break;
catch (DeltaConcurrentModificationException e)
log.error("merge data failed:", e.getMessage());
至于为什么用 while
循环写入,是为了保证写入冲突时,程序不挂掉。
将deltaTable
表做为sink
表,latestChange
做为最终要更新的update
数据,匹配条件是过滤的分区以及更新条件,当能够匹配时判断 update
的更新时间是否最新,如果最新则更新所有字段。当从sink表找不到数据,即无法匹配时,直接 insertAll
插入所有字段即可。
以上就是所有更新流程。当然日常我们其实也可能有其它问题,比如第一次的初始化数据怎么做,等等。
其它问题
1.如何初始化历史数据
最简单的方法当然是更新 mysql
的所有时间戳,把 binlog
重发一下。使用同步工具把mysql的数据同步到 kafka
,然后在 spark
程序中把增量 kafka
数据和历史 kafka
数据进行一个 union
写入到 delta lake
.该种方式还有一些优点,可以在后续 binlog
如果解析数据异常丢失,可以继续同步历史到 kafka
,重新消费,无需更新 mysql
数据
2.表的 schema
怎么定义的
由于我们 merge
操作使用了 updatAll
和 insertAll
,所以我们开启delta lake
的 Automatic schema evolution
,该特性会自动同步我们 mysql
中新加的字段到 delta lake
表
3.小文件问题怎么解决
在创建 sparkSession
时设置spark.delta.merge.repartitionBeforeWrite=true
,然后配置spark.sql.shuffle.partitions
大小即可,但是spark.sql.shuffle.partitions
影响着我们shuffling 操作的分区数,如果太小可能会导致任务执行过慢,太大又会导致小文件数过多,大家要酌情配置。
另外,我们可以通过其它方式合并文件数,比如我在写入程序里新增了一个kafka source
Dataset<String> configStream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getDataKafkaServer())
.option("failOnDataLoss", "false")
.option("startingOffsets", "latest")
.option("subscribe", Constants.CONFIG_TOPIC)
.option("groupIdPrefix", "conf")
.option("kafka.max.poll.interval.ms", Integer.MAX_VALUE)
.load().selectExpr("cast(value as string)");
//配置只需要一个并行度处理即可
configStream.repartition(1)
.writeStream()
.foreachBatch(new CompactionFunction(config))
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("checkpointLocation", config.getCheckPointPath() + "/config")
.queryName("config stream")
.start();
然后通过在其它平台定期发送合并前一天分区的命令来合并历史分区文件数
configSet.sparkSession().read()
.format("delta")
.load(config.getSinkPath())
.where(partition)
.repartition(num)
.write()
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.partitionBy(config.isHourPt() ? "dt,hour" : "dt")
.option("replaceWhere", partition)
.save(config.getSinkPath());
核心,大概就这么多。有问题大家可以留言评论
关注公众号获取最新文章
以上是关于深入剖析 Delta Lake: MySQL CDC 实战的主要内容,如果未能解决你的问题,请参考以下文章
深入剖析 Delta Lake: schema validation