MySQLmom程序增量同步MySQL数据到ES
Posted 运维之美
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MySQLmom程序增量同步MySQL数据到ES相关的知识,希望对你有一定的参考价值。
说明: 演示mysqlmom增量的同步数据到ES环境中redis版本3.2.8,ES-5.0.0,mysql5.7.22
分析 binlog 的增量同步的要求:
1.确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;启动增量同步前。首先对所要增量同步的表进行一次全量同步,然后才是开启增量同步
2.需要在本地安装redis服务
/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf
[root@tidb05 conf]# /usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf
[root@tidb05 conf]# ss -lntup|grep redis
tcp LISTEN 0 8192 127.0.0.1:10201 *:* users:(("redis-server",pid=10597,fd=4))
[root@tidb05 conf]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a \'YHu222tuEq\' info Memory
# Memory
used_memory:6179328
used_memory_human:5.89M
used_memory_rss:5095424
used_memory_rss_human:4.86M
used_memory_peak:6179328
used_memory_peak_human:5.89M
total_system_memory:16656146432
total_system_memory_human:15.51G
used_memory_lua:37888
used_memory_lua_human:37.00K
maxmemory:1000000000
maxmemory_human:953.67M
maxmemory_policy:noeviction
mem_fragmentation_ratio:0.82
mem_allocator:jemalloc-4.0.3
3.新建配置文件,只支持"insert", "update"的增量同步:
mom new test_mom/binlog_config.py -t binlog --force
[root@tidb05 mysqlsmom]# mom new test_mom/binlog_config.py -t binlog --force
new config at /data1/soft/mysqlsmom/test_mom/binlog_config.py
4.编辑 test_mom/binlog_config.py,按注释提示修改配置:
[root@tidb05 conf]# cat /data1/soft/mysqlsmom/test_mom/binlog_config.py
# coding=utf-8
STREAM = "BINLOG" # "BINLOG" or "INIT"
SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同
SLAVE_UUID = __name__
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
###BULK_SIZE = 10000 此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败
BINLOG_CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# redis存储上次同步位置等信息
REDIS = {
"host": "127.0.0.1",
"port": 10201,
"db": 0,
"password": "YHu222tuEq", # 不需要密码则注释或删掉该行
}
# 配置es节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
{
"stream": {
"database": "stdb01",
"table": "test01"
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
#{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es;注释掉该行则同步全部字段的值到es
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "upsert",
"index": "test01_index",
"type": "test01",
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
5.运行
该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;同步旧数据请看全量同步MySql数据到es;
6.验证测试:
验证MySQL测试表stdb01.test01已经存在的数据:
[root@tidb04 ~]# mysql -e "select * from stdb01.test01;"
+----+----------+------------+---------------------+
| id | username | password | create_time |
+----+----------+------------+---------------------+
| 1 | tomcat | xiaohuahua | 2021-07-11 10:57:57 |
| 2 | java | 123456 | 2021-07-11 10:57:57 |
| 3 | lua | ssd123456 | 2021-07-11 10:57:57 |
| 4 | php | seurw456 | 2021-07-11 10:57:57 |
| 5 | python | seueurw456 | 2021-07-11 10:57:58 |
| 6 | java | 123456 | 2021-07-11 16:59:47 |
| 7 | java | 123456 | 2021-07-11 16:59:51 |
| 8 | java | 123456 | 2021-07-11 16:59:58 |
| 9 | tomcat | ceshi001 | 2021-07-28 00:24:41 |
| 10 | c++ | 558996 | 2021-07-28 00:33:01 |
| 11 | c++ | 558996 | 2021-07-11 16:59:58 |
| 12 | c++ | 558996 | 2021-07-11 16:59:58 |
| 13 | java | 596 | 2021-07-28 00:41:14 |
| 14 | java | 7890 | 2021-07-28 00:41:34 |
| 15 | php | 7890 | 2021-07-28 00:41:51 |
| 16 | python | 654321 | 2021-07-28 00:42:08 |
+----+----------+------------+---------------------+
启动mysqlmom服务:
[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py
binglog文件和pos位置点:
[root@tidb05 ~]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a \'YHu222tuEq\'
127.0.0.1:10201> info keyspace
# Keyspace
db0:keys=2,expires=0,avg_ttl=0
127.0.0.1:10201> keys *
1) "binlog_config_log_pos"
2) "binlog_config_log_file"
127.0.0.1:10201> get binlog_config_log_pos
"2268"
127.0.0.1:10201> get binlog_config_log_file
"mysql-bin.000013"
MySQL新增和删除记录:
root@tidb04 23:59: [stdb01]> INSERT INTO test01(username,password,create_time) values(\'go\', \'654\',now());
root@tidb04 00:16: [stdb01]> delete from test01 where id=17;
Query OK, 1 row affected (0.00 sec)
[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py
2573
2021-07-29 00:13:46,528 root INFO {"timestamp": "2021-07-29 00:13:46", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "insert", "table": "test01", "schema": "stdb01"}
2021-07-29 00:13:46,529 root INFO {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17, "_id": 17}
2870
2021-07-29 00:16:43,364 root INFO {"timestamp": "2021-07-29 00:16:43", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "delete", "table": "test01", "schema": "stdb01"}
**虽然记录了日志但是es里面没有任何值新增和删除的变化**
root@tidb04 00:19: [stdb01]> INSERT INTO test01(username,password,create_time) values(\'go\', \'654\',now());
Query OK, 1 row affected (0.00 sec)
root@tidb04 00:19: [stdb01]> INSERT INTO test01(username,password,create_time) values(\'C++\', \'654\',now());
Query OK, 1 row affected (0.01 sec)
3330
2021-07-29 00:19:47,088 root INFO {"timestamp": "2021-07-29 00:19:47", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1}, "action": "insert", "table": "test01", "schema": "stdb01"}
2021-07-29 00:19:47,088 root INFO {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}
3636
2021-07-29 00:20:42,729 root INFO {"timestamp": "2021-07-29 00:20:42", "host": "172.16.0.197", "values": {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2}, "action": "insert", "table": "test01", "schema": "stdb01"}
2021-07-29 00:20:42,729 root INFO {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}
**重复尝试了好多次,虽然在启动binlog增量同步数据时,有增删数据日志的输出,但是登录ES,始终没看到ES中数据的变化**
**存放在redis中的pos位置点一直没变话:**
127.0.0.1:10201> get binlog_config_log_file
"mysql-bin.000013"
127.0.0.1:10201> get binlog_config_log_pos
"2268"
后面发现问题原因:
主要原因是错误的把配置文件binlog_config.py 中BULK_SIZE 这个参数开启导致的。果断注销掉,重启binglog同步程序,终于可以看大有数据写入到ES了
7、全量同步stdb01.test01表数据到:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
2021-07-29 00:23:47,371 root INFO {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}
2021-07-29 00:23:47,371 root INFO {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}
2021-07-29 00:23:48,501 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:1.130s]
real 0m1.768s
user 0m0.463s
sys 0m0.059s
[es@tidb06 logs]$ curl \'http://172.16.0.247:9999/_cat/indices?v\'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open test01_index LMMynfXaThGktG_YEAO2QQ 5 1 2 0 10.1kb 10.1kb
[es@tidb06 logs]$ curl -s -XGET \'http://172.16.0.247:9999/_cat/indices/test01_index?v\'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open test01_index LMMynfXaThGktG_YEAO2QQ 5 1 2 0 10.1kb 10.1kb
8.再次启动binlog_config.py 进行增量同步:
mom run -c ./test_mom/binlog_config.py
写入2条sql:
root@tidb04 00:44: [stdb01]> INSERT INTO test01(username,password,create_time) values(\'php\', \'123\',now());
Query OK, 1 row affected (0.01 sec)
root@tidb04 00:44: [stdb01]> INSERT INTO test01(username,password,create_time) values(\'java\', \'321\',now());
Query OK, 1 row affected (0.01 sec)
root@tidb04 00:44: [stdb01]> select * from test01;
+----+----------+----------+---------------------+
| id | username | password | create_time |
+----+----------+----------+---------------------+
| 1 | go | 654 | 2021-07-29 00:19:47 |
| 2 | C++ | 654 | 2021-07-29 00:20:42 |
| 3 | php | 123 | 2021-07-29 00:44:39 |
| 4 | java | 321 | 2021-07-29 00:44:49 |
+----+----------+----------+---------------------+
4 rows in set (0.00 sec)
**存放在redis中的pos位置点开始变化了:**
127.0.0.1:10201> get binlog_config_log_file
"mysql-bin.000013"
127.0.0.1:10201> get binlog_config_log_pos
"3278"
**登录ES,看到insert数据也写入到ES了**
9、inert,update,delete的增量同步MySQL数据到ES的配置文件:
[root@tidb05 soft]# cat mysqlsmom/test_mom/binlog_config.py
# coding=utf-8
STREAM = "BINLOG" # "BINLOG" or "INIT"
SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同
SLAVE_UUID = __name__
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
#BULK_SIZE = 10000 此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败
BINLOG_CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# redis存储上次同步位置等信息
REDIS = {
"host": "127.0.0.1",
"port": 10201,
"db": 0,
"password": "YHu222tuEq", # 不需要密码则注释或删掉该行
}
# 配置es节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
{
"stream": {
"database": "stdb01",
"table": "test01"
},
"jobs": [
# 同步插入、更新es数据
{
"actions": ["insert", "update"],
"pipeline": [
#{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es;注释掉该行则同步全部字段的值到es
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "upsert",
"index": "test01_index",
"type": "test01",
"nodes": NODES
}
}
},
#delete 同步删除es数据
{
"actions": ["delete"],
"pipeline": [
#{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es;注释掉该行则同步全部字段的值到es
{"set_id": {"field": "id"}}
],
"dest": {
"es": {
"action": "delete",
"index": "test01_index",
"type": "test01",
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
关于MySQLmom增量同步指定MySQL表数据到ES简单介绍完毕,请继续关注博主,后面还会有更精彩的文章继续分享。
以上是关于MySQLmom程序增量同步MySQL数据到ES的主要内容,如果未能解决你的问题,请参考以下文章