MySQLmom程序全量同步MySQL表数据到ES
Posted 运维之美
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MySQLmom程序全量同步MySQL表数据到ES相关的知识,希望对你有一定的参考价值。
特殊提示:
本次演示的ES的版本为公司内部定制的elasticsearch-5.0.0的版本测试的,而且ES是单节点安装。本次演示从一个全量同步mysql表数据到elasticsearch-5.0.0开始
一、创建全量同步配置文件
[root@tidb05 mysqlsmom]# mom new test_mom/init_config.py -t init --force
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
new config at /data1/soft/mysqlsmom/test_mom/init_config.py
[root@tidb05 mysqlsmom]# echo $?
0
说明:test_mom是可以指定名称的。
解决其中一个warning警告,其实对本次演示时没任何影响的,但是对于洁癖的我,看着总觉得不爽。具体warning警告内容和解决办法如下:
[root@tidb05 mysqlsmom]# mom new 197_testdb_mom/init_config.py -t init --force
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
new config at /data1/soft/mysqlsmom/197_testdb_mom/init_config.py
[root@tidb05 mysqlsmom]# ll /data1/soft/mysqlsmom/197_testdb_mom/init_config.py
-rw-r--r-- 1 root root 1298 Jul 11 10:46 /data1/soft/mysqlsmom/197_testdb_mom/init_config.py
上面的/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
**解决办法:**
pip uninstall urllib3 -y
pip uninstall chardet -y
pip install requests
二、案例配置演示
案例一:全量同步MySQL某张表全部字段数据到es
创建测试库和测试表,并写入测试数据到test01测试表:
create database stdb01;
CREATE TABLE `test01` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test01(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now());
INSERT INTO test01(username,password,create_time) values(\'java\', \'123456\',now());
INSERT INTO test01(username,password,create_time) values(\'lua\', \'ssd123456\',now());
INSERT INTO test01(username,password,create_time) values(\'php\', \'seurw456\',now());
INSERT INTO test01(username,password,create_time) values(\'python\', \'seueurw456\',now());
root@tidb04 10:57: [stdb01]> select * from test01;
+----+----------+------------+---------------------+
| id | username | password | create_time |
+----+----------+------------+---------------------+
| 1 | tomcat | xiaohuahua | 2021-07-11 10:57:57 |
| 2 | java | 123456 | 2021-07-11 10:57:57 |
| 3 | lua | ssd123456 | 2021-07-11 10:57:57 |
| 4 | php | seurw456 | 2021-07-11 10:57:57 |
| 5 | python | seueurw456 | 2021-07-11 10:57:58 |
+----+----------+------------+---------------------+
5 rows in set (0.00 sec)
创建连接库的账户:(这个账户的权限其实可以再小点:grant replication slave on . )
GRANT ALL PRIVILEGES ON *.* TO \'click_rep\'@\'172.16.0.246\' identified by \'jwtest123456\';flush privileges;
全量同步stdb01.test01表数据的配置文件内容如下:
[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
# 修改elasticsearch节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
{
"stream": {
"database": "stdb01", # 在此数据库执行sql语句
"sql": "select * from test01", # 将该sql语句选中的数据同步到 elasticsearch;
# "pk": {"field": "id", "type": "char"} # 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id;此处可以指定具体同步哪些字段的数据到es.不指定的话默认同步表全部的数据到es。
],
"dest": {
"es": {
"action": "upsert",
"index": "test01_index", # 设置 index
"type": "test01", # 设置 type
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
执行全量同步命令:
[root@tidb05 mysqlsmom]# pwd
/data1/soft/mysqlsmom
[root@tidb05 mysqlsmom]# ls
197_testdb_mom docs mysqlsmom README.md README_OLD.md setup.py test_mom
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
2021-07-11 11:09:44,500 root INFO {"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 10:57:57", "id": 1, "_id": 1}
2021-07-11 11:09:44,623 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.123s]
2021-07-11 11:09:44,624 root INFO {"username": "java", "password": "123456", "create_time": "2021-07-11 10:57:57", "id": 2, "_id": 2}
2021-07-11 11:09:44,630 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]
2021-07-11 11:09:44,630 root INFO {"username": "lua", "password": "ssd123456", "create_time": "2021-07-11 10:57:57", "id": 3, "_id": 3}
2021-07-11 11:09:44,639 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.009s]
2021-07-11 11:09:44,640 root INFO {"username": "php", "password": "seurw456", "create_time": "2021-07-11 10:57:57", "id": 4, "_id": 4}
2021-07-11 11:09:44,644 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.004s]
2021-07-11 11:09:44,645 root INFO {"username": "python", "password": "seueurw456", "create_time": "2021-07-11 10:57:58", "id": 5, "_id": 5}
2021-07-11 11:09:44,650 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]
real 0m0.640s
user 0m0.444s
sys 0m0.051s
具体ES里面的数据此处就不再截图演示了。
案例二:全量同步MySQL某张表部分字段数据到es
创建测试表test02和写入测试数据:
CREATE TABLE `test02` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test02(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now());
INSERT INTO test02(username,password,create_time) values(\'java\', \'123456\',now());
INSERT INTO test02(username,password,create_time) values(\'lua\', \'ssd123456\',now());
INSERT INTO test02(username,password,create_time) values(\'php\', \'seurw456\',now());
INSERT INTO test02(username,password,create_time) values(\'python\', \'seueurw456\',now());
运行同步命令:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
2021-07-11 11:25:53,126 root INFO {"username": "tomcat", "_id": 1, "id": 1}
2021-07-11 11:25:53,217 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.091s]
2021-07-11 11:25:53,218 root INFO {"username": "java", "_id": 2, "id": 2}
2021-07-11 11:25:53,223 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]
2021-07-11 11:25:53,223 root INFO {"username": "lua", "_id": 3, "id": 3}
2021-07-11 11:25:53,228 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]
2021-07-11 11:25:53,229 root INFO {"username": "php", "_id": 4, "id": 4}
2021-07-11 11:25:53,235 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]
2021-07-11 11:25:53,235 root INFO {"username": "python", "_id": 5, "id": 5}
2021-07-11 11:25:53,241 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.006s]
real 0m0.597s
user 0m0.440s
sys 0m0.047s
具体ES里面的数据此处就不再截图演示了。
此案例演示的具体配置文件如下:
[root@tidb05 mysqlsmom]# cat /data1/soft/mysqlsmom/test_mom/init_config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
# 修改elasticsearch节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
{
"stream": {
"database": "stdb01", # 在此数据库执行sql语句
"sql": "select * from test02", # 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"} # 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段
{"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "test02_index", # 设置 index
"type": "test02", # 设置 type
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
案例三:全量同步MySQL多张表数据到es
创建测试表test03,test04,写入测试数据:
CREATE TABLE `test03` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test03(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now());
INSERT INTO test03(username,password,create_time) values(\'java\', \'123456\',now());
CREATE TABLE `test04` (
`id` int(8) NOT NULL AUTO_INCREMENT,
`username` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`password` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
`create_time` varchar(20) COLLATE utf8_unicode_ci NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
INSERT INTO test04(username,password,create_time) values(\'tomcat\', \'xiaohuahua\',now());
INSERT INTO test04(username,password,create_time) values(\'java\', \'123456\',now());
root@tidb04 12:59: [stdb01]> select * from test04;
+----+----------+------------+---------------------+
| id | username | password | create_time |
+----+----------+------------+---------------------+
| 1 | tomcat | xiaohuahua | 2021-07-11 12:59:01 |
| 2 | java | 123456 | 2021-07-11 12:59:01 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec)
root@tidb04 12:59: [stdb01]> select * from test03;
+----+----------+------------+---------------------+
| id | username | password | create_time |
+----+----------+------------+---------------------+
| 1 | tomcat | xiaohuahua | 2021-07-11 12:58:53 |
| 2 | java | 123456 | 2021-07-11 12:58:54 |
+----+----------+------------+---------------------+
2 rows in set (0.00 sec)
此案例配置文件内容如下:
[root@tidb05 test_mom]# cat init_config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1
# 修改elasticsearch节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
# 同步stdb01.test03到es:
{
"stream": {
"database": "stdb01", # 在此数据库执行sql语句
"sql": "select * from test03", # 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"} # 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "test03_index", # 设置 index
"type": "test03", # 设置 type
"nodes": NODES
}
}
}
]
},
# 同步stdb01.test04到es:
{
"stream": {
"database": "stdb01", # 在此数据库执行sql语句
"sql": "select * from test04", # 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"} # 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段
{"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "test04_index", # 设置 index
"type": "test04", # 设置 type
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
启动运行命令:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
/usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.23) or chardet (2.2.1) doesn\'t match a supported version!
RequestsDependencyWarning)
2021-07-11 13:01:09,473 root INFO {"username": "tomcat", "password": "xiaohuahua", "create_time": "2021-07-11 12:58:53", "id": 1, "_id": 1}
2021-07-11 13:01:09,555 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.082s]
2021-07-11 13:01:09,556 root INFO {"username": "java", "password": "123456", "create_time": "2021-07-11 12:58:54", "id": 2, "_id": 2}
2021-07-11 13:01:09,561 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]
2021-07-11 13:01:09,564 root INFO {"username": "tomcat", "_id": 1, "id": 1}
2021-07-11 13:01:09,636 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.072s]
2021-07-11 13:01:09,636 root INFO {"username": "java", "_id": 2, "id": 2}
2021-07-11 13:01:09,642 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.005s]
real 0m0.629s
user 0m0.411s
sys 0m0.055s
具体ES里面的数据此处就不再截图演示了。
案例四、同步同MySQL实例下不同的库不同的表数据到ES
MySQL表数据如下:
root@tidb04 10:58: [test_db]> select * from stdb01.test01;
+----+----------+----------+---------------------+
| id | username | password | create_time |
+----+----------+----------+---------------------+
| 30 | fox | 556 | 2021-07-30 08:19:37 |
| 31 | fox | 556 | 2021-07-30 08:19:38 |
+----+----------+----------+---------------------+
2 rows in set (0.00 sec)
root@tidb04 10:58: [test_db]> select * from test_db.test01;
+----+----------+------------+---------------------+
| id | username | password | create_time |
+----+----------+------------+---------------------+
| 1 | tomcat | xiaohuahua | 2021-07-03 23:51:17 |
| 2 | php | xiao | 2021-07-03 23:53:36 |
| 3 | fix | xiao | 2021-07-03 23:53:49 |
| 4 | java | bai | 2021-07-03 23:54:01 |
+----+----------+------------+---------------------+
4 rows in set (0.00 sec)
全量同步MySQL数据到ES:
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
2021-08-01 10:58:49,966 root INFO {"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:37", "id": 30, "_id": 30}
2021-08-01 10:58:49,967 root INFO {"username": "fox", "password": "556", "create_time": "2021-07-30 08:19:38", "id": 31, "_id": 31}
2021-08-01 10:58:50,115 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.148s]
2021-08-01 10:58:50,119 root INFO {"username": "tomcat", "_id": 1, "id": 1}
2021-08-01 10:58:50,119 root INFO {"username": "php", "_id": 2, "id": 2}
2021-08-01 10:58:50,119 root INFO {"username": "fix", "_id": 3, "id": 3}
2021-08-01 10:58:50,119 root INFO {"username": "java", "_id": 4, "id": 4}
2021-08-01 10:58:50,259 elasticsearch INFO POST http://172.16.0.247:9999/_bulk [status:200 request:0.139s]
real 0m0.873s
user 0m0.505s
sys 0m0.080s
ES数据验证:
全量同步的配置文件内容如下:
[root@tidb05 mysqlsmom]# cat ./test_mom/init_config.py
# coding=utf-8
STREAM = "INIT"
# 修改数据库连接
CONNECTION = {
\'host\': \'172.16.0.197\',
\'port\': 3306,
\'user\': \'click_rep\',
\'passwd\': \'jwtest123456\'
}
# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 50000
# 修改elasticsearch节点
#NODES = [{"host": "127.0.0.1", "port": 9200}]
NODES = [{"host": "172.16.0.247", "port": 9999}]
TASKS = [
# 同步stdb01.test03到es:
{
"stream": {
"database": "stdb01", # 在此数据库执行sql语句
"sql": "select * from test01", # 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"} # 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "stdb01.test01_index", # 设置 index
"type": "user_table", # 设置 type
"nodes": NODES
}
}
}
]
},
##同步test_db.test01到es:
{
"stream": {
"database": "test_db", # 在此数据库执行sql语句
"sql": "select * from test01", # 将该sql语句选中的数据同步到 elasticsearch
# "pk": {"field": "id", "type": "char"} # 当主键id的类型是字符串时
},
"jobs": [
{
"actions": ["insert", "update"],
"pipeline": [
{"only_fields": {"fields": ["id", "username"]}}, # 只同步 id 和 username字段
{"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id
],
"dest": {
"es": {
"action": "upsert",
"index": "test_db.test01_index", # 设置 index
"type": "user_table", # 设置 type
"nodes": NODES
}
}
}
]
}
]
# CUSTOM_ROW_HANDLERS = "./my_handlers.py"
# CUSTOM_ROW_FILTERS = "./my_filters.py"
案例五、设置参数BULK_SIZE =1和=10000进行测试全量同步时间测试
设置BULK_SIZE =10000进行全量同步表t_work_order_follow数据到es测试
[root@tidb04 ~]# mysql -e "select count(*) from stdb01.t_work_order_follow;"
+----------+
| count(*) |
+----------+
| 3975925 |
+----------+
[root@tidb05 mysqlsmom]# time mom run -c ./test_mom/init_config.py
real 30m7.618s
user 23m51.398s
sys 0m58.087s
设置参数BULK_SIZE =1 进行全量同步表t_work_order_follow数据到es测试,说实话花费的时间非常的长了:
root@tidb04 08:07: [test_db]> select count(*) from t_work_order_follow;
+----------+
| count(*) |
+----------+
| 3975925 |
+----------+
1 row in set (0.62 sec)
[root@tidb06 mysqlsmom]# time mom run -c ./test_mom/init_config.py
real 237m59.067s
user 53m49.099s
sys 3m25.431s
后面测试把设置BULK_SIZE =50000 测试结果和设置BULK_SIZE =10000消耗的时间基本差不多。
全量同步演示到此介绍完毕。后面会分享下增量同步的配置方法,尽情期待。
以上是关于MySQLmom程序全量同步MySQL表数据到ES的主要内容,如果未能解决你的问题,请参考以下文章
Elasticsearch-jdbc批量同步mysql数据失败