深入剖析 Delta Lake: MySQL CDC 实战

Posted scx_white

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入剖析 Delta Lake: MySQL CDC 实战相关的知识,希望对你有一定的参考价值。

前言

在初建大数据平台时,我想大家都遇到过这样一个需求,mysql 的数据如何同步到 hdfs 中供数仓分析使用

T+1架构

在早期,业务对实时性的要求可能还不高,大家的计算也都是 T+1 的离线计算,当然也可能有 H+1 的准实时计算。大家的选型可能是 sqoopdatax 等工具。此时只需要在离线调度平台上配置每天/每小时拉取 mysql 数据同步到 hive 的任务即可。

但该方式也有一些弊端,如:

  • 实时性较差
  • 全量抽取、尤其是每小时一次的抽取对 mysql 的压力很大
  • 单表数据量过大时(上亿数据),dba 不允许全量抽取,改为增量抽取后,hive 端需要和历史数据做合并。如果增量和历史全量合并执行时长更久,如果增量和最近几个月合并可能导致脏数据
  • 涉及到分库分表抽取时难度较大,工具支持的并不完美,需要在 hive 端做合并,操作复杂

H+1架构

18 年那会,我想以上的这种方式应该是很多公司都在使用的,即使是现在也有公司在采用。
在此架构的基础上,有些公司可能会有一些大表数据需要做 H+1 的准实时需求,面临的问题是:如果仍然每个小时抽取大表数据,而拉取数据时间又可能很长,会导致 mysql 实例负载持续处于最高水位,造成 mysql 服务不可用,即使是在从库拉取,也会导致从库挂掉,mysql 同步延迟等等问题,所以此时就诞生了一种新的同步方式。


该种架构需要在 mysql 端部署 canal 等能够接收 mysql 主库的 binlog 并且能解析 binlog 的工具。然后把 binlog 根据库或者业务类型发送到不同的 kafkamq 队列。而在下游,需要数据平台开发人员使用 spark 或者flink 等实时框架开发相应的消费代码,把 kafkabinlog 变更数据写入/更新到 hbase 或者 kudu具有更新的db。在后面就是在离线调度平台使用 hive 或者spark sql来进行 etl 计算了。
当然,该种架构还是有一定的弊端

  • 只是准实时,还是需要在离线调度平台每小时的抽取 hbase 数据
  • 把对 mysql 的抽取压力,转移到了 hbase 端。如果也有业务在用hbase,每小时的高峰期会导致业务读/写请求变慢
  • 如果每个小时从 hbase 全量抽取,走的 hbasescan 命令,数据量大的时候极慢,可能会导致后续的整个链路超出1个小时,甚至变成 H+n
  • 如果 hbase 只存近几天的增量数据,然后使用row_number() overhive 的全量进行合并,也可能会导致整个链路过长,如果合并最近几个月的全量数据,可能会产生脏数据。

实时方案

以前受限于 parquet 文件写入 HDFS 文件后,要想更新数据,就只能全量写新的数据,成本很高,并且 HDFS 的设计就是一次写入多次读取。都 2020 年了,到底有没有一种架构能够在分钟级别采集到 HDFS 呢?

很明显,是有的。据我所知,目前能够支持 hdfs 更新的工具有 delta lakehudiiceberg(这三款都是数据湖框架,更新也是数据湖数据的更新,但是能够通过其它引擎读取,比如spark、hive、presto)。但是目前 iceberg
还是不支持更新的 merge/upsert 操作,并且 icebergflink 目前已经在合作,据说社区内部已经在把该特性的优先级放到最高了,估计会在21 年中附近出来,而其它两款是明确支持更新操作的。我调研了一番后,发现还是 databricks 公司开源的 delta lake 上手比较简单,继承了了 spark 的易用性。


该种架构最简单粗暴,直接使用 spark 或者 flink 解析 binlog 日志,然后将数据处理后写入到 hdfs ,但是问题是一般 mysql 都是 dba 在维护,不会允许个人或者某个团队直接读取mysql binlog,这样可能会导致重复binlog 消费,对 mysql 压力较大。最终会由 dba 或者中间件团队统一解析binlog,然后将数据投递到 kafkamq 队列,供其它业务团队消费。所以可能的架构应该是这样

由于 delta lakespark 都是由 databricks 公司开源的,并且强绑定spark,所以只能使用 spark 来操作

读取kafka解析后的消息

使用 spark structured streaming 我们可以很简单的消费 kafka 的数据

  // kafka source
        Dataset<Row> source = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", config.getKafkaServer())
                .option("failOnDataLoss", "true")
                .option("startingOffsets", "latest")
                .option("groupIdPrefix", "inc")
                .option("subscribe", config.getTopic())
                .option("kafka.max.poll.interval.ms", Integer.MAX_VALUE)
                .load().selectExpr("cast(key as string)", "cast(value as string)");

