LOGSTASH+ELASTICSEARCH 处理 MYSQL 慢查询日志
Posted SOHU-DBA
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LOGSTASH+ELASTICSEARCH 处理 MYSQL 慢查询日志相关的知识,希望对你有一定的参考价值。
LOGSTASH+ELASTICSEARCH 处理 mysql 慢查询日志
遇到一个需求, 需要查询某些业务的慢查询日志. 结果 DBA 平台那边提供的慢查询日志不能解决实际的业务场景 (上报的字段补全), 无奈, 自己挽起袖子上
参考了 , 不过自己根据需求做了较多的变更
开始吧
1. 找到日志的位置
先确认是否开启了, 然后找到日志文件的位置
> show variables like '%slow%'; +---------------------+-------------------------------------+ | Variable_name | Value | +---------------------+-------------------------------------+ | log_slow_queries | ON | | slow_launch_time | 2 | | slow_query_log | ON | | slow_query_log_file | /data/mysqllog/20000/slow-query.log | +---------------------+-------------------------------------+
2. 慢查询日志
格式基本是如下, 当然, 格式如果有差异, 需要根据具体格式进行小的修改
# Time: 160524 5:12:29 # User@Host: user_a[xxxx] @ [10.166.140.109] # Query_time: 1.711086 Lock_time: 0.000040 Rows_sent: 385489 Rows_examined: 385489 use dbname; SET timestamp=1464037949; SELECT 1 from dbname;
3. 使用 logstash 采集
采集, 无非是用multiline
进行多行解析
但是, 需要处理的
第一个是, 去除掉没用的信息
第二个, 慢查询 sql, 是会反复出现的, 所以, 执行次数成了一个很重要的指标. 我们要做的, 就是降噪
(将参数去掉, 涉及带引号的内容 + 数字), 将参数类信息过滤掉, 留下核心的 sql, 然后计算出一个 hash, 这样就可以在查询, 根据这个字段进行聚合. 这里用到了 以及
# calculate unique hash mutate { add_field => {"sql_for_hash" => "%{sql}"} } mutate { gsub => [ "sql_for_hash", "'.+?'", "", "sql_for_hash", "-?\d*\.{0,1}\d+", "" ] } checksum { algorithm => "md5" keys => ["sql_for_hash"] }
最后算出来的 md5, 放入了logstash_checksum
完整的 logstash 配置文件 (具体使用可能需要根据自身日志格式做些小调整) 注意, 里面的 patternALLWORD [\s\S]*
input { file { path => ["/data/mysqllog/20000/slow-query.log"] sincedb_path => "/data/LogNew/logstash/sincedb/mysql.sincedb" type => "mysql-slow-log" add_field => ["env", "PRODUCT"] codec => multiline { pattern => "^# User@Host:" negate => true what => previous } }}filter { grok { # User@Host: logstash[logstash] @ localhost [127.0.0.1] # User@Host: logstash[logstash] @ [127.0.0.1] match => [ "message", "^# User@Host: %{ALLWORD:user}\[%{ALLWAORD}\] @ %{ALLWORD:dbhost}? \[%{IP:ip}\]" ] } grok { # Query_time: 102.413328 Lock_time: 0.000167 Rows_sent: 0 Rows_examined: 1970 match => [ "message", "^# Query_time: %{NUMBER:duration:float}%{SPACE}Lock_time: %{NUMBER:lock_wait:float}%{SPACE}Rows_sent: %{NUMBER:results:int}%{SPACE}Rows_examined:%{SPACE}%{NUMBER:scanned:int}%{ALLWORD:sql}"] } // remove useless data mutate { gsub => [ "sql", "\nSET timestamp=\d+?;\n", "", "sql", "\nuse [a-zA-Z0-9\-\_]+?;", "", "sql", "\n# Time: \d+\s+\d+:\d+:\d+", "", "sql", "\n/usr/local/mysql/bin/mysqld.+$", "", "sql", "\nTcp port:.+$", "", "sql", "\nTime .+$", "" ] } # Capture the time the query happened grok { match => [ "message", "^SET timestamp=%{NUMBER:timestamp};" ] } date { match => [ "timestamp", "UNIX" ] } # calculate unique hash mutate { add_field => {"sql_for_hash" => "%{sql}"} } mutate { gsub => [ "sql_for_hash", "'.+?'", "", "sql_for_hash", "-?\d*\.{0,1}\d+", "" ] } checksum { algorithm => "md5" keys => ["sql_for_hash"] } # Drop the captured timestamp field since it has been moved to the time of the event mutate { # TODO: remove the message field remove_field => ["timestamp", "message", "sql_for_hash"] }}output { #stdout{ # codec => rubydebug #} #if ("_grokparsefailure" not in [tags]) { # stdout{ # codec => rubydebug # } #} if ("_grokparsefailure" not in [tags]) { elasticsearch { hosts => ["192.168.1.1:9200"] index => "logstash-slowlog" } }}
采集进去的内容
{ "@timestamp" => "2016-05-23T21:12:59.000Z", "@version" => "1", "tags" => [ [0] "multiline" ], "path" => "/Users/ken/tx/elk/logstash/data/slow_sql.log", "host" => "Luna-mac-2.local", "type" => "mysql-slow", "env" => "PRODUCT", "user" => "dba_bak_all_sel", "ip" => "10.166.140.109", "duration" => 28.812601, "lock_wait" => 0.000132, "results" => 749414, "scanned" => 749414, "sql" => "SELECT /*!40001 SQL_NO_CACHE */ * FROM `xxxxx`;", "logstash_checksum" => "3e3ccb89ee792de882a57e2bef6c5371" }
4. 写查询
查询, 我们需要按logstash_checksum
进行聚合, 然后按照次数由多到少降序展示, 同时, 每个logstash_checksum
需要有一条具体的 sql 进行展示
通过 es 的 可以完美地解决这个查询需求
查询的 query
body = { "from": 0, "size": 0, "query": { "filtered": { "query": { "match": { "user": "test" } }, "filter": { "range": { "@timestamp": { "gte": "now-1d", "lte": "now" } } } } }, "aggs": { "top_errors": { "terms": { "field": "logstash_checksum", "size": 20 }, "aggs": { "top_error_hits": { "top_hits": { "sort": [ { "@timestamp":{ "order": "desc" } } ], "_source": { "include": [ "user" , "sql", "logstash_checksum", "@timestamp", "duration", "lock_wait", "results", "scanned" ] }, "size" : 1 } } } } } }
跟这个写法相关的几个参考链接: /
5. 渲染页面
python 的后台, 使用sqlparse
包, 将 sql 进行格式化 (换行 / 缩进 / 大小写), 再往前端传.
>>> sql = 'select * from foo where id in (select id from bar);' >>> print sqlparse.format(sql, reindent=True, keyword_case='upper') SELECT * FROM foo WHERE id IN (SELECT id FROM bar);
然后在页面上, 使用 js 进行语法高亮
版权声明:自由转载 - 非商用 - 非衍生 - 保持署名 |
如果你觉得我的文章或项目对你有所帮助, 可小额捐赠 ¥0.99/¥2.99/¥4.99/¥9.9 用于站点维护:)
http://www.wklken.me/posts/2016/05/24/elk-mysql-slolog.html
sohu-dba
以上是关于LOGSTASH+ELASTICSEARCH 处理 MYSQL 慢查询日志的主要内容,如果未能解决你的问题,请参考以下文章
处理用千牛导出淘宝数据,供Logstash到Elasticsearch使用。(NodeJS)
企业日志分析ELK(Logstash+Elasticsearch+Kibana)介绍及搭建
Logstash+Kibana+多ElasticSearch集群部署