ELK-logstash
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ELK-logstash相关的知识,希望对你有一定的参考价值。
logstash简介:
logstash日志分析的配置和使用
logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层。
首先将数据传给logstash,它将数据进行过滤和格式化(转成JSON格式),然后传给Elasticsearch进行存储、建搜索的索引,kibana提供前端的页面再进行搜索和图表可视化,它是调用Elasticsearch的接口返回的数据进行可视化。logstash和Elasticsearch是用Java写的,kibana使用node.js框架。
官网地址:https://www.elastic.co/downloads/logstash
安装:
yum安装
rpm --import http://packages.elasticsearch.org/GPG-KEY-elasticsearch
cat > /etc/yum.repos.d/logstash.repo <<EOF
[logstash-5.0]
name=logstash repository for 5.0.x packages
baseurl=http://packages.elasticsearch.org/logstash/5.0/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1
EOF
yum clean all
yum install logstash
或者官网下载安装
tar zxf logstash.tar.gz
语法:
Logstash 设计了自己的 DSL —— 有点像 Puppet 的 DSL,或许因为都是用 Ruby语言写的吧 —— 包括有区域,注释,数据类型(布尔值,字符串,数值,数组,哈希),条件判断,字段引用等。
区段(section)
Logstash 用 {} 来定义区域。区域内可以包括插件区域定义,你可以在一个区域内定义多个插件。插件区域内则可以定义键值对设置。
数据类型
Logstash 支持少量的数据值类型:
bool #布尔值
debug => true
string #字符串
host => "hostname"
number #数值
port => 514
array #数组
match => ["datetime", "UNIX", "ISO8601"]
hash #哈希
options => {
key1 => "value1",
key2 => "value2"
}
字段引用(field reference)
如果你想在 Logstash 配置中使用字段的值,只需要把字段的名字写在中括号 []里就行了,这就叫字段引用。
条件判断(condition)
Logstash从 1.3.0 版开始支持条件判断和表达式。
表达式支持下面这些操作符:
== (等于), != (不等于), < (小于), > (大于), <= (小于等于), >= (大于等
于)
=~ (匹配正则), !~ (不匹配正则)
in (包含), not in (不包含)
and (与), or (或), nand(非与), xor(非或)
() (复合表达式), !() (对复合表达式结果取反)
logstash命令行参数:
-e:意在执行。直接运行 bin/logstash -e ” 达到相同效果。这个参数的默认值是下面这样:
input {
stdin { }
}
output {
stdout { }
}
–config 或 -f
意即文件。真实运用中,我们会写很长的配置,甚至可能超过 shell 所能支持的1024 个字符长度。所以我们必把配置固化到文件里,然后通过 bin/logstash -f agent.conf 这样的形式来运行。 此外,logstash 还提供一个方便我们规划和书写配置的小功能。你可以直接用
bin/logstash -f /etc/logstash.d/
来运行。logstash 会自动读取
/etc/logstash.d/ 目录下所有 *.conf 的文本文件,然后在自己内存里拼接成一个完整的大配置文件,再去执行。
注意:
logstash 列出目录下所有文件时,是字母排序的。而 logstash 配置段的 filter 和output 都是顺序执行,所以顺序非常重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件,同时在配置中,严谨采用 if 判断限定不同日志的动作。
–configtest 或 -t
意即测试。用来测试 Logstash 读取到的配置文件语法是否能正常解析。Logstash配置语法是用 grammar.treetop 定义的。尤其是使用了上一条提到的读取目录方式的读者,尤其要提前测试。
–log 或 -l
意即日志。Logstash 默认输出日志到标准错误。生产环境下你可以通过bin/logstash -l logs/logstash.log 命令来统一存储日志。
–pipeline-workers 或 -w
运行 filter 和 output 的 pipeline 线程数量。默认是 CPU 核数。
–pipeline-batch-size 或 -b
每个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数之前,最多能累积的日志条数。默认是 125 条。越大性能越好,同样也会消耗越多的 JVM 内存。
–pipeline-batch-delay 或 -u
每个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5ms。
–pluginpath 或 -P
可以写自己的插件,然后用
bin/logstash --pluginpath /path/to/own/plugins 加载它们。
–verbose
输出一定的调试日志。
–debug
输出更多的调试日志。
plugin的安装
plugin 用法说明
Usage:
bin/logstash-plugin [OPTIONS] SUBCOMMAND [ARG] ...
Parameters:
SUBCOMMAND subcommand
[ARG] ... subcommand arguments
Subcommands:
install Install a plugin
uninstall Uninstall a plugin
update Install a plugin
list List all installed plugins
Options:
-h, --help print help
首先,你可以通过 bin/logstash-plugin list 查看本机现在有多少插件可用。(其实就在 vendor/bundle/jruby/1.9/gems/ 目录下)
本地插件安装
bin/logstash-plugin 不单可以通过 rubygems 平台安装插件,还可以读取本地路径的 gem 文件。这对自定义插件或者无外接网络的环境都非常有效。例:
bin/logstash-plugin install /path/to/logstash-filter-crash.gem
然后,假如你看到 https://github.com/logstash-plugins/ 下新发布了一个logstash-output-webhdfs 模块(当然目前还没有)。打算试试,就只需要运行:
bin/logstash-plugin install logstash-output-webhdfs
同样,假如是升级,只需要运行:
bin/logstash-plugin update logstash-input-tcp
运行:
# bin/logstash -e ‘input{stdin{}}output{stdout{codec=>rubydebug}}‘
hello world
结果:
{
"message" => "Hello World",
"@version" => "1",
"@timestamp" => "2014-08-07T10:30:59.937Z",
"host" => "raochenlindeMacBook-Air.local",
}
长期运行:nohup bin/logstash -f /path/conf.d/logstash.conf &
input配置:
读取文件:
Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持glob 展开文件路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日
志文件的当前读取位置。所以,不要担心 logstash 会漏过你的数据。
input {
file {
path => ["/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
解释
有一些比较有用的配置项,可以用来指定 FileWatch 库的行为:
discover_interval
logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15秒。
exclude
不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开。
close_older
一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是 3600 秒,即一小时。
ignore_older
在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是 86400 秒,即一天。
sincedb_path
如果你不想用默认的 $HOME/.sincedb (Windows 平台上在C:\Windows\System32\config\systemprofile\.sincedb ),可以通过这个配置定义 sincedb 文件到其他位置。
sincedb_write_interval
logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
stat_interval
logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。
start_position
logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成"beginning",logstash 进程就从头开始读取,类似 less +F 的形式运行。
注意
1. 通常你要导入原有数据进 Elasticsearch 的话,你还需要 filter/date 插件来修改默认的"@timestamp" 字段值。
2. FileWatch 只支持文件的绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。
3. LogStash::Inputs::File 只是在进程运行的注册阶段初始化一个 FileWatch 对象。所以它不能支持类似 fluentd 那样的 path => "/path/to/% {+yyyy/MM/dd/hh}.log" 写法。达到相同目的,你只能写成 path => "/path/to/*/*/*/*.log" 。FileWatch 模块提供了一个稍微简单一点的写法: /path/to/**/*.log ,用 ** 来缩写表示递归全部子目录。
4. 在单个 input/file 中监听的文件数量太多的话,每次启动扫描构建监听队列会消耗较多的时间。给使用者的感觉好像读取不到一样,这是正常现象。
5. start_position 仅在该文件从未被监听过的时候起作用。如果 sincedb 文件中已经有这个文件的 inode 记录了,那么 logstash 依然会从记录过的 pos开始读取数据。所以重复测试的时候每回需要删除 sincedb 文件(官方博客上提供了另一个巧妙的思路:将 sincedb_path 定义为 /dev/null ,则每次重启自动从头开始读)。
6. 因为 windows 平台上没有 inode 的概念,Logstash 某些版本在 windows 平台上监听文件不是很靠谱。windows 平台上,推荐考虑使用 nxlog 作为收集端
标准输入(stdin):
配置示例
input {
stdin {
add_field => {"key" => "value"}
codec => "plain"
tags => ["add"]
type => "std"
}
}
运行结果
{
"message" => "hello world",
"@version" => "1",
"@timestamp" => "2014-08-08T06:48:47.789Z",
"type" => "std",
"tags" => [
[0] "add"
],
"key" => "value",
"host" => "raochenlindeMacBook-Air.local"
}
解释
type 和 tags 是 logstash 事件中两个特殊的字段。通常来说我们会在输入区段中通过 type 来标记事件类型 —— 我们肯定是提前能知道这个事件属于什么类型的。而tags 则是在数据处理过程中,由具体的插件来添加或者删除的。
举例:
input {
stdin {
type => "web"
}
}
filter {
if [type] == "web" {
grok {
match => ["message", %{COMBINEDAPACHELOG}]
}
}
}
output {
if "_grokparsefailure" in [tags] {
nagios_nsca {
nagios_status => "1"
}
} else {
elasticsearch {
}
}
}
合并多行数据(Multiline)
有些时候,应用程序调试日志会包含非常丰富的内容,为一个事件打印出很多行内容。这种日志通常都很难通过命令行解析的方式做分析。而 logstash 正为此准备好了 codec/multiline 插件!
小贴士:multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
配置示例
input {
stdin {
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
运行结果
运行 logstash 进程,然后在等待输入的终端中输入如下几行数据:
[Aug/08/08 14:54:03] hello world
[Aug/08/09 14:54:04] hello logstash
hello best practice
hello raochenlin
[Aug/08/10 14:54:05] the end
你会发现 logstash 输出下面这样的返回:codec配置
{
"@timestamp" => "2014-08-09T13:32:03.368Z",
"message" => "[Aug/08/08 14:54:03] hello world\n",
"@version" => "1",
"host" => "raochenlindeMacBook-Air.local"
}
{
"@timestamp" => "2014-08-09T13:32:24.359Z",
"message" => "[Aug/08/09 14:54:04] hello logstash\n\n
hello best practice\n\n hello raochenlin\n",
"@version" => "1",
"tags" => [
[0] "multiline"
],
"host" => "raochenlindeMacBook-Air.local"
}
你看,后面这个事件,在 "message" 字段里存储了三行数据!
你可能注意到输出的事件中都没有最后的"the end"字符串。这是因为你最后输入的回车符 \n 并不匹配设定的 ^\[ 正则表达式,logstash 还得等下一行数据直到匹配成功后才会输出这个事件。
解释
其实这个插件的原理很简单,就是把当前行的数据添加到前面一行后面,,直到新进的当前行匹配 ^\[ 正则为止。这个正则还可以用 grok 表达式。
以上是关于ELK-logstash的主要内容,如果未能解决你的问题,请参考以下文章