通过这段代码,我们就获得了 binlog 的数据,其中配置了failOnDataLoss=true 来保证我们的offset如果无法读取就强制失败,提醒负责人数据丢失

设置checkpoint地址以及触发间隔

source.writeStream()
                .format("delta")
                .foreachBatch(new BatchWriteFunction(config))
                .trigger(Trigger.ProcessingTime(config.getTriggerTime(), TimeUnit.MINUTES))
                .option("checkpointLocation", config.getCheckPointPath() + "/stream")
                .queryName("all stream")
                .start();

由于spark structured streaming 没有直接写入 delta table 的实现类,所以需要我们使用 foreachBatch 方式来自己实现,foreachBatch 默认实现了at least once 的语义,由于 delta lake 支持 acid 的特性,所以我们的 sink 端也保证了幂等性,最终实现了 excatly once 的语义。
继续看BatchWriteFunction

sink代码

public class BatchWriteFunction implements VoidFunction2<Dataset<String>, Long> 


    private final String updateSql;
    private final Config config;

    private ConcurrentHashMap<String, DeltaTable> cacheTable;


    public BatchWriteFunction(Config config) 
        this.config = config;
        if (config.isHourPt()) 
            this.updateSql = "sink.dt = update.dt and sink.hour = update.hour and sink.id = update.id";
         else 
            this.updateSql = "sink.dt = update.dt and sink.id = update.id";
        
        cacheTable = new ConcurrentHashMap<>(2);
    

    @Override
    public void call(Dataset<String> rowDataSet, Long v2) throws Exception 
        
        if (rowDataSet.isEmpty()) 
            log.warn("batch data size is empty ,ignore");
            return;
        
       	//省略
    

    private DeltaTable getDeltaTable(Dataset<String> rowDataSet) 
        DeltaTable deltaTable = cacheTable.get(config.getTableAlias());
        if (deltaTable == null) 
            deltaTable = DeltaTable.forPath(rowDataSet.sparkSession(), config.getSinkPath());
            cacheTable.put(config.getTableAlias(), deltaTable);
        
        return deltaTable;
    


核心代码太长先省略,后续分段介绍

BatchWriteFunction有三个变量

  • updateSql :更新/插入时 sql 条件
  • config :相关配置信息
  • cacheTable:缓存的delta table

其中在构造函数中通过判断该表是小时分区还是天分区来使用不同的更新/插入条件

具体看重写的 call方法,有两个参数

  • rowDataSet :微批时间内的数据
  • v2:生成的唯一id,用来写入端持久化使用来保证 excatly once 语义。

rowDataSet 为空时,我们可以直接返回,如果在这里有多个sink,或者多个action算子时,建议使用rowDataSet.persist 持久化一下
rowDataSet 不为空时,由于 rowDataSetjson 格式的数据,我们需要解析一下

        Dataset<Row> dataset = rowDataSet.sparkSession().read().json(rowDataSet);

这样我们就将 json 数据转换为具有 schemaRow 数据集

由于 delta lake 不支持写入时有重复的 id 进行更新同一条数据,所以我们需要做一下去重操作

  		StructType schema = dataset.schema();
        StringBuilder otherCols = new StringBuilder();
        String id = Constants.ID;
        String updateTime = Constants.UPDATE_TIME;
        boolean idExists = false, updateTimeExists = false;
        String[] fieldNames = schema.fieldNames();
        int len = fieldNames.length;
        String fieldName;
        for (int i = 0; i < len; i++) 
            fieldName = fieldNames[i];
            if (fieldName.equals(id)) 
                idExists = true;
             else if (fieldName.equals(updateTime)) 
                updateTimeExists = true;
             else 
                if (i != len - 1) 
                    otherCols.append(fieldName).append(", ");
                 else 
                    otherCols.append(fieldName);
                
            
        

        if (!idExists || !updateTimeExists) 
            rowDataSet.show(false);
            throw new Exception("主键或者更新字段找不到:" + JSONObject.toJSONString(schema.fieldNames()));
        
        otherCols.insert(0, "struct(" + updateTime + ",");
        otherCols.append(") as otherCols");

        //4
        Dataset<Row> latestChange = dataset.toDF().selectExpr("id", otherCols.toString())
                .groupBy("id")
                .agg(max("otherCols").as("latest"))
                .selectExpr("id", "latest.*");

