ELK03-Logstash熟悉
Posted austyn-thq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK03-Logstash熟悉相关的知识,希望对你有一定的参考价值。
官方定义:Logstash 是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。
LogStash是重量级的,支持多数据获取机制,通过TCP/UDP协议、文件、syslog、windows Eventlogs及STDIN等;获取到数据后,它支持对数据执行过滤、修改等操作
Agent/Server 架构,在 产生日志的节点上 部署agent(ship),ship搜集日志发送给server端。
为了避免服务器被瞬间大量数据压垮的情况,会在agent和server中间放一个消息队列,比较常见的使用是redis(主要用它的发布订阅功能),一般也将这个部分称之为broker(掮客)。
Logstash从队列上一条一条的取数据。
Logstash对数据进行过滤修改(清晰操作),然后交给ElasticSearch。
对于Logstash而言Server端也是插件式工作模式。
INPUT PLUGIN #把数据搜集进来的插件
CODEC PLUGIN #搜集和发送可能要编码,不是必须的
FILTER PLUGIN #过滤掉我们不需要的数据
OUTPUT PLUGIN #将数据输出至ES,当然可以保存本地或者redis
查看直接用插件名定义
input {
stdin {} #标准输入获取数据,在屏幕上输入的
}
output {
stdout {} #输出和输入一样
codec => rubydebug
}
}
数据类型
Array: [ item1.item2,...]
Boolean: true,false
Bytes:
Codec: 编码器
Hash: key => value
Number: 包含int和float
Password: 密码串不会记录到日志中,或者*代替
Path: 文件系统路径
String: 字符串
条件判断
==,!=,<,>,<=,>=
=~,!~ #=~正则匹配,!~正则不匹配
in,not in #包含于不包含
and,or,not
() #复合表达式用小括号
Logstash 的工作流程: input |filter| output,如果无需对数据进行任何处理,filter可省略
input{
stdin {}
}
output {
stdout {
codec => rubydebug
}
}
input插件:
file input:
start_position string -- 指明"beginning"表示开始处,默认为"ending"
type -- 没有任何默认值,主要在输入输出时定义成哪种类型,一般都为"system"
可以配合filter做处理
fileinput插件:
从指定的文件中读取事件流;其工作特性类似于tail -f -1
使用FileWatch(Ruby Gem库) 来监视文件是否发生变化
.sincedb #记录每个日志文件上一次读取的位置和时间,inode,major number,minor number,pos
所以即使停止了logstash不会影响到漏读
.sincedb 默认在执行客户端的用户的家目录
即使日志滚动也可以进行读取,文件名会发生改变,所以要定义路径简单的定义file input:
vim /etc/logstash/fromfile.conf
input {
file {
path => ["/var/log/messages"] #单个文件直接写路径,多可文件使用数组
type => "system"
start_position => "beginning" #从文件开头读取,ending则表示下一次文件发生新增时读取新行
}
}output {
stdout {
codec => rubydebug
}
}
/usr/share/logstash/bin/logstash -t -f /etc/logstash/conf.d/fromfile.conf #测试配置文件,老版本的--configtest已被 --config.test_and_exit取代
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/fromfile.conf #用nohup或者screen运行
tcp input:
input {
tcp {
port => "8888"
type => "tcplog"
}
}
output {
stdout {
codec => rubydebug
}
}
yum install -y nc
echo "test" | nc 192.168.2.131 8888 进行测试
udp input插件:
collectd 服务配置,用于udp插件的测试,collectd是一款性能监控程序运行在udp,配置其通过network插件向外发送数据,当然nc也可以在这里测试
yum install collectd -- epel源
vim /etc/collectd.conf
Hostname "nodetest1"
LoadPlugin cpu
LoadPlugin df
LoadPlugin network
<Plugin network>
<server "192.168.2.131" "25826"> #指明Logstash监听的IP,端口和logstash文件保持一致
</server>
</Plugin>
tcpdump -i eth0 host 192.168.2.131 and port 25826 #可对logstash主机端口进行数据监测
vim /etc/logstash/conf.d/udp.conf
input {
udp {
port => 25826
codec => collectd {}
type => "collectd-demo"
}
}
output {
stdout {
codec => rubydebug
}
}
{
"plugin" => "cpu",
"collectd_type" => "cpu",
"value" => 832,
"@timestamp" => 2019-10-23T07:06:15.828Z,
"type" => "collectd-demo",
"plugin_instance" => "1",
"host" => "nodeteset2",
"@version" => "1",
"type_instance" => "softirq"
}
{
"plugin" => "df",
"collectd_type" => "df_complex",
"value" => 0.0,
"@timestamp" => 2019-10-23T07:06:05.828Z,
"type" => "collectd-demo",
"plugin_instance" => "run-user-0",
"host" => "nodeteset2",
"@version" => "1",
"type_instance" => "used"
}
redis input插件:
从redis读取数据,支持redis channel(发布订阅频道)和lists(支持列表)两种方式
使用版本比较新的的 redis,后面如果用得到,再做实验
filter 插件 - 用于在将event通过output之前对其实现某些处理功能
grok filter插件 - 用于分析并结构化文本数据;目前是logstash中将非结构化数据转化为结构化数据的不二之选
用于分析syslog、nginx、httpd日志
yum install -y httpd
如果日志格式没有修改,默认的结构(大概)
%ip %user %group [%datetime] %method %url %version %status %size %referer %user_agent
rpm -ql logstash | grep "patterns$" #查看定义相关日志的日志定义,apache 的日志是专门的一项
vim /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns 查看grok的patterns文件,里面有相关关于变量的定义
示例:
语法格式:
%{SYNIAX:SEMANTIC}
SYNIAX: 预定义模式名称
SEMANTIC: 匹配到的文本的定义标识符
举例:
1.1.1.1 GET /index.html 3.0 200
%{IP:client_ip} %{WORD:method} %{URIPATHRARAM:request_URL} %{NUMBER:version} %{NUMBER:status} #IP、WORD....是文件中预定义后面是起个名字
vim /etc/logstash/conf.d/groksample.conf
input {
stdin {}
}
filter {
grok {
#patterns_dir => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns" #指根据哪个patterns文件来定义模式,如果不定义则去默认文件匹配
match => { "message" => "%{IP:client_ip} %{WORD:method} %{URIPATHPARAM:request_URL} %{NUMBER:version} %{NUMBER:status}" }
}
}
output {
stdout {
codec => rubydebug
}
}
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/groksample.conf
等运行完成看到直接输入 1.1.1.1 GET /index.html 3.0 200
{
"@timestamp" => 2019-10-24T04:54:03.144Z,
"version" => "3.0",
"status" => "200",
"host" => "austyn-test-002.novalocal",
"method" => "GET",
"message" => "1.1.1.1 GET /index.html 3.0 200",
"client_ip" => "1.1.1.1",
"@version" => "1",
"request_URL" => "/index.html"
}
以实际httpd服务为例,启动httpd服务
vim /etc/logstash/conf.d/httpdsample.conf
input {
file {
path => ["/var/log/httpd/access_log"]
type => "apachelog"
start_position => "beginning"
}
}
filter {
grok {
match => { "message" => "%[COMBINEDAPACHELOG]"} #预定义的
}
}
output {
stdout {
codec => rubydebug
}
}
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/httpdsample.conf #会发现"_grokparsefailure"错误,这主要是accesslog和 预定义的匹配不上
预定义文件:/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/httpd,可能httpd服务的日志定义或者pattern预定义的配置文件需要修改。
nginx参照
nginx log的匹配方式:
将如下信息添加至 /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns文件的尾部:
NGUSERNAME [a-zA-Z.@-+_%]+
NGUSER %{NGUSERNAME}
NGINXACCESS %{IPORHOST:clientip} - %{NOTSPACE:remote_user} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} %{NOTSPACE:http_x_forwarded_for}
grok支持正则
output插件
这里以ES和redis举例
ES:
output {
elasticsearch {
hosts => ["192.168.2.131:9200"]
index=> "apache_access-%{+YYYY-MM}"
}
}
Redis:
output {
redis {
port => "6379"
host => ["192.168.2.200"]
data_type => "list"
key => "logstash-%{type}" #因为有data_type 所以key要指明,没有则可以默认
}
}
以上是关于ELK03-Logstash熟悉的主要内容,如果未能解决你的问题,请参考以下文章
ELK教程3:logstash的部署SpringBoot整合ELK+Filebeat