ELK采集MySQL慢日志实现
Posted _雪辉_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK采集MySQL慢日志实现相关的知识,希望对你有一定的参考价值。
文章目录
一、ELK采集mysql慢日志架构
- MySQL 服务器安装 Filebeat 作为 agent 收集 slowLog
- Filebeat 读取 MySQL 慢日志文件做简单过滤传给 Kafka 集群
- Logstash 读取 Kafka 集群数据并按字段拆分后转成 JSON 格式存入 ES 集群
- Kibana 读取ES集群数据展示到web页面上
二、filebeat
使用filebeat实时采集mysql慢日志推送到kafka中
filebeat.inputs:
#slowlog
- type: log
enabled: true
paths:
- /data/mysql_tmp/slow.log
fields_under_root: true
fields:
clustername: test
type: mysql_slowlog
log_topic: mysql-slowlog
close_inactive: 5m
scan_frequency: 1m
tail_files: true
multiline:
pattern: "^# Time: "
negate: true
match: after
output.kafka:
version: x.x.x.x
hosts: ["xxx:9092","xxx:9092"]
topic: "%[log_topic]"
partition.round_robin:
reachable_only: false
worker: 1
required_acks: 1
compression: gzip
max_message_bytes: 10485760
keep_alive: 10s
client_id: "filebeat"
max_procs: 1
三、logstash
使用logstash将kakfa中的message,拆分后转成 JSON 格式存入 ES 集群
input
kafka
bootstrap_servers => "xxx:9092"
topics => "mysql-slowlog"
filter
grok
# 有ID有use
match => [ "message", "^#\\s+User@Host:\\s+%USER:user\\[[^\\]]+\\]\\s+@\\s+(?:(?<clienthost>\\S*) )?\\[(?:%IP:clientip)?\\]\\s+Id:\\s+%NUMBER:id\\n# Query_time: %NUMBER:query_time\\s+Lock_time: %NUMBER:lock_time\\s+Rows_sent: %NUMBER:rows_sent\\s+Rows_examined: %NUMBER:rows_examined\\nuse\\s(?<dbname>\\w+);\\nSET\\s+timestamp=%NUMBER:timestamp_mysql;\\n(?<query>[\\s\\S]*)" ]
# 有ID无use
match => [ "message", "^#\\s+User@Host:\\s+%USER:user\\[[^\\]]+\\]\\s+@\\s+(?:(?<clienthost>\\S*) )?\\[(?:%IP:clientip)?\\]\\s+Id:\\s+%NUMBER:id\\n# Query_time: %NUMBER:query_time\\s+Lock_time: %NUMBER:lock_time\\s+Rows_sent: %NUMBER:rows_sent\\s+Rows_examined: %NUMBER:rows_examined\\nSET\\s+timestamp=%NUMBER:timestamp_mysql;\\n(?<query>[\\s\\S]*)" ]
# 无ID有use
match => [ "message", "^#\\s+User@Host:\\s+%USER:user\\[[^\\]]+\\]\\s+@\\s+(?:(?<clienthost>\\S*) )?\\[(?:%IP:clientip)?\\]\\n# Query_time: %NUMBER:query_time\\s+Lock_time: %NUMBER:lock_time\\s+Rows_sent: %NUMBER:rows_sent\\s+Rows_examined: %NUMBER:rows_examined\\nuse\\s(?<dbname>\\w+);\\nSET\\s+timestamp=%NUMBER:timestamp_mysql;\\n(?<query>[\\s\\S]*)" ]
# 无ID无use
match => [ "message", "^#\\s+User@Host:\\s+%USER:user\\[[^\\]]+\\]\\s+@\\s+(?:(?<clienthost>\\S*) )?\\[(?:%IP:clientip)?\\]\\n# Query_time: %NUMBER:query_time\\s+Lock_time: %NUMBER:lock_time\\s+Rows_sent: %NUMBER:rows_sent\\s+Rows_examined: %NUMBER:rows_examined\\nSET\\s+timestamp=%NUMBER:timestamp_mysql;\\n(?<query>[\\s\\S]*)" ]
date
match => ["timestamp_mysql","UNIX"]
target => "@timestamp"
mutate
remove_field => ["@version","message","timestamp_mysql"]
output
elasticsearch
hosts => ["xxx.xxx.xxx.xxx:9200""]
index => "%type-%+YYYY.MM.dd"
四、es+kibana
读取Kibana 读取ES集群数据展示到web页面上
es中mysql慢日志索引模板
"order": 0,
"template": "mysqld-slow-*",
"settings":
"index":
"refresh_interval": "5s"
,
"mappings":
"mysqld-slow":
"numeric_detection": true,
"properties":
"@timestamp":
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
,
"@version":
"type": "string"
,
"query_time":
"type": "double"
,
"row_sent":
"type": "string"
,
"rows_examined":
"type": "string"
,
"clientip":
"type": "string"
,
"clienthost":
"type": "string"
,
"id":
"type": "integer"
,
"lock_time":
"type": "string"
,
"dbname":
"type": "keyword"
,
"user":
"type": "keyword"
,
"query":
"type": "string",
"index": "not_analyzed"
,
"tags":
"type": "string"
,
"timestamp":
"type": "string"
,
"type":
"type": "string"
,
"aliases":
以上是关于ELK采集MySQL慢日志实现的主要内容,如果未能解决你的问题,请参考以下文章