基于 MySQL Binlog 的 Elasticsearch 数据同步实践

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于 MySQL Binlog 的 Elasticsearch 数据同步实践相关的知识,希望对你有一定的参考价值。

参考技术A

一、背景

随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 mysql 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。

使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。

二、现有方法及问题

对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张MySQL 表中,这张中间表对应了业务需要的 Elasticsearch 索引,每一列对应索引中的一个Mapping 字段。通过脚本以 Crontab 的方式,读取 MySQL 中间表中 UTime 大于上一次读取时间的所有数据,即该段时间内的增量,写入Elasticsearch。

所以,一旦业务逻辑中有相应字段的数据变更,需要同时顾及 MySQL 中间表的变更;如果需要 Elasticsearch 中的数据即时性较高,还需要同时写入 Elasticsearch。

随着业务数据越来越多,MySQL 中间表的数据量越来越大。当需要在 Elasticsearch 的索引中新增 Mapping 字段时,相应的 MySQL 中间表也需要新增列,在数据量庞大的表中,扩展列的耗时是难以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段随着业务发展增多,需要由业务方增加相应的写入 MySQL 中间表方法,这也带来一部分开发成本。

三、方案设计

1. 整体思路

现有的一些开源数据同步工具,如阿里的 DataX 等,主要是基于查询来获取数据源,这会存在如何确定增量(比如使用utime字段解决等)和轮询频率的问题,而我们一些业务场景对于数据同步的实时性要求比较高。为了解决上述问题,我们提出了一种基于 MySQL Binlog 来进行 MySQL 数据同步到 Elasticsearch 的思路。Binlog 是 MySQL 通过 Replication 协议用来做主从数据同步的数据,所以它有我们需要写入 Elasticsearch 的数据,并符合对数据同步时效性的要求。

使用 Binlog 数据同步 Elasticsearch,业务方就可以专注于业务逻辑对 MySQL 的操作,不用再关心数据向 Elasticsearch 同步的问题,减少了不必要的同步代码,避免了扩展中间表列的长耗时问题。

经过调研后,我们采用开源项目 go-mysql-elasticsearch 实现数据同步,并针对马蜂窝技术栈和实际的业务环境进行了一些定制化开发。

2. 数据同步正确性保证

公司的所有表的 Binlog 数据属于机密数据,不能直接获取,为了满足各业务线的使用需求,采用接入 Kafka 的形式提供给使用方,并且需要使用方申请相应的 Binlog 数据使用权限。获取使用权限后,使用方以 Consumer Group 的形式读取。

这种方式保证了 Binglog 数据的安全性,但是对保证数据同步的正确性带来了挑战。因此我们设计了一些机制,来保证数据源的获取有序、完整。

1). 顺序性

通过 Kafka 获取 Binlog 数据,首先需要保证获取数据的顺序性。严格说,Kafka 是无法保证全局消息有序的,只能局部有序,所以无法保证所有 Binlog 数据都可以有序到达 Consumer。

但是每个 Partition 上的数据是有序的。为了可以按顺序拿到每一行 MySQL 记录的 Binglog,我们把每条 Binlog 按照其 Primary Key,Hash 到各个 Partition 上,保证同一条 MySQL 记录的所有 Binlog 数据都发送到同一个 Partition。

如果是多 Consumer 的情况,一个 Partition 只会分配给一个 Consumer,同样可以保证 Partition 内的数据可以有序的 Update 到 Elasticsearch 中。

2). 完整性

考虑到同步程序可能面临各种正常或异常的退出,以及 Consumer 数量变化时的 Rebalance,我们需要保证在任何情况下不能丢失 Binlog 数据。

利用 Kafka 的 Offset 机制,在确认一条 Message 数据成功写入 Elasticsearch 后,才 Commit 该条 Message 的 Offset,这样就保证了数据的完整性。而对于数据同步的使用场景,在保证了数据顺序性和完整性的情况下,重复消费是不会有影响的。

四、技术实现

1. 功能模块

配置解析模块

负责解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日志记录方式配置、MySQL 库表及字段与 Elasticsearch 的 Index 和 Mapping 对应关系配置等。

规则模块

规则模块决定了一条 Binlog 数据应该写入到哪个 Elasticsearch 索引、文档_id 对应的 MySQL 字段、Binlog 中的各个 MySQL 字段与索引 Mapping 的对应关系和写入类型等。

在本地化过程中,根据我们的业务场景,增加了对 MySQL 表各字段的 where 条件判断,来过滤掉不需要的 Binlog 数据。

Kafka 相关模块

该模块负责连接 Kafka 集群,获取 Binlog 数据。

Binlog 数据解析模块

原项目中的 Binlog 数据解析针对的是原始的 Binlog 数据,包含了解析 Replication 协议的实现。在我们的使用场景中,Binlog 数据已经是由 canal 解析成的 json 字符串,所以对该模块的功能进行了简化。

binlog json字符串示例

