ELK如何收集并用grok格式化MySQL的slow日志?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK如何收集并用grok格式化MySQL的slow日志?相关的知识,希望对你有一定的参考价值。

前言

需求说明

用ELK收集mysql的慢查询日志信息,并且使用grok插件将日志信息格式化为json格式。

部署安排

test101——10.0.0.101——部署MySQL、filebeat
test102——10.0.0.102——部署elasticsearch、kibana
test103——10.0.0.103——部署logstash

实验环境版本

系统版本:centos7.3
MySQL版本:

本次测试两个版本MySQL
1、yum安装的mariadb(Server version: 5.5.60-MariaDB MariaDB Server)
2、源码安装的MySQL5.7.20版本

ELK版本:6.5.3

操作步骤(MariaDB5.5.60为例)

1、安装mariadb,准备数据库环境

1.1 安装服务

[[email protected] ~]# yum -y install mariadb*     #这样安装默认是5.5.60版本  

1.2 修改配置文件,开启慢查询日志

在/etc/my.conf文件加入下面三行内容,开启慢查询日志:

slow_query_log
long_query_time = 2      #阙值设置成2秒,只要超过2秒的操作,就会写入slow-log文件
slow_query_log_file = "/var/lib/mysql/test101-slow.log"    #设置慢查询日志

重启MySQL

[[email protected] ~]# systemctl restart mysql 

1.3 创建测试库,导入数据

1)先创建一个大数据量的文件:

[[email protected] ~]# seq 1 19999999 > /tmp/big

2)登录数据库创建测试数据库:

[[email protected] ~]# mysql
MariaDB [(none)]> create database test101;     #创建测试库
MariaDB [(none)]> use test101;
MariaDB [test101]> create table table1(id int(10)not null)engine=innodb;    #创建测试表

3)将刚刚创建好的大数据文件/tmp/big导入到MySQL的测试数据库中,生成测试数据表:

MariaDB [test101]> show tables;
+----------------------+
| Tables_in_test101 |
+----------------------+
| table1                   |
+-----------------------+
1 row in set (0.00 sec)

MariaDB [test101]> load data local infile ‘/tmp/big‘ into table table1;   #注意:local不能少,少了local可能会报错“ERROR 13 (HY000): Can‘t get stat of ‘/tmp/big‘ (Errcode: 2)”
Query OK, 19999999 rows affected (1 min 47.64 sec)   
Records: 19999999  Deleted: 0  Skipped: 0  Warnings: 0

MariaDB [test101]> 

测试数据导入成功

1.4 生成慢查询日志

在数据库查询一条数据:

MariaDB [test101]> select * from table1 where id=258;
+-----+
| id  |
+-----+
| 258 |
+-----+
1 row in set (11.76 sec)

MariaDB [test101]> 

然后查看慢查询日志文件,已经生成了相应的日志。数据库准备完成:

[[email protected] mysql]# tailf /var/lib/mysql/test101-slow.log 

# Time: 181217 15:23:39
# [email protected]: root[root] @ localhost []
# Thread_id: 2  Schema: test101  QC_hit: No
# Query_time: 11.758867  Lock_time: 0.000106  Rows_sent: 1  Rows_examined: 19999999
SET timestamp=1545031419;
select * from table1 where id=258;

2、ELK工具部署

2.1 在test101安装好filebeat、test102安装elasticsearch/kibana

(安装和基础配置省略)

2.2 在test103服务器安装好logstash

(安装步骤和基础配置省略)

1)创建logstash采集日志文件:

[[email protected] conf.d]# cat /etc/logstash/conf.d/logstash-syslog.conf 
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => ["http://10.0.0.102:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
  stdout {
    codec => rubydebug
  }

}

2.3 启动服务查看日志采集

启动101服务器的filebeat,102服务器的elasticsearch、前台启动103服务器的logstash,前台查看日志采集

1)在test101服务器的MySQL运行查询语句(注意,运行查询语句之前要保证logstash前台启动已经完成):

MariaDB [test101]> select * from table1 where id=358;

+-----+
| id  |
+-----+
| 358 |
+-----+
1 row in set (13.47 sec)

MariaDB [test101]> 

2)查看test101服务器上慢查询日志已经生成:

