如何将MySql数据导入Elasticsearch中

Posted ShuSheng007

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何将MySql数据导入Elasticsearch中相关的知识,希望对你有一定的参考价值。

[版权申明] 非商业目的注明出处可自由转载
出自:shusheng007

文章目录

概述

最近工作中有一个将mysql数据导入ES的需求,于是网上搜索了一下,发现不尽如人意。看了好几篇文章,还是不知道怎么做,于是我准备记录一下自己成功的经验,为自己也为同行…

方案

采用Logstash从MySql中抽取数据,然后导入ES中,如下图所示

环境准备

安装Mysql/MariaDb

已经安装则跳过,此处采用docker安装

  • 创建本地卷

~/software/mariadb/data

cd ~
mkdir -p software/mariadb/data
  • 写docker-compose文件

mariadb-dc.yaml

version: '3'

services:
  mariadb:
    image: mariadb:10.6.5
    ports:
      - 3306:3306
    volumes:
      - ~/software/mariadb/data:/var/lib/mysql
    environment:
      - MYSQL_ROOT_PASSWORD=root
      - MYSQL_USER=ss007
      - MYSQL_PASSWORD=ss007
      - MYSQL_DATABASE=ss007_bd

其中有几点需要说明:

- MYSQL_USER=ss007
- MYSQL_PASSWORD=ss007

创建了一个叫ss007的用户

  • 执行

mariadb-dc.yaml目录下执行

docker-compose -f mariadb-dc.yaml up -d
  • 准备数据

数据库已经安装好,使用数据库管理工具,例如Navicat连接数据库并建表导入数据。

当然如果你习惯命令行,进入docker使用mysql命令行客户端操作也是可以的

-- ----------------------------
-- Table structure for student
-- ----------------------------
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `stu_name` varchar(32) NOT NULL DEFAULT '' COMMENT '姓名',
  `age` int(11) NOT NULL DEFAULT 18 COMMENT '年龄',
  `create_time` datetime DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COMMENT='学生表';

-- ----------------------------
-- Records of student
-- ----------------------------
INSERT INTO `student` VALUES (1, '王二狗', 30, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (2, '牛翠华', 25, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (3, '上官无雪', 18, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (4, '王婆', 50, '2022-02-19 10:57:16');

按装Elasticsearch与Kibana(可选)

已经安装则跳过,没安装请参考docker之如何使用docker来安装ELK(elasticsearch,logstash,kibana)

安装Logstash

没有采用docker安装,直接从官网下载了7.17.0的压缩包,解压即可

写logstash配置文件

我们在config文件里新建一个名为my-demo-logstash.conf的文件,内容如下

input 
  jdbc 
    jdbc_driver_library => "./mysql-connector-java-5.1.49.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/ss007_db?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => root
    jdbc_password => root
    # jdbc_paging_enabled => "true" #是否进行分页
    # jdbc_page_size => "50000"
    tracking_column => "create_time"
    use_column_value => true
    # statement_filepath => "sql文件路径,与下面的执行语句二选1"
    statement => "SELECT id,stu_name AS name,age,create_time FROM ss007_db.student WHERE create_time > :sql_last_value;"
    # 设置监听间隔  各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
    schedule => " 10 * * * * *"
  

output 
  elasticsearch 
    document_id => "%id"
    # document_type => ""
    index => "ss007_index"
    hosts => ["localhost:9200"]
  
  stdout
    codec => rubydebug
  

文件分为input 和output两部分,其实还可以有个filter。input 部分从mysql读取数据,output部分向ES插入数据,关键字段解释如下

input:

  • jdbc_driver_library:mysql-connector-java.jar 连接文件路径,我放到了与bin同一级别,所以是./
  • tracking_column: mysql中的某一列,logstash用它来跟踪数据的变化,例如此处我用的是create_time列,那么当新增加一条记录,或者修改一下这个时间的话,logstash就会知道,从而更新数据
  • use_column_value :是否使用tracking_column的值作为:sql_last_value的值。设置为true时,:sql_last_value = tracking_column。设置为false,:sql_last_value= 上次查询时间
  • statement:sql查询语句,其中:sql_last_value 已经在上面介绍过了
  • schedule:执行计划,可以通过它设置查询的执行计划,我这里设置每隔10秒执行一次

output:

如果ES中没有index,会自动创建

  • document_id => “%id” : ES中document的唯一id,这里使用student的主键
  • index => “ss007_index” : ES 的索引

7.0以后不用设置document_type,会默认doc

运行

导航到logstash路径下运行以下命令,注意7.x不能进入bin文件执行

 sudo  bin/logstash -f config/my-demo-logstash.conf

输出如下类似结果说明成功了

...
[2022-02-20T16:22:10,869][INFO ][logstash.inputs.jdbc     ][main][4a56def37a98a3ccef919df855d4d915c4fa56a1212a5e6c99a82aa2a6e70df9] (0.009233s) SELECT id,stu_name AS name,age,create_time FROM ss007_db.student WHERE create_time > 0;

           "name" => "王婆",
     "@timestamp" => 2022-02-20T08:22:10.938Z,
    "create_time" => 2022-02-19T02:57:16.000Z,
             "id" => 4,
       "@version" => "1",
            "age" => 50


           "name" => "王二狗",
     "@timestamp" => 2022-02-20T08:22:10.927Z,
    "create_time" => 2022-02-19T02:57:16.000Z,
             "id" => 1,
       "@version" => "1",
            "age" => 30

...

查看结果

从Kibana上查看ss007_index是否创建并导入了数据

更新数据

因为我们设置了每隔10秒执行一次,所以当我们插入新数据,或者更新数据导致create_time列的日期增大,logstash就会感受到,然后更新ES中的数据

  • 插入一条新数据输出

           "name" => "张妈",
     "@timestamp" => 2022-02-20T08:27:10.160Z,
    "create_time" => 2022-02-19T21:57:16.000Z,
             "id" => 5,
       "@version" => "1",
            "age" => 50

  • 修改已有数据输出

           "name" => "牛翠华",
     "@timestamp" => 2022-02-20T08:22:10.937Z,
    "create_time" => 2022-02-19T02:57:16.000Z,
             "id" => 2,
       "@version" => "1",
            "age" => 25

总结

为了搞这个我也是花了一天时间的,理论都知道,一动手就各种问题,如果它对你有帮助就点个赞再走

参考文章:https://dzone.com/articles/migrating-mysql-data-to-elasticsearch-using-logsta

以上是关于如何将MySql数据导入Elasticsearch中的主要内容,如果未能解决你的问题,请参考以下文章

Logstash:把MySQL数据导入到Elasticsearch中

使用Logstash把MySQL数据导入到Elasticsearch中

使用Logstash把MySQL数据导入到Elasticsearch中

LogStash如何通过jdbc 从mysql导入elasticsearch

MySQL数据同步至ElasticSearch的相关实现方法

Elasticsearch:如何将 Strava 数据导入 Elastic Stack