Logstash 监听文件变动,并将内容同步到elasticsearch
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Logstash 监听文件变动,并将内容同步到elasticsearch相关的知识,希望对你有一定的参考价值。
参考技术A logstash.yml 文件添加:Logstash 安装x-pack 插件:
配置文件 test.conf:
可以通过kibana 查看相关内容,并监测相关修改频率。
记录ELK之logstash同步mysql数据到Elasticsearch ,配置文件详解
本文出处:https://my.oschina.net/xiaowangqiongyou/blog/1812708#comments 截取部分内容以便学习
input { jdbc { # mysql相关jdbc配置 jdbc_connection_string => "jdbc:mysql://10.112.76.30:3306/jack_test?useUnicode=true&characterEncoding=utf-8&useSSL=false" jdbc_user => "root" jdbc_password => "123456" # jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/ jdbc_driver_library => "./config/mysql-connector-java-5.1.46.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => true jdbc_page_size => "50000" jdbc_default_timezone =>"Asia/Shanghai" # mysql文件, 也可以直接写SQL语句在此处,如下: # statement => "select * from t_order where update_time >= :sql_last_value;" statement_filepath => "./config/jdbc.sql" # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年) schedule => "* * * * *" #type => "jdbc" # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中 #record_last_run => true # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值. use_column_value => true # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键 tracking_column => "update_time" tracking_column_type => "timestamp" last_run_metadata_path => "./logstash_capital_bill_last_id" # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录 clean_run => false #是否将 字段(column) 名称转小写 lowercase_column_names => false } } output { elasticsearch { hosts => "10.112.76.31:9200" index => "mysql_order" document_id => "%{id}" template_overwrite => true } # 这里输出调试,正式运行时可以注释掉 stdout { codec => json_lines } }
这里有几个注意点:
(1)jdbc_driver_library
mysql-connector-java-5.1.46.jar的存放目录,这个一定要配置正确,支持全路径和相对路径。如果配置不对,将会报“can ”错误。
(2)sql_last_value
标志目前logstash同步的位置信息(类似offset)。比如id、updatetime。logstash通过这个标志,可以判断目前同步到哪一条数据。
(3)statement、statement_filepath
statement:执行同步的sql语句,可以同步部分数据。
statement_filepath:存储执行同步的sql语句。不和statement同时使用。
(4)schedule
定时器,表示每隔多长时间同步一次数据。格式类似crontab。
(5)tracking_column、tracking_column_type
tracking_column:表示表中哪一列用于判断logstash同步的位置信息。与sql_last_value比较判断是否需要同步这条数据。
tracking_column_type:racking_column指定列的类型。支持两种类型:numeric(默认)、timestamp。注意:如果列是时间字段(比如updateTime),一定要指定这个类型为timestamp。我就踩了这个大坑。。。一直同步不成功!!!
(6)last_run_metadata_path
存储sql_last_value值的文件名称及位置。
(7)document_id
生成elasticsearch的文档值,尽量使用同步的数据中已有的唯一标识。比如同步订单数据,可以使用订单号。
启动命令
bin/logstash -f config/logstash-mysql-es.conf > logs/logstash.out &
以上是关于Logstash 监听文件变动,并将内容同步到elasticsearch的主要内容,如果未能解决你的问题,请参考以下文章
《果然新鲜》电商项目(45)- 利用Logstash自动同步数据库内容到ES(多文件方式)
Logstash+ Kafka基于AOP 实时同步日志到es