MySQL系列:基于binlog的增量订阅与消费

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MySQL系列:基于binlog的增量订阅与消费相关的知识,希望对你有一定的参考价值。

  在一些业务场景中,像在数据分析中我们有时候需要捕获数据变化(CDC);在数据审计中,我们也往往需要知道数据从这个点到另一个点的变化;同样在实时分析中,我们有时候需要看到某个值得实时变化等。

要解决以上问题,我们可以实时解析mysql binlog日志,下面两个工具可以很好的处理这个问题:

1. canal(阿里巴巴开源项目,纯java开发)

2. python-mysql-replication(python开发)

 

使用场景:

1. MySQL到NoSQL的数据同步

2. MySQL到搜索引擎的复制

3. 当数据发生变化是清除数据缓存

4. 数据库审计

5. 实时数据分析

 

本文主要说说python-mysql-replication。

 

介绍

python-mysql-replication是基于MySQL复制原理实现的,把自己伪装成一个slave不断的从MySQL数据库获取binlog并解析。

 当前版本(0.15)环境支持:

1. MySQL 5.5/5.6/5.7

2. Python >=2.6

3. Python 3.3/3.4/3.5/3.6(3.2不支持)

 

MySQL复制实现:

 技术分享图片

 

python-mysql-replication实现:

 技术分享图片

配置安装

 安装软件

[[email protected]1 ~]# pip install mysql-replication

MySQL授权

1 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO replicator@192.168.3.% IDENTIFIED BY 123456;

Binlog要满足如下条件

 1 MySQL>[email protected](none) 09:53:38>show variables like log_bin;
 2 +---------------+-------+
 3 | Variable_name | Value     |
 4 +---------------+-------+
 5 | log_bin       | ON       |
 6 +---------------+-------+
 7 1 row in set (0.01 sec)
 8 
 9 MySQL>show variables like binlog_format;
10 +---------------+-------+
11 | Variable_name | Value     |
12 +---------------+-------+
13 | binlog_format | ROW      |
14 +---------------+-------+
15 1 row in set (0.00 sec)
16 
17 MySQL>show variables like binlog_row_image;
18 +------------------+-------+
19 | Variable_name    | Value     |
20 +------------------+-------+
21 | binlog_row_image | FULL      |
22 +------------------+-------+
23 1 row in set (0.00 sec)

 

示例代码:

 1 [[email protected] script]# cat mysql-replication.py
 2 #!/usr/bin/env python
 3 # -*- coding: utf-8 -*-
 4 
 5 from pymysqlreplication import BinLogStreamReader
 6 from pymysqlreplication.row_event import (
 7     DeleteRowsEvent,
 8     UpdateRowsEvent,
 9     WriteRowsEvent,
10 )
11 import sys
12 import json
13 
14 def main():
15     mysql_settings = {host: 192.168.3.130,
16                       port: 3306, user: replicator, passwd: 123456}
17     stream = BinLogStreamReader(
18         connection_settings=mysql_settings,
19         server_id=101,
20         blocking=True,
21         only_schemas=[zow],
22         only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
23         resume_stream=True,
24         log_file=mysql-bin.000013, log_pos=6197)
25 
26     for binlogevent in stream:
27         for row in binlogevent.rows:
28             event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}
29             if isinstance(binlogevent, DeleteRowsEvent):
30                 event["action"] = "delete"
31                 event["values"] = dict(row["values"].items())
33                 event = dict(event.items())
34             elif isinstance(binlogevent, UpdateRowsEvent):
35                 event["action"] = "update"
36                 event["before_values"] = dict(row["before_values"].items())
37                 event["after_values"] = dict(row["after_values"].items())
38                 event = dict(event.items())
39             elif isinstance(binlogevent, WriteRowsEvent):
40                 event["action"] = "insert"
41                 event["values"] = dict(row["values"].items())
42                 event = dict(event.items())
43             print json.dumps(event)
44             sys.stdout.flush()
45 
46     stream.close()
47 
48 
49 if __name__ == "__main__":
50     main()

 

运行结果:

1 [[email protected]1 script]# python mysql-replication.py 
2 {"action": "insert", "table": "t2", "log_pos": 4622, "values": {"tname": "hh", "id": 7}, "schema": "zow"}
3 {"log_pos": 4904, "after_values": {"tname": "ii", "id": 7}, "action": "update", "table": "t2", "before_values": {"tname": "hh", "id": 7}, "schema": "zow"}
4 {"log_pos": 4904, "after_values": {"tname": "ii", "id": 7}, "action": "update", "table": "t2", "before_values": {"tname": "hh", "id": 7}, "schema": "zow"}
5 {"action": "delete", "table": "t2", "log_pos": 5169, "values": {"tname": "ii", "id": 7}, "schema": "zow"}
6 {"action": "delete", "table": "t2", "log_pos": 5169, "values": {"tname": "ii", "id": 7}, "schema": "zow"}

 

更多例子见:https://github.com/noplay/python-mysql-replication/tree/master/examples

 

以上是关于MySQL系列:基于binlog的增量订阅与消费的主要内容,如果未能解决你的问题,请参考以下文章

数据库中间件(下)

SpringBoot系列之集成阿里canal监听MySQL Binlog

谈谈对Canal(增量数据订阅与消费)的理解

你们公司怎么处理 MySQL 的 Binlog 日志?

监听MySQL的binlog日志工具分析:Canal

头条二面:你们公司怎么处理 MySQL 的 Binlog 日志?