如何将MySql数据导入Elasticsearch中
Posted ShuSheng007
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何将MySql数据导入Elasticsearch中相关的知识,希望对你有一定的参考价值。
[版权申明] 非商业目的注明出处可自由转载
出自:shusheng007
文章目录
概述
最近工作中有一个将mysql数据导入ES的需求,于是网上搜索了一下,发现不尽如人意。看了好几篇文章,还是不知道怎么做,于是我准备记录一下自己成功的经验,为自己也为同行…
方案
采用Logstash从MySql中抽取数据,然后导入ES中,如下图所示
环境准备
安装Mysql/MariaDb
已经安装则跳过,此处采用docker安装
- 创建本地卷
~/software/mariadb/data
cd ~
mkdir -p software/mariadb/data
- 写docker-compose文件
mariadb-dc.yaml
version: '3'
services:
mariadb:
image: mariadb:10.6.5
ports:
- 3306:3306
volumes:
- ~/software/mariadb/data:/var/lib/mysql
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_USER=ss007
- MYSQL_PASSWORD=ss007
- MYSQL_DATABASE=ss007_bd
其中有几点需要说明:
- MYSQL_USER=ss007
- MYSQL_PASSWORD=ss007
创建了一个叫ss007的用户
- 执行
在mariadb-dc.yaml
目录下执行
docker-compose -f mariadb-dc.yaml up -d
- 准备数据
数据库已经安装好,使用数据库管理工具,例如Navicat连接数据库并建表导入数据。
当然如果你习惯命令行,进入docker使用mysql命令行客户端操作也是可以的
-- ----------------------------
-- Table structure for student
-- ----------------------------
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`stu_name` varchar(32) NOT NULL DEFAULT '' COMMENT '姓名',
`age` int(11) NOT NULL DEFAULT 18 COMMENT '年龄',
`create_time` datetime DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COMMENT='学生表';
-- ----------------------------
-- Records of student
-- ----------------------------
INSERT INTO `student` VALUES (1, '王二狗', 30, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (2, '牛翠华', 25, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (3, '上官无雪', 18, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (4, '王婆', 50, '2022-02-19 10:57:16');
按装Elasticsearch与Kibana(可选)
已经安装则跳过,没安装请参考docker之如何使用docker来安装ELK(elasticsearch,logstash,kibana)
安装Logstash
没有采用docker安装,直接从官网下载了7.17.0的压缩包,解压即可
写logstash配置文件
我们在config文件里新建一个名为my-demo-logstash.conf
的文件,内容如下
input
jdbc
jdbc_driver_library => "./mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/ss007_db?characterEncoding=UTF-8&useSSL=false"
jdbc_user => root
jdbc_password => root
# jdbc_paging_enabled => "true" #是否进行分页
# jdbc_page_size => "50000"
tracking_column => "create_time"
use_column_value => true
# statement_filepath => "sql文件路径,与下面的执行语句二选1"
statement => "SELECT id,stu_name AS name,age,create_time FROM ss007_db.student WHERE create_time > :sql_last_value;"
# 设置监听间隔 各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => " 10 * * * * *"
output
elasticsearch
document_id => "%id"
# document_type => ""
index => "ss007_index"
hosts => ["localhost:9200"]
stdout
codec => rubydebug
文件分为input 和output两部分,其实还可以有个filter。input 部分从mysql读取数据,output部分向ES插入数据,关键字段解释如下
input:
- jdbc_driver_library:mysql-connector-java.jar 连接文件路径,我放到了与bin同一级别,所以是
./
- tracking_column: mysql中的某一列,logstash用它来跟踪数据的变化,例如此处我用的是create_time列,那么当新增加一条记录,或者修改一下这个时间的话,logstash就会知道,从而更新数据
- use_column_value :是否使用
tracking_column
的值作为:sql_last_value
的值。设置为true时,:sql_last_value = tracking_column
。设置为false,:sql_last_value= 上次查询时间
。 - statement:sql查询语句,其中
:sql_last_value
已经在上面介绍过了 - schedule:执行计划,可以通过它设置查询的执行计划,我这里设置每隔10秒执行一次
output:
如果ES中没有index,会自动创建
- document_id => “%id” : ES中document的唯一id,这里使用student的主键
- index => “ss007_index” : ES 的索引
7.0以后不用设置document_type,会默认doc
运行
导航到logstash路径下运行以下命令,注意7.x不能进入bin文件执行
sudo bin/logstash -f config/my-demo-logstash.conf
输出如下类似结果说明成功了
...
[2022-02-20T16:22:10,869][INFO ][logstash.inputs.jdbc ][main][4a56def37a98a3ccef919df855d4d915c4fa56a1212a5e6c99a82aa2a6e70df9] (0.009233s) SELECT id,stu_name AS name,age,create_time FROM ss007_db.student WHERE create_time > 0;
"name" => "王婆",
"@timestamp" => 2022-02-20T08:22:10.938Z,
"create_time" => 2022-02-19T02:57:16.000Z,
"id" => 4,
"@version" => "1",
"age" => 50
"name" => "王二狗",
"@timestamp" => 2022-02-20T08:22:10.927Z,
"create_time" => 2022-02-19T02:57:16.000Z,
"id" => 1,
"@version" => "1",
"age" => 30
...
查看结果
从Kibana上查看ss007_index是否创建并导入了数据
更新数据
因为我们设置了每隔10秒执行一次,所以当我们插入新数据,或者更新数据导致create_time
列的日期增大,logstash就会感受到,然后更新ES中的数据
- 插入一条新数据输出
"name" => "张妈",
"@timestamp" => 2022-02-20T08:27:10.160Z,
"create_time" => 2022-02-19T21:57:16.000Z,
"id" => 5,
"@version" => "1",
"age" => 50
- 修改已有数据输出
"name" => "牛翠华",
"@timestamp" => 2022-02-20T08:22:10.937Z,
"create_time" => 2022-02-19T02:57:16.000Z,
"id" => 2,
"@version" => "1",
"age" => 25
总结
为了搞这个我也是花了一天时间的,理论都知道,一动手就各种问题,如果它对你有帮助就点个赞再走
参考文章:https://dzone.com/articles/migrating-mysql-data-to-elasticsearch-using-logsta
以上是关于如何将MySql数据导入Elasticsearch中的主要内容,如果未能解决你的问题,请参考以下文章
Logstash:把MySQL数据导入到Elasticsearch中
使用Logstash把MySQL数据导入到Elasticsearch中
使用Logstash把MySQL数据导入到Elasticsearch中
LogStash如何通过jdbc 从mysql导入elasticsearch