logstash的浅析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了logstash的浅析相关的知识,希望对你有一定的参考价值。

一、logstash浅析

logstash就是一个过滤器,收集器。他是一个重量级的日志收集工具。使用logstash会消耗大量的服务器资源,所以通常需求高一点的配置。

过滤器:它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。

收集器:它可以采集指定地址的日志,并传输到elasticsearch、redis等目的地。

二、logstash的处理展示:

 技术分享图片

三、插件

  1. input(输入插件):日志来源。

  a:读取指定地址的日志

    input { 

       file {

          path => "/var/log/messages"

          type => "system"

           start_position => "beginning"

        }

     }

   b:接收filebeat的日志   

      input {

  beats {

port => 5044

}

      }

   c:读取redis中的数据     

      input {

          redis {

           batch_count => 1 

         # EVAL命令返回的事件数目,设置为5表示一次请求返回5条日志信息

          data_type => "list" 

           # logstash redis插件工作方式

           key => "logstash-test" 

           # 监听的键值

            host => "127.0.0.1" 

            # redis端口号

            password => "123456" 

            # 如果有安全认证,此项为认证密码

            db => 0 

            # 如果应用使用了不同的数据库,此为redis数据库的编号,默认为0。

            threads => 1 

           # 启用线程数量

         }

       }

   d:读取syslog、rsyslog的数据

     input { 

        syslog { 

            port => "514"

            }

          }

   e:读取网络数据tcp (旧数据的处理)

     input { 

         tcp {

            port => 8888

            mode => "server" 

           ssl_enable => false 

             }

         }

    通常与nc命令配合使用

     # nc 127.0.0.1 8888 < olddata

 f:标准输入stdin   

      input {

         stdin {

          add_field => {"key" => "value"}

         codec => "plain"

          tags => ["add"]

         type => "std"

          }

         }

    #  bin/logstash  -f stdin.conf     后面输入字段,就会有输出(不常用)  

 2.filter 

  我的理解filter模块就是对日志的一个处理过程。简单介绍我使用过的几个插件。

   a:grok 插件  :拆分日志,筛选需要的值,并进行排序。配合这个字段 remove_field => ["message"]  可以删除不需要的值。nginx日志为例:    

    grok {

match => { "message" => "\[%{TIMESTAMP_ISO8601:time_iso8601}\] \[%{NUMBER:pid}\] \[%{IPORHOST:remote_addr}\] \[%{IPORHOST:http_host}(:%{NUMBER:http_host_port})?\] \[%{IPORHOST:upstream_addr}(:%{NUMBER:upstream_port})?\] \[%{WORD:verb} %{URIPATH:request_uri}(?:%{URIPARAM:request_parameter})? HTTP/%{NUMBER:httpversion}\] \[%{NUMBER:status}\] \[%{BASE10NUM:upstream_response_time} s\] \[%{BASE10NUM:request_time} s\] \[(?:%{NUMBER:bytes_sent}|-) bytes\] \[(?:%{NUMBER:body_bytes_sent}|-) bytes\] \[%{GREEDYDATA:http_user_agent}\]" }

remove_field => ["message"]    #一个不想要的字段

        }

   
   b:mutate插件:处理数据的格式,特别是时间

      mutate {

          convert => {

         "upstream_response_time" => "float"

           "request_time" => "float"

          "status" => "integer"

          }

       }

   c:ruby插件  处理事件event,可以使用各种ruby语法

        if [logtype] == "Feed" {

     ruby {

        code => '

     request_uri = event.get("request_uri")

              if request_uri.nil?

return

end

          }

        解释一下(定义request_uri字段,假如字段为空,结束操作)

   d:date插件 定义时间

        date {

    locale => "en"

    match => ["time_iso8601", "ISO8601"]

           }

       解释一下(当前日志的时间是美国时间,输出为北京时间)

    f:drop的使用

         if [message] !~ "^error" {

drop { }

}

        解释一下(假如信息匹配到error,就过滤掉)

3.output 输出到

  a:elasticsearch  最常用的插件

    output {

elasticsearch {

hosts => "localhost:9200"

manage_template => false

index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" 

document_type => "%{[@metadata][type]}" 

}

          }

   b:当然也可以输入到redis中  进行进一步处理(参数选择性添加)

    output {

     redis{ 

         batch => true    

         batch_events => 50     

         batch_timeout => 5       

         codec => plain  

         congestion_interval => 1         

         congestion_threshold => 5  

         data_type => list 

         db => 0

         host => ["127.0.0.1:6379"]

         key => xxx

         # list或channel的名字

         password => xxx

         # redis的密码,默认不使用

         port => 6379

     }

  }


以上是关于logstash的浅析的主要内容,如果未能解决你的问题,请参考以下文章

Logstash:Logstash-to-Logstash 通信

Logstash:Logstash-to-Logstash 通信

logstash好用不

es 无日志,logstash 报错

logstash配置文件

Logstash之四:logstash接收kafka数据