Greenplum 实时数据仓库实践——实时数据同步
Posted wzy0623
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Greenplum 实时数据仓库实践——实时数据同步相关的知识,希望对你有一定的参考价值。
目录
5.6 Canal + Kafka + ClientAdapter
构建实时数据仓库最大的挑战在于从操作型数据源实时抽取数据,即ETL过程中的Extract部分。我们要以全量加增量的方式,实时捕获源系统中所需的所有数据及其变化,而这一切都要在不影响对业务数据库正常操作的前提下进行,目标是要满足高负载、低延迟,难点正在于此,所以需要完全不同于批处理的技术加以实现。当操作型数据进入数据仓库过渡区或ODS以后,就可以利用数据仓库系统软件提供的功能特性进行后续处理,不论是Greenplum、Hive或是其他软件,这些处理往往只需要使用其中一种,相对来说简单一些。
Greenplum作为数据仓库的计算引擎,其数据来源多是业务数据,其中以mysql为主。本篇将介绍两种主要的从MySQL实时同步数据到Greenplum的解决方案,一是maxwell + Kafka + bireme、二是Canal + Kafka + ClientAdapter,这两个方案的共同点是都使用开源组件,不需要编写代码,只要进行适当配置便可运行。总体来说,两种方案都是基于MySQL binlog捕获数据变化,然后将binlog以数据流的形式传入Kafka消息队列,再以消费的方式将数据变化应用到Greenplum。但是,两者在实现上区别很大,尤其是消费端的不同实现方式使数据载入Greenplum的性能差别巨大。由于主要的MySQL变化数据捕获技术都是基于其复制协议,并以消息系统作为中间组件,所以先会介绍作为基础的MySQL数据复制和Kafka。
5.1 数据抽取方式
抽取数据是ETL处理过程的第一个步骤,也是数据仓库中最重要和最具有挑战性的部分,适当的数据抽取是成功建立数据仓库的关键。
从源抽取数据导入数据仓库或过渡区有两种方式,可以从源把数据抓取出来(拉),也可以请求源把数据发送(推)到数据仓库。影响选择数据抽取方式的一个重要因素是操作型系统的可用性和数据量,这是抽取整个数据集还是仅仅抽取自最后一次抽取以来的变化数据的基础。我们考虑以下两个问题:
- 需要抽取哪部分源数据加载到数据仓库?有两种可选方式,完全抽取和变化数据捕获。
- 数据抽取的方向是什么?有两种方式,拉模式,即数据仓库主动去源系统拉取数据;推模式,由源系统将自己的数据推送给数据仓库。
对于第二个问题来说,通常要改变或增加操作型业务系统的功能是非常困难的,这种困难不仅体现在技术上,还有来自于业务系统用户及其开发者的阻力。理论上讲,数据仓库不应该要求对源系统做任何改造,实际上也很少由源系统推数据给数据仓库。因此对这个问题的答案比较明确,大都采用拉数据模式。下面我们着重讨论第一个问题。
如果数据量很小并且易处理,一般来说采取完全源数据抽取,就是将所有的文件记录或所有的数据库表数据抽取至数据仓库。这种方式适合基础编码类型的源数据,比如邮政编码、学历、民族等。基础编码型源数据通常是维度表的数据来源。如果源数据量很大,抽取全部数据是不可行的,那么只能抽取变化的源数据,即最后一次抽取以来发生了变化的数据。这种数据抽取模式称为变化数据捕获,简称CDC(Change Data Capture),常被用于抽取操作型系统的事务数据,比如销售订单、用户注册,或各种类型的应用日志记录等。
CDC大体可以分为两种,一种是侵入式的,另一种是非侵入式的。所谓侵入式的是指CDC操作会给源系统带来性能的影响。只要CDC操作以任何一种方式对源库执行了SQL语句,就可以认为是侵入式的CDC。常用的四种CDC方法是:基于时间戳的CDC、基于触发器的CDC、基于快照的CDC、基于日志的CDC,其中前三种是侵入性的。表5-1总结了四种CDC方案的特点。
时间戳 | 触发器 | 快照 | 日志 | |
能区分插入/更新 | 否 | 是 | 是 | 是 |
周期内,检测到多次更新 | 否 | 是 | 否 | 是 |
能检测到删除 | 否 | 是 | 是 | 是 |
不具有侵入性 | 否 | 否 | 否 | 是 |
支持实时 | 否 | 是 | 否 | 是 |
不依赖数据库 | 是 | 否 | 是 | 否 |
表5-1 四种CDC方案比较
5.1.1 基于源数据的CDC
基于源数据的CDC要求源数据里有相关的属性列,抽取过程可以利用这些属性列来判断哪些数据是增量数据。最常见的属性列有以下两种。
- 时间戳:这种方法至少需要一个更新时间戳,但最好有两个,一个插入时间戳,表示记录何时创建,一个更新时间戳,表示记录最后一次更新的时间。
- 序列:大多数数据库系统都提供自增功能。如果数据库表列被定义成自增的,就可以很容易地根据该列识别出新插入的数据。
这种方法的实现较为简单,假设表t1中有一个时间戳字段last_inserted,t2表中有一个自增序列字段id,则下面SQL语句的查询结果就是新增的数据,其中last_load_time和last_load_id分别表示ETL系统中记录的最后一次数据装载时间和最大自增序列号。
select * from t1 where last_inserted > last_load_time;
select * from t2 where id > last_load_id;
通常需要建立一个额外的数据库表存储上一次更新时间或上一次抽取的最后一个序列号。在实践中,一般是在一个独立的模式下或在数据过渡区里创建这个参数表。基于时间戳和自增序列的方法是CDC最简单的实现方式,也是最常用的方法,但它的缺点也很明显:
- 不能区分插入和更新操作。只有当源系统包含了插入时间戳和更新时间戳两个字段,才能区别插入和更新,否则不能区分。
- 不能记录删除数据的操作。不能捕获到删除操作,除非是逻辑删除,即记录没有被真的删除,只是做了逻辑上的删除标志。
- 无法识别多次更新。如果在一次同步周期内,数据被更新了多次,只能同步最后一次更新操作,中间的更行操作都丢失了。
- 不具有实时能力。时间戳和基于序列的数据抽取一般适用于批量操作,不适合于实时场景下的数据抽取。
这种方法是具有侵入性的,如果操作型系统中没有时间戳或时间戳信息是不可用的,那么不得不通过修改源系统把时间戳包含进去,首先要求修改操作型系统的表包含一个新的时间戳列,然后建立一个触发器,在修改一行时更新时间戳列的值。在实施这些操作前必须被源系统的拥有者所接受,并且要仔细评估对源系统产生的影响。
有些方案通过高频率扫描递增列的方式实现准实时数据抽取。例如Flume的flume-ng-sql-source插件,缺省每5秒查询一次源表的主键以捕获新增数据,“利用Flume将MySQL表数据准实时抽取到HDFS”展示了一个具体示例。
5.1.2 基于触发器的CDC
当执行INSERT、UPDATE、DELETE这些SQL语句时,可以激活数据库里的触发器,并执行一些动作,就是说触发器可以用来捕获变更的数据并把数据保存到中间临时表里。然后这些变更的数据再从临时表中取出,被抽取到数据仓库的过渡区里。但在大多数场合下,不允许向操作型数据库里添加触发器(业务数据库的变动通常都异常慎重),而且这种方法会降低系统的性能,所以此方法用的并不是很多。
作为直接在源数据库上建立触发器的替代方案,可以使用源数据库的复制功能,把源数据库上的数据复制到从库上,在从库上建立触发器以提供CDC功能。尽管这种方法看上去过程冗余,且需要额外的存储空间,但实际上这种方法非常有效,而且没有侵入性。复制是大部分数据库系统的标准功能,如MySQL、Oracle和SQL Server等都有各自的数据复制方案。
5.1.3 基于快照的CDC
如果没有时间戳,也不允许使用触发器,就要使用快照表了。可以通过比较源表和快照表来获得数据变化。快照就是一次性抽取源系统中的全部数据,把这些数据装载到数据仓库的过渡区中。下次需要同步时,再从源系统中抽取全部数据,并把全部数据也放到数据仓库的过渡区中,作为这个表的第二个版本,然后再比较这两个版本的数据,从而找到变化。
有多个方法可以获得这两个版本数据的差异。假设表有两个列id和name,id是主键列。该表的第一、二个版本的快照表名为snapshot_1、snapshot_2。下面的SQL语句在主键id列上做全外链接,并根据主键比较的结果增加一个标志字段,I表示新增,U表示更新,D代表删除,N代表没有变化。外层查询过滤掉没有变化的记录。
select * from
(select case when t2.id is null then 'D'
when t1.id is null then 'I'
when t1.name <> t2.name then 'U'
else 'N'
end as flag,
case when t2.id is null then t1.id else t2.id end as id,
t2.name
from snapshot_1 t1 full outer join snapshot_2 t2 on t1.id = t2.id) a
where flag <> 'N';
当然,这样的SQL语句需要数据库支持全外链接,对于MySQL这样不支持全外链接的数据库,可以使用类似下面的SQL语句:
select 'U' as flag, t2.id as id, t2.name as name
from snapshot_1 t1 inner join snapshot_2 t2 on t1.id = t2.id
where t1.name != t2.name
union all
select 'D' as flag, t1.id as id, t1.name as name
from snapshot_1 t1 left join snapshot_2 t2 on t1.id = t2.id
where t2.id is null
union all
select 'I' as flag, t2.id as id, t2.name as name
from snapshot_2 t2 left join snapshot_1 t1 on t2.id = t1.id
where t1.id is null;
基于快照的CDC可以检测到插入、更新和删除的数据,这是相对于基于时间戳的CDC方案的优点。它的缺点是需要大量的存储空间来保存快照。另外,当表很大时,这种查询会有比较严重的性能问题。
5.1.4 基于日志的CDC
最复杂的和最没有侵入性的CDC方法是基于日志的方式。数据库会把每个插入、更新、删除操作记录到日志里。如使用MySQL数据库,只要在数据库服务器中启用二进制日志binlog(设置log_bin服务器系统变量),之后就可以实时从数据库日志中读取到所有数据库写操作,并使用这些操作来更新数据仓库中的数据。这种方式需要把二进制日志转为可以理解的格式,然后再把里面的操作按照顺序读取出来。
MySQL提供了一个叫做mysqlbinlog的日志读取工具。这个工具可以把二进制的日志格式转换为可读的格式,然后就可以把这种格式的输出保存到文本文件里,或者直接把这种格式的日志应用到MySQL客户端用于数据还原操作。mysqlbinlog工具有很多命令行参数,其中最重要的一组参数可以设置开始/截止时间戳,这样能够只从日志里截取一段时间的日志。另外,日志里的每个日志项都有一个序列号,也可以用来做偏移操作。MySQL的日志提供了上述两种方式来防止CDC过程发生重复或丢失数据的情况。下面是使用mysqlbinlog的两个例子。第一条命令将jbms_binlog.000002文件中从120偏移量以后的操作应用到一个MySQL数据库中。第二条命令将jbms_binlog.000002文件中一段时间的操作格式化输出到一个文本文件中。
mysqlbinlog --start-position=120 jbms_binlog.000002 | mysql -u root -p123456
mysqlbinlog --start-date="2011-02-27 13:10:12" --stop-date="2011-02-27 13:47:21" jbms_binlog.000002 > temp/002.txt
使用基于数据库的日志工具也有缺陷,即只能用来处理一种特定的数据库,如果要在异构的数据库环境下使用基于日志的CDC方法,就要使用GoldenGate之类的软件。本篇介绍的两种实时数据同步方案都是使用开源组件完成类似功能。
5.2 MySQL数据复制
Maxwell、Canal都可以实时读取MySQL二进制日志,本质上都是将自身伪装成一个从库,利用MySQL原生的主从复制协议获取并处理二进制日志。了解MySQL复制的基本原理有助于理解和使用这些组件。
简单说,复制就是将来自一个MySQL数据库服务器(主库)的数据复制到一个或多个MySQL数据库服务器(从库)。传统的MySQL复制提供了一种简单的Primary-Secondary复制方法,默认情况下,复制是单向异步的。MySQL支持两种复制方式:基于行的复制和基于语句的复制。这两种方式都是通过在主库上记录二进制日志(binlog)、在从库重放中继日志(relylog)的方式来实现异步的数据复制。二进制日志或中继日志中的记录被称为事件。所谓异步包含两层含义,一是主库的二进制日志写入与将其发送到从库是异步进行的,二是从库获取与重放日志事件是异步进行的。这意味着,在同一时间点从库上的数据更新可能落后于主库,并且无法保证主从之间的延迟间隔。
复制给主库增加的开销主要体现在启用二进制日志带来的I/O,但是增加并不大,MySQL官方文档中称开启二进制日志会产生1%的性能损耗。出于对历史事务备份以及从介质失败中恢复的目的,这点开销是非常必要的。除此之外,每个从库也会对主库产生一些负载,例如网络和I/O。当从库读取主库的二进制日志时,也会造成一定的I/O开销。如果从一个主库复制到多个从库,唤醒多个复制线程发送二进制日志内容的开销将会累加。但所有这些复制带来的额外开销相对于应用对MySQL服务器造成的高负载来说都微不足道。
5.2.1 复制的用途
复制的用途主要体现在以下五个方面:
1. 横向扩展
通过复制可以将读操作指向从库来获得更好的读扩展。所有写入和更新都在主库上进行,但读取可能发生在一个或多个从库上。在这种读写分离模型中,主库专用于更新,显然比同时进行读写操作会有更好的写性能。需要注意的是,对于写操作并不适合通过复制来扩展。在一主多从架构中,写操作会被执行多次,正如“木桶效应”,这时整个系统的写性能取决于写入最慢的那部分。
2. 负载均衡
通过MySQL复制可以将读操作分布到多个服务器上,实现对读密集型应用的优化。对于小规模的应用,可以简单地对机器名做硬编码或者使用DNS轮询(将一个机器名指向多个IP地址)。当然也可以使用复杂的方法,例如使用LVS网络负载均衡器等,能够很好地将负载分配到不同的MySQL服务器上。
3. 提高数据安全性
提高数据安全性可以从两方面来理解。其一,因为数据被复制到从库,并且从库可以暂停复制过程,所以可以在从库上执行备份操作而不会影响对应的主库。其二,当主库出现问题时,还有从库的数据可以被访问。但是,对备份来说,复制仅是一项有意义的技术补充,它既不是备份也不能够取代备份。例如,当用户误删除一个表,而且此操作已经在从库上被复制执行,这种情况下只能用备份来恢复。
4. 提高高可用性
复制可以帮助应用程序避免MySQL单点失败,一个包含复制的设计良好的故障切换系统能够显著缩短宕机时间。
5. 滚动升级
比较普遍的做法是,使用一个高版本MySQL作为从库,保证在升级全部实例前,查询能够在从库上按照预期执行。测试没有问题后,将高版本的MySQL切换为主库,并将应用连接至该主库,然后重新搭建高版本的从库。
后面介绍Maxwell和Canal方案时会看到,其架构正是利用了横向扩展中的级联主从拓扑结构,以及从库可以安全暂停复制的特点才得以实现。
5.2.2 二进制日志
MySQL复制依赖二进制日志(binlog),所以要理解复制如何工作,先要了解MySQL的二进制日志。
二进制日志包含描述数据库更改的事件,如建表操作或对表数据的更改等。开启二进制日志有两个重要目的:
- 用于复制。主库上的二进制日志提供要发送到从库的数据更改记录。主库将其二进制日志中包含的事件发送到从库,从库执行这些事件以对其本地数据进行相同的更改。
- 用于恢复。当出现介质错误,如磁盘故障时,数据恢复操作需要使用二进制日志。还原备份后,重新执行备份后记录的二进制日志中的事件,最大限度减少数据丢失。
不难看出,MySQL二进制日志所起的作用与Oracle的归档日志类似。二进制日志只记录更新数据的事件,不记录SELECT或SHOW等语句。通过设置log-bin系统变量开启二进制日志,不同版本MySQL的缺省配置可能不同,如MySQL 5.6的缺省为不开启,MySQL 8中缺省是开启的。
二进制日志有STATEMENT、ROW、MIXED三种格式,通过binlog-format系统变量设置:
- STATMENT格式,基于SQL语句的复制(statement-based replication,SBR)。每一条会修改数据的SQL语句会被记录到binlog中。这种格式的优点是不需要记录每行的数据变化,这样二进制日志会比较少,减少磁盘I/O,提高性能。缺点是在某些情况下会导致主库与从库中的数据不一致,例如last_insert_id()、now()等非确定性函数,以及用户自定义函数(user-defined functions,udf)等易出现问题。
- ROW格式,基于行的复制(row-based replication,RBR)。该格式不记录SQL语句的上下文信息,仅记录哪条数据被修改了,修改成了什么样子,能清楚记录每一行数据的修改细节。其优点是不会出现某些特定情况下的存储过程、函数或触发器的调用和触发无法被正确复制的问题。缺点是通常会产生大量的日志,尤其像大表上执行alter table操作时会让日志暴涨。
- MIXED格式,混合复制(mixed-based replication,MBR)。它是语句和行两种格式的混合体,默认使用STATEMENT模式保存二进制日志,对于STATEMENT模式无法正确复制的操作,会自动切换到基于行的格式,MySQL会根据执行的SQL语句选择日志保存方式。
不同版本MySQL的binlog-format参数的缺省值可能不同,如MySQL 5.6的缺省值为STATEMENT,MySQL 8缺省使用ROW格式。二进制日志的存放位置最好设置到与MySQL数据目录不同的磁盘分区,以降低磁盘I/O的竞争,提升性能,并且在数据磁盘故障的时候还可以利用备份和二进制日志恢复数据。
5.2.3 复制步骤
总的来说,MySQL复制有五个步骤:
- 在主库上把数据更改事件记录到二进制日志中。
- 从库上的I/O线程向主库询问二进制日志中的事件。
- 主库上的binlog dump线程向I/O线程发送二进制事件。
- 从库上的I/O线程将二进制日志事件复制到自己的中继日志中。
- 从库上的SQL线程读取中继日志中的事件,并将其重放到从库上。
图5-1更详细地描述了复制的细节。
图5-1 复制如何工作
第一步是在主库上记录二进制日志。每次准备提交事务完成数据更新前,主库将数据更新的事件记录到二进制日志中。MySQL会按事务提交的顺序而非每条语句的执行顺序来记录二进制日志。在记录二进制日志后,主库会告诉存储引擎可以提交事务了。
下一步,从库将主库的二进制日志复制到其本地的中继日志中。首先,从库会启动一个工作线程,称为I/O线程。I/O线程跟主库建立一个普通的客户端连接,然后在主库上启动一个特殊的二进制日志转储(binlog dump)线程,它会读取主库上二进制日志中的事件,但不会对事件进行轮询。如果该线程追赶上了主库,它将进入睡眠状态,直到主库发送信号通知其有新的事件时才会被唤醒,从库I/O线程会将接收到的事件记录到中继日志中。
从库的SQL线程执行最后一步,该线程从中继日志中读取事件并在从库上执行,从而实现从库数据的更新。当SQL线程追赶I/O线程时,中继日志通常已经在系统缓存中,所以读取中继日志的开销很低。SQL线程执行的事件也可以通过log_slave_updates系统变量来决定是否写入其自己的二进制日志中,这可以用于级联复制的场景。
这种复制架构实现了获取事件和重放事件的解耦,允许这两个过程异步进行。也就是说I/O线程能够独立于SQL线程之外工作。但这种架构也限制了复制的过程,其中最重要的一点是在主库上并发更新的查询在从库上通常只能串行化执行,因为缺省只有一个SQL线程来重放中继日志中的事件。在MySQL 5.6以后已经可以通过配置slave_parallel_workers等系统变量进行并行复制,相关细节参见“组提交与多线程复制”。
5.3 使用Kafka
从MySQL复制中从库的角度看,实际上是实现了一个消息队列的功能。消息就是二进制日志中的事件,持久化存储在中继日志文件里。I/O线程是消息的生产者,向中继日志写数据,SQL线程是消息的消费者,从中继日志读取数据并在目标库上重放。队列是一种先进先出的数据结构,这个简单定义就决定了队列中的数据一定是有序的。在数据复制场景中这种有序性极为重要,如果不能保证事件重放与产生同序,主从库的数据将会不一致,也就失去了数据复制的意义。
中继日志、I/O线程、SQL线程是MySQL内部的实现。在本专题讨论的异构环境中,源是MySQL,目标是Greenplum。作为一种不严格的类比,Maxwell、Canal实现的是类似I/O线程的功能,bireme、Canal的ClientAdapter组件实现的是类似SQL线程的功能。那中继日志呢,是Kafka登场的时候了,当然Kafka比中继日志或消息队列要复杂得多,它是一个完整的消息系统。严格来说,在本实时数据同步场景中,Kafka并不是必须的。比如Canal的TCP服务器模式,就是直接将网络数据包直接发送给消费者进行消费,消息数据只存在于内存中,并不做持久化。这种实现方式用于生产环境很不合适,既有丢失数据的风险,也缺乏必要的管理和监控,引入Kafka正好可以物尽其用。
5.3.1 Kafka基本概念
Kafka是一款基于发布与订阅的分布式消息系统,其数据按照一定顺序持久化保存,并可以按需读取。此外,Kafka的数据分布在整个集群中,具备数据故障保护和性能伸缩能力。
1. 消息和批次
Kafka的数据单元被称为消息,它可以被看作是数据库里的一条记录。消息由字节数组组成,所以对于Kafka来说,消息里的数据没有特别的格式或含义。消息可以有一个可选的元数据,也就是键。与消息一样,键也是一个字节数组,对于Kafka来说也没有特殊的含义。当消息以一种可控的方式写入不同分区时会用到键。最简单的例子就是为键生成一个一致性哈希值,然后使用哈希值对主题分区进行取模,为消息选取分区。这样可以保证具有相同键的消息总是被写到相同的分区上。对数据库来说,通常将表的主键作为消息的键,这是Kafka保证消费顺序的关键所在,后面将详细说明。
为了提高效率,消息被分批次写入Kafka。批次就是一组消息,这些消息属于同一个主题和分区。把消息分批次传输可以减少网络开销。不过,这要在延迟时间和吞吐量之间做出权衡:批次越大,单位时间处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。
2. 主题与分区
Kafka的消息通过主题(topic)进行分类。主题就好比数据库的表,或者文件系统的目录。主题可以被分为若干个分区(partition),一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先进先出的顺序读取。注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。如果需要所有消息都是有序的,那么最好只用一个分区。图5-2所示的主题有4个分区,消息被追加写入每个分区的尾部。Kafka通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供更强大的性能。
图5-2 包含多个分区的主题
3. 生产者和消费者
Kafka的客户端就是Kafka系统的用户,它们被分为两种基本类型:生产者和消费者。除此之外,还有其他两个客户端API——用于数据集成的Kafka Connect API和用于流式处理的Kafka Streams。这些客户端API使用生产者和消费者作为内部组件,提供了高级的功能。
生产者(producer)创建消息。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均匀分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个哈希值,并将其映射到指定分区。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同业务规则将消息映射到分区。
消费者(consumer)读取数据。消费者订阅一个或多个主题,并按消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取的消息。偏移量(offset)是另一种元数据,它是一个不断递增的整数值,在消息创建时,Kafka会把它添加到消息里。在给定分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在Zookeeper或Kafka中,如果消费者关闭或重启,它的读取状态不会丢失。
消费者是消费者组(consumer group)的一部分,也就是说,会有一个或多个消费者共同读取一个主题。组保证每个分区只能被同组中的一个消费者使用。图5-3所示的组中,有3个消费者同时读取一个主题。其中两个消费者各自读取一个分区,另一个消费者读取其他两个分区。消费者和分区之间的映射通常被称为消费者对分区的所有权关系(ownership)。
通过这种方式,消费者可以消费包含大量消息的主题。而且如果一个消费者失效,组里的其他消费者可以接管失效消费者的工作。
图5-3 消费者组从主题读取消息
4. broker和集群
一个独立的Kafka服务器被称为broker。broker接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件机器性能特征,单个broker可以轻松处理数千个分区以及每秒百万级的消息量。
broker是集群的组成部分,每个集群都有一个broker同时充当了集群控制器(controller)的角色,它被自动从集群的活跃成员中选举出来,负责将分区分配给broker和监控broker等管理工作。在集群中,一个分区从属于一个broker,该broker被称为分区的首领(leader)。一个分区可以分配给多个broker,这个时候发生分区复制,如图5-4所示。这种复制机制为分区提供了消息冗余,如果有一个broker失效,其他broker可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。
图5-4 集群里的分区复制
消息保存期限(retention)是Kafka的一个重要特性。Kafka broker默认的消息保留策略是这样的:要么保留一段时间(比如7天),要么保留到消息到达一定大小的字节数(比如1GB)。当消息数量达到这些上限时,旧消息就会过期并被删除,所以在任何时刻,可用消息的总量都不会超过配置参数所指定的大小。主题可以配置自己的保留策略,能将消息保留到不再使用它们为止。例如,用于跟踪用户活动的数据可能需要保留几天,而应用程序的度量指标可能只需要保留几小时。可以通过配置把主题当做紧凑型日志(log compacted),只有最后一个带有特定键的消息会被保留下来。这种情况对于变更日志类型的数据比较适用,因为人们只关心最后时刻发生的那个变更。
5.3.2 Kafka消费者与分区
通常消息的生成速度比消费速度快,显然此时有必要对消费者进行横向扩展。就像多个生产者可以向相同的主题写入消息一样,我们也可以使用多个消费者从同一主题读取消息,对消息进行分流。
Kafka消费者从属于消费者组,一个组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。假设主题T1有4个分区,我们创建了消费者C1,它是组G1里唯一的消费者,我们用它订阅主题T1。消费者C1将收到主题T1全部4个分区的消息,如图5-5所示。
图5-5 1个消费者接收4个分区的消息
如果在组G1里新增一个消费者C2,那么每个消费者将分别从两个分区接收消息。我们假设消费者C1接收分区0和分区2的消息,消费者C2接收分区1和分区3的消息,如图5-6所示。
图5-6 2个消费者接收4个分区的消息
如果组G1有4个消费者,那么每个消费者可以分配到一个分区,如图5-7所示。
图5-7 4个消费者接收4个分区的消息
如果我们往组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收任何消息,如图5-8所示。
图5-8 5个消费者接收4个分区的消息
往群组里增加消费者是横向扩展消费能力的主要方式。Kafka消费者经常会做一些高延迟的操作,比如把数据写到数据库或HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向扩展的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者数量超过主题分区的数量,多余的消费者只会被闲置。
除了通过增加消费者来横向扩展单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。实际上,Kafka设计的主要目标之一,就是要让主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费者组,就可以让它们获取到主题的所有消息。横向扩展Kafka消费者或消费者组并不会对性能造成负面影响。
在上面的例子里,如果新增一个只包含一个消费者的组G2,那么这个消费者将从主题T1上接收所有消息,与组G1之间互不影响。组G2可以增加更多的消费者,每个消费者可以消费若干个分区,就像组G1那样,如图5-9所示。总的来说,组G2还是会接收所有消息,不管有没有其他组存在。
简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者组,然后往组里添加消费者来扩展读取能力和扩展能力,组里的每个消费者只处理一部分消息。
图5-9 两个消费者组对应一个主题
5.4 选择主题分区数
5.4.1 使用单分区
上一节提到,Kafka只能保证单个分区中消息的顺序,因此如果要求与数据库保持强一致性,最好只使用一个分区。那么,单分区的吞吐量能否满足负载需求呢?下面就在现有环境上做一个测试,以得出有根据的量化的结论。
1. 测量MySQL binlog日志量
测试方法为使用tpcc-mysql工具,执行一段时间的压测,然后查看这段时间产生的binlog文件大小,得出binlog吞吐量。TPC-C是专门针对联机交易处理系统(OLTP系统)的规范,而tpcc-mysql则是percona公司基于TPC-C衍生出来的产品,专用于MySQL基准测试,下载地址为https://github.com/Percona-Lab/tpcc-mysql。关于tpcc-mysql的安装和使用,参见“测试规划”。
(1)从库重置binlog
reset master;
show master status;
初始binlog文件名和偏移量分别是mysql-bin.000001和120。
(2)主库执行tpcc测试
# 10仓库,32并发线程,预热10秒,执行300秒
~/tpcc-mysql-master/tpcc_start -h172.16.1.126 -d tpcc_test -u root -p "123456" -w 10 -c 32 -r 10 -l 300
得到的每分钟事务数为:5543.600 TpmC
(3)压测执行结束后,在从库查询binlog日志量
show binary logs;
此时binlog文件名和偏移量分别是mysql-bin.000001和406396209。预热10秒,执行300秒,binlog产生速度为:(406396209-120)/1024/1024/310 ≈ 1.25MB/S。
2. 测量kafka单分区生产者吞吐量
(1)创建topic
# 创建topic
kafka-topics.sh --create --topic test --bootstrap-server 172.16.1.124:9092 --partitions 1 --replication-factor 3
# 查看topic
kafka-topics.sh --describe --topic test --bootstrap-server 172.16.1.124:9092
创建了一个单分区三副本的topic:
Topic: test Partition: 0 Leader: 339 Replicas: 339,330,340 Isr: 339,330,340
(2)执行测试
kafka-producer-perf-test.sh --topic test --num-records 500000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=172.16.1.124:9092 acks=1
kafka-producer-perf-test.sh是Kafka提供的生产者性能测试命令行工具,这里所使用的选项说明:
- num-records:指定发送的消息总数。
- record-size:指定每条消息的字节数,这里假设约为一个binlog event的大小。在MySQL中可用show binlog events命令查看每个event的大小。
- throughput指定每秒发送的消息数,-1为不限制。
- acks:指定生产者的应答方式,有效值为0、1、all。0表示生产者在成功写入消息之前不会等待任何来自服务器的响应,吞吐量最高,但最可能丢失消息。1表示只要首领节点收到消息,生产者就会收到一个来自服务器的成功响应。all表示只有所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应,最安全但延迟最高。
测试结果为:
500000 records sent, 10989.010989 records/sec (21.46 MB/sec), 1267.54 ms avg latency, 1714.00 ms max latency, 1388 ms 50th, 1475 ms 95th, 1496 ms 99th, 1693 ms 99.9th.
可以看到单分区平均吞吐量约21.46 MB/S,平均每秒发送10989条2KB的消息。两相比较,Kafka单分区生产者的消息吞吐量大约是压测binlog吞吐量的17倍。实际生产环境的硬件配置会比本实验环境高得多,单分区吞吐量通常可达100 MB/S。通过这个粗略测试得出的结论是单分区可以承载一般的生产数据库负载。
3. 测量kafka单分区消费者吞吐量
单分区只能有一个消费者(一个消费组中),但可以利用多个线程提高消费性能。
kafka-consumer-perf-test.sh --broker-list 172.16.1.124:9092 --topic test --messages 500000 --threads 1
--threads指定消费线程数,1、3、6、12时的测试结果如下:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
# 1线程
2021-12-09 10:57:19:198, 2021-12-09 10:57:28:921, 976.6543, 100.4478, 500047, 51429.2914, 3034, 6689, 146.0090, 74756.6153
# 3线程
2021-12-09 10:57:52:134, 2021-12-09 10:58:00:280, 976.6543, 119.8937, 500047, 61385.5880, 3039, 5107, 191.2384, 97914.0396
# 6线程
2021-12-09 10:58:58:345, 2021-12-09 10:59:06:495, 976.6543, 119.8349, 500047, 61355.4601, 3031, 5119, 190.7901, 97684.5087
# 12线程
2021-12-09 10:59:16:028, 2021-12-09 10:59:24:093, 976.6543, 121.0979, 500047, 62002.1079, 3031, 5034, 194.0116, 99333.9293
5.4.2 如何选定分区数量
严格说只要涉及多分区,一定会有消费顺序问题。在非强一致性场景中,可以通过选择表的主键作为分区键,以适当避免消费乱序带来的数据一致性问题,同时利用多分区保持Kafka的扩展性。在选择分区数量时,需要考虑如下几个因素。
- 主题需要达到多大的吞吐量?例如是每秒写入100KB还是1GB?
- 从单个分区读取数据的最大吞吐量是多少?每个分区一般都会有一个消费者,如果知道消费者写入数据库的速度不会超过每秒50MB,那么从一个分区读取数据的吞吐量也不需要超过每秒50MB。
- 可以通过类似的方法估算生产者向单个分区写入数据的吞吐量,不过生产者的速度一般比消费者快得多,所以最好为生产者多估算一些吞吐量。
- 每个broker包含的分区个数、可用的磁盘空间和网络带宽。
- 如果消息是按不同键写入分区的,那么为已有主题新增分区会很困难。
- 单个broker对分区个数是有限制的,因为分区越多,占用内存越多,完成首领选举需要的时间也越长。
如果估算出主题的吞吐量和消费者吞吐量,可以用主题吞吐量除以消费者吞吐量算出分区个数。如果不知道这些信息,根据经验,把分区大小限制在25GB以内可以得到比较理想的效果。
5.5 maxwell + Kafka + bireme
本节介绍的方法是采用 maxwell + Kafka + bireme,将MySQL数据实时同步至Greenplum。maxwell实时解析MySQL的binlog,并将输出的JSON格式数据发送到Kafka,Kafka在此方案中主要用于消息中转,bireme负责读取Kafka的消息,并应用于Greenplum数据库以增量同步数据。方法实施的主要流程为如下三步:
- 搭建Kafka服务。
- 搭建maxwell服务,修改配置使其能够连接MySQL并能向Kafka写入数据。
- 搭建bireme服务,修改配置使其能读取Kafka的数据并能向Greenplum写入数据。
5.5.1 总体架构
本方案的总体架构如图5-10所示。
图5-10 maxwell + Kafka + bireme 架构
图中的maswell从MySQL复制的从库中级联获取binlog,这样做的原因将在5.5.4小节“实时CDC”中详细说明。maxwell是一个能实时读取MySQL二进制日志binlog,并生成JSON 格式的消息,作为生产者发送给Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其他平台的应用程序,其中Kafka是maxwell支持最完善的一个消息系统。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。maswell在GitHub上具有较高的活跃度,官网地址为地址为https://github.com/zendesk/maxwell。
maxwell主要提供了下列功能:
- 支持 SELECT * FROM table 方式进行全量数据初始化。
- 支持GTID,当MySQL发生failover后,自动恢复binlog位置。
- 可以对数据进行分区,解决数据倾斜问题,发送到Kafka的数据支持database、table、column等级别的数据分区。
- 工作方式是伪装为MySQL Slave,在主库上创建dump线程连接,接收binlog事件,然后根据schemas信息拼装成JSON字符串,可以接受ddl、xid、row等各种事件。
bireme是一个Greenplum数据仓库的增量同步工具,目前支持MySQL、PostgreSQL和MongoDB数据源,maxwell + Kafka 是一种支持的数据源类型。bireme作为Kafka的消费者,采用 DELETE + COPY 的方式,将数据源的修改记录同步到Greenplum,相较于INSERT、UPDATE、DELETE方式,COPY方式速度更快,性能更优。bireme的主要特性是采用小批量加载方式(默认加载延迟时间为10秒钟)提升数据同步的性能,但要求所有同步表在源和目标数据库中都必须有主键。bireme官网地址为https://github.com/HashDataInc/bireme/。
以上是关于Greenplum 实时数据仓库实践——实时数据同步的主要内容,如果未能解决你的问题,请参考以下文章
Greenplum 实时数据仓库实践——Greenplum与数据仓库