2021年大数据ELK(二十二):采集Apache Web服务器日志

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年大数据ELK(二十二):采集Apache Web服务器日志相关的知识,希望对你有一定的参考价值。

全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 

新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点。

目录

采集Apache Web服务器日志

一、需求

二、准备日志数据

三、使用FileBeats将日志发送到Logstash

四、配置Logstash接收FileBeat数据并打印

五、Logstash输出数据到Elasticsearch

1、重新拷贝一份配置文件

2、将output修改为Elasticsearch

3、重新启动Logstash

4、追加一条日志到监控的文件中,并查看Elasticsearch中的索引、文档

六、Logstash过滤器

1、查看Logstash已经安装的插件

2、Grok插件

3、Grok语法

4、用法

七、匹配日志中的IP、日期并打印

1、配置Logstash

2、启动Logstash

八、解析所有字段

1、修改Logstash配置文件

2、测试并启动Logstash

九、将数据输出到Elasticsearch

1、过滤出来需要的字段

2、转换日期格式

3、输出到Elasticsearch指定索引


采集Apache Web服务器日志

一、需求

Apache的Web Server会产生大量日志,当我们想要对这些日志检索分析。就需要先把这些日志导入到Elasticsearch中。此处,我们就可以使用Logstash来实现日志的采集

打开这个文件,如下图所示。我们发现,是一个纯文本格式的日志。如下图所示:

 这个日志其实由一个个的字段拼接而成,参考以下表格

字段名

说明

client IP

浏览器端IP

timestamp

请求的时间戳

method

请求方式(GET/POST)

uri

请求的链接地址

status

服务器端响应状态

length

响应的数据长度

reference

从哪个URL跳转而来

browser

浏览器

因为最终我们需要将这些日志数据存储在Elasticsearch中,而Elasticsearch是有模式(schema)的,而不是一个大文本存储所有的消息,而是需要将字段一个个的保存在Elasticsearch中。所以,我们需要在Logstash中,提前将数据解析好,将日志文本行解析成一个个的字段,然后再将字段保存到Elasticsearch中

二、准备日志数据

将Apache服务器日志上传到 /export/server/es/data/apache/ 目录

mkdir -p /export/server/es/data/apache/

三、使用FileBeats将日志发送到Logstash

在使用Logstash进行数据解析之前,我们需要使用FileBeat将采集到的数据发送到Logstash。之前,我们使用的FileBeat是通过FileBeat的Harvester组件监控日志文件,然后将日志以一定的格式保存到Elasticsearch中,而现在我们需要配置FileBeats将数据发送到Logstash。FileBeat这一端配置以下即可:

#----------------------------- Logstash output ---------------------------------
#output.logstash:
  # Boolean flag to enable or disable the output module.
  #enabled: true

  # The Logstash hosts
  #hosts: ["localhost:5044"]

hosts配置的是Logstash监听的IP地址/机器名以及端口号。

准备FileBeat配置文件

cd /export/server/es/filebeat-7.6.1-linux-x86_64

vim filebeat-logstash.yml

因为Apache的web log日志都是以IP地址开头的,所以我们需要修改下匹配字段

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/apache/log/access.*
  multiline.pattern: '^\\d+\\.\\d+\\.\\d+\\.\\d+ '
  multiline.negate: true
  multiline.match: after

output.logstash:
  enabled: true
  hosts: ["node1:5044"]

启动FileBeat,并指定使用新的配置文件

./filebeat -e -c filebeat-logstash.yml

FileBeat将尝试建立与Logstash监听的IP和端口号进行连接。但此时,我们并没有开启并配置Logstash,所以FileBeat是无法连接到Logstash的。

