Elasticsearch:在 Elasticsearch 中计算摄取延迟并存储摄取时间以提高可观察性

Posted Elastic 中国社区官方博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:在 Elasticsearch 中计算摄取延迟并存储摄取时间以提高可观察性相关的知识,希望对你有一定的参考价值。

使用 Elasticsearch 查看和分析数据时,经常会看到使用在远程/受监控系统上生成的时间戳的可视化、监控和警报解决方案。 但是,使用远程生成的时间戳可能存在风险。

如果远程事件的发生与事件到达 Elasticsearch 之间存在延迟,或者如果远程系统上的时间设置不正确,那么重要事件可能会被忽视。 因此,在将文档摄取到 Elasticsearch 时,存储每个文档的摄取时间以及监控每个事件到达 Elasticsearch 集群所需的时间通常很有帮助。 大于正常摄取延迟可能表示摄取过程存在问题或远程系统上的时间设置存在问题。

在此博客中,我们将展示如何使用带有 set processor 的摄取节点(ingest node)在文档到达 Elasticsearch 集群时向其添加摄取时间戳。 此时间戳可用于可视化、监控和警报。

此外,我们将展示如何使用脚本处理器(script processor)来计算摄入滞后。 这种滞后是远程/受监控系统上发生事件的时间戳与相应文档到达 Elasticsearch 集群的时间之间的差异。 这可用于确保摄取过程不会花费太长时间,并用于检测远程时间戳是否设置不正确。

添加摄入量时间戳并计算摄入量滞后

下面我们给出一个摄入管道的例子,它添加了一个名为 ingest_time 的摄入时间戳。 它还计算远程事件时间戳与事件到达 Elasticsearch 的时间之间的延迟,并将其存储在名为 lag_in_seconds 的字段中。

ingest_time 字段有两个用途:

  1. 它可以用作 Kibana 可视化中的时间字段以及用于监控和警报
  2. 它用于延迟计算

请注意,我们假设每个文档都有一个名为 event_timestamp 的字段,该字段对应于每个事件在远程/受监控系统上发生的时间。 你的事件时间戳字段的名称对于你的数据可能会有所不同,因此应进行相应修改。

我们将管道写入 Elasticsearch,如下所示:

PUT _ingest/pipeline/calculate_lag

  "description": "Add an ingest timestamp and calculate ingest lag",
  "processors": [
    
      "set": 
        "field": "_source.ingest_time",
        "value": "_ingest.timestamp"
      
    ,
    
      "script": 
        "lang": "painless",
        "source": """ 
            if(ctx.containsKey("ingest_time") && ctx.containsKey("event_timestamp"))  
              ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['event_timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000; 
             
        """
      
    
  ]

然后我们可以使用 simulate API 测试管道:

POST _ingest/pipeline/calculate_lag/_simulate 
 
  "docs": [ 
     
      "_source":  
        "event_timestamp": "2019-11-07T20:39:00.000Z" 
       
     
  ] 

它应该响应类似于以下内容,其中包括 lag_in_seconds 和 ingest_time 字段:


  "docs": [
    
      "doc": 
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": 
          "event_timestamp": "2019-11-07T20:39:00.000Z",
          "lag_in_seconds": 104913743,
          "ingest_time": "2023-03-06T03:21:23.412526720Z"
        ,
        "_ingest": 
          "timestamp": "2023-03-06T03:21:23.41252672Z"
        
      
    
  ]

最后,我们可以使用管道将真实文档写入 Elasticsearch:

PUT test_index/_doc/1?pipeline=calculate_lag 
 
  "event_timestamp": "2023-03-06T00:03:55.000Z", 
  "other_field": "whatever" 

我们可以检索文档:

GET test_index/_doc/1
PUT test_index/_doc/1?pipeline=calculate_lag 
 
  "event_timestamp": "2023-03-06T00:03:55.000Z", 
  "other_field": "whatever" 

在索引设置中指定管道

在生产部署中使用摄取管道时,最好将管道应用于索引设置,而不是在 PUT URL 中指定管道。 这可以通过将 index.default_pipeline 添加到索引设置来完成,如下所示:

PUT test_index/_settings 
 
  "index.default_pipeline": "calculate_lag" 

