计算事件之间的时间

Posted

技术标签:

【中文标题】计算事件之间的时间【英文标题】:Calculating time between events 【发布时间】:2016-09-18 02:42:49 【问题描述】:

我有一条消息流经多个系统,每个系统都使用时间戳和 uuid messageId 记录消息进入和退出。我正在通过以下方式摄取所有日志:

filebeat --> logstash --> elastic search --> kibana

因此,我现在有这些事件:

@timestamp                      messageId                               event 
May 19th 2016, 02:55:29.003     00e02f2f-32d5-9509-870a-f80e54dc8775    system1Enter
May 19th 2016, 02:55:29.200     00e02f2f-32d5-9509-870a-f80e54dc8775    system1Exit
May 19th 2016, 02:55:29.205     00e02f2f-32d5-9509-870a-f80e54dc8775    system2Enter
May 19th 2016, 02:55:29.453     00e02f2f-32d5-9509-870a-f80e54dc8775    system2Exit

我想生成一份报告(最好是堆叠的条形图或柱形图),说明在每个系统中花费的时间:

messageId                               in1:1->2:in2
00e02f2f-32d5-9509-870a-f80e54dc8775    197:5:248

最好的方法是什么? Logstash 过滤器? kibana 计算字段?

【问题讨论】:

logstash aggregate filter中有一个很好的例子 @Val,嗯,该示例需要经过时间才能出现在日志行中 - 不从两个单独的日志行计算它。也许我可以以某种方式将它与Elapsed 插件一起使用。 【参考方案1】:

您可以仅使用 Logstash aggregate filter 来实现这一点,但是,您必须大幅重新实现 elapsed filter 已经完成的功能,所以这很遗憾,对吧?

然后让我们混合使用 Logstash aggregate filter 和 elapsed filter。后者用于测量每个阶段的时间,前者用于将所有时间信息聚合到最后一个事件中。

旁注:您可能需要重新考虑您的时间戳格式,使其更符合解析标准。我已将它们转换为 ISO 8601 以使其更易于解析,但您可以随意推出自己的正则表达式。

所以我从以下日志开始:

2016-05-19T02:55:29.003 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Enter
2016-05-19T02:55:29.200 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Exit
2016-05-19T02:55:29.205 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Enter
2016-05-19T02:55:29.453 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Exit

首先我使用三个elapsed 过滤器(每个阶段一个in11->2in2),然后使用三个聚合过滤器来收集所有时间信息。它看起来像这样:

filter 
  grok 
    match => ["message", "%TIMESTAMP_ISO8601:timestamp %UUID:messageId %WORD:event"]
    add_tag => [ "%event" ]
  
  date 
    match => [ "timestamp", "ISO8601"]
  
  # Measures the execution time of system1
  elapsed 
    unique_id_field => "messageId"
    start_tag => "system1Enter"
    end_tag => "system1Exit"
    new_event_on_match => true
    add_tag => ["in1"]
  
  # Measures the execution time of system2
  elapsed 
    unique_id_field => "messageId"
    start_tag => "system2Enter"
    end_tag => "system2Exit"
    new_event_on_match => true
    add_tag => ["in2"]
  
  # Measures the time between system1 and system2
  elapsed 
    unique_id_field => "messageId"
    start_tag => "system1Exit"
    end_tag => "system2Enter"
    new_event_on_match => true
    add_tag => ["1->2"]
  
  # Records the execution time of system1
  if "in1" in [tags] and "elapsed" in [tags] 
    aggregate 
      task_id => "%messageId"
      code => "map['report'] = [(event['elapsed_time']*1000).to_i]"
      map_action => "create"
    
  
  # Records the time between system1 and system2
  if "1->2" in [tags] and "elapsed" in [tags] 
    aggregate 
      task_id => "%messageId"
      code => "map['report'] << (event['elapsed_time']*1000).to_i"
      map_action => "update"
    
  
  # Records the execution time of system2
  if "in2" in [tags] and "elapsed" in [tags] 
    aggregate 
      task_id => "%messageId"
      code => "map['report'] << (event['elapsed_time']*1000).to_i; event['report'] = map['report'].join(':')"
      map_action => "update"
      end_of_task => true
    
  

在前两个事件之后,你会得到一个像这样的新事件,表明system1已经花费了197ms:


                 "@timestamp" => "2016-05-21T04:20:51.731Z",
                       "tags" => [ "elapsed", "elapsed_match", "in1" ],
               "elapsed_time" => 0.197,
                  "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775",
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.003Z"

