logstash之multiline插件,匹配多行日志

Posted 运维工匠实战(如果发现有错误请大家把正确的方法发送给我,方便

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash之multiline插件,匹配多行日志相关的知识,希望对你有一定的参考价值。

在外理日志时,除了访问日志外,还要处理运行时日志,该日志大都用程序写的,比如log4j。运行时日志跟访问日志最大的不同是,运行时日志是多行,也就是说,连续的多行才能表达一个意思。

在filter中,加入以下代码:

filter { 

  multiline {  }

}

 如果能按多行处理,那么把他们拆分到字段就很容易了。

字段属性:

对于multiline插件来说,有三个设置比较重要:negate , pattern 和 what

negate:类型是boolean默认为false

pattern:

必须设置,并且没有默认值,类型为string,要匹配下则表达式

what:

必须设置,并且没有默认值,可以为previous(之前的)或next

下面看看这个例子:

# cat logstash_multiline_shipper.conf 
input { 
    file { 
        path => "/apps/logstash/conf/test/c.out"
        type => "runtimelog"
        codec => multiline { 
            pattern => "^\["
            negate => true
            what => "previous"
         }
        start_position => "beginning"
        sincedb_path => "/apps/logstash/logs/sincedb-access"
        ignore_older =>0
     }
 }
output { 
    stdout{ 
        codec => rubydebug    
     }
 }

说明:区配以"["开头的行,如果不是,那肯定是属于前一行的

测试数据如下:

[16-04-12 03:40:01 DEBUG] model.MappingNode:- [/store/shopclass] matched over.

[16-04-12 03:40:02 DEBUG] impl.JdbcEntityInserter:- from product_category product_category

where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null

order by product_category.ORDERS asc

[16-04-12 03:40:03 DEBUG] model.MappingNode:- [/store/shopclass] matched over.

[16-04-12 03:40:04 DEBUG] model.MappingNode:- [/store/shopclass] matched over.

[16-04-12 03:40:05 DEBUG] impl.JdbcEntityInserter:- from product_category product_category

where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null

order by product_category.ORDERS desc

[16-04-12 03:40:06 DEBUG] impl.JdbcEntityInserter:- from product_category product_category

where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null

order by product_category.ORDERS asc

[16-04-12 03:40:07 DEBUG] model.MappingNode:- [/store/shopclass] matched over.

启动logstash:

# ./../bin/logstash -f logstash_multiline_shipper.conf 
Sending Logstashs logs to /apps/logstash/logs which is now configured via log4j2.properties
[2016-12-09T15:16:59,173][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2016-12-09T15:16:59,192][INFO ][logstash.pipeline        ] Pipeline main started
[2016-12-09T15:16:59,263][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}

加入测试数据到被监控的log后,查看输出:

# ./../bin/logstash -f logstash_multiline_shipper.conf 
Sending Logstashs logs to /apps/logstash/logs which is now configured via log4j2.properties
[2016-12-09T15:16:59,173][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2016-12-09T15:16:59,192][INFO ][logstash.pipeline        ] Pipeline main started
[2016-12-09T15:16:59,263][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9601}
{
          "path" => "/apps/logstash/conf/test/c.out",
    "@timestamp" => 2016-12-09T07:21:15.403Z,
      "@version" => "1",
          "host" => "ofs1",
       "message" => "# ./../bin/logstash -f logstash_multiline_shipper.conf \nSending Logstash‘s logs to /apps/logstash/logs which is now configured via log4j2.properties",
          "type" => "runtimelog",
          "tags" => [
        [0] "multiline"
    ]
}
{
          "path" => "/apps/logstash/conf/test/c.out",
    "@timestamp" => 2016-12-09T07:21:15.409Z,
      "@version" => "1",
          "host" => "ofs1",
       "message" => "[2016-12-09T15:16:59,173][INFO ][logstash.pipeline        ] Starting pipeline {\"id\"=>\"main\", \"pipeline.workers\"=>4, \"pipeline.batch.size\"=>125, \"pipeline.batch.delay\"=>5, \"pipeline.max_inflight\"=>500}",
          "type" => "runtimelog",
          "tags" => []
}
{
          "path" => "/apps/logstash/conf/test/c.out",
    "@timestamp" => 2016-12-09T07:21:15.410Z,
      "@version" => "1",
          "host" => "ofs1",
       "message" => "[2016-12-09T15:16:59,192][INFO ][logstash.pipeline        ] Pipeline main started",
          "type" => "runtimelog",
          "tags" => []
}

 

以上是关于logstash之multiline插件,匹配多行日志的主要内容,如果未能解决你的问题,请参考以下文章

logstash 安装插件multiline

logstash multiline 把多行文件处理为一个 event

使用logstash收集javanginx系统等常见日志

logstash multiline处理csv单元格多行数据的double quotes问题(exception=>#<CSV::MalformedCSVError: Unclosed quoted)

logstash multiline 过滤 mysql slowlog 和java log

logstash 中多行合并