Elastic Stack Logstash基础用法

Posted 17岁boy想当攻城狮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elastic Stack Logstash基础用法相关的知识,希望对你有一定的参考价值。

Grok

前言

在Logstash里为我们提供了一个非常强大的Grok日志分析工具,它可以将杂乱无序的日志变的井然有序。

语法与内置正则

Grok属于Filter的插件,同时Grok内置了许多正则,方便我们解析日志,Grok语法如下:

filter 
            grok 
                    match => 
                                "message" => "this is regular"
                    
            

在logstash里使用数学里的"=>"推导符号来表示关系,message是filebeat发送过来的一个字段内容

grok里的语法格式如下:

%正则:变量

grok内置了多个正则,帮助我们来快速解析日志内容:

模式

正则

作用

USERNAME

[a-zA-Z0-9._-]+

提取用户名

USER

%USERNAME

提取用户, USERNAME的别名

INT

(?:[+-]?(?:[0-9]+))

提取整数

BASE10NUM

(?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))

提取十进制的数

NUMBER

(?:%BASE10NUM)

提取数字

BASE16NUM

(?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))

提取十六进制的数

BASE16FLOAT

\\b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\\.[0-9A-Fa-f]*)?)|(?:\\.[0-9A-Fa-f]+)))\\b

提取十六进制的浮点数

POSINT

\\b(?:[1-9][0-9]*)\\b

提取不以0开头的数,如:01 22 33 04

只提取22 33以0开头的数字不会提取

NONNEGINT

\\b(?:[0-9]+)\\b

全部匹配,以0开头依然会匹配

WORD

\\b\\w+\\b

匹配单词

NOTSPACE

\\S+

匹配非空白字符

SPACE

\\s*

匹配空格

DATA

.*?

满足条件字符匹配一次

GREEDYDATA

.*

贪婪匹配

QUOTEDSTRING

(?>(?<!\\\\)(?>"(?>\\\\.|[^\\\\"]+)+"|""|(?>'(?>\\\\.|[^\\\\']+)+')|''|(?>`(?>\\\\.|[^\\\\`]+)+`)|``))

匹配引用字符串

UUID

[A-Fa-f0-9]8-(?:[A-Fa-f0-9]4-)3[A-Fa-f0-9]12

匹配UUID

MAC

(?:%CISCOMAC|%WINDOWSMAC|%COMMONMAC)

匹配MAC地址

CISCOMAC

(?:(?:[A-Fa-f0-9]4\\.)2[A-Fa-f0-9]4)

匹配 CISCO MAC地址

WINDOWSMAC

(?:(?:[A-Fa-f0-9]2-)5[A-Fa-f0-9]2)

匹配Windows MAC地址

COMMONMAC

(?:(?:[A-Fa-f0-9]2:)5[A-Fa-f0-9]2)

匹配COMMON MAC地址

IPV6

((([0-9A-Fa-f]1,4:)7([0-9A-Fa-f]1,4|:))|(([0-9A-Fa-f]1,4:)6(:[0-9A-Fa-f]1,4|((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d))3)|:))|(([0-9A-Fa-f]1,4:)5(((:[0-9A-Fa-f]1,4)1,2)|:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d))3)|:))|(([0-9A-Fa-f]1,4:)4(((:[0-9A-Fa-f]1,4)1,3)|((:[0-9A-Fa-f]1,4)?:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d))3))|:))|(([0-9A-Fa-f]1,4:)3(((:[0-9A-Fa-f]1,4)1,4)|((:[0-9A-Fa-f]1,4)0,2:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d))3))|:))|(([0-9A-Fa-f]1,4:)2(((:[0-9A-Fa-f]1,4)1,5)|((:[0-9A-Fa-f]1,4)0,3:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d))3))|:))|(([0-9A-Fa-f]1,4:)1(((:[0-9A-Fa-f]1,4)1,6)|((:[0-9A-Fa-f]1,4)0,4:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d))3))|:))|(:(((:[0-9A-Fa-f]1,4)1,7)|((:[0-9A-Fa-f]1,4)0,5:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d))3))|:)))(%.+)?

匹配IPV6

IPV4

(?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]1,2)[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]1,2)[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]1,2)[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]1,2))(?![0-9])

匹配IPV4

IP

(?:%IPV6|%IPV4)