在第三个事件之后,你会得到这样一个事件,显示system1和system2之间花费了多少时间,即5ms:


                 "@timestamp" => "2016-05-21T04:20:51.734Z",
                       "tags" => [ "elapsed", "elapsed_match", "1->2" ],
               "elapsed_time" => 0.005,
                  "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775",
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.200Z"

在第四个事件之后,你会得到一个像这样的新事件,它显示了在 system2 中花费了多少时间,即 248 毫秒。该事件还包含一个 report 字段,其中包含消息的所有时间信息


                 "@timestamp" => "2016-05-21T04:20:51.736Z",
                       "tags" => [ "elapsed", "elapsed_match", "in2" ],
               "elapsed_time" => 0.248,
                  "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775",
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.205Z"
                     "report" => "197:5:248"

【讨论】:

谢谢 val,我正从同一个兔子洞里走下去,这有助于我更快地到达那里。我注意到关于已用过滤器的一点是,由于我的每条日志行只匹配几个已用过滤器中的一个,所有不匹配的过滤器都会添加一个_grokparsefailure 标签。我在经过的过滤器中添加了tag_on_failure =&gt; [ ] 试图找出我在to_i 中看到的错误,显然地图行中的输入是一个数组而不是字符串。 Aggregate exception occurred. Error: undefined method to_i' for [197.0]:Array` 对不起,我的错,我在第一个聚合的代码中有错字(方括号在错误的地方),我已经更新了答案,请重试。 啊,这就是你在 rb 中创建地图的方式。我通过完全删除方括号来纠正您的错字,并且在连接时遇到了问题。现在一切都好。感谢您的帮助! 您知道是否有办法更改已过去或聚合创建的事件的日期?【参考方案2】:

我不得不在 logstash 5.4 中对这项工作进行一些调整,这是修改后的代码。

filter 
  grok 
    match => ["message", "%TIMESTAMP_ISO8601:timestamp %UUID:messageId %WORD:event"]
    add_tag => [ "%event" ]
  
  date 
    match => [ "timestamp", "ISO8601"]
  
  # Measures the execution time of system1
  elapsed 
    unique_id_field => "messageId"
    start_tag => "system1Enter"
    end_tag => "system1Exit"
    new_event_on_match => true
    add_tag => ["in1"]
  
  # Measures the time between system1 and system2
  elapsed 
    unique_id_field => "messageId"
    start_tag => "system1Exit"
    end_tag => "system2Enter"
    new_event_on_match => true
    add_tag => ["1->2"]
  
  # Measures the execution time of system2
  elapsed 
    unique_id_field => "messageId"
    start_tag => "system2Enter"
    end_tag => "system2Exit"
    new_event_on_match => true
    add_tag => ["in2"]
  
  # Records the execution time of system1
  if "in1" in [tags] and "elapsed" in [tags] 
    aggregate 
      task_id => "%messageId"
      code => "map['report'] = (event.get('elapsed_time')*1000).to_i"
      map_action => "create"
    
   
  # Records the time between system1 and system2
  if "1->2" in [tags] and "elapsed" in [tags] 
    aggregate 
      task_id => "%messageId"
      code => "map['report'] << (event.get('elapsed_time')*1000).to_i"
      map_action => "update"
    
  
  # Records the execution time of system2
  if "in2" in [tags] and "elapsed" in [tags] 
    aggregate 
      task_id => "%messageId"
      code => "map['report'] << (event.get('elapsed_time')*1000).to_i"
      map_action => "update"
      end_of_task => true
    
  

【讨论】:

使用多个 logstash 工作人员时,已用过滤器和聚合过滤器将无法正常工作。在那种情况下,我们如何解决同样的问题?谢谢。 你强制它使用单一的工作。这可能是一个瓶颈,但其他项目不必遵循相同的路径

以上是关于计算事件之间的时间的主要内容,如果未能解决你的问题,请参考以下文章

iOS开发:计算两个事件之间经过的秒数的简单方法是啥?

使用 pySpark 计算用户事件之间的平均时间

计算 Pyspark 中发生条件时两个事件之间的月数

MySQL - PHP:计算一天中多个事件之间的总小时数

Pandas - 根据条件计算相关事件

Big Query Compute 两个自定义事件之间的平均时间