现在,任何发送到 test_index 的文档都将通过 calculate_lag 管道,而无需在 URL 中使用 ?pipeline=calculate_lag。 我们可以使用以下 PUT 命令验证这是否有效:

PUT test_index/_doc/2 
 
  "event_timestamp": "2023-03-06T00:03:57.000Z", 
  "other_field": "This is a new doc" 

执行以下命令以查看我们刚刚摄取的文档:

GET test_index/_doc/2

这应该返回一个如下所示的丰富文档:


  "_index": "test_index",
  "_id": "2",
  "_version": 1,
  "_seq_no": 3,
  "_primary_term": 1,
  "found": true,
  "_source": 
    "lag_in_seconds": 14147,
    "ingest_time": "2023-03-06T03:59:44.405306003Z",
    "event_timestamp": "2023-03-06T00:03:57.000Z",
    "other_field": "This is a new doc"
  

如何使用摄入滞后

大于预期的摄入滞后可能表明摄入过程存在问题。 因此,如果延迟超过某个阈值,则可以使用延迟来触发警报。 或者可以将延迟输入到机器学习作业中,以检测摄取处理时间中的意外偏差。

或者,可以执行分析以检测任何主机是否比其他主机有更大的滞后,这可能表明它的时钟设置可能有问题。 此外,可以创建 Kibana 仪表板来显示历史滞后值与当前滞后值的图形表示。

如果延迟大于预期,则应调查延迟的原因。 如果是由于远程系统时钟设置不正确引起的,则应正确设置远程时钟。 如果滞后是由缓慢的摄取过程引起的,则应调查摄取过程并对其进行调整以使其达到预期效果。 如何使用滞后的可能性仅受您的需求限制。

使用摄入时间戳

在 Kibana 中查看可视化或观察异常时,我们通常会考虑发生在最后一天或上周的事件。 但是,如果我们依赖于远程生成的事件时间戳而不是摄取时间戳,那么摄取过程中的任何滞后都可能导致某些文档永远不会被查看或监视。 例如,如果一个事件发生在昨天但今天才到达集群,它不会显示在今天事件的仪表板中,因为它的远程生成的时间戳是从昨天开始的。 此外,当我们昨天查看仪表板时它还不可用,因为它尚未存储在 Elasticsearch 中(因为它今天才到达)。

另一方面,如果我们可视化数据并使用摄取时间戳设置警报,我们可以保证我们正在考虑最新的事件到达 Elasticsearch,而不管事件发生的时间。 这将确保即使临时备份摄取过程也不会错过事件。

使用摄取时间戳的另一个优势与事件时间戳可能被恶意或无意中错误设置的事实有关。 例如,假设我们正在监控一个远程系统,黑客将其时钟设置为 1980 年代的某个日期。 如果我们依赖远程生成的事件时间戳,我们可能会错过恶意用户在该系统上执行的所有活动 —— 除非我们专门查找存储为 1980 年代发生的事件。 另一方面,如果我们依赖于摄取时间戳,我们可以保证我们将考虑最近到达 Elasticsearch 集群的所有事件,而不管远程系统为每个事件提供的时间戳。

结论

在此博客中,我们介绍了如何使用摄取处理器来存储摄取时间和计算摄取过程的延迟,以及你可能想要这样做的原因。 此外,我们概述了使用摄取时间进行监控和可视化的优势,以及使用远程事件时间戳的风险。

如果你还没有自己的 Elasticsearch 集群,你可以在几分钟内免费试用 Elasticsearch Service,或者下载 Elastic Stack 并在本地运行它。 此外,请查看 Elastic Observability,了解有关如何将日志、指标和 APM 跟踪大规模地集中在一个堆栈中的更多信息,以便您可以监控和响应环境中任何地方发生的事件。

以上是关于Elasticsearch:在 Elasticsearch 中计算摄取延迟并存储摄取时间以提高可观察性的主要内容,如果未能解决你的问题,请参考以下文章

elasticsearch的安装部署

Docker 安装 Elasticsearch

Elasticsearch date 类型详解

kibana连接elasticsearch集群做负载均衡

ES1:Windows下安装ElasticSearch

docker安装elasticsearch