2021-12-05T11:28:47.585+0800    ERROR   pipeline/output.go:100  Failed to connect to backoff(async(tcp://node1.itcast.cn:5044)): dial tcp 192.168.88.100:5044: connect: connection refused

四、​​​​​​​配置Logstash接收FileBeat数据并打印

Logstash的配置文件和FileBeat类似,它也需要有一个input、和output。基本格式如下:

# #号表示添加注释
# input表示要接收的数据
input 


# file表示对接收到的数据进行过滤处理
filter 



# output表示将数据输出到其他位置
output 

配置从FileBeat接收数据

cd /export/server/es/logstash-7.6.1/config
vim filebeat-print.conf
input 
  beats 
    port => 5044
  


output 
  stdout 
    codec => rubydebug
  

 测试logstash配置是否正确

bin/logstash -f config/filebeat-print.conf --config.test_and_exit
[2021-12-05T11:46:33,940][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

启动logstash

bin/logstash -f config/filebeat-print.conf --config.reload.automatic

 reload.automatic:修改配置文件时自动重新加载

测试

创建一个access.log.1文件,使用cat test >> access.log.1往日志文件中追加内容。

test文件中只保存一条日志:

[root@node1 log]# cat test 
235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

当我们启动Logstash之后,就可以发现Logstash会打印出来从FileBeat接收到的数据:


           "log" => 
          "file" => 
            "path" => "/var/apache/log/access.log.1"
        ,
        "offset" => 825
    ,
         "input" => 
        "type" => "log"
    ,
         "agent" => 
        "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",
             "version" => "7.6.1",
                "type" => "filebeat",
                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",
            "hostname" => "node1.itcast.cn"
    ,
    "@timestamp" => 2021-12-05T09:07:55.236Z,
           "ecs" => 
        "version" => "1.4.0"
    ,
          "host" => 
        "name" => "node1"
    ,
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
       "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \\"POST /it.cn/bigdata.html 200 45 \\"www.baidu.com\\" \\"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",
      "@version" => "1"

 

五、Logstash输出数据到Elasticsearch

通过控制台,我们发现Logstash input接收到的数据没有经过任何处理就发送给了output组件。而其实我们需要将数据输出到Elasticsearch。所以,我们修改Logstash的output配置。配置输出Elasticsearch只需要配置以下就可以了:

output 
    elasticsearch 
        hosts => [ "localhost:9200" ]
    

操作步骤:

1、重新拷贝一份配置文件

cp filebeat-print.conf filebeat-es.conf

2、将output修改为Elasticsearch

input 
  beats 
    port => 5044
  


output 
 elasticsearch 
   hosts => [ "node1:9200","node2:9200","node3:9200"]
 

3、重新启动Logstash

bin/logstash -f config/filebeat-es.conf --config.reload.automatic

4、追加一条日志到监控的文件中,并查看Elasticsearch中的索引、文档

cat test >> access.log.1 

// 查看索引数据

GET /_cat/indices?v

我们在Elasticsearch中发现一个以logstash开头的索引

    
        "health": "green",
        "status": "open",
        "index": "logstash-2021.12.05-000001",
        "uuid": "147Uwl1LRb-HMFERUyNEBw",
        "pri": "1",
        "rep": "1",
        "docs.count": "2",
        "docs.deleted": "0",
        "store.size": "44.8kb",
        "pri.store.size": "22.4kb"
    

// 查看索引库的数据

GET /logstash-2021.12.05-000001/_search?format=txt



    "from": 0,

    "size": 1

我们可以获取到以下数据:

                    "@timestamp": "2021-12-05T09:38:00.402Z",
                    "tags": [
                        "beats_input_codec_plain_applied"
                    ],
                    "host": 
                        "name": "node1"
                    ,
                    "@version": "1",
                    "log": 
                        "file": 
                            "path": "/var/apache/log/access.log.1"
                        ,
                        "offset": 1343
                    ,
                    "agent": 
                        "version": "7.6.1",
                        "ephemeral_id": "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",
                        "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",
                        "hostname": "node1",
                        "type": "filebeat"
                    ,
                    "input": 
                        "type": "log"
                    ,
                    "message": "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \\"POST /it.cn/bigdata.html 200 45 \\"www.baidu.com\\" \\"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",
                    "ecs": 
                        "version": "1.4.0"
                    

从输出返回结果,我们可以看到,日志确实已经保存到了Elasticsearch中,而且我们看到消息数据是封装在名为message中的,其他的数据也封装在一个个的字段中。我们其实更想要把消息解析成一个个的字段。例如:IP字段、时间、请求方式、请求URL、响应结果,这样

六、Logstash过滤器

在Logstash中可以配置过滤器Filter对采集到的数据进行中间处理,在Logstash中,有大量的插件供我们使用。参考官网:

Filter plugins | Logstash Reference [7.6] | Elastic

此处,我们重点来讲解Grok插件。

1、查看Logstash已经安装的插件

bin/logstash-plugin list

2、​​​​​​​Grok插件

Grok是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web服务器日志、mysql或者是任意其他的日志格式。

Grok官网:Grok filter plugin | Logstash Reference [7.6] | Elastic

3、​​​​​​​Grok语法

Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认,Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。

官网:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

grok模式的语法是:%SYNTAX:SEMANTIC

SYNTAX指的是Grok模式名称,SEMANTIC是给模式匹配到的文本字段名。例如:

%NUMBER:duration %IP:client

duration表示:匹配一个数字,client表示匹配一个IP地址

默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:%NUMBER:duration:int %IP:client

以下是常用的Grok模式:

NUMBER

匹配数字(包含:小数)

INT

匹配整形数字

POSINT

匹配正整数

WORD

匹配单词

DATA

匹配所有字符

IP

匹配IP地址

PATH

匹配路径

4、​​​​​​​用法

    filter 
      grok 
        match =>  "message" => "%IP:client %WORD:method %URIPATHPARAM:request %NUMBER:bytes %NUMBER:duration" 
      
    

 

​​​​​​​七、匹配日志中的IP、日期并打印

235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

我们使用IP就可以把前面的IP字段匹配出来,使用HTTPDATE可以将后面的日期匹配出来

配置Grok过滤插件

1、配置Logstash

input 
    beats 
        port => 5044
    


filter 
    grok 
        match =>  
            "message" => "%IP:ip - - \\[%HTTPDATE:date\\]" 
        
       


output 
    stdout 
        codec => rubydebug
    

 

2、启动Logstash

bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic

           "log" => 
        "offset" => 1861,
          "file" => 
            "path" => "/var/apache/log/access.log.1"
        
    ,
         "input" => 
        "type" => "log"
    ,
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
          "date" => "15/Apr/2015:00:27:19 +0849",
           "ecs" => 
        "version" => "1.4.0"
    ,
    "@timestamp" => 2021-12-05T11:02:05.809Z,
       "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \\"POST /it.cn/bigdata.html 200 45 \\"www.baidu.com\\" \\"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",
          "host" => 
        "name" => "node1"
    ,
            "ip" => "235.9.200.242",
         "agent" => 
            "hostname" => "node1",
             "version" => "7.6.1",
        "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",
                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",
                "type" => "filebeat"
    ,
      "@version" => "1"

我们看到,经过Grok过滤器插件处理之后,我们已经获取到了ip和date两个字段。接下来,我们就可以继续解析其他的字段

八、​​​​​​​解析所有字段

将日志解析成以下字段:

字段名

说明

client IP

浏览器端IP

timestamp

请求的时间戳

method

请求方式(GET/POST)

uri

请求的链接地址

status

服务器端响应状态

length

响应的数据长度

reference

从哪个URL跳转而来

browser

浏览器

1、修改Logstash配置文件

input 
    beats 
        port => 5044
    


filter 
    grok 
        match =>  
            "message" => "%IP:ip - - \\[%HTTPDATE:date\\] \\"%WORD:method %PATH:uri %DATA\\" %INT:status %INT:length \\"%DATA:reference\\" \\"%DATA:browser\\"" 
        
       


output 
    stdout 
        codec => rubydebug
    

 

2、测试并启动Logstash

我们可以看到,8个字段都已经成功解析。


     "reference" => "www.baidu.com",
      "@version" => "1",
           "ecs" => 
        "version" => "1.4.0"
    ,
    "@timestamp" => 2021-12-05T03:30:10.048Z,
            "ip" => "235.9.200.241",
        "method" => "POST",
           "uri" => "/it.cn/bigdata.html",
         "agent" => 
                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",
        "ephemeral_id" => "734ae9d8-bcdc-4be6-8f97-34387fcde972",
             "version" => "7.6.1",
            "hostname" => "node1",
                "type" => "filebeat"
    ,
        "length" => "45",
        "status" => "200",
           "log" => 
          "file" => 
            "path" => "/var/apache/log/access.log"
        ,
        "offset" => 1
    ,
         "input" => 
        "type" => "log"
    ,
          "host" => 
        "name" => "node1"
    ,
          "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
       "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",
          "date" => "15/Apr/2015:00:27:19 +0849",
       "message" => "235.9.200.241 - - [15/Apr/2015:00:27:19 +0849] \\"POST /it.cn/bigdata.html HTTP/1.1\\" 200 45 \\"www.baidu.com\\" \\"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900\\""

九、​​​​​​​将数据输出到Elasticsearch

到目前为止,我们已经通过了Grok Filter可以将日志消息解析成一个一个的字段,那现在我们需要将这些字段保存到Elasticsearch中。我们看到了Logstash的输出中,有大量的字段,但如果我们只需要保存我们需要的8个,该如何处理呢?而且,如果我们需要将日期的格式进行转换,我们又该如何处理呢?

1、​​​​​​​过滤出来需要的字段

要过滤出来我们需要的字段。我们需要使用mutate插件。mutate插件主要是作用在字段上,例如:它可以对字段进行重命名、删除、替换或者修改结构。

官方文档:Mutate filter plugin | Logstash Reference [7.6] | Elastic

例如,mutate插件可以支持以下常用操作

配置:

注意:此处为了方便进行类型的处理,将status、length指定为int类型

input 
    beats 
        port => 5044
    


filter 
    grok 
        match =>  
            "message" => "%IP:ip - - \\[%HTTPDATE:date\\] \\"%WORD:method %PATH:uri %DATA\\" %INT:status:int %INT:length:int \\"%DATA:reference\\" \\"%DATA:browser\\"" 
        
    
    mutate 
        enable_metric => "false"
        remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]
    


output 
    stdout 
        codec => rubydebug
    

 

2、​​​​​​​转换日期格式

要将日期格式进行转换,我们可以使用Date插件来实现。该插件专门用来解析字段中的日期,官方说明文档:Date filter plugin | Logstash Reference [7.6] | Elastic

用法如下:

将date字段转换为「年月日 时分秒」格式。默认字段经过date插件处理后,会输出到@timestamp字段,所以,我们可以通过修改target属性来重新定义输出字段。

Logstash配置修改为如下:

input 
    beats 
        port => 5044
    


filter 
    grok 
        match =>  
            "message" => "%IP:ip - - \\[%HTTPDATE:date\\] \\"%WORD:method %PATH:uri %DATA\\" %INT:status:int %INT:length:int \\"%DATA:reference\\" \\"%DATA:browser\\"" 
        
    
    mutate 
        enable_metric => "false"
        remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]
    
    date 
        match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]
        target => "date"
    


