ELK--Logstash入门

Posted 干活的园丁

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK--Logstash入门相关的知识,希望对你有一定的参考价值。

ELK--Logstash入门

1.前言

Logstash是一款由ruby编写的开源数据采集工具。其支持对多种不同的数据源进行采集,并进行自定义处理,然后传输到指定位置。为后续的数据分析、可视化等应用提供支持。

 

1.2版本

近期,ELK版本迭代速度飞快。在本文中采用的版本为6.5.4

 

1.3安装与部署

https://www.elastic.co/downloads/logstash

下载tar.gz格式,直接解压(tar -zxf logstash.tar.gz

需要安装JDK 1.8以上的环境

2.Logstash

从数据源到存储端,Logstash的工作共分为三个过程:数据采集、数据处理(可选)、数据输出。对应的三个插件InputFilter(可选)Output来完成相应的工作。

 

2.1 Input介绍

Input插件能够从多种数据源中采集数据,例如:FileKafkaBeatsRedis等。

 

2.1.2 Input应用实例

1.file中采集数据

 

input {

     file {

       path=> [ "/root/example/*.log" ]

       start_position => "beginning"

       sincedb_path => "/dev/null"

       }

    }

注解:

1. path=> ["/root/example/*.log" ] 读取指定目录下,所有以.log结尾的文件。

2. start_position =>"beginning" 从文件头开始读取数据,默认为从文件结尾开始读取。

3. sincedb_path => "/dev/null"logstash会将每个文件的读取状态保存到sincedb_path指定的文件中,下次读取的时候会从上次中断的地方开始。这里将sincedb_path指向一个空目录,以便每次读取的时候都能从头开始。

 

2.kafka中采集数据

 

input {

 kafka{

   bootstrap_servers => "host1:9092,host2:9092"

   topics=>'example'

   group_id => 'example01'

   enable_auto_commit => false

   auto_offset_reset => 'earliest'

   consumer_threads => 6

   codec =>'json'

  }

}

注解:

1.enable_auto_commit => false 禁止logstash自动提交当前topic被消费的offset位置。

2.auto_offset_reset => 'earliest' topic最早的offset开始消费。

3.consumer_threads => 9 消费者的数量。

 

2.2.1 Filter介绍

Filter接受从Input传过来的数据,并对数据进行指定的处理。

 

2.2.2Filter应用实例

1.Grok正则解析

 

#message中,按照如下规则解析出datep1等字段

filter {

grok{          

match=>{"message"=>"%{DATESTAMP:date}|%{DATA:p1}|%{WORD:p2}|%{UUID:p3}|%{DATA:p4}|%{GREEDYDATA:p5}"}

   }

}

2.日期处理

 

#使用日志中date字段中的日期,替换@timestamp的值

filter{

 date{

    #按照指定格式匹配时间

   match => [ "date","yy-MM-dd HH:mm:ss" ]

    #match到的时间赋值给指定字段

   target => "@timestamp"

  }

}

3.Mutate数据修改

 

#timestamp中的时间精度保留到秒级(当前为毫秒级)

filter{

 grok{

match=>{"message"=>"%{DATESTAMP:date}|%{DATA:p1}|%{WORD:p2}|%{UUID:p3}|%{DATA:p4}|%{GREEDYDATA:p5}"}

  }

 mutate{

    #date中,以.为分隔符,进行字符串分割

   split=>{"date" => "."}

    #添加new_date字段,并用分割后的date[0]进行赋值

   add_field => {"new_date" => "%{[date][0]}"}

}

 date{

   match => [ "new_date","yy-MM-dd HH:mm:ss" ]

   target => "@timestamp"

  }

 mutate{

    #删除字段

   remove_field => [ "date","path","host"]

  }

}

4.Ruby

 

#timestamp中的时间精度保留到秒级(当前为毫秒级)

filter{

grok{

match=>{"message"=>"%{DATESTAMP:date}|%{NUMBER:g1}|%{NUMBER:g2}|%{DATA:p1}|%{WORD:p2}|%{UUID:p3}|%{DATA:p4}|%{GREEDYDATA:p5}"}

  }

#使用ruby中的时间函数对日志中的时间进行格式处理

 ruby{

   code =>'event.set("new_time",Time.parse(event.get("date")).strftime("%Y-%m-%d"))'

  }

 date{

   match => [ "new_date","yy-MM-dd" ]

   target => "@timestamp"

  }

}

2.3.1 Output介绍

Output能够将接受到的数据传输到多种不同的目的地。例如:KafkaFileElasticSearchHDFS等。

 

2.3.2 Output应用实例

1.输出到Kafka

 

output{

 kafka{

   bootstrap_servers => "host1:9092,host2:9092"

   topic_id => "example02"

   codec => json

  }

}

2.输出到File

 

output{

 file{

   path=>"/root/output/example/output.txt"

  }

}

3.输出到ElasticSearch

 

output{

 elasticsearch{

   hosts => ["host1:9200"]

   sniffing => true

   index => "example-index"

   flush_size => 100

   pool_max => 2000

   pool_max_per_route =>200

  }

}

注解:

2.index => "example-index" ES中建立的索引名称。

3.flush_size => 100 logstash积攒100event然后发送给ES

4.pool_max => 2000 logstashES的连接池中,最多可以创建2000个连接,超过这个数量时,会尝试重用连接池的连接。

5.pool_max_per_route 每个logstash进程能够创建最大的连接数量。

 

4.输出到HDFS

 

output {

 webhdfs {

   host => "host1"

   port => 50070

   path => "/example/dt=%{+YYYY-MM-dd}/logstash-%{+HH}.log"

   user => "example"

   compression => "snappy"

   flush_size => 10000

   idle_flush_time => 2

   standby_host => "host2"

   standby_port => 50070

  }

}                                             

原文链接:

https://blog.csdn.net/u010353408/article/details/78283309


以上是关于ELK--Logstash入门的主要内容,如果未能解决你的问题,请参考以下文章

Linux安装ELK--Logstash

linux安装logstash7.6.1及搭建简单ELK--logstash收集nginx日志

ELK - Logstash连接MySQL

ELK——Logstash

企业日志分析ELK(Logstash+Elasticsearch+Kibana)介绍及搭建

ELK logstash邮件报警