ELK03-Logstash熟悉

Posted austyn-thq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK03-Logstash熟悉相关的知识,希望对你有一定的参考价值。

官方定义:Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。

LogStash是重量级的,支持多数据获取机制,通过TCP/UDP协议、文件、syslog、windows Eventlogs及STDIN等;获取到数据后,它支持对数据执行过滤、修改等操作

Agent/Server 架构,在 产生日志的节点上 部署agent(ship),ship搜集日志发送给server端。

为了避免服务器被瞬间大量数据压垮的情况,会在agent和server中间放一个消息队列,比较常见的使用是redis(主要用它的发布订阅功能),一般也将这个部分称之为broker(掮客)。
Logstash从队列上一条一条的取数据。
Logstash对数据进行过滤修改(清晰操作),然后交给ElasticSearch。
对于Logstash而言Server端也是插件式工作模式。

INPUT PLUGIN #把数据搜集进来的插件
CODEC PLUGIN #搜集和发送可能要编码,不是必须的
FILTER PLUGIN #过滤掉我们不需要的数据
OUTPUT PLUGIN #将数据输出至ES,当然可以保存本地或者redis

 

查看直接用插件名定义 

input {
  stdin {} #标准输入获取数据,在屏幕上输入的
}

output {
  stdout {} #输出和输入一样
  codec => rubydebug
  }
}

 

数据类型

Array: [ item1.item2,...]
Boolean: true,false
Bytes:
Codec: 编码器
Hash: key => value
Number: 包含int和float
Password: 密码串不会记录到日志中,或者*代替
Path: 文件系统路径
String: 字符串

 

条件判断

==,!=,<,>,<=,>=
=~,!~ #=~正则匹配,!~正则不匹配
in,not in #包含于不包含
and,or,not
() #复合表达式用小括号

 

Logstash 的工作流程: input |filter| output,如果无需对数据进行任何处理,filter可省略

input{
  stdin {}
}

output {
  stdout {
  codec => rubydebug
  }
}

 

input插件:

  file input:
  start_position string -- 指明"beginning"表示开始处,默认为"ending"
  type -- 没有任何默认值,主要在输入输出时定义成哪种类型,一般都为"system"
  可以配合filter做处理

fileinput插件:

从指定的文件中读取事件流;其工作特性类似于tail -f -1
使用FileWatch(Ruby Gem库) 来监视文件是否发生变化
.sincedb #记录每个日志文件上一次读取的位置和时间,inode,major number,minor number,pos
所以即使停止了logstash不会影响到漏读
.sincedb 默认在执行客户端的用户的家目录
即使日志滚动也可以进行读取,文件名会发生改变,所以要定义路径

简单的定义file input:

vim /etc/logstash/fromfile.conf
input {
  file {
    path => ["/var/log/messages"] #单个文件直接写路径,多可文件使用数组
    type => "system"
    start_position => "beginning" #从文件开头读取,ending则表示下一次文件发生新增时读取新行
    }
}

output {
  stdout {
      codec => rubydebug
    }
}

 

/usr/share/logstash/bin/logstash -t -f /etc/logstash/conf.d/fromfile.conf  #测试配置文件,老版本的--configtest已被 --config.test_and_exit取代

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/fromfile.conf  #用nohup或者screen运行

 

tcp input:

input {
  tcp {
    port => "8888"
    type => "tcplog"
  }
}

output {
  stdout {
    codec => rubydebug
  }
}

yum install -y nc

echo "test" | nc 192.168.2.131 8888  进行测试

 


udp input插件:

  collectd 服务配置,用于udp插件的测试,collectd是一款性能监控程序运行在udp,配置其通过network插件向外发送数据,当然nc也可以在这里测试
  yum install collectd -- epel源

  vim /etc/collectd.conf

  

Hostname "nodetest1"
LoadPlugin cpu
LoadPlugin df
LoadPlugin network

<Plugin network>
<server "192.168.2.131" "25826">  #指明Logstash监听的IP,端口和logstash文件保持一致
</server>
</Plugin>

tcpdump -i eth0 host 192.168.2.131 and port 25826 #可对logstash主机端口进行数据监测

vim /etc/logstash/conf.d/udp.conf

input {
  udp {
    port => 25826
    codec => collectd {}
    type => "collectd-demo"
  }
}

output {
    stdout {
    codec => rubydebug
   }
}