上面是一个简化的 binlog json 字符串,通过该条 binlog 的 database 和 table 可以命中一条配置规则,根据该配置规则,把 Data 中的 key-value 构造成一个与对应 Elasticsearch 索引相匹配的 key-value map,同时包括一些数据类型的转换:

Elasticsearch相关模块

Binlog 数据解析模块生成的 key-value map,由该模块拼装成请求_bulk 接口的 update payload,写入 Elasticsearch。考虑到 MySQL 频繁更新时对 Elasticsearch 的写入压力,key-value map 会暂存到一个 slice 中,每 200ms 或 slice 长度达到一定长度时(可以通过配置调整),才会调用 Elasticsearch 的_bulk 接口,写入数据。

2. 定制化开发

1). 适应业务需求

upsert

业务中使用的索引数据可能是来自多个不同的表,同一个文档的数据来自不同表的时候,先到的数据是一条 index,后到的数据是一条 update,在我们无法控制先后顺序时,需要实现 upsert 功能。在_bulk 参数中加入

Filter

实际业务场景中,可能业务需要的数据只是某张表中的部分数据,比如用 type 字段标识该条数据来源,只需要把 type=1或2的数据同步到 Elasticsearch 中。我们扩展了规则配置,可以支持对 Binlog 指定字段的过滤需求,类似:

2)快速增量

数据同步一般分为全量和增量。接入一个业务时,首先需要把业务现有的 历史 MySQL 数据导入到 Elasticsearch 中,这部分为全量同步。在全量同步过程中以及后续增加的数据为增量数据。

在全量数据同步完成后,如果从最旧开始消费 Kafka,队列数据量很大的情况下,需要很长时间增量数据才能追上当前进度。为了更快的拿到所需的增量 Binlog,在 Consumer Group 消费 Kafka 之前,先获取各个 Topic 的 Partition 在指定时间的 offset 值,并 commit 这些 offset,这样在 Consumer Group 连接 Kafka 集群时,会从刚才提交的 offset 开始消费,可以立即拿到所需的增量 Binlog。

3). 微服务和配置中心

项目使用马蜂窝微服务部署,为新接入业务提供了快速上线支持,并且在业务 Binlog 数据突增时可以方便快速的扩容 Consumer。

马蜂窝配置中心支持了各个接入业务的配置管理,相比于开源项目中的 toml 格式配置文件,使用配置中心可以更方便的管理不同业务不同环境的配置。

五、日志与监控

从上图中可以看出,订单各个表的数据同步延时平均在 1s 左右。把延时数据接入 ElastAlert,在延时数据过多时发送报警通知。

另一个监控指标是心跳检测,单独建立一张独立于业务的表,crontab 脚本每分钟修改一次该表,同时检查上一次修改是否同步到了指定的索引,如果没有,则发送报警通知。该心跳检测,监控了整个流程上的 Kafka、微服务和 ES,任何一个会导致数据不同步的环节出问题,都会第一个接到通知。

六、结语

目前接入的最重要业务方是电商的订单索引,数据同步延时稳定在 1s 左右。这次的开源项目本地化实践,希望能为一些有 Elasticsearch 数据同步需求的业务场景提供帮助。

如何查看MySQL的binlog数据

binlog,即二进制日志,它记录了数据库上的所有改变.
改变数据库的SQL语句执行结束时,将在binlog的末尾写入一条记录,同时通知语句解析器,语句执行完毕.
binlog格式
基于语句,无法保证所有语句都在从库执行成功,比如update ... limit 1;
基于行,将每一次改动记为binlog中的一行.在执行一个特别复杂的update或者delete操作时,基于行的格式会有优势.
参考技术A 当启动Binlog后,事务会产生Binlog Event,这些Event被看做事务数据的一部分。因此要保证事务的Binlog Event和InnoDB引擎中的数据的一致性。所以带Binlog的CrashSafe要求MySQL宕机重启后能够保证:

- 所有已经提交的事务的数据仍然存在。

- 所有没有提交的事务的数据自动回滚。

- 所有已经提交了的事务的Binlog Event也仍然存在。

- 所有没有提交事务没有记录Binlog Event。

这些要求很好理解,如果重启后数据还在,但是Binlog Event没有了,就没办法复制到其他节点上了。如果重启后,数据没了,但是Binlog Event还在,那么不存在的数据就会被复制到其他节点上,从而导致主从的不一致。

为了保证带Binlog的CrashSafe,MySQL内部使用的两阶段提交(Two Phase Commit)。

2 - MySQL的Two Phase Commit(2PC)

在开启Binlog后,MySQL内部会自动将普通事务当做一个XA事务来处理:
- 自动为每个事务分配一个唯一的ID
- COMMIT会被自动的分成Prepare和Commit两个阶段。
- Binlog会被当做事务协调者(Transaction Coordinator),Binlog Event会被当做协调者日志。
想了解2PC,可以参考文档:【https://en.wikipedia.org/wiki/Two-phase_commit_protocol。】

