MySQLmom程序增量同步MySQL数据到ES

Posted 运维之美

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MySQLmom程序增量同步MySQL数据到ES相关的知识,希望对你有一定的参考价值。

说明: 演示mysqlmom增量的同步数据到ES环境中redis版本3.2.8,ES-5.0.0,mysql5.7.22

分析 binlog 的增量同步的要求:

1.确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;启动增量同步前。首先对所要增量同步的表进行一次全量同步,然后才是开启增量同步

2.需要在本地安装redis服务


/usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf 
[root@tidb05 conf]#  /usr/local/redis/bin/redis-server /usr/local/redis/conf/redis.conf 
[root@tidb05 conf]# ss -lntup|grep redis
tcp    LISTEN     0      8192   127.0.0.1:10201                 *:*                   users:(("redis-server",pid=10597,fd=4))
[root@tidb05 conf]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a \'YHu222tuEq\' info Memory
# Memory
used_memory:6179328
used_memory_human:5.89M
used_memory_rss:5095424
used_memory_rss_human:4.86M
used_memory_peak:6179328
used_memory_peak_human:5.89M
total_system_memory:16656146432
total_system_memory_human:15.51G
used_memory_lua:37888
used_memory_lua_human:37.00K
maxmemory:1000000000
maxmemory_human:953.67M
maxmemory_policy:noeviction
mem_fragmentation_ratio:0.82
mem_allocator:jemalloc-4.0.3

3.新建配置文件,只支持"insert", "update"的增量同步:

mom new test_mom/binlog_config.py -t binlog --force
[root@tidb05 mysqlsmom]# mom new test_mom/binlog_config.py -t binlog --force
new config at /data1/soft/mysqlsmom/test_mom/binlog_config.py

4.编辑 test_mom/binlog_config.py,按注释提示修改配置:


[root@tidb05 conf]# cat   /data1/soft/mysqlsmom/test_mom/binlog_config.py
# coding=utf-8
STREAM = "BINLOG"  # "BINLOG" or "INIT"
SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同
SLAVE_UUID = __name__

# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
###BULK_SIZE = 10000  此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败

BINLOG_CONNECTION = {
    \'host\': \'172.16.0.197\',
    \'port\': 3306,
    \'user\': \'click_rep\',
    \'passwd\': \'jwtest123456\'
}
# redis存储上次同步位置等信息
REDIS = {
    "host": "127.0.0.1",
    "port": 10201,
    "db": 0,
    "password": "YHu222tuEq",  # 不需要密码则注释或删掉该行
}

# 配置es节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
    {
        "stream": {
            "database": "stdb01",
            "table": "test01"
        },
        "jobs": [
            {
                "actions": ["insert", "update"],
                "pipeline": [
                    #{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es;注释掉该行则同步全部字段的值到es
                    {"set_id": {"field": "id"}}
                ],
                "dest": {
                    "es": {
                        "action": "upsert",
                        "index": "test01_index",
                        "type": "test01",
                        "nodes": NODES
                    }
                }
            }
        ]
    }
]

# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"

5.运行

该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;
注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;同步旧数据请看全量同步MySql数据到es;

6.验证测试:


验证MySQL测试表stdb01.test01已经存在的数据:
[root@tidb04 ~]# mysql -e "select * from stdb01.test01;"
+----+----------+------------+---------------------+
| id | username | password   | create_time         |
+----+----------+------------+---------------------+
|  1 | tomcat   | xiaohuahua | 2021-07-11 10:57:57 |
|  2 | java     | 123456     | 2021-07-11 10:57:57 |
|  3 | lua      | ssd123456  | 2021-07-11 10:57:57 |
|  4 | php      | seurw456   | 2021-07-11 10:57:57 |
|  5 | python   | seueurw456 | 2021-07-11 10:57:58 |
|  6 | java     | 123456     | 2021-07-11 16:59:47 |
|  7 | java     | 123456     | 2021-07-11 16:59:51 |
|  8 | java     | 123456     | 2021-07-11 16:59:58 |
|  9 | tomcat   | ceshi001   | 2021-07-28 00:24:41 |
| 10 | c++      | 558996     | 2021-07-28 00:33:01 |
| 11 | c++      | 558996     | 2021-07-11 16:59:58 |
| 12 | c++      | 558996     | 2021-07-11 16:59:58 |
| 13 | java     | 596        | 2021-07-28 00:41:14 |
| 14 | java     | 7890       | 2021-07-28 00:41:34 |
| 15 | php      | 7890       | 2021-07-28 00:41:51 |
| 16 | python   | 654321     | 2021-07-28 00:42:08 |
+----+----------+------------+---------------------+

