如何从logstash中的日志消息中获取数字?
Posted
技术标签:
【中文标题】如何从logstash中的日志消息中获取数字?【英文标题】:How to get numbers from a log message in logstash? 【发布时间】:2014-02-19 11:44:15 【问题描述】:我对 Logstash 很陌生。 我可以运行 logstash jar 文件并查看 kibana 网页。 很酷~~
现在,我想将下一行(系统日志消息)更改为下一行。
Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40
==>
'timestamp': 'Feb 19 18:45:29',
'host': 'SD550', 0x1000:10, 0x1001:20, 0x1002:30, 0x1003:40
在日志消息中,'0x1000' 是起始寄存器地址,'4' 是寄存器值的数量,下一个值就是值。所以,这意味着 0x1000:10、0x1001:20、0x1002:30、0x1003:40。 重要的一点是寄存器值的数量能够改变。因此,日志消息的长度可以是可变的。即使它有任何长度,我也想得到一个正确的结果。 (例如,0x2000,2,12,22 ==> 0x2000:12, 0x2001:22)
这是我不完整的 logstash 配置文件。我发现了一些过滤器,例如 grok、mutate 和 extractnumbers。但是,我不知道该怎么做。
input
file
path => "/var/log/syslog"
type => "syslog"
filter
???
output
elasticsearch
我知道我想要很多,对不起,伙计们。另外,我的最终目标是为kibana中的特定寄存器绘制TIME(x)/VALUE(y)图表。是否可以?我可以给你一些建议吗?
谢谢你, 金英敏
【问题讨论】:
【参考方案1】:您会想使用 grok 来匹配各个字段,有许多内置的 grok 模式可以帮助您解决这个问题。 %SYSLOGBASE 将为您获取时间戳和主机,然后可以使用 %NUMBER 等模式以及https://github.com/logstash/logstash/blob/v1.3.3/patterns/grok-patterns
中的其他模式获取其余部分由于您的可变日志长度,您的模式可能会变得有点复杂,但是我认为您只需匹配所有数字并将它们存储在一个数组中,然后在您的 mutate 中,您就可以将它们映射到寄存器价值。
就在 kibana 中生成图表而言,一旦您的数据格式正确,这将不会很困难。有一个易于填充的内置时间序列图类型。
【讨论】:
【参考方案2】:2 月 19 日 18:45:29 SD550 插孔:REG,0x1000,4,10,20,30,40
如果您对与上述类似的数据使用以下配置文件,并打开 kibana,则它可以工作。它将字段拆分为您可以搜索的不同类别。我对这一切都很陌生,但这就是我的做法。下面还有一个简单的时间饼图的屏幕截图在我将上面的大约 8 行放入不同的时间和地址值之后
input
tcp
type => "test"
port => 3333
filter
grok
match => ["message", "%MONTH:month %DAY:day %TIME:time %WORD:sd550 %WORD:name: %WORD:asmThing,%WORD:address,%NUMBER:firstno%NUMBER:2no%NUMBER:3no%NUMBER:4no%NUMBER:5no"]
output
elasticsearch
# Setting 'embedded' will run a real elasticsearch server inside logstash.
# This option below saves you from having to run a separate process just
# for ElasticSearch, so you can get started quicker!
embedded => true
【讨论】:
寄存器文件长度可变。但是你的 grok 过滤器是固定的!当寄存器超过 4 个时,您的过滤器将失败。 我没有意识到它改变了。我的过滤器无法用于更改,您是对的。【参考方案3】:我有一个想法。要处理具有多个寄存器地址的可变日志长度:值, 您可以先使用grok 过滤器过滤消息。然后使用csv过滤器将每个寄存器值分开。
过滤器:
filter
grok
match => ["message", "%MONTH:month %NUMBER:day %TIME:time %WORD:host %WORD:user: %WORD:unit,%WORD:address,%NUMBER:regNumber,%GREEDYDATA:regValue"]
add_field => ["logdate","%month %day %time"]
remove_field => ["month","day", "time"]
csv
source => "regValue"
remove_field => ["regValue"]
输出:
"message" => "Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40",
"@version" => "1",
"@timestamp" => "2014-02-20T02:05:53.608Z",
"host" => "SD550"
"user" => "Jack",
"unit" => "REG",
"address" => "0x1000",
"regNumber" => "4",
"logdate" => "Feb 19 18:45:29",
"column1" => "10",
"column2" => "20",
"column3" => "30",
"column4" => "40"
但是,地址字段名称是由 csv 过滤器给出的(您不能通过CSV filter column 给出字段名称,因为字段的数量是可变的)。如果你想满足你的要求,你需要修改csv filter。
【讨论】:
【参考方案4】:谢谢所有回答我问题的人.. 尤其是 Ben Lim。
在你的帮助下,我得到了这个结果。
"@version" => "1",
"@timestamp" => "2014-02-20T11:07:28.125Z",
"type" => "syslog",
"host" => "ymkim-SD550",
"path" => "/var/log/syslog",
"ts" => "Feb 20 21:07:27",
"user" => "ymkim",
"func" => "REG",
"8192" => 16,
"8193" => 32,
"8194" => 17,
"8195" => 109
来自$ logger REG,2000,4,10,20,11,6d
这是我的配置文件。
input
file
path => "/var/log/syslog"
type => "syslog"
filter
grok
match => ["message", "%SYSLOGTIMESTAMP:ts %SYSLOGHOST:hostname %WORD:user: %WORD:func,%WORD:address,%NUMBER:regNumber,%GREEDYDATA:regValue"]
if [func] == "REG"
modbus_csv
start_address => "address"
num_register => "regNumber"
source => "regValue"
remove_field => ["regValue", "hostname", "message",
"address", "regNumber"]
output
stdout debug => true
elasticsearch
并修改了 csv 过滤器,命名为 modbus_csv.rb。
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "csv"
# CSV filter. Takes an event field containing CSV data, parses it,
# and stores it as individual fields (can optionally specify the names).
class LogStash::Filters::MODBUS_CSV < LogStash::Filters::Base
config_name "modbus_csv"
milestone 2
# The CSV data in the value of the source field will be expanded into a
# datastructure.
config :source, :validate => :string, :default => "message"
# Define a list of column names (in the order they appear in the CSV,
# as if it were a header line). If this is not specified or there
# are not enough columns specified, the default column name is "columnX"
# (where X is the field number, starting from 1).
config :columns, :validate => :array, :default => []
config :start_address, :validate => :string, :default => "0"
config :num_register, :validate => :string, :default => "0"
# Define the column separator value. If this is not specified the default
# is a comma ','.
# Optional.
config :separator, :validate => :string, :default => ","
# Define the character used to quote CSV fields. If this is not specified
# the default is a double quote '"'.
# Optional.
config :quote_char, :validate => :string, :default => '"'
# Define target for placing the data.
# Defaults to writing to the root of the event.
config :target, :validate => :string
public
def register
# Nothing to do here
end # def register
public
def filter(event)
return unless filter?(event)
@logger.debug("Running modbus_csv filter", :event => event)
matches = 0
@logger.debug(event[@num_register].hex)
for i in 0..(event[@num_register].hex)
@columns[i] = event[@start_address].hex + i
end
if event[@source]
if event[@source].is_a?(String)
event[@source] = [event[@source]]
end
if event[@source].length > 1
@logger.warn("modbus_csv filter only works on fields of length 1",
:source => @source, :value => event[@source],
:event => event)
return
end
raw = event[@source].first
begin
values = CSV.parse_line(raw, :col_sep => @separator, :quote_char => @quote_char)
if @target.nil?
# Default is to write to the root of the event.
dest = event
else
dest = event[@target] ||=
end
values.each_index do |i|
field_name = @columns[i].to_s || "column#i+1"
dest[field_name] = values[i].hex
end
filter_matched(event)
rescue => e
event.tag "_modbus_csvparsefailure"
@logger.warn("Trouble parsing modbus_csv", :source => @source, :raw => raw,
:exception => e)
return
end # begin
end # if event
@logger.debug("Event after modbus_csv filter", :event => event)
end # def filter
end # class LogStash::Filters::Csv
最后,我得到了我想要的图表。 (*func = REG (13) 4096 mean per 10m | (13 hits))
【讨论】:
以上是关于如何从logstash中的日志消息中获取数字?的主要内容,如果未能解决你的问题,请参考以下文章