- 分布式事务ID(XID)

使用2PC时,MySQL会自动的为每一个事务分配一个ID,叫XID。XID是唯一的,每个事务的XID都不相同。XID会分别被Binlog和InnoDB记入日志中,供恢复时使用。MySQ内部的XID由三部分组成:

- 前缀部分

前缀部分是字符串"MySQLXid"

- Server ID部分

当前MySQL的server_id
- query_id部分

为了保证XID的的唯一性,数字部分使用了query_id。MySQL内部会自动的为每一个语句分配一个query_id,全局唯一。

参考代码:sql/xa。h的struct xid_t结构。

- 事务的协调者Binlog

Binlog在2PC中充当了事务的协调者(Transaction Coordinator)。由Binlog来通知InnoDB引擎来执行prepare,commit或者rollback的步骤。事务提交的整个过程如下:

1. 协调者准备阶段(Prepare Phase)

告诉引擎做Prepare,InnoDB更改事务状态,并将Redo Log刷入磁盘。

2. 协调者提交阶段(Commit Phase)

2.1 记录协调者日志,即Binlog日志。

2.2 告诉引擎做commit。
注意:记录Binlog是在InnoDB引擎Prepare(即Redo Log写入磁盘)之后,这点至关重要。

在MySQ的代码中将协调者叫做tc_log。在MySQL启动时,tc_log将被初始化为mysql_bin_log对象。参考sql/binlog.cc中的init_server_components():
if (opt_bin_log) tc_log= &mysql_bin_log;

而在事务提交时,会依次执行:
tc_log->prepare();
tc_log->commit();
参考代码:sql/binlog.cc中的ha_commit_trans()。当mysql_bin_log是tc_log时,prepare和commit的代码在sql/binlog.cc中:

MYSQL_BIN_LOG::prepare();
MYSQL_BIN_LOG::commit();

-协调者日志Xid_log_event
作为协调者,Binlog需要将事务的XID记入日志,供恢复时使用。Xid_log_event有以下几个特点:
- 仅记录query_id
因为前缀部分不变,server_id已经记录在Event Header中,Xid_log_event中只记录query_id部分。
- 标志事务的结束

在Binlog中相当于一个事务的COMMIT语句。

一个事务在Binlog中看起来时这样的:
Query_log_event("BEGIN");DML产生的events; Xid_log_event;

- DDL没有BEGIN,也没有Xid_log_event 。
- 仅InnoDB的DML会产生Xid_log_event
因为MyISAM不支持2PC所以不能用Xid_log_event ,但会有COMMIT Event。
Query_log_event("BEGIN");DML产生的events;Query_log_event("COMMIT");

问题:Query_log_event("COMMIT")和Xid_log_event 有不同的影响吗?

- Xid_log_event 中的Xid可以帮助master实现CrashSafe。
- Slave的CrashSafe不依赖Xid_log_event
事务在Slave上重做时,会重新产生XID。所以Slave服务器的CrashSafe并不依赖于Xid_log_event 。Xid_log_event 和Query_log_event("COMMIT"),只是作为事务的结尾,告诉Slave Applier去提交这个事务。因此二者在Slave上的影响是一样的。

3 - 恢复(Recovery)
这个机制是如何保证MySQL的CrashSafe的呢,我们来分析一下。这里我们假设用户设置了以下参数来保证可靠性:

- 恢复前事务的状态
在恢复开始前事务有以下几种状态:
- InnoDB中已经提交
根据前面2PC的过程,可知Binlog中也一定记录了该事务的的Events。所以这种事务是一致的不需要处理。
- InnoDB中是prepared状态,Binlog中有该事务的Events。
需要通知InnoDB提交这些事务。
- InnoDB中是prepared状态,Binlog中没有该事务的Events。
因为Binlog还没记录,需要通知InnoDB回滚这些事务。
- Before InnoDB Prepare
事务可能还没执行完,因此InnoDB中的状态还没有prepare。根据2PC的过程,Binlog中也没有该事务的events。 需要通知InnoDB回滚这些事务。

- 恢复过程
从上面的事务状态可以看出:恢复时事务要提交还是回滚,是由Binlog来决定的。
- 事务的Xid_log_event 存在,就要提交。
- 事务的Xid_log_event 不存在,就要回滚。

恢复的过程非常简单:
- 从Binlog中读出所有的Xid_log_event
- 告诉InnoDB提交这些XID的事务
- InnoDB回滚其它的事务

以上是关于基于 MySQL Binlog 的 Elasticsearch 数据同步实践的主要内容,如果未能解决你的问题,请参考以下文章

大厂如何基于binlog解决多机房同步mysql数据?

如何查看MySQL的binlog数据

(4.11)mysql备份还原——mysql闪回技术(基于binlog)

MySQL的binlog数据如何查看

MySQL 基于语句的复制:binlog 是不是包含在 master 上执行的确切查询?

mysql 对应 binlog 查看