如何从logstash中的日志消息中获取数字?

Posted

技术标签:

【中文标题】如何从logstash中的日志消息中获取数字?【英文标题】:How to get numbers from a log message in logstash? 【发布时间】:2014-02-19 11:44:15 【问题描述】:

我对 Logstash 很陌生。 我可以运行 logstash jar 文件并查看 kibana 网页。 很酷~~

现在,我想将下一行(系统日志消息)更改为下一行。

Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40
==>
 'timestamp': 'Feb 19 18:45:29', 
  'host': 'SD550', 0x1000:10, 0x1001:20, 0x1002:30, 0x1003:40 

在日志消息中,'0x1000' 是起始寄存器地址,'4' 是寄存器值的数量,下一个值就是值。所以,这意味着 0x1000:10、0x1001:20、0x1002:30、0x1003:40。 重要的一点是寄存器值的数量能够改变。因此,日志消息的长度可以是可变的。即使它有任何长度,我也想得到一个正确的结果。 (例如,0x2000,2,12,22 ==> 0x2000:12, 0x2001:22)

这是我不完整的 logstash 配置文件。我发现了一些过滤器,例如 grok、mutate 和 extractnumbers。但是,我不知道该怎么做。

input  
  file  
        path => "/var/log/syslog"
        type => "syslog"
   


filter 
   ???


output 
  elasticsearch  

我知道我想要很多,对不起,伙计们。另外,我的最终目标是为kibana中的特定寄存器绘制TIME(x)/VALUE(y)图表。是否可以?我可以给你一些建议吗?

谢谢你, 金英敏

【问题讨论】:

【参考方案1】:

您会想使用 grok 来匹配各个字段,有许多内置的 grok 模式可以帮助您解决这个问题。 %SYSLOGBASE 将为您获取时间戳和主机,然后可以使用 %NUMBER 等模式以及https://github.com/logstash/logstash/blob/v1.3.3/patterns/grok-patterns

中的其他模式获取其余部分

由于您的可变日志长度,您的模式可能会变得有点复杂,但是我认为您只需匹配所有数字并将它们存储在一个数组中,然后在您的 mutate 中,您就可以将它们映射到寄存器价值。

就在 kibana 中生成图表而言,一旦您的数据格式正确,这将不会很困难。有一个易于填充的内置时间序列图类型。

【讨论】:

【参考方案2】:

2 月 19 日 18:45:29 SD550 插孔:REG,0x1000,4,10,20,30,40

如果您对与上述类似的数据使用以下配置文件,并打开 kibana,则它可以工作。它将字段拆分为您可以搜索的不同类别。我对这一切都很陌生,但这就是我的做法。下面还有一个简单的时间饼图的屏幕截图在我将上面的大约 8 行放入不同的时间和地址值之后

input 
  tcp  
    type => "test"
    port => 3333
   



filter 
    grok 
        match => ["message", "%MONTH:month %DAY:day %TIME:time %WORD:sd550 %WORD:name: %WORD:asmThing,%WORD:address,%NUMBER:firstno%NUMBER:2no%NUMBER:3no%NUMBER:4no%NUMBER:5no"]

 
output 
  elasticsearch 
    # Setting 'embedded' will run  a real elasticsearch server inside logstash.
    # This option below saves you from having to run a separate process just
    # for ElasticSearch, so you can get started quicker!
    embedded => true
  

【讨论】:

寄存器文件长度可变。但是你的 grok 过滤器是固定的!当寄存器超过 4 个时,您的过滤器将失败。 我没有意识到它改变了。我的过滤器无法用于更改,您是对的。【参考方案3】:

我有一个想法。要处理具有多个寄存器地址的可变日志长度:值, 您可以先使用grok 过滤器过滤消息。然后使用csv过滤器将每个寄存器值分开。

过滤器:

filter 
    grok 
            match => ["message", "%MONTH:month %NUMBER:day %TIME:time %WORD:host %WORD:user: %WORD:unit,%WORD:address,%NUMBER:regNumber,%GREEDYDATA:regValue"]
            add_field => ["logdate","%month %day %time"]
            remove_field => ["month","day", "time"]
    

    csv 
            source => "regValue"
            remove_field => ["regValue"]
    

输出:


   "message" => "Feb 19 18:45:29 SD550 Jack: REG,0x1000,4,10,20,30,40",
  "@version" => "1",
"@timestamp" => "2014-02-20T02:05:53.608Z",
      "host" => "SD550"
      "user" => "Jack",
      "unit" => "REG",
   "address" => "0x1000",
 "regNumber" => "4",
   "logdate" => "Feb 19 18:45:29",
   "column1" => "10",
   "column2" => "20",
   "column3" => "30",
   "column4" => "40"