匹配IP6或IP4

HOSTNAME

\\b(?:[0-9A-Za-z][0-9A-Za-z-]0,62)(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]0,62))*(\\.?|\\b)

匹配主机名

HOST

%HOSTNAME

HOSTNAME别名

IPORHOST

(?:%HOSTNAME|%IP)

匹配IPORHOST

HOSTPORT

%IPORHOST:%POSINT

匹配HOSTPORT

PATH

(?:%UNIXPATH|%WINPATH)

匹配路径

UNIXPATH

(?>/(?>[\\w_%!$@:.,-]+|\\\\.)*)+

unix路径

TTY

(?:/dev/(pts|tty([pq])?)(\\w+)?/?(?:[0-9]+))

tty路径

WINPATH

(?>[A-Za-z]+:|\\\\)(?:\\\\[^\\\\?*]*)+

windows路径

URIPROTO

[A-Za-z]+(\\+[A-Za-z+]+)?

匹配URIPROTO

URIHOST

%IPORHOST(?::%POSINT:port)?

匹配URIHOST

URIPATH

(?:/[A-Za-z0-9$.+!*'(),~:;=@#%_\\-]*)+

匹配URIPATH

URIPARAM

\\?[A-Za-z0-9$.+!*'|(),~@#%&/=:;_?\\-\\[\\]]*

匹配URIPARAM

URIPATHPARAM

%URIPATH(?:%URIPARAM)?

匹配URIPATHPARAM

URI

%URIPROTO://(?:%USER(?::[^@]*)?@)?(?:%URIHOST)?(?:%URIPATHPARAM)?

匹配URI

MONTH

\\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\\b

匹配月

MONTHNUM

(?:0?[1-9]|1[0-2])

匹配月数

MONTHNUM2

(?:0[1-9]|1[0-2])

匹配月数写法2

MONTHDAY

(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

匹配日

DAY

(?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

匹配星期

YEAR

(?>\\d\\d)1,2

匹配年

HOUR

(?:2[0123]|[01]?[0-9])

匹配小时

MINUTE

(?:[0-5][0-9])

匹配分钟

SECOND

(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)

匹配秒

TIME

(?!<[0-9])%HOUR:%MINUTE(?::%SECOND)(?![0-9])

匹配时间

DATE_US

%MONTHNUM[/-]%MONTHDAY[/-]%YEAR

匹配美国时间格式

DATE_EU

%MONTHDAY[./-]%MONTHNUM[./-]%YEAR

匹配欧洲时间格式

ISO8601_TIMEZONE

(?:Z|[+-]%HOUR(?::?%MINUTE))

匹配ISO8601时区

ISO8601_SECOND

(?:%SECOND|60)

匹配ISO8601秒

TIMESTAMP_ISO8601

%YEAR-%MONTHNUM-%MONTHDAY[T ]%HOUR:?%MINUTE(?::?%SECOND)?%ISO8601_TIMEZONE?

匹配ISO8601时间

DATE

%DATE_US|%DATE_EU

匹配时间

DATESTAMP

%DATE[- ]%TIME

匹配时间戳

TZ

(?:[PMCE][SD]T|UTC)

匹配TZ时间

DATESTAMP_RFC822

%DAY %MONTH %MONTHDAY %YEAR %TIME %TZ

匹配RFC822时间格式

DATESTAMP_RFC2822

%DAY, %MONTHDAY %MONTH %YEAR %TIME %ISO8601_TIMEZONE

匹配RFC2822时间格式

DATESTAMP_OTHER

%DAY %MONTH %MONTHDAY %TIME %TZ %YEAR

匹配OTHER时间格式

DATESTAMP_EVENTLOG

%YEAR%MONTHNUM2%MONTHDAY%HOUR%MINUTE%SECOND

匹配EVENTLOG 时间格式

SYSLOGTIMESTAMP

%MONTH +%MONTHDAY %TIME

匹配syslog时间戳

PROG

(?:[\\w._/%-]+)

匹配PROG

SYSLOGPROG

%PROG:program(?:\\[%POSINT:pid\\])?

匹配系统日志 PID

SYSLOGHOST

%IPORHOST

匹配系统日志主机

SYSLOGFACILITY

<%NONNEGINT:facility.%NONNEGINT:priority>

匹配系统日志设施

HTTPDATE

%MONTHDAY/%MONTH/%YEAR:%TIME %INT

匹配http日期

SYSLOGBASE

%SYSLOGTIMESTAMP:timestamp (?:%SYSLOGFACILITY )?%SYSLOGHOST:logsource %SYSLOGPROG:

匹配系统日志库

COMMONAPACHELOG

%IPORHOST:clientip %USER:ident %USER:auth \\[%HTTPDATE:timestamp\\] "(?:%WORD:verb %NOTSPACE:request(?: HTTP/%NUMBER:httpversion)?|%DATA:rawrequest)" %NUMBER:response (?:%NUMBER:bytes|-)

匹配COMMONAPACHELOG

COMBINEDAPACHELOG

%COMMONAPACHELOG %QS:referrer %QS:agent

匹配COMBINEDAPACHELOG

LOGLEVEL

([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)

匹配日志等级

示例

下面演示如何将一行日志的时间解析出来:

2023-2-14 10:50:15 hello word

使用内置正则:

filter 
            grok 
                    match => 
                                "message" => "%DATE:date %TIME:time"
                    
            

输出:


    data:2023-2-14
    time:10:50:15

除此之外我们还可以使用自定义正则,自定义正则以()作为开始,如果定义变量使用?<>来定义,下面是自定义正则解析时间:

filter 
            grok 
                    match => 
                                "message" => "(?<date>\\d4-\\d2-\\d2)\\s(?<time>\\d2:\\d2:\\d2)"
                    
            

grok还支持pattern文件,pattern文件名字只能是:postfix,类似c语言里面的宏定义,你可以将你的正则写进去并定义一个别名:

my_time \\d4-\\d2-\\d2

然后就可以在里面使用它了:

filter 
            grok 
                    patterns_dir => ["/ElasticStack/logstash-8.5.2/config/patterns"]
                    match => 
                                "message" => "%my_time:date"
                    
            

Mutate

前言

Mutate是过滤器,用于将字段进行二次过滤,它位于grok后一层

示例

grok解析出来的字段默认类型是keyboard,键值,属于txt文本类型,filter提供mutate工具来帮我们定义类型,其格式如下:

mutate 
        convert => 
                    "Test" => "integer"
                  

mutate提供了如下几种类型:

integer:整形

string: 字符类型

keyboard: 键类型

float: 浮点数类型

除此之外还提供对变量的其它操作

字段重命名:

mutate 
        rename => "name" => "name3"

去除字段空格:

mutate 
        strip => ["name"]

更新字段值 update

mutate 
        update => "name" => "li"

增加字段 add_field

mutate 

	add_field => "testField1" => "0"

	add_field => "testField2" => "%name" #引用name中的值

移除字段 remove_field

mutate 

	remove_field => ["name"]

大小写转换 lowercase&uppercase

mutate 

	#lowercase => [ "name" ]

	uppercase => [ "name" ]

正则表达式替换 gsub

这里只针对string类型字段,如下把name字段中的“o”替换为“p”

mutate 
	gsub => ["name","o","p"]

复制字段 copy

复制一个已存在的字段到另外一个字段,已存在的字段会被重写到一个新的字段,新的字段不需要单独添加

mutate 
	copy => "name" => "name2"

if语句

在logstash里是支持if语句的例如可以通过filebeat传递过来的字段进行判断从而进行不同的操作:

filter 
        if "hello" in [fields][type] 
                grok 

                                match => 
                                                "message" => "xxx"
                        
                
            
        

上面code判断filebeat里fields下的type字段是否为hello,如果是hello则进行解析

当然也可以运用在output上:

output 
        if "hello" in [fields][type] 
                elasticsearch 
                        hosts => ["localhost:9200"]
                        index => "test"
                
        

        stdout  codec => rubydebug 

以上是关于Elastic Stack Logstash基础用法的主要内容,如果未能解决你的问题,请参考以下文章

ES 集中式日志分析平台 Elastic Stack(介绍)

Logstash:如何运用 Elastic Stack 结合 RSS feeds 告知可能性

Logstash:如何运用 Elastic Stack 结合 RSS feeds 告知可能性

浅尝 Elastic Stack Logstash + Beats + Kafka

浅尝 Elastic Stack Logstash + Beats + Kafka

集中式日志分析平台 Elastic Stack(介绍)