实时数据应用系统设计方法
Posted 咬定青松
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时数据应用系统设计方法相关的知识,希望对你有一定的参考价值。
传统的数据应用架构,根据离线和实时应用场景的不同选用不同的数据管道
如果是离线数据迁移,使用Sqoop或者Datax等工具可以轻易的实现数据源到数据仓库的转换,更重要的是通过定时T+1的迁移模式很容易规避迁移过程中的各种问题。然而这种迁移方式的最大问题就是T+1模式,数据可用性延迟太长,特别是跨机房数据传输,数据延迟问题会更严重。此外,迁移大量数据会影响数据库的性能,同时直连数据库的方式不是每个业务方都允许,尽管可以提供备库,但也增加了业务运维成本。
于是基于CDC(Change Data Capture)的方式应运而生,它通过采集业务系统实时产生的binlog日志数据,解析日志还原出原始数据、相应的Schema和操作类型等信息后发送到Kafka,下游再消费Kafka的数据存储到数据湖系统中。不幸的是,原来批量加载而规避的问题在这种方案中几乎都出现了,而且还引入了更高的复杂度,比如下图是本人早些年实践过的系统架构:
业务数据库binlog通过canal传输到kafka,下游应用定时或者实时拉取kafka数据写入s3临时文件,spark合并临时文件后将最终结果分区更新到hive元数据
其中涉及到的主要组件或者系统有:
canal server:实例部署的基本单元,对一个数据库实例可以根据要求的可用性级别部署一个或多个canal server,如果一个数据库实例对应有多个canal server实例,因为同一个mysql同步部署单元中只能有一个主canal实例处于Active状态,于是在多canal实例的情况下需要借助zookeeper选主。canal server依据通讯协议拉取mysql 服务器的binlog日志,完成解析并存储到消息系统kafka,这一步已经集成到canal,作为canal的基本功能之一存在
kafka:canal server将解析后的binlog数据发送到配置的kafka topic,canal server支持将同一个mysql实例的所有binlog发送到同一个topic、多个分表发送到一个topic以及每个表到独立的topic等配置功能
canal client:用户实现的客户端程序,用于消费kafka数据并写入s3。canal client可以作为spark streaming、flink应用程序的常驻服务,也可以作为客户端库被批处理任务定时调度
spark:将canal client写入的临时文件进行合并和去重,结果数据以分区形式加载到hive表
hive:最终结果数据以Hive数据仓库形式对外服务
首先来看看该方案可以解决什么问题及其优势:
1. 对数据库的侵入性大大降低,不再需要直连数据库,也不需要部署备份数据库,只需要开放相关权限即可。
2. 该方案能够根据数据所在的表名动态路由到配置的 Kafka topic,特别是当分库分表存在的情况下,能够将多个分表合并到同一个topic,相比离线多分区表同步有较大的便捷优势。
3.解决离线批处理带来的高延迟问题,该方案通过实时将binlog发送到kafka,然后下游应用再消费kafka的数据,从而为数据的实时处理和查询提供可能。
4.由于中间系统Kafka的引入,解耦了数据的生产和消费,消费端可以对接多种数据处理引擎,如Hive、Spark、Flink和Presto等。而且处理后的数据可以重新写回到新的kafka topic。
但同时该方案也带来了新的复杂度和问题,列举如下:
1. 需要部署binlog捕获系统,目前常用的是阿里开源的canal,canal本身可以单实例部署,也可以高可用部署,取决于应用系统的服务等级。如果是高可用部署,还需要依赖ZooKeeper。
2. 需要部署分布式Kafka集群,用于暂存解析后的binlog数据,大量中间数据的存在推高了Kafka的运维和存储成本。
3. 需要实现Kafka消费端程序,而消费端程序的调度取决于应用类型,如果是常驻服务,其数据处理能力和可靠性将成为系统瓶颈。而过多的常驻服务将极大占用集群的资源。
4. 过长的数据处理链路增大了系统出错的概率和维护成本,而且涉及到的开发任务增加了使用门槛。
5. 无法有效应对DDL导致的Schema变更,当数据表结构发送变更,需要重启canal-server,而下游数据表结构的变更,可能会因为FileFormat的不同而支持行为不一样,导致某些引擎无法使用,如Presto
6. 无法兼顾全量数据迁移和增量迁移。首次接入CDC,可能需要手动初始化全量数据。对于DDL变更,也可能涉及到重新初始化。
7. 定时消费kafka产生较多临时小文件,数据合并、去重导致额外的计算开销。而且小文件过多,对metaserver压力较大。
另外还有一些因为基础系统本身技术的限制带来的问题:
8. 无版本控制,无法回溯历史数据。
9. 基于HDFS或者S3的Hive不支持更新操作、HDFS和S3之间的数据难以转换存储介质,某些操作比如rename无法通用
10. 无法提供ACID事务保证。比如无法保证在Hive数仓变更的过程中不会对分析作业的读取造成影响。
11. 无法有效应对分区变更,重刷数据过程非常耗时。
12. Hive Metastore 仅维护 partition 级别的统计信息,非 partition 字段不能做 partition
上述方案中存在的一些问题并不是同等重要,比如对ddl和事务的支持不好并不是特别关键,完全可以通过上游应用合理规避。但数据处理链路过程和延迟较大的问题至少说明方案不是最佳方案,究其原因一是需要解决的问题客观存在(如数据的增量拉取、更新数据的合并和去重,中间系统的性能和可靠性等),推高了为解决问题而增加的复杂度,二是当前缺少相应的更合适技术。而导致上述问题的一个主要原因是目前缺少能够支持更新操作的分布式存储系统。在分布式实时数据处理领域,一个好的存储系统至少要能够满足以下基本条件:
1. 分布式、高可用、可扩展
2. 不仅支持高效读写,还支持高效更新,特别是面向OLAP场景的读写
3. 不仅支持批量读写而且支持基于row- level的读写
4. 能够跟现有的技术生态较好结合,尤其是Hadoop、Spark或者Flink等,如果能够不增加维护成本就更好了。
5. 支持事务和多版本并发控制
6. 支持标准的SQL,包括DQL、DML、DDL、DCL
这些要求也反映了大数据处理系统的一个发展趋势:早期的大数据技术希望借助机器的堆积和数据的冗余来解决效率和不可靠问题,但也留下了应用复杂的后遗症,而在实时数据处理领域迫切需要将数据库领域成熟的方案和经验应用到大数据的存储和查询上来。在大数据的存储和查询也希望能够有一个像数据库系统那样独立的系统来支持数据的事务性的实时读写。
上述要求可能无法全部满足,但可以作为接近的目标,可喜的是目前开源界的确出现了不少这样的技术,比如Apache kudu,百度开源的Apache Doris,俄罗斯的国产Click House、Greeplum、Tidb等,另外还有其他的商业版技术如AWS的Redshift、阿里云的AnalyticDB、Hologres等,虽然它们也都存在不少问题,但在不少领域和场景都得到广泛应用。
然而一类数据湖技术俘获了我们的“芳心”,其代表有Hudi、Delta Lake,Iceberg,其中Iceberg 的设计初衷更倾向于定义一个标准、开放且通用的数据组织格式,同时屏蔽底层数据存储格式上的差异,向上提供统一的操作 API,使得不同的引擎可以通过其提供的 API 接入;Hudi 的设计初衷更像是为了解决流式数据的快速落地,并能够通过 upsert 语义进行延迟数据修正;Delta Lake 作为 Databricks 开源的项目,更侧重于在 Spark 层面上解决 Parquet、ORC 等存储格式的固有问题,并带来更多的能力提升。
虽然这三个项目在设计初衷上稍有不同,但实现的思路和提供的能力却非常相似,他们都提供了 ACID 的能力,都基于乐观锁实现了冲突解决和提供线性一致性,同时相应地提供了 time travel(数据回溯) 的功能。
但是因为设计初衷的不同,三个项目当前的能力象限各有不同,Iceberg 在其格式定义和核心能力上最为完善,但是上游引擎的适配上稍显不足;Hudi 基于 Spark 打造了完整的流式数据落地方案,但是其核心抽象较弱,与 Spark 耦合较紧;Delta Lake 同样高度依赖于 Spark 生态圈,与其他引擎的适配尚需时日。但以社区的力量,可以预见的是三个项目最终会互相借鉴优化而趋同。
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.
Iceberg 的官方定义是一种表格式,可以简单理解为是基于计算层(Flink , Spark)和存储层(ORC,Parqurt,Avro)的一个中间层,用 Flink 或者 Spark 将数据写入 Iceberg,然后再通过其他方式来读取这个表,比如 Spark,Flink,Presto 等。从这个定义上来看,Iceberg是一个用于海量数据分析场景下的开源的表格式。如下图所示,Iceberg 基于分布式存储系统来保存数据文件和元文件,其中元文件按照快照方式存储,每个快照元文件记录数据文件所在的文件路径。
一个表的数据由数据目录和元数据目录组成,其中数据目录保存实际的数据文件,元数据目录保存元数据,元数据由代表快照信息的json格式的元数据文件、以snap前缀的快照元数据文件、以Avro后缀的清单文件组成
具体讲,每个快照文件(snapshot)存储一个清单列表文件(manifest list),清单列表文件记录1至多个清单文件(manifest file)的路径和相关文件的统计数量信息等,而清单文件记录了组成某个快照的数据文件(data file)列表。每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、文件的大小以及文件里面数据的行数等信息。数据文件是 Apache Iceberg 表真实存储数据的文件,可以采用parquet、avro等格式。它们之间的关系如下图所示:
快照、清单文件、数据文件为一对多的层次关系,通过引用关系关联到最终的数据文件,依据快照很方便的回溯历史数据和增量拉取
基于Iceberg数据湖技术实现的实时应用架构图如下:
Flink通过check point机制和下游应用的幂等或容错机制等保证端到端的一致性
处理流程为业务库binlog通过Flink进入Kafka,再通过Flink消费Kafka数据写入Iceberg。因为Iceberg具备了数据的更新操作功能,因此相比上述canal的CDC方案,复杂度大为降低,而且功能大为增强,列举如下:
1. 数据可以批量或者流式写入或者更新,对批、流方式的读写支持以及兼顾了首次全量历史数据导入和增量数据摄取,真正做到了流批一体。这也是最大亮点之一,因为它降低了整个数据链路复杂度,将数据的处理限定在一个黑盒里,对用户透明。
2. 开放的表格式,支持不同的数据存储格式和数据处理引擎
3. 基于ACID语义的快照格式保证了数据的写入和读取互相隔离,而Flink的checkpoint机制和Iceberg的ACID事务特性保证了端到端的exactly once语义
4. 快照方式支持增量拉取和数据回溯,所谓增量拉取是指可以读取指定某个时间区间或者起始快照的文件数据,读取的最小粒度是文件。
5. 基于快照方式使schema变更和分区变更非常容易,仅影响当前快照,历史快照数据可以保持不变
6. 分布式存储的表格式提升了元数据存储的可扩展性以及降低了对Hive NameNode的压力
7. Iceberg元数据统计信息基于文件粒度,相比Hive 元数据基于分区粒度,具有更大的性能提升空间
8. 进一步提升了数据的时效性,T+1离线数仓进化为分钟级别的准实时数仓
9. 数据库binlog获取采用Debezium,支持更多类型的数据库类型,相比canal功能更为强大
最后值得一提的是Iceberg的引入只是作为依赖包的形式,几乎没有增加部署和维护成本,而社区基于Flink增加了Source和Sink,将该数据链路进一步简化,比如使用下图的系统设计方法来代替传统的设计方法:
最简单的Flink应用程序配置source和sink即可实现一个实时数据同步应用,而不需要借助部署canal、kafka等中间件
Flink Source Connector以嵌入式集成了Debezium,可以在应用程序端直接获取binlog,如果不追求at least once,可以摆脱对Kafka的依赖,再结合Flink Iceberg sink能够实现彻底的CDC方案,其关键在于 row-level upsert和delete的实现。Row-level update和delete通常有Copy-on-Write和Merge-on-Read两种方案。其中Copy-on-Write把生成新数据文件的压力集中于写入的时候,适合对读有较高要求的场景;而Merge-on-Read把合并最终结果的压力放在读取的时候,适合于快速写入的场景。
以上是关于实时数据应用系统设计方法的主要内容,如果未能解决你的问题,请参考以下文章