logstash使用template提前设置好maping同步mysql数据到Elasticsearch5.5.2

Posted jstarseven

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash使用template提前设置好maping同步mysql数据到Elasticsearch5.5.2相关的知识,希望对你有一定的参考价值。

上篇blog说到采用logstash-input-jdbcmysql数据同步到ES(http://www.cnblogs.com/jstarseven/p/7704893.html),但是这里有一个问题,即假如我不需要logstash自动对mysql数据提供的mapping模板怎么办,毕竟我的数据需要ik分词,同义词解析等。。。

这时候就需要用到logstash的template功能了 ,如果现在还不到logstash和logstash-input-jdbc的安装使用方式的建议先看上一篇文章。--------jstarseven

转载请注明原文出处:http://www.cnblogs.com/jstarseven/p/7707499.html 

好的,首先看一下之前简单使用logstash-input-jdbc导入es的配置文件mysql.conf(一会配置template时候需要修改):

input {
    stdin {
    }
    jdbc {
      # 数据库
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
      # 用户名密码
      jdbc_user => "root"
      jdbc_password => "123456"
      # jar包的位置
      jdbc_driver_library => "/usr/local/logstash-5.5.2/bin/config-mysql/mysql-connector-java-5.1.31.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #statement_filepath => "config-mysql/test02.sql"
      statement => "select * from my_into_es "
      schedule => "* * * * *"
      #索引的类型
      type => "my_into_es_type"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
        # index名
        index => "my_into_es_index"
        # 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

  现在,我们来看template模板怎么用:

第一种采用我个人将它称为动态模板:dynamic_templates 可以做到对某种类型字段进行匹配mapping

1. 切换路径  cd  /usr/local/logstash-5.5.2 目录下

2. 新建template目录 mkdir template

3. cd template

4. 新建文件 logstash-ik.json   

5. 编辑文件内容:

{
    "template": "*",
    "version": 50001,
    "settings": {
        "index.refresh_interval": "5s"
    },
    "mappings": {
        "_default_": {
            "_all": {
                "enabled": true,
                "norms": false
            },
            "dynamic_templates": [
                {
                    "message_field": {
                        "path_match": "message",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "text",
                            "norms": false
                        }
                    }
                },
                {
                    "string_fields": {
                        "match": "*",
                        "match_mapping_type": "string",
                        "mapping": {
                            "type": "text",
                            "norms": false,
                            "analyzer": "ik_max_word",
                            "fields": {
                                "keyword": {
                                    "type": "keyword"
                                }
                            }
                        }
                    }
                }
            ],
            "properties": {
                "@timestamp": {
                    "type": "date",
                    "include_in_all": false
                },
                "@version": {
                    "type": "keyword",
                    "include_in_all": false
                }
            }
        }
    }
}
~ 

6. cd /usr/local/logstash-5.5.2/bin/config-mysql

7.新建文件 mkdir mysql-ik-define.conf

文件内容:

input {
    stdin {
    }
    jdbc {
      # 数据库
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
      # 用户名密码
      jdbc_user => "root"
      jdbc_password => "123456"
      # jar包的位置
      jdbc_driver_library => "/usr/local/logstash-5.5.2/bin/config-mysql/mysql-connector-java-5.1.31.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #statement_filepath => "config-mysql/test02.sql"
      statement => "select * from my_into_es_define"
      schedule => "* * * * *"
      #索引的类型
      type => "into_es_type_define_ik"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
        # index名
        index => "into_es_index_define_ik"
        # 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{id}"
        template_overwrite => true
        template => "/usr/local/logstash-5.5.2/template/logstash-ik.json"
    }
    stdout {
        codec => json_lines
    }
}

注释:上面标颜色的就是template的配置,其他基本不变

8. cd /usr/local/logstash-5.5.2/bin

9. 执行命令:./logstash -f config-mysql/mysql-ik-define.conf 

观察日志:

10.我们拿ElasticSearch-head插件看一下新建好的mapping:

和我们预料的一样没有问题,数据也成功导入:

总结:这种配置方式个人觉得比较灵活可以对字段按类区分做mapping

第二种采用我个人将它称为静态模板(其实和上面的基本一致),就是template文件不一样,mapping针对每个字段写死就好:

1.在之前的template目录下新建logstash-ik-define.json文件:

{
    "template": "*",
    "version": 50001,
    "settings": {
        "index.refresh_interval": "5s"
     },
    "mappings": {
        "into_es_type_define" :{
         "properties": {
               "ct": {
                    "type": "date"
                },
                "@timestamp": {
                   "include_in_all": false,
                   "type": "date"
                },
               "@version": {
                  "include_in_all": false,
                  "type": "keyword"
                },
               "name": {
                  "norms": false,
                  "analyzer": "ik_max_word",
                  "type": "text",
                  "fields": {
                     "keyword": {
                        "type": "keyword"
                         }
                    }
                },
                "id": {
                    "type": "long"
                },
                "type": {
                     "norms": false,
                     "analyzer": "ik_max_word",
                     "type": "text",
                     "fields": {
                     "keyword": {
                         "type": "keyword"
                          }
                    }
                },
                "age": {
                   "type": "long"
                },
                "desc": {
                    "norms": false,
                    "analyzer": "ik_max_word",
                    "type": "text",
                    "fields": {
                    "keyword": {
                         "type": "keyword"
                         }
                     }
                },
               "ut": {
                      "type": "date"
                 }    
            }
        }
    }
}

2.修改上述 mysql-ik-define.conf文件里面的index,type,和template部分应用模板文件即可

3.执行命令:./logstash -f config-mysql/mysql-ik-define.conf

4.查看head里面的mapping新建情况和template文件中mapping保持一致:

 

5.数据也成功同步:

 

总结:template模板使用

1.静态模板 :
                     适合索引字段数据固定的场景,一旦配置完成,不能向里面加入多余的字段,否则会报错
                     优点:scheam已知,业务场景明确,不容易出现因字段随便映射从而造成元数据撑爆es内存,从而导致es集群全部宕机
                     缺点:字段数多的情况下配置稍繁琐

1.动态模板 :
      适合字段数不明确,大量字段的配置类型相同的场景,多加字段不会报错
                     优点:可动态添加任意字段,无须改动scheaml,
                     缺点:如果添加的字段非常多,有可能造成es集群宕机

定制索引模板,是搜索业务中一项比较重要的步骤,需要注意的地方有很多,比如:
       (1)字段数固定吗
       (2)字段类型是什么
       (3)分不分词
       (4)索引不索引
       (5)存储不存储
       (6)排不排序
       (7)是否加权
除了这些还有其他的一些因素,比如,词库的维护改动,搜索架构的变化等等。
如果前提没有充分的规划好,后期改变的话,改动其中任何一项,都需要重建索引,这个代价是非常大和耗时的,尤其是在一些数据量大的场景中

 

 


 -END-

以上是关于logstash使用template提前设置好maping同步mysql数据到Elasticsearch5.5.2的主要内容,如果未能解决你的问题,请参考以下文章

Logstash中配置默认索引映射

使用canal获取mysql的binlog传输给kafka,并交由logstash获取实验步骤

logstash收集nginx日志

使用 Logstash 配置文件添加小时数

logstash grok使用案例

MacOS用户你只要设置好这个2个快捷键,每天都可以提前下班!