ELK采集MySQL慢日志实现

Posted _雪辉_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK采集MySQL慢日志实现相关的知识,希望对你有一定的参考价值。

文章目录

一、ELK采集mysql慢日志架构

  • MySQL 服务器安装 Filebeat 作为 agent 收集 slowLog
  • Filebeat 读取 MySQL 慢日志文件做简单过滤传给 Kafka 集群
  • Logstash 读取 Kafka 集群数据并按字段拆分后转成 JSON 格式存入 ES 集群
  • Kibana 读取ES集群数据展示到web页面上

二、filebeat

  使用filebeat实时采集mysql慢日志推送到kafka中

filebeat.inputs:

#slowlog
- type: log
  enabled: true
  paths:
    - /data/mysql_tmp/slow.log
  fields_under_root: true
  fields:
    clustername: test
    type: mysql_slowlog
    log_topic: mysql-slowlog
  close_inactive: 5m
  scan_frequency: 1m
  tail_files: true
  multiline:
    pattern: "^# Time: "
    negate: true
    match: after

output.kafka:
  version: x.x.x.x
  hosts: ["xxx:9092","xxx:9092"]
  topic: "%[log_topic]"
  partition.round_robin:
    reachable_only: false
  worker: 1
  required_acks: 1
  compression: gzip
  max_message_bytes: 10485760
  keep_alive: 10s
  client_id: "filebeat"

max_procs: 1
  

三、logstash

  使用logstash将kakfa中的message,拆分后转成 JSON 格式存入 ES 集群

input 
    kafka 
        bootstrap_servers => "xxx:9092"
        topics => "mysql-slowlog"
    

filter 
    grok 
       # 有ID有use
        match => [ "message", "^#\\s+User@Host:\\s+%USER:user\\[[^\\]]+\\]\\s+@\\s+(?:(?<clienthost>\\S*) )?\\[(?:%IP:clientip)?\\]\\s+Id:\\s+%NUMBER:id\\n# Query_time: %NUMBER:query_time\\s+Lock_time: %NUMBER:lock_time\\s+Rows_sent: %NUMBER:rows_sent\\s+Rows_examined: %NUMBER:rows_examined\\nuse\\s(?<dbname>\\w+);\\nSET\\s+timestamp=%NUMBER:timestamp_mysql;\\n(?<query>[\\s\\S]*)" ]

        # 有ID无use
        match => [ "message", "^#\\s+User@Host:\\s+%USER:user\\[[^\\]]+\\]\\s+@\\s+(?:(?<clienthost>\\S*) )?\\[(?:%IP:clientip)?\\]\\s+Id:\\s+%NUMBER:id\\n# Query_time: %NUMBER:query_time\\s+Lock_time: %NUMBER:lock_time\\s+Rows_sent: %NUMBER:rows_sent\\s+Rows_examined: %NUMBER:rows_examined\\nSET\\s+timestamp=%NUMBER:timestamp_mysql;\\n(?<query>[\\s\\S]*)" ]

        # 无ID有use
        match => [ "message", "^#\\s+User@Host:\\s+%USER:user\\[[^\\]]+\\]\\s+@\\s+(?:(?<clienthost>\\S*) )?\\[(?:%IP:clientip)?\\]\\n# Query_time: %NUMBER:query_time\\s+Lock_time: %NUMBER:lock_time\\s+Rows_sent: %NUMBER:rows_sent\\s+Rows_examined: %NUMBER:rows_examined\\nuse\\s(?<dbname>\\w+);\\nSET\\s+timestamp=%NUMBER:timestamp_mysql;\\n(?<query>[\\s\\S]*)" ]

        # 无ID无use
        match => [ "message", "^#\\s+User@Host:\\s+%USER:user\\[[^\\]]+\\]\\s+@\\s+(?:(?<clienthost>\\S*) )?\\[(?:%IP:clientip)?\\]\\n# Query_time: %NUMBER:query_time\\s+Lock_time: %NUMBER:lock_time\\s+Rows_sent: %NUMBER:rows_sent\\s+Rows_examined: %NUMBER:rows_examined\\nSET\\s+timestamp=%NUMBER:timestamp_mysql;\\n(?<query>[\\s\\S]*)" ]
    
    date 
            match => ["timestamp_mysql","UNIX"]
            target => "@timestamp"
    
    mutate 
            remove_field => ["@version","message","timestamp_mysql"]
    

output 
    elasticsearch 
        hosts  => ["xxx.xxx.xxx.xxx:9200""]
        index  => "%type-%+YYYY.MM.dd"
            
        

四、es+kibana

  读取Kibana 读取ES集群数据展示到web页面上
es中mysql慢日志索引模板


  "order": 0,
  "template": "mysqld-slow-*",
  "settings": 
    "index": 
      "refresh_interval": "5s"
    
  ,
  "mappings": 
    "mysqld-slow": 
      "numeric_detection": true,
      "properties": 
        "@timestamp": 
          "type": "date",
          "format": "strict_date_optional_time||epoch_millis"
        ,
        "@version": 
          "type": "string"
        ,
        "query_time": 
          "type": "double"
        ,
        "row_sent": 
          "type": "string"
        ,
        "rows_examined": 
          "type": "string"
        ,
        "clientip": 
          "type": "string"
        ,
        "clienthost": 
          "type": "string"
        ,
        "id": 
          "type": "integer"
        ,
        "lock_time": 
          "type": "string"
        ,
        "dbname": 
          "type": "keyword"
        ,
        "user": 
          "type": "keyword"
        ,
        "query": 
          "type": "string",
          "index": "not_analyzed"
        ,
        "tags": 
          "type": "string"
        ,
        "timestamp": 
          "type": "string"
        ,
        "type": 
          "type": "string"
        
      
    
  ,
  "aliases": 

以上是关于ELK采集MySQL慢日志实现的主要内容,如果未能解决你的问题,请参考以下文章

ELK 构建 MySQL 慢日志收集平台

ELK logstash 处理MySQL慢查询日志(初步)

ELK分析MySQL慢查询日志并生成图像

20180417ELK日志管理之filebeat收集分析mysql慢日志

ELK如何收集并用grok格式化MySQL的slow日志?

mysql慢查询