启动mysqlmom服务:

[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py 

binglog文件和pos位置点:

[root@tidb05 ~]# /usr/local/redis/bin/redis-cli -h 127.0.0.1 -p 10201 -a \'YHu222tuEq\' 
127.0.0.1:10201> info keyspace
# Keyspace
db0:keys=2,expires=0,avg_ttl=0
127.0.0.1:10201> keys *
1) "binlog_config_log_pos"
2) "binlog_config_log_file"
127.0.0.1:10201> get binlog_config_log_pos
"2268"
127.0.0.1:10201> get binlog_config_log_file
"mysql-bin.000013"

MySQL新增和删除记录:

root@tidb04 23:59:  [stdb01]>  INSERT INTO test01(username,password,create_time) values(\'go\', \'654\',now());
root@tidb04 00:16:  [stdb01]> delete from test01 where id=17;
Query OK, 1 row affected (0.00 sec)

[root@tidb05 mysqlsmom]# mom run -c ./test_mom/binlog_config.py 

2573
2021-07-29 00:13:46,528 root         INFO     {"timestamp": "2021-07-29 00:13:46", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "insert", "table": "test01", "schema": "stdb01"}
2021-07-29 00:13:46,529 root         INFO     {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17, "_id": 17}
2870
2021-07-29 00:16:43,364 root         INFO     {"timestamp": "2021-07-29 00:16:43", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:13:46", "id": 17}, "action": "delete", "table": "test01", "schema": "stdb01"}
**虽然记录了日志但是es里面没有任何值新增和删除的变化**

root@tidb04 00:19:  [stdb01]> INSERT INTO test01(username,password,create_time) values(\'go\', \'654\',now());
Query OK, 1 row affected (0.00 sec)

root@tidb04 00:19:  [stdb01]> INSERT INTO test01(username,password,create_time) values(\'C++\', \'654\',now());
Query OK, 1 row affected (0.01 sec)

3330
2021-07-29 00:19:47,088 root         INFO     {"timestamp": "2021-07-29 00:19:47", "host": "172.16.0.197", "values": {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1}, "action": "insert", "table": "test01", "schema": "stdb01"}
2021-07-29 00:19:47,088 root         INFO     {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}

3636
2021-07-29 00:20:42,729 root         INFO     {"timestamp": "2021-07-29 00:20:42", "host": "172.16.0.197", "values": {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2}, "action": "insert", "table": "test01", "schema": "stdb01"}
2021-07-29 00:20:42,729 root         INFO     {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}

**重复尝试了好多次,虽然在启动binlog增量同步数据时,有增删数据日志的输出,但是登录ES,始终没看到ES中数据的变化**

**存放在redis中的pos位置点一直没变话:**
127.0.0.1:10201> get binlog_config_log_file
"mysql-bin.000013"
127.0.0.1:10201> get binlog_config_log_pos
"2268"

后面发现问题原因:
主要原因是错误的把配置文件binlog_config.py 中BULK_SIZE 这个参数开启导致的。果断注销掉,重启binglog同步程序,终于可以看大有数据写入到ES了

7、全量同步stdb01.test01表数据到:


[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py 
2021-07-29 00:23:47,371 root         INFO     {"username": "go", "password": "654", "create_time": "2021-07-29 00:19:47", "id": 1, "_id": 1}
2021-07-29 00:23:47,371 root         INFO     {"username": "C++", "password": "654", "create_time": "2021-07-29 00:20:42", "id": 2, "_id": 2}
2021-07-29 00:23:48,501 elasticsearch INFO     POST http://172.16.0.247:9999/_bulk [status:200 request:1.130s]

real    0m1.768s
user    0m0.463s
sys 0m0.059s

[es@tidb06 logs]$ curl \'http://172.16.0.247:9999/_cat/indices?v\'
health status index        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   test01_index LMMynfXaThGktG_YEAO2QQ   5   1          2            0     10.1kb         10.1kb

[es@tidb06 logs]$ curl -s -XGET \'http://172.16.0.247:9999/_cat/indices/test01_index?v\'
health status index        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   test01_index LMMynfXaThGktG_YEAO2QQ   5   1          2            0     10.1kb         10.1kb

8.再次启动binlog_config.py 进行增量同步:


mom run -c ./test_mom/binlog_config.py

写入2条sql:
root@tidb04 00:44:  [stdb01]> INSERT INTO test01(username,password,create_time) values(\'php\', \'123\',now());
Query OK, 1 row affected (0.01 sec)

root@tidb04 00:44:  [stdb01]> INSERT INTO test01(username,password,create_time) values(\'java\', \'321\',now());
Query OK, 1 row affected (0.01 sec)

root@tidb04 00:44:  [stdb01]>  select * from test01;
+----+----------+----------+---------------------+
| id | username | password | create_time         |
+----+----------+----------+---------------------+
|  1 | go       | 654      | 2021-07-29 00:19:47 |
|  2 | C++      | 654      | 2021-07-29 00:20:42 |
|  3 | php      | 123      | 2021-07-29 00:44:39 |
|  4 | java     | 321      | 2021-07-29 00:44:49 |
+----+----------+----------+---------------------+
4 rows in set (0.00 sec)

**存放在redis中的pos位置点开始变化了:**
127.0.0.1:10201> get binlog_config_log_file
"mysql-bin.000013"
127.0.0.1:10201> get binlog_config_log_pos
"3278"

**登录ES,看到insert数据也写入到ES了**

9、inert,update,delete的增量同步MySQL数据到ES的配置文件:


[root@tidb05 soft]# cat mysqlsmom/test_mom/binlog_config.py
# coding=utf-8
STREAM = "BINLOG"  # "BINLOG" or "INIT"
SERVER_ID = 172160197 ### 确保每个用于binlog同步的配置文件的SERVER_ID不同
SLAVE_UUID = __name__

# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
#BULK_SIZE = 10000   此参数只能是在全量导数据时开启,在增量同步数据时禁止开启,否则导致增量同步失败

BINLOG_CONNECTION = {
    \'host\': \'172.16.0.197\',
    \'port\': 3306,
    \'user\': \'click_rep\',
    \'passwd\': \'jwtest123456\'
}
# redis存储上次同步位置等信息
REDIS = {
    "host": "127.0.0.1",
    "port": 10201,
    "db": 0,
    "password": "YHu222tuEq",  # 不需要密码则注释或删掉该行
}

# 配置es节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
    {
        "stream": {
            "database": "stdb01",
            "table": "test01"
        },
        "jobs": [
           # 同步插入、更新es数据
            {
                "actions": ["insert", "update"],
                "pipeline": [
                    #{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es;注释掉该行则同步全部字段的值到es
                    {"set_id": {"field": "id"}}
                ],
                "dest": {
                    "es": {
                        "action": "upsert",
                        "index": "test01_index",
                        "type": "test01",
                        "nodes": NODES
                    }
                }
            },
           #delete 同步删除es数据
            {
               "actions": ["delete"],
               "pipeline": [
                   #{"only_fields": {"fields": ["id", "name", "age"]}}, ##只同步 id,name和age字段数据到es;注释掉该行则同步全部字段的值到es
                   {"set_id": {"field": "id"}}
               ],
               "dest": {
                   "es": {
                       "action": "delete",
                       "index": "test01_index",
                       "type": "test01",
                       "nodes": NODES
                   }
               }
            }

        ]
    }
]

# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"

关于MySQLmom增量同步指定MySQL表数据到ES简单介绍完毕,请继续关注博主,后面还会有更精彩的文章继续分享。

以上是关于MySQLmom程序增量同步MySQL数据到ES的主要内容,如果未能解决你的问题,请参考以下文章

MySQLmom程序全量同步MySQL表数据到ES

logstash增量同步mysql数据到es

实现mysql与ES的增量数据同步

Canal——增量同步MySQL数据到ES

Canal的理解和应用

干货 | Debezium实现Mysql到Elasticsearch高效实时同步