但是,地址字段名称是由 csv 过滤器给出的(您不能通过CSV filter column 给出字段名称,因为字段的数量是可变的)。如果你想满足你的要求,你需要修改csv filter。

【讨论】:

【参考方案4】:

谢谢所有回答我问题的人.. 尤其是 Ben Lim。

在你的帮助下,我得到了这个结果。


      "@version" => "1",
    "@timestamp" => "2014-02-20T11:07:28.125Z",
          "type" => "syslog",
          "host" => "ymkim-SD550",
          "path" => "/var/log/syslog",
            "ts" => "Feb 20 21:07:27",
          "user" => "ymkim",
          "func" => "REG",
          "8192" => 16,
          "8193" => 32,
          "8194" => 17,
          "8195" => 109

来自$ logger REG,2000,4,10,20,11,6d

这是我的配置文件。

input  
  file  
        path => "/var/log/syslog"
        type => "syslog"
   


filter 
  grok 
        match => ["message", "%SYSLOGTIMESTAMP:ts %SYSLOGHOST:hostname %WORD:user: %WORD:func,%WORD:address,%NUMBER:regNumber,%GREEDYDATA:regValue"]
  

  if [func] == "REG"   
      modbus_csv 
          start_address => "address"
          num_register => "regNumber"
          source => "regValue"
          remove_field => ["regValue", "hostname", "message", 
                "address", "regNumber"]
      
  



output 
    stdout  debug => true 
  elasticsearch  

并修改了 csv 过滤器,命名为 modbus_csv.rb。

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"

require "csv"

# CSV filter. Takes an event field containing CSV data, parses it,
# and stores it as individual fields (can optionally specify the names).
class LogStash::Filters::MODBUS_CSV < LogStash::Filters::Base
  config_name "modbus_csv"
  milestone 2

  # The CSV data in the value of the source field will be expanded into a
  # datastructure.
  config :source, :validate => :string, :default => "message"

  # Define a list of column names (in the order they appear in the CSV,
  # as if it were a header line). If this is not specified or there
  # are not enough columns specified, the default column name is "columnX"
  # (where X is the field number, starting from 1).
  config :columns, :validate => :array, :default => []
  config :start_address, :validate => :string, :default => "0"
  config :num_register, :validate => :string, :default => "0"

  # Define the column separator value. If this is not specified the default
  # is a comma ','.
  # Optional.
  config :separator, :validate => :string, :default => ","

  # Define the character used to quote CSV fields. If this is not specified
  # the default is a double quote '"'.
  # Optional.
  config :quote_char, :validate => :string, :default => '"'

  # Define target for placing the data.
  # Defaults to writing to the root of the event.
  config :target, :validate => :string

  public
  def register

    # Nothing to do here

  end # def register

  public
  def filter(event)
    return unless filter?(event)

    @logger.debug("Running modbus_csv filter", :event => event)

    matches = 0

    @logger.debug(event[@num_register].hex)
    for i in 0..(event[@num_register].hex)
        @columns[i] = event[@start_address].hex + i
    end
    if event[@source]
      if event[@source].is_a?(String)
        event[@source] = [event[@source]]
      end

      if event[@source].length > 1
        @logger.warn("modbus_csv filter only works on fields of length 1",
                     :source => @source, :value => event[@source],
                     :event => event)
        return
      end

      raw = event[@source].first
      begin
        values = CSV.parse_line(raw, :col_sep => @separator, :quote_char => @quote_char)

        if @target.nil?
          # Default is to write to the root of the event.
          dest = event
        else
          dest = event[@target] ||= 
        end

        values.each_index do |i|
          field_name = @columns[i].to_s || "column#i+1"
          dest[field_name] = values[i].hex
        end

        filter_matched(event)
      rescue => e
        event.tag "_modbus_csvparsefailure"
        @logger.warn("Trouble parsing modbus_csv", :source => @source, :raw => raw,
                      :exception => e)
        return
      end # begin
    end # if event

    @logger.debug("Event after modbus_csv filter", :event => event)

  end # def filter

end # class LogStash::Filters::Csv

最后,我得到了我想要的图表。 (*func = REG (13) 4096 mean per 10m | (13 hits))

【讨论】:

以上是关于如何从logstash中的日志消息中获取数字?的主要内容,如果未能解决你的问题,请参考以下文章

Logstash 如何与 Syslog 集成?

从 iOS 设备获取统一系统日志

Logstash 应该只记录 grok 解析的消息

请教logstash的多行日志问题,最后一行的日志总是不能获取

使用logstash结合logback收集微服务日志

filebeat+logstash+elasticsearch收集haproxy日志