日志分析系统ELK之Logstash
Posted Tuki_a
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了日志分析系统ELK之Logstash相关的知识,希望对你有一定的参考价值。
Logstash
什么是ELK
一般我们需要进行日志分析场景:直接在日志文件中 grep、awk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。
一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一套集中式日志系统,可以提高定位问题的效率。
ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。目前主流的一种日志系统。
ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。
Logstash简介
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
Logstash是一个开源的服务器端数据处理管道。
logstash拥有200多个插件,能够同时从多个来源采集数据,转换数据,然后将数据发送到 “存储库” 中。(大多都是Elasticsearch。)
官方文档:https://www.elastic.co/guide/en/logstash/current/introduction.html
Logstash组成
Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器。
1、输入
输入:采集各种样式、大小和来源的数据
Logstash 支持各种输入选择 ,同时从众多常用来源捕捉事件。
能够以连续的流式传输方式,轻松地从日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
2、过滤器(可选)
过滤器:实时解析和转换数据
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
- 利用 Grok 从非结构化数据中派生出结构
- 从 IP 地址破译出地理坐标
- 将 PII 数据匿名化,完全排除敏感字段
- 简化整体处理,不受数据源、格式或架构的影响
3、输出
输出:选择存储库,导出数据
尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。
Logstash 提供众多输出选择,可以将数据发送到指定的地方,并且能够灵活地解锁众多下游用例。
Logstash安装与配置
软件下载(注意要下和ElasticSearch版本一致的):https://elasticsearch.cn/download/,Elasticsearch 至少需要 Java 8
[root@server3 ~]# rpm -ivh jdk-8u171-linux-x64.rpm
[root@server3 ~]# rpm -ivh logstash-7.6.1.rpm
通过命令行运行Logstash
在命令行中设置的任何标志都会覆盖logstash.yml 中的相应设置,但文件本身不会更改。对于后续的 Logstash 运行,它保持原样。
执行二进制脚本来运行logstash,/usr/share/logstash/bin
下包括启动Logstash
和logstash-plugin
安装插件
官网:https://www.elastic.co/guide/en/logstash/current/running-logstash-command-line.html
参数-e
-e, --config.string CONFIG_STRING
使用给定的字符串作为配置数据。与配置文件的语法相同。
执行标准输入到标准输出,即从接收从终端收到的再从终端输出。
[root@server3 ~]# cd /usr/share/logstash/bin/
[root@server3 bin]# /usr/share/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }'
参数-f
-f, --path.config CONFIG_PATH
从特定文件或目录加载 Logstash 配置。如果给定目录,则该目录中的所有文件将按字典顺序连接,然后解析为单个配置文件。
日志输出到文件
编写一个logstash配置文件,运行 Logstash 并加载test.conf文件中定义的 Logstash 配置
[root@server3 bin]# cd /etc/logstash/conf.d/
[root@server3 conf.d]# ls
[root@server3 conf.d]# vim test.conf
input {
stdin { } #接收终端输入
}
output {
stdout {} #显示终端输出
file {
path => "/tmp/testfile" #输出到/tmp/testfile文件中,格式为custom format: {输入内容}
codec => line { format => "custom format: %{message}"}
}
}
[root@server3 conf.d]# /usr/share/logstash/bin/logstash -f test.conf
这边输入一些内容(相当于是logstash捕获的日志)测试
再开一个Terminal,查看文件存在且内容与输入的一致
日志上传到elasticsearch
编写logstash配置文件,运行 Logstash 并加载es.conf文件中定义的 Logstash 配置
[root@server3 conf.d]# vim es.conf
input {
stdin {}
}
output {
stdout {} #标准输出一份
elasticsearch { #给elasticsearch输出一份
hosts => ["192.168.122.11:9200"] #目标elasticsearch主机ip
index => "logstash-%{+yyyy.MM.dd}" #索引格式为logstash-年月日
}
}
[root@server3 conf.d]# /usr/share/logstash/bin/logstash -f es.conf
运行后输入内容测试
到es上查看,已自动创建索引和分片,采集到的内容和输入一致
如果想把日志文件作为输入,首先要把权限改为644,因为logstash读取时是logstash身份,所以必须开放读的权力。
[root@server3 conf.d]# vim es.conf
input {
file { #从文件/var/log/messages输入,从头开始输入
path => "/var/log/messages"
start_position => "beginning"
}
}
output {
stdout {} #标准输出
elasticsearch { #输出elasticsearch
hosts => ["192.168.122.11:9200"]
index => "logstash-%{+yyyy.MM.dd}"
}
}
执行完后到es端把创建的索引删除了,再次创建只会上传新产生的日志(可以使用logger “日志”
命令来产生日志),这是因为在/usr/share/logstash/data/plugins/inputs/file/
目录下,有一个.sincedb
的文件,它负责记录数据偏移量,已经上传过的数据,不会重复上传;删除相应的.sincedb
后,就可以重新全部上传所有日志了
logstash如何区分设备、文件名、文件的不同版本:
logstash会把进度保存到sincedb文件中。
sincedb文件一共6个字段,分别表示inode编号、文件系统的主要设备号、文件系统的次要设备号、文件中的当前字节偏移量、最后一个活动时间戳(浮点数)、与此记录匹配的最后一个已知路径
Logstash伪装为日志服务器
如果想收集日志要每台都部署Logstash,数量多了终归是不方便,所以让logstash伪装成日志服务器,每个节点服务器远程发送日志给logstash。
编写logstash配置文件
[root@server3 conf.d]# vim es.conf
input {
#file {
# path => "/var/log/messages"
# start_position => "beginning"
#}
syslog { #伪装syslog,开放端口514
port => 514
}
}
output {
stdout {}
elasticsearch {
hosts => ["192.168.122.11:9200"]
index => "syslog-%{+yyyy.MM.dd}" #索引为syslog-年月日
}
}
远程主机server1编辑/etc/rsyslog.conf文件,打开514端口,并在最后加入*.* @@192.168.122.13
意为所有的日志发送给192.168.122.13一份
被收集端重启rsyslog服务,logstash端就接收到输入并上传到es端了,到es端就可以方便的查看其他主机的日志信息了
grok过滤插件
我们平时查看日志,会条理清晰的显示各组信息,当我们只想看其中一组数据,比如只想得到ip信息,就需要logstash的切片这个功能。
分割命令行的信息输出到终端
编写一个简单的对终端接收的信息进行切片的logstash配置文件,运行 Logstash 并加载grank.conf文件中定义的 Logstash 配置
[root@server3 conf.d]# vim grank.conf
input {
stdin {}
}
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } #会对接收到的信息自动切片为这5个内容
}
}
output {
stdout {}
}
[root@server3 conf.d]# /usr/share/logstash/bin/logstash -f grank.conf
如下输入一组日志信息测试,分片成功
采集apache日志切片输出到es
下载apache并配置一个发布网页
[root@server3 conf.d]# yum install -y httpd
[root@server3 conf.d]# systemctl start httpd
[root@server3 conf.d]# cd /var/www/html/
[root@server3 html]# echo hello cool girl! > index.html
访问成功,apache配置成功
到apache的配置文件里查看日志的格式如下定义,但这并不能被我们直接使用
[root@server3 conf.d]# vim /etc/httpd/conf/httpd.conf
在下图目录下,有很多软件的日志的输出形式
看httpd的规定,如何写日志已经提前用变量的方法定义了,所以我们只需要按照这个规定切片就好了
把apache的日志作为grok的输入,日志文件需要给读的权限,日志文件的目录/var/log/httpd
需要给读和执行的权限755,读的时候是logstash的身份
[root@server3 conf.d]# vim grok.conf
input {
file {
path => "/var/log/httpd/access_log" #/var/log/httpd/access_log文件作为输入
start_position => "beginning" #从头开始输入
}
}
filter {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" } #按照默认的HTTPD_COMBINEDLOG方式切片,就是我们上面看的文件里已经定义好的变量
}
}
output {
stdout {}
elasticsearch {
hosts => ["192.168.122.11:9200"]
index => "apachelog-%{+yyyy.MM.dd}" #索引名字叫apachelog
}
}
执行可以看到按照默认定义好的模式切片
再到es端查看已经创建好索引
查看详细数据也没问题(之前curl访问apache产生的两个日志)
用curl访问产生的数据量太少,所以使用ab命令创建多个并发访问线程产生大量访问数据
[root@lucky mnt]# ab -n 122 -c1 http://192.168.122.13/index.html
到es端查看记录输出成功
以上是关于日志分析系统ELK之Logstash的主要内容,如果未能解决你的问题,请参考以下文章
ELK搭建实时日志分析平台之二Logstash和Kibana搭建