output 
    stdout 
        codec => rubydebug
    

启动Logstash:

bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic

       "status" => "200",
    "reference" => "www.baidu.com",
       "method" => "POST",
      "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",
           "ip" => "235.9.200.241",
       "length" => "45",
          "uri" => "/it.cn/bigdata.html",
         "date" => 2015-04-14T15:38:19.000Z

3、​​​​​​​输出到Elasticsearch指定索引

我们可以通过    

elasticsearch 

        hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]

        index => "xxx"

index来指定索引名称,默认输出的index名称为:logstash-%+yyyy.MM.dd。但注意,要在index中使用时间格式化,filter的输出必须包含 @timestamp字段,否则将无法解析日期。

input 
    beats 
        port => 5044
    


filter 
    grok 
        match =>  
            "message" => "%IP:ip - - \\[%HTTPDATE:date\\] \\"%WORD:method %PATH:uri %DATA\\" %INT:status:int %INT:length:int \\"%DATA:reference\\" \\"%DATA:browser\\"" 
        
    
    mutate 
        enable_metric => "false"
        remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
    
    date 
        match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]
        target => "date"
    


output 
    stdout 
        codec => rubydebug
    
    elasticsearch 
        hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]
        index => "apache_web_log_%+YYYY-MM"
    

启动Logstash

bin/logstash -f config/filebeat-apache-weblog.conf --config.test_and_exit

bin/logstash -f config/filebeat-apache-weblog.conf --config.reload.automatic

注意:

  • index名称中,不能出现大写字符

 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

以上是关于2021年大数据ELK(二十二):采集Apache Web服务器日志的主要内容,如果未能解决你的问题,请参考以下文章

2021年大数据ELK(二十四):安装Kibana

2021年大数据ELK(二十五):添加Elasticsearch数据源

2021年大数据ELK(二十三):Kibana简介

2021年大数据ELK(二十八):制作Dashboard

2021年大数据ELK(二十):FileBeat是如何工作的

2021年大数据ELK(二十一):Logstash简介和安装