{
"plugin" => "cpu",
"collectd_type" => "cpu",
"value" => 832,
"@timestamp" => 2019-10-23T07:06:15.828Z,
"type" => "collectd-demo",
"plugin_instance" => "1",
"host" => "nodeteset2",
"@version" => "1",
"type_instance" => "softirq"
}
{
"plugin" => "df",
"collectd_type" => "df_complex",
"value" => 0.0,
"@timestamp" => 2019-10-23T07:06:05.828Z,
"type" => "collectd-demo",
"plugin_instance" => "run-user-0",
"host" => "nodeteset2",
"@version" => "1",
"type_instance" => "used"
}

 

 

redis input插件:
  从redis读取数据,支持redis channel(发布订阅频道)和lists(支持列表)两种方式
  使用版本比较新的的 redis,后面如果用得到,再做实验

 

filter 插件 - 用于在将event通过output之前对其实现某些处理功能

grok filter插件 - 用于分析并结构化文本数据;目前是logstash中将非结构化数据转化为结构化数据的不二之选

  用于分析syslog、nginx、httpd日志

yum install -y httpd

如果日志格式没有修改,默认的结构(大概)
%ip %user %group [%datetime] %method %url %version %status %size %referer %user_agent

rpm -ql logstash | grep "patterns$" #查看定义相关日志的日志定义,apache 的日志是专门的一项

vim /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns 查看grok的patterns文件,里面有相关关于变量的定义

示例:

语法格式:
%{SYNIAX:SEMANTIC}
SYNIAX: 预定义模式名称
SEMANTIC: 匹配到的文本的定义标识符
举例:
1.1.1.1 GET /index.html 3.0 200
%{IP:client_ip} %{WORD:method} %{URIPATHRARAM:request_URL} %{NUMBER:version} %{NUMBER:status} #IP、WORD....是文件中预定义后面是起个名字

vim /etc/logstash/conf.d/groksample.conf

input {
  stdin {}
}

filter {
  grok {
  #patterns_dir => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns" #指根据哪个patterns文件来定义模式,如果不定义则去默认文件匹配
  match => { "message" => "%{IP:client_ip} %{WORD:method} %{URIPATHPARAM:request_URL} %{NUMBER:version} %{NUMBER:status}" }
  }
}

 

output {
  stdout {
    codec => rubydebug
  }
}

/usr/share/logstash/bin/logstash  -f /etc/logstash/conf.d/groksample.conf 

等运行完成看到直接输入 1.1.1.1 GET /index.html 3.0 200

{
"@timestamp" => 2019-10-24T04:54:03.144Z,
"version" => "3.0",
"status" => "200",
"host" => "austyn-test-002.novalocal",
"method" => "GET",
"message" => "1.1.1.1 GET /index.html 3.0 200",
"client_ip" => "1.1.1.1",
"@version" => "1",
"request_URL" => "/index.html"
}

以实际httpd服务为例,启动httpd服务

vim /etc/logstash/conf.d/httpdsample.conf

input {
  file {
    path => ["/var/log/httpd/access_log"]
    type => "apachelog"
    start_position => "beginning"
  }
}

filter {
  grok {
    match => { "message" => "%[COMBINEDAPACHELOG]"} #预定义的
  }
}

output {
    stdout {
        codec => rubydebug
    }
}

/usr/share/logstash/bin/logstash  -f /etc/logstash/conf.d/httpdsample.conf #会发现"_grokparsefailure"错误,这主要是accesslog和 预定义的匹配不上

预定义文件:/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/httpd,可能httpd服务的日志定义或者pattern预定义的配置文件需要修改。

 

nginx参照
nginx log的匹配方式:
将如下信息添加至 /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns文件的尾部:

NGUSERNAME [a-zA-Z.@-+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} - %{NOTSPACE:remote_user} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} %{NOTSPACE:http_x_forwarded_for}

grok支持正则

output插件

  这里以ES和redis举例

  ES:

  output {
      elasticsearch {
              hosts => ["192.168.2.131:9200"]
              index=> "apache_access-%{+YYYY-MM}"
      }
     }

 

   Redis:

output {
    redis {
    port => "6379"
    host => ["192.168.2.200"]
    data_type => "list"
    key => "logstash-%{type}" #因为有data_type 所以key要指明,没有则可以默认
  }
}

以上是关于ELK03-Logstash熟悉的主要内容,如果未能解决你的问题,请参考以下文章

初探ELK-logstash使用小结

ELK教程3:logstash的部署SpringBoot整合ELK+Filebeat

ELK教程3:logstash的部署SpringBoot整合ELK+Filebeat

elk的logstash怎么过滤出报错的模块

ELK--Logstash入门

[转] ELK 之 Logstash