代码不复杂,主要是根据 id 进行 group by 然后取时间戳最新的那条数据。需要注意的是:otherCols对象的第一个字段要是数据的更新字段,如:gmt_modified

 List<String> dtList = latestChange.toJavaRDD()
                .groupBy((Function<Row, String>) v1 -> v1.getAs(Constants.DT))
                .keys().collect();
        StringBuilder dtStr = new StringBuilder();
        int size = dtList.size();
        if (size == 1) 
            dtStr.append("sink.dt = '").append(dtList.get(0)).append("'");
         else 
            dtStr.append("( sink.dt = '").append(dtList.get(0)).append("'");
            for (int i = 1; i < size; i++) 
                dtStr.append(" or sink.dt = '").append(dtList.get(i)).append("'");
            
            dtStr.append(") ");
        

去重之后,我们为了使用 delta lake 提供的 snapshot isolation特性来加快delta lakemerge 操作,还对这些数据进行了分区聚合

最后进行 merge 操作

 DeltaTable deltaTable = getDeltaTable(rowDataSet);
        //持续写入避免冲突,绝大部分冲突都在delta内部解决了
        while (true) 
            try 
                //7
                deltaTable.as("sink")
                        .merge(latestChange.as("update"), dtStr.toString() + " and " + this.updateSql)
                        .whenMatched(String.format("update.%s > sink.%s", Constants.UPDATE_TIME, Constants.UPDATE_TIME))
                        .updateAll()
                        .whenNotMatched()
                        .insertAll()
                        .execute();
                break;
             catch (DeltaConcurrentModificationException e) 
                log.error("merge data failed:", e.getMessage());
            
        

至于为什么用 while 循环写入,是为了保证写入冲突时,程序不挂掉。
deltaTable表做为sink表,latestChange做为最终要更新的update数据,匹配条件是过滤的分区以及更新条件,当能够匹配时判断 update 的更新时间是否最新,如果最新则更新所有字段。当从sink表找不到数据,即无法匹配时,直接 insertAll 插入所有字段即可。


以上就是所有更新流程。当然日常我们其实也可能有其它问题,比如第一次的初始化数据怎么做,等等。

其它问题

1.如何初始化历史数据
最简单的方法当然是更新 mysql 的所有时间戳,把 binlog 重发一下。使用同步工具把mysql的数据同步到 kafka,然后在 spark 程序中把增量 kafka数据和历史 kafka 数据进行一个 union 写入到 delta lake.该种方式还有一些优点,可以在后续 binlog 如果解析数据异常丢失,可以继续同步历史到 kafka,重新消费,无需更新 mysql 数据
2.表的 schema 怎么定义的
由于我们 merge 操作使用了 updatAllinsertAll,所以我们开启delta lakeAutomatic schema evolution,该特性会自动同步我们 mysql 中新加的字段到 delta lake
3.小文件问题怎么解决
在创建 sparkSession 时设置spark.delta.merge.repartitionBeforeWrite=true,然后配置spark.sql.shuffle.partitions大小即可,但是spark.sql.shuffle.partitions影响着我们shuffling 操作的分区数,如果太小可能会导致任务执行过慢,太大又会导致小文件数过多,大家要酌情配置。
另外,我们可以通过其它方式合并文件数,比如我在写入程序里新增了一个kafka source

      Dataset<String> configStream = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", config.getDataKafkaServer())
                .option("failOnDataLoss", "false")
                .option("startingOffsets", "latest")
                .option("subscribe", Constants.CONFIG_TOPIC)
                .option("groupIdPrefix", "conf")
                .option("kafka.max.poll.interval.ms", Integer.MAX_VALUE)
                .load().selectExpr("cast(value as string)");
        //配置只需要一个并行度处理即可
        configStream.repartition(1)
                .writeStream()
                .foreachBatch(new CompactionFunction(config))
                .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
                .option("checkpointLocation", config.getCheckPointPath() + "/config")
                .queryName("config stream")
                .start();


然后通过在其它平台定期发送合并前一天分区的命令来合并历史分区文件数

  configSet.sparkSession().read()
                        .format("delta")
                        .load(config.getSinkPath())
                        .where(partition)
                        .repartition(num)
                        .write()
                        .option("dataChange", "false")
                        .format("delta")
                        .mode("overwrite")
                        .partitionBy(config.isHourPt() ? "dt,hour" : "dt")
                        .option("replaceWhere", partition)
                        .save(config.getSinkPath());

核心,大概就这么多。有问题大家可以留言评论


关注公众号获取最新文章

以上是关于深入剖析 Delta Lake: MySQL CDC 实战的主要内容,如果未能解决你的问题,请参考以下文章

深入剖析 Delta Lake: schema validation

Delta Lake 版本管理(13)

如何使用Delta Lake构建批流一体数据仓库

Delta Lake 学习

Pyspark Delta Lake 捕获表不是 delta 表异常

pyspark delta-lake 元存储