分布式日志收集之Logstash 笔记
Posted 我是攻城师
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式日志收集之Logstash 笔记相关的知识,希望对你有一定的参考价值。
Java代码
INFO - 2015-11-03 06:10:53.106; [ company] org.apache.solr.core.SolrCore; [company] webapp=/solr path=/select params={mm=100%25&sort=score+desc,regCapital+desc,foundDate+asc,cpyNamePy+asc&start=0&q=+cpyName:兰州顶津食品有限公司++OR+sname:"兰州顶津食品有限公司"^10+OR+oldName:兰州顶津食品有限公司
+&wt=javabin&fq=&version=2&rows=10&defType=edismax} hits=0 status=0 QTime=2
INFO - 2015-11-03 06:10:53.106; [ company] org.apache.solr.core.SolrCore; [company] webapp=/solr path=/select params={mm=100%25&sort=score+desc,regCapital+desc,foundDate+asc,cpyNamePy+asc&start=0&q=+cpyName:兰州顶津食品有限公司++OR+sname:"兰州顶津食品有限公司"^10+OR+oldName:兰州顶津食品有限公司 +&wt=javabin&fq=&version=2&rows=10&defType=edismax} hits=0 status=0 QTime=2
使用gork正则语法+内置正则,能够提取出,log级别,UTC时间,查询关键词等等内容
Java代码
log级别:
%{LOGLEVEL:loglevel}
查询时间:
%{TIMESTAMP_ISO8601:time};
查询关键词 :
cpyName:(?<kw>.*)++
命中数量 :
hits=(?<hits>d+)
查询时间
QTime=(?<qtime>d+)
查询偏移开始:
start=(?<start>d+)
返回数量:
rows=(?<rows>d+)
log级别: %{LOGLEVEL:loglevel} 查询时间: %{TIMESTAMP_ISO8601:time}; 查询关键词 : cpyName:(?<kw>.*)++ 命中数量 : hits=(?<hits>d+) 查询时间 QTime=(?<qtime>d+) 查询偏移开始: start=(?<start>d+) 返回数量: rows=(?<rows>d+)
案例(二)使用filter-date插件提取日志文件里面的时间,覆盖logstash自己默认生成log时的时间
官网介绍:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html
这个案例也比较常见,因为我们需要的时间,肯定是log日志里面记录的时间,而不是logstash接入时的那个时间,保证时间正确,才能在
Kibana里面正确的展示,关于UTC时间的+8的问题,这里就不说了,搜索时自己可以控制搜到正确的内容即可
内容如下截图,写了一个固定的时间:
结果如下图,已经被转换成UTC时间
案例(三)使用ruby内嵌代码,将一个yyyy-MM-dd HH:mm:ss格式的日期,转换成long时间戳
接着案例二的代码,再其filter里面再加入如下一段代码:
运行后,再看结果:
写个程序验证是否正确,发现没有问题:
如果会点JRuby或者Ruby语法,来使用Logstash则可以做更多的自定义的处理任务
案例(四)使用codec+multiline来处理跨行的日志
什么场景下,需要使用multiline插件呢? 最常见的就是我们的log4j里面的记录的java程序发生异常时,经常
会抛出一大堆异常,如下:
注意这么多行日志,从业务角度来讲,它是一行的,如果默认我们不做任何处理,logstash就会把它解析成多个事件,这样以来
基本上偏离了我们预期的设想,那么该如何处理呢?
方法(1):
在input阶段的编码过程中,加入正则判断:
方法(2):
还是在input阶段,但是使用的触发模式,直到遇到下一行日志前,把这一行与下一行之间的所有内容都当成是一行,这种做法比较简单,优点时在性能与准确度上可能会比上一个方法好,但是缺点是,必须有下一行日志产生,当前的这一行日志,才能被收集完毕 !
参考文章:http://blog.sematext.com/2015/05/26/handling-stack-traces-with-logstash/
案例(五)使用mutate+gsub来去除一些字段里面的换行符
Java代码
mutate {
gsub => [ "message", " ", "" ]
}
mutate { gsub => [ "message", " ", "" ] }
gork过滤例子:
Java代码
input { file { path => "/var/log/http.log" } }
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
以上是关于分布式日志收集之Logstash 笔记的主要内容,如果未能解决你的问题,请参考以下文章
ELK 之数据收集传输过滤 Filebeat+Logstash 部署