[[email protected] mysql]# tailf test101-slow.log 

# Time: 181218  8:58:30
# [email protected]: root[root] @ localhost []
# Thread_id: 3  Schema: test101  QC_hit: No
# Query_time: 13.405630  Lock_time: 0.000271  Rows_sent: 1  Rows_examined: 19999999
SET timestamp=1545094710;
select * from table1 where id=358;

3)再查看logstash,已经采集到相关信息(但是上面的6句日志信息,是被分成了6段messages采集的):

[[email protected] logstash]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-syslog.conf
.......#省略若干行启动信息
[INFO ] 2018-12-18 08:51:48.669 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}  
#就是出现这个提示的时候才运行上面的mysql查询语句

{
       "message" => "# Time: 181218  8:58:30",
    "@timestamp" => 2018-12-18T00:58:33.396Z,
    "prospector" => {
        "type" => "log"
    },
         "input" => {
        "type" => "log"
    },
      "@version" => "1",
          "host" => {
                   "os" => {
            "codename" => "Core",
            "platform" => "centos",
             "version" => "7 (Core)",
              "family" => "redhat"
        },
                 "name" => "test101",
        "containerized" => true,
         "architecture" => "x86_64",
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
    },
          "beat" => {
            "name" => "test101",
        "hostname" => "test101",
         "version" => "6.5.3"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "offset" => 252,
        "source" => "/var/lib/mysql/test101-slow.log"
}
{
       "message" => "# [email protected]: root[root] @ localhost []",
    "@timestamp" => 2018-12-18T00:58:33.398Z,
    "prospector" => {
        "type" => "log"
    },
         "input" => {
        "type" => "log"
    },
      "@version" => "1",
          "host" => {
        "containerized" => true,
                 "name" => "test101",
         "architecture" => "x86_64",
                   "os" => {
            "codename" => "Core",
            "platform" => "centos",
             "version" => "7 (Core)",
              "family" => "redhat"
        },
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
    },
          "beat" => {
            "name" => "test101",
         "version" => "6.5.3",
        "hostname" => "test101"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "source" => "/var/lib/mysql/test101-slow.log",
        "offset" => 276
}
{
       "message" => "# Thread_id: 3  Schema: test101  QC_hit: No",
    "@timestamp" => 2018-12-18T00:58:33.398Z,
    "prospector" => {
        "type" => "log"
    },
         "input" => {
        "type" => "log"
    },
      "@version" => "1",
          "host" => {
                   "os" => {
              "family" => "redhat",
            "platform" => "centos",
             "version" => "7 (Core)",
            "codename" => "Core"
        },
                 "name" => "test101",
        "containerized" => true,
         "architecture" => "x86_64",
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
    },
          "beat" => {
            "name" => "test101",
        "hostname" => "test101",
         "version" => "6.5.3"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "source" => "/var/lib/mysql/test101-slow.log",
        "offset" => 315
}
{
       "message" => "# Query_time: 13.405630  Lock_time: 0.000271  Rows_sent: 1  Rows_examined: 19999999",
    "@timestamp" => 2018-12-18T00:58:33.398Z,
    "prospector" => {
        "type" => "log"
    },
         "input" => {
        "type" => "log"
    },
      "@version" => "1",
          "host" => {
                 "name" => "test101",
         "architecture" => "x86_64",
                   "os" => {
            "codename" => "Core",
            "platform" => "centos",
             "version" => "7 (Core)",
              "family" => "redhat"
        },
        "containerized" => true,
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
    },
          "beat" => {
            "name" => "test101",
        "hostname" => "test101",
         "version" => "6.5.3"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "source" => "/var/lib/mysql/test101-slow.log",
        "offset" => 359
}
{
       "message" => "SET timestamp=1545094710;",
    "@timestamp" => 2018-12-18T00:58:33.398Z,
    "prospector" => {
        "type" => "log"
    },
         "input" => {
        "type" => "log"
    },
      "@version" => "1",
          "host" => {
        "containerized" => true,
                 "name" => "test101",
         "architecture" => "x86_64",
                   "os" => {
            "codename" => "Core",
            "platform" => "centos",
             "version" => "7 (Core)",
              "family" => "redhat"
        },
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
    },
          "beat" => {
            "name" => "test101",
        "hostname" => "test101",
         "version" => "6.5.3"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "source" => "/var/lib/mysql/test101-slow.log",
        "offset" => 443
}
{
       "message" => "select * from table1 where id=358;",
    "@timestamp" => 2018-12-18T00:58:33.398Z,
    "prospector" => {
        "type" => "log"
    },
         "input" => {
        "type" => "log"
    },
      "@version" => "1",
          "host" => {
         "architecture" => "x86_64",
                   "os" => {
            "codename" => "Core",
            "platform" => "centos",
             "version" => "7 (Core)",
              "family" => "redhat"
        },
                 "name" => "test101",
        "containerized" => true,
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
    },
          "beat" => {
            "name" => "test101",
         "version" => "6.5.3",
        "hostname" => "test101"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "source" => "/var/lib/mysql/test101-slow.log",
        "offset" => 469
}

3、日志采集优化

因为慢日志里面,一条信息被分成了6段,不便于日志的分析,因此需要将上面采集到的6个messages优化合并成一个

3.1 修改filebeat配置,将上面的6段messages合并成一句日志信息

修改filebeat配置文件,在日志路径后面加入下面三行,然后重启filebeat:

  paths:
    - /var/lib/mysql/test101-slow.log
    #- c:programdataelasticsearchlogs*
    #加入下面三行
  multiline.pattern: "^# [email protected]:"
  multiline.negate: true
  multiline.match: after

说明:

multiline.pattern:正则表达式,去匹配指定的一行,这里去匹配的以“# [email protected]:”开头的那一行;
multiline.negate:true 或 false;默认是false,就是将multiline.pattern匹配到的那一行合并到上一行;如果配置是true,就是将除了multiline.pattern匹的那一行的其他所有行合并到其上一行;
multiline.match:after 或 before,就是指定将要合并到上一行的内容,合并到上一行的末尾或开头;

3.2 检查修改后的采集日志格式

1)再在MySQL执行一次查询语句:

MariaDB [test101]> select * from table1 where id=368;
+-----+
| id  |
+-----+
| 368 |
+-----+
1 row in set (12.54 sec)

MariaDB [test101]> 

2)查看看查询日志,已经生成日志:

[[email protected] mysql]# tailf test101-slow.log 

# Time: 181218  9:17:42
# [email protected]: root[root] @ localhost []
# Thread_id: 3  Schema: test101  QC_hit: No
# Query_time: 12.541603  Lock_time: 0.000493  Rows_sent: 1  Rows_examined: 19999999
SET timestamp=1545095862;
select * from table1 where id=368;

3)查看logstash的实时输出,采集到的日志就变成了一行:

[[email protected] logstash]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-syslog.conf
.......#省略若干行启动信息
{
       "message" => # [email protected]: root[root] @ localhost []
# Thread_id: 3  Schema: test101  QC_hit: No
# Query_time: 12.541603  Lock_time: 0.000493  Rows_sent: 1  Rows_examined: 19999999
SET timestamp=1545095862;
select * from table1 where id=368;",   
                   #这一行就是在slow-log中的信息,已经被合并成了一行,慢日志的“"# Time: 181218  9:17:42” 单独成行,在前面没有贴出来
    "prospector" => {
        "type" => "log"
    },
         "input" => {
        "type" => "log"
    },
          "host" => {
         "architecture" => "x86_64",
                   "os" => {
            "codename" => "Core",
            "platform" => "centos",
             "version" => "7 (Core)",
              "family" => "redhat"
        },
        "containerized" => true,
                 "name" => "test101",
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc"
    },
           "log" => {
        "flags" => [
            [0] "multiline"
        ]
    },
    "@timestamp" => 2018-12-18T01:17:45.336Z,
      "@version" => "1",
          "beat" => {
            "name" => "test101",
        "hostname" => "test101",
         "version" => "6.5.3"
    },
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
        "source" => "/var/lib/mysql/test101-slow.log",
        "offset" => 504
}

4、logstash配置grok插件,格式化分析日志

在实现了上一步骤,即将日志的messages合并之后,就需要采用grok插件将messages格式化成json格式了。

4.1 编辑logstash-syslog.conf文件配置grok插件

编辑logstash-syslog.conf文件,在input和output中间,加入整个filter模块,配置如下:

[[email protected] conf.d]# cat  logstash-syslog.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  beats {
    port => 5044
  }
}

filter {

   #这一步格式化messages为json格式
    grok {
       match => [ "message", "(?m)^# [email protected]: %{USER:query_user}[[^]]+] @ (?:(?<query_host>S*) )?[(?:%{IP:query_ip})?]s# Thread_id:s+%{NUMBER:thread_id:int}s+Schema: %{USER:schema}s+QC_hit: %{WORD:QC_hit}s*# Query_time: %{NUMBER:query_time:float}s+Lock_time: %{NUMBER:lock_time:float}s+Rows_sent: %{NUMBER:rows_sent:int}s+Rows_examined: %{NUMBER:rows_examined:int}s*(?:use %{DATA:database};s*)?SET timestamp=%{NUMBER:timestamp};s*(?<query>(?<action>w+)s+.*)" ]

       }
#这一步是将日志中的时间那一行(如:# Time: 181218  9:17:42)加上一个“drop”的tag
    grok {
        match => { "message" => "# Time: " }
        add_tag => [ "drop" ]
        tag_on_failure => []
    }
#删除标签中含有drop的行。也就是要删除慢日志里面的“# Time: 181218  9:17:42”这样的内容
    if "drop" in [tags] {
        drop {}
    }

#   时间转换
    date {
        match => ["mysql.slowlog.timestamp", "UNIX", "YYYY-MM-dd HH:mm:ss"]
        target => "@timestamp"
        timezone => "Asia/Shanghai"
    }

    ruby {
        code => "event.set(‘[@metadata][today]‘, Time.at(event.get(‘@timestamp‘).to_i).localtime.strftime(‘%Y.%m.%d‘))"
    }

#删除字段message
    mutate {
        remove_field => [ "message" ]
    }
}

output {
  elasticsearch {
    hosts => ["http://10.0.0.102:9200"]
    #index =>  "%{tags[0]}" 
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
  }
  stdout {
    codec => rubydebug
  }

}

4.2 检查格式化后的日志采集

1)清除elasticsearch的旧索引,保证索引干净:

[[email protected] filebeat]# curl 10.0.0.102:9200/_cat/indices
green open .kibana_1 udOUvbprSnKWUJISwD0r_g 1 0 8 0 74.4kb 74.4kb
[[email protected] filebeat]# 

2)重启filebeat,重新启动logstash前台输出,在mariadb执行查询语句:

MariaDB [test101]> select * from table1 where id=588;
+-----+
| id  |
+-----+
| 588 |
+-----+
1 row in set (13.00 sec)

MariaDB [test101]> 

3)查看慢查询日志已经生成:

[[email protected] mysql]# tailf test101-slow.log 

# Time: 181218 14:39:38
# [email protected]: root[root] @ localhost []
# Thread_id: 4  Schema: test101  QC_hit: No
# Query_time: 12.999487  Lock_time: 0.000303  Rows_sent: 1  Rows_examined: 19999999
SET timestamp=1545115178;
select * from table1 where id=588;

4)查看logstash的实时采集输出,已经是json格式了:

[INFO ] 2018-12-18 14:25:02.802 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}

{
           "QC_hit" => "No",
        "rows_sent" => 1,
           "action" => "select",
            "input" => {
        "type" => "log"
    },
           "offset" => 1032,
         "@version" => "1",
       "query_host" => "localhost",
             "beat" => {
            "name" => "test101",
        "hostname" => "test101",
         "version" => "6.5.3"
    },
            "query" => "select * from table1 where id=428;",
       "@timestamp" => 2018-12-18T06:25:34.017Z,
       "prospector" => {
        "type" => "log"
    },
        "lock_time" => 0.000775,
       "query_user" => "root",
              "log" => {
        "flags" => [
            [0] "multiline"
        ]
    },
           "schema" => "test101",
        "timestamp" => "1545114330",
        "thread_id" => 4,
       "query_time" => 14.155133,
             "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
             "host" => {
                 "name" => "test101",
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc",
        "containerized" => true,
                   "os" => {
            "codename" => "Core",
              "family" => "redhat",
             "version" => "7 (Core)",
            "platform" => "centos"
        },
         "architecture" => "x86_64"
    },
    "rows_examined" => 19999999,
           "source" => "/var/lib/mysql/test101-slow.log"
{
           "QC_hit" => "No",
        "rows_sent" => 1,
           "action" => "select",
            "input" => {
        "type" => "log"
    },
           "offset" => 1284,
         "@version" => "1",
       "query_host" => "localhost",
             "beat" => {
            "name" => "test101",
        "hostname" => "test101",
         "version" => "6.5.3"
    },
            "query" => "select * from table1 where id=588;",
       "@timestamp" => 2018-12-18T06:39:44.082Z,
       "prospector" => {
        "type" => "log"
    },
        "lock_time" => 0.000303,
       "query_user" => "root",
              "log" => {
        "flags" => [
            [0] "multiline"
        ]
    },
           "schema" => "test101",
        "timestamp" => "1545115178",
        "thread_id" => 4,
       "query_time" => 12.999487,
             "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
             "host" => {
                 "name" => "test101",
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc",
        "containerized" => true,
                   "os" => {
            "codename" => "Core",
              "family" => "redhat",
             "version" => "7 (Core)",
            "platform" => "centos"
        },
         "architecture" => "x86_64"
    },
    "rows_examined" => 19999999,
           "source" => "/var/lib/mysql/test101-slow.log"
}

再打开kibana界面,建立索引,然后查看kibana的界面,日志OK:
技术分享图片

技术分享图片

其他版本MySQL慢日志及grok语句

1、MySQL5.7.20版本

慢查询日志语句:
mysql5.7.20版本慢日志:

# Time: 2018-12-18T08:43:24.828892Z
# [email protected]: root[root] @ localhost []  Id:     7
# Query_time: 15.819314  Lock_time: 0.000174 Rows_sent: 1  Rows_examined: 19999999
SET timestamp=1545122604;
select * from table1 where id=258;

grok语法:

match => [ "message", "(?m)^# [email protected]: %{USER:query_user}[[^]]+] @ (?:(?<query_host>S*) )?[(?:%{IP:query_ip})?]s+Id:s+%{NUMBER:id:int}s# Query_time: %{NUMBER:query_time:float}s+Lock_time: %{NUMBER:lock_time:float}s+Rows_sent: %{NUMBER:rows_sent:int}s+Rows_examined: %{NUMBER:rows_examined:int}s*(?:use %{DATA:database};s*)?SET timestamp=%{NUMBER:timestamp};s*(?<query>(?<action>w+)s+.*)" ]

logstash采集查询结果:

{
    "rows_examined" => 19999999,
            "input" => {
        "type" => "log"
    },
             "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
       "query_user" => "root",
       "query_time" => 15.461892,
       "query_host" => "localhost",
         "@version" => "1",
            "query" => "select * from table1 where id=258;",
             "host" => {
        "containerized" => true,
                   "os" => {
            "platform" => "centos",
             "version" => "7 (Core)",
              "family" => "redhat",
            "codename" => "Core"
        },
                   "id" => "9fcb0c7f28e449aabad922d1ba1f0afc",
                 "name" => "test101",
         "architecture" => "x86_64"
    },
           "source" => "/home/data/test101-mysql5.7.20-slow.log",
           "action" => "select",
        "lock_time" => 0.000181,
       "prospector" => {
        "type" => "log"
    },
           "offset" => 1416,
              "log" => {
        "flags" => [
            [0] "multiline"
        ]
    },
        "rows_sent" => 1,
        "timestamp" => "1545122934",
             "beat" => {
        "hostname" => "test101",
         "version" => "6.5.3",
            "name" => "test101"
    },
       "@timestamp" => 2018-12-18T08:48:58.531Z,
               "id" => 7
}

以上是关于ELK如何收集并用grok格式化MySQL的slow日志?的主要内容,如果未能解决你的问题,请参考以下文章

ELK-logstash grok自定义正则

ELK日志处理之使用Grok解析日志

使用logstash收集javanginx系统等常见日志

elk笔记4--grok正则解析

ELK 构建 MySQL 慢日志收集平台

ELK Logstash Grok入门指南