elfk 搭建系列 -- logstash 的搭建

Posted 苦逼小码农

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了elfk 搭建系列 -- logstash 的搭建相关的知识,希望对你有一定的参考价值。

Logstash 安装

a. 安装

在三台机器命令行中执行如下命令。

 yum -y install logstash

然后安装需要用到的插件,执行如下的命令。

 sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-exec logstash-output-email logstash-filter-metrics logstash-filter-ruby logstash-input-beats logstash-filter-multiline
 source "https://ruby.taobao.org"

然后再执行上面的安装命令即可。

b. 修改配置文件,配置文件的地址(/etc/logstash)

其中,默认的配置文件可以不做更改,里边可以更改 logstash 数据和日志的路径。

 path.data: /var/lib/logstash
 path.logs: /var/log/logstash

其中最主要的配置文件就在当前目录的 conf.d 中,包括其中的一些插件的使用,比如说对日志的过滤,发送邮件,执行命令行等。

input-监听 Beats 的连接端口

使用命令 sudo vim /etc/logstash/conf.d/input-beats.yml,监听 beats 的端口 5044。

 input {
  beats {
    port => 5044
    codec => plain {
          charset => "UTF-8"
     }
   }
 }

grok-过滤日志信息

这里只做最简单的解析演示。日志的格式如下图所示。

 -2019-10-30 18:12:15.215 - DEBUG -

解析如下,其中正则文件位于 /etc/logstash/pattern 下,使用命令 mkdir /etc/logstash/pattern && vim /etc/logstash/pattern/postfix ,创建文件夹并创建正则文件。(注意,前面的名称只能为大写字母和数字)

 TEST1 (?:\S+\s*\S+)|-

然后解析文件如下,使用命令 sudo vim /etc/logstash/conf.d/grok-loginfo.yaml ,配置日志解析。

 filter {
  if [fields][logname] == "all" {  # 在 beats 配置时打上的标签,然后对不同的日志进行对应的解析
    grok{
        patterns_dir => "/etc/logstash/pattern" # 引用自定义的正则文件
        match => { "message" => "-%{TIMESTAMP_ISO8601:timestamp} -%{LOGLEVEL:loglevel}\s" } # 进行日志解析,其中"%{}",表示引用正则,前面的 "TIMESTAMP_ISO8601" 和 "LOGLEVEL" 为 elk 自带的正则解析,":" 后边的是你解析后命名的字段,用作 kibana 的查看和过滤条件。想要引用自己定义的正则,则为 "%{TEST1:test}",意思为引用 TEST1 的正则解析,将解析的字段命名为 test
     }
 
    date {
        match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,S", "ISO8601" ] # 将解析出来的日志时间作为事件的时间戳
     }
   }
 }

output-将结果输出到 elasticsearch,或根据结果进行 email 或 命令行输出

output 的配置文件如下,先使用命令 vim /etc/logstash/conf.d/output.yaml

 output {
  if [@metadata][beat] == "filebeat" {
    if [fields][logname] == "all" { # 判断条件,来创建不同的索引
      elasticsearch {
        hosts => ["elk1.kbxmn.com", "elk2.kbxmn.com", "elk3.kbxmn.com"] # 输出 elasticsearch 集群
        index => "%{[@metadata][beat]}-%{[fields][logname]}-%{[fields][hostname]}-%{+YYYY.MM.dd}" # 创建索引的规则
       }
     }
    else if [fields][logname] == "access" {
      elasticsearch {
        hosts => ["elk1.kbxmn.com", "elk2.kbxmn.com", "elk3.kbxmn.com"]
        index => "%{[@metadata][beat]}-%{[fields][logname]}-%{[fields][hostname]}-%{+YYYY.MM.dd}"
       }
     }
   }
   
  if [loglevel] == "ERROR" {
    email {
        port           => "25" # 邮件服务器的端口
        address       => "smtp.exmail.qq.com" # 邮件服务器的地址,这里为腾讯云的邮件服务器地址
        username       => "xxx@xxx.xxx" # 邮件服务器的用户名
        password       => "********" # 邮件服务器的密码
        authentication => "plain" # 默认 "plain" 即可
        use_tls       => false # 不适用 ssl 发送邮件,如果使用则设置为 true ,那么端口就不是25,一般为 456 或 987
        from           => "xxxxx@xxx.xxx" # 与邮件服务器的用户名一致即可
        subject       => "Warning: you have an error!" # 标题,可以引用解析的日志参数,也是 "%{字段名}",如 "%{test}"
        to             => "xxxxx@xxx.xxx" # 邮件发送到的邮箱,收到报警的人的邮箱
        via           => "smtp" # 通过 smtp 服务器进行发送
        body           => "ERROR_MESSAGE:\n%{message}" # 邮件正文,这里输出日志正文的信息
     }
   }
 
  if [@metadata][beat] == "heartbeat" and [http][response][status_code] != 200 { # beats 系列中的 heartbeat ,这条是判断只要是状态不是 200 的
      exec {
  command => "curl --connect-timeout 1 -m 1 http://www.xxxx.com&" # 满足条件状态不为 500 ,则使用 curl 命令推送(根据个人的情况而定),注意一定要想到如果对方不能收到的情况,设置超时时间或者使用 & ,不影响后续的进程,否则的话 logstash 容易卡死,无法向 elasticsearch 发送日志数据。
       }
   }
 }

到这里 logstash 的基本配置就算完成了,然后使用命令 sudo systemctl start logstash && sudo systemctl enable logstash 进行启动 logstash ,并设置开机自启。

想检验 logstash 是否启动成功,可以查看启动日志,以默认的为例 :sudo tailf /var/log/logstash/logstash-plain.log ,启动成功后,日志会显示如下的信息。


[2019-10-29T16:14:09,044][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}


到这里 logstash 就安装完成了。

以上是关于elfk 搭建系列 -- logstash 的搭建的主要内容,如果未能解决你的问题,请参考以下文章

ELK+Filebeat+Kafka分布式日志管理平台搭建

yum搭建ELFK日志采集系统

linux 搭建ELFK6.8.0集群

ELK+kafka+filebeat搭建生产ELFK集群 --wukong编辑器

从ELK到ELFK的转变,满足企业PB级日志系统的实践之路

在ELFK架构中加入kafka