logstash收集MySQL慢查询日志

Posted 运维小兵

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash收集MySQL慢查询日志相关的知识,希望对你有一定的参考价值。

#此处以收集mysql慢查询日志为准,根据文件名不同添加不同的字段值
input {
file { path => "/data/order-slave-slow.log" type => "mysql-slow-log" start_position => "beginning" codec => multiline { pattern => "^# [email protected]:" negate => true what => previous } } file { path => "/data/other-slave-slow.log" type => "mysql-slow-log" start_position => "beginning" codec => multiline { pattern => "^# [email protected]:" negate => true what => previous } } file { path => "/data/order-master-slow.log" type => "mysql-slow-log" start_position => "beginning" codec => multiline { pattern => "^# [email protected]:" negate => true what => previous } } file { path => "/data/other-master-slow.log" type => "mysql-slow-log" start_position => "beginning" codec => multiline { pattern => "^# [email protected]:" negate => true what => previous } } } filter { if [path] =~ "order-slave-slow" { //根据文件内容不同,增加不同的字段 grok { match => { "message" => "(?m)^#\[email protected]:\s+%{USER:user}\[[^\]]+\]\[email protected]\s+(?:(?<clientip>\S*) )?\[(?:%{IPV4:clientip})?\]\s+Id:\s+%{NUMBER:row_id:int}\n#\s+Query_time:\s+%{NUMBER:Query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:Row_sent:int}\s+Rows_examined:\s+%{NUMBER:Rows_examined:int}\n\s*(?:use %{DATA:database};\s*\n)?SET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<sql>(?<action>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$" } remove_field => [ "message" ] } mutate { replace => [ "host" , "%{host}" ] add_field => [ "nsCode", "%{nsCode}" ] add_field => [ "envCode", "%{envCode}" ] add_field => [ "mysqlType", "%{mysqlType}" ] gsub => [ "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "" ] } } if [path] =~ "other-slave-slow" { grok { match => { "message" => "(?m)^#\[email protected]:\s+%{USER:user}\[[^\]]+\]\[email protected]\s+(?:(?<clientip>\S*) )?\[(?:%{IPV4:clientip})?\]\s+Id:\s+%{NUMBER:row_id:int}\n#\s+Query_time:\s+%{NUMBER:Query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:Row_sent:int}\s+Rows_examined:\s+%{NUMBER:Rows_examined:int}\n\s*(?:use %{DATA:database};\s*\n)?SET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<sql>(?<action>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$" } remove_field => [ "message" ] } mutate { replace => [ "host" , "%{host}" ] add_field => [ "nsCode", "%{nsCode}" ] add_field => [ "envCode", "%{envCode}" ] add_field => [ "mysqlType", "%{mysqlType}" ] gsub => [ "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "" ] } } if [path] =~ "order-master-slow" { grok { match => { "message" => "(?m)^#\[email protected]:\s+%{USER:user}\[[^\]]+\]\[email protected]\s+(?:(?<clientip>\S*) )?\[(?:%{IPV4:clientip})?\]\s+Id:\s+%{NUMBER:row_id:int}\n#\s+Query_time:\s+%{NUMBER:Query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:Row_sent:int}\s+Rows_examined:\s+%{NUMBER:Rows_examined:int}\n\s*(?:use %{DATA:database};\s*\n)?SET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<sql>(?<action>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$" } remove_field => [ "message" ] } mutate { replace => [ "host" , "%{host}" ] add_field => [ "nsCode", "%{nsCode}" ] add_field => [ "envCode", "%{envCode}" ] add_field => [ "mysqlType", "%{mysqlType}" ] gsub => [ "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "" ] } } if [path] =~ "other-master-slow" { grok { match => { "message" => "(?m)^#\[email protected]:\s+%{USER:user}\[[^\]]+\]\[email protected]\s+(?:(?<clientip>\S*) )?\[(?:%{IPV4:clientip})?\]\s+Id:\s+%{NUMBER:row_id:int}\n#\s+Query_time:\s+%{NUMBER:Query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:Row_sent:int}\s+Rows_examined:\s+%{NUMBER:Rows_examined:int}\n\s*(?:use %{DATA:database};\s*\n)?SET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<sql>(?<action>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$" } remove_field => [ "message" ] } mutate {
       #替换原有host字段的值 replace
=> [ "host" , "%{host}" ]
       #新增三个字段 add_field
=> [ "nsCode", "%{nsCode}" ] add_field => [ "envCode", "%{envCode}" ] add_field => [ "mysqlType", "%{mysqlType}" ]
       //sql字段的值进行切分,"
\n# Time: \d+\s+\d+:\d+:\d+"匹配到的内容替换为空。
       gsub => [ "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "" ]
    }
  }
}
//此处输出至redis服务器中
output {
    if [type] == "mysql-slow-log" {
        redis {
            host => "%{ES_SEVER}" //此处指向redis服务器地址
            data_type => "list"
            key => "mysql-slow-log"
        }
    }
}

 









以上是关于logstash收集MySQL慢查询日志的主要内容,如果未能解决你的问题,请参考以下文章

利用filebeat推送mysql慢查询日志

Logstash分析MySQL慢查询日志

LOGSTASH+ELASTICSEARCH 处理 MYSQL 慢查询日志

ELK logstash 处理MySQL慢查询日志(初步)

logstash解析mysql慢日志

ELK采集MySQL慢日志实现