Elasticsearch:计算多个状态更新的总持续时间 - transform 应用案例

Posted 中国社区官方博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch:计算多个状态更新的总持续时间 - transform 应用案例相关的知识,希望对你有一定的参考价值。

在我之前的文章中,我使用了 Kibana 所提供的 UI 来对所感兴趣的数据进行 transform:

在今天的文章中,我将使用 API 的形式来展示如何使用 transform 来对数据进行转换。

Elasticsearch Transforms 让你

这种以实体(entity)为中心的视图对于由多个文档(如用户行为或会话)组成的各种数据很有帮助。 例如,分布式系统中的会话或请求的持续时间是常见的场景。 以下帖子基于 StackOverflow 问题,该问题以微小的变化反复出现 - 将其用作蓝图。

样本数据

共有三个不同的实体,其唯一 ID A、B 和 C。每个实体都可以通过 eventStart.timestamp 或 eventStop.timestamp 进行多个状态更新:

POST test/_bulk
{ "index" : { "_id" : "1" } }
{ "uniqueID" : "A",   "eventStart": {"timestamp": "2020-07-01T13:50:55.000Z"} }
{ "index" : { "_id" : "2" } }
{ "uniqueID" : "A",   "eventStop": {"timestamp": "2020-07-01T13:51:00.000Z"} }
{ "index" : { "_id" : "3" } }
{ "uniqueID" : "B",   "eventStart": {"timestamp": "2020-07-01T13:52:25.000Z"} }
{ "index" : { "_id" : "4" } }
{ "uniqueID" : "B",   "eventStop": {"timestamp": "2020-07-01T13:53:00.000Z"} }
{ "index" : { "_id" : "5" } }
{ "uniqueID" : "A",   "eventStop": {"timestamp": "2020-07-01T13:54:55.000Z"} }
{ "index" : { "_id" : "6" } }
{ "uniqueID" : "C",   "eventStart": {"timestamp": "2020-07-01T13:54:55.000Z"} }

我们运行上面的命令把数据导入到 Elasticsearch 中去。

依靠默认映射,两个日期和关键字字段是相关的,用于计算不同的持续时间:

# Request
GET test/_mapping

# Response
{
  "test" : {
    "mappings" : {
      "properties" : {
        "eventStart" : {
          "properties" : {
            "timestamp" : {
              "type" : "date"
            }
          }
        },
        "eventStop" : {
          "properties" : {
            "timestamp" : {
              "type" : "date"
            }
          }
        },
        "uniqueID" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

Transforms API

计算方法如下:

  1. 按 uniqueID 分组。
  2. 获取第一个 eventStart 和最后一个 eventStop 时间戳
  3. 计算时差(以秒为单位)。

虽然 Kibana 在 Elasticsearch Transform API 之上提供了一个 UI 来完成 transform,但此示例坚持使用 Elasticsearch API,它更易于遵循和重现。 一个方便的 API 是使用 POST _transform/_preview 进行 preview

从分组的第一步开始,由于聚合部分是强制性的,计算状态更新的数量:

# Request
POST _transform/_preview
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "test_transformed"
  },
  "pivot": {
    "group_by": {
      "id": {
        "terms": {
          "field": "uniqueID.keyword"
        }
      }
    },
    "aggregations": {
      "event_count": {
        "value_count": {
          "field": "_id"
        }
      }
    }
  }
}

# Response
{
  "preview" : [
    {
      "event_count" : 3,
      "id" : "A"
    },
    {
      "event_count" : 2,
      "id" : "B"
    },
    {
      "event_count" : 1,
      "id" : "C"
    }
  ],
  "generated_dest_index" : {
    "mappings" : {
      "_meta" : {
        "_transform" : {
          "transform" : "transform-preview",
          "version" : {
            "created" : "7.14.0"
          },
          "creation_date_in_millis" : 1632892282869
        },
        "created_by" : "transform"
      },
      "properties" : {
        "event_count" : {
          "type" : "long"
        },
        "id" : {
          "type" : "keyword"
        }
      }
    },
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "auto_expand_replicas" : "0-1"
      }
    },
    "aliases" : { }
  }
}

对于最终结果,它 “只是” 缺少正确的聚合:bucket 脚本聚合听起来很有希望。

使用 Bucket 脚本聚合进行转换

继续之前的转换,这个添加最早的开始时间戳,最晚的结束时间戳,以及两者之间的持续时间:

POST _transform/_preview
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "test_transformed"
  },
  "pivot": {
    "group_by": {
      "id": {
        "terms": {
          "field": "uniqueID.keyword"
        }
      }
    },
    "aggregations": {
      "event_count": {
        "value_count": {
          "field": "_id"
        }
      },
      "start": {
        "min": {
          "field": "eventStart.timestamp"
        }
      },
      "stop": {
        "max": {
          "field": "eventStop.timestamp"
        }
      },
      "duration": {
        "bucket_script": {
          "buckets_path": {
            "start": "start.value",
            "stop": "stop.value"
          },
          "script": """
            return (params.stop - params.start)/1000;
          """
        }
      }
    }
  }
}

上面请求的响应为:

{
  "preview" : [
    {
      "duration" : 240.0,
      "stop" : "2020-07-01T13:54:55.000Z",
      "event_count" : 3,
      "start" : "2020-07-01T13:50:55.000Z",
      "id" : "A"
    },
    {
      "duration" : 35.0,
      "stop" : "2020-07-01T13:53:00.000Z",
      "event_count" : 2,
      "start" : "2020-07-01T13:52:25.000Z",
      "id" : "B"
    },
    {
      "stop" : null,
      "event_count" : 1,
      "start" : "2020-07-01T13:54:55.000Z",
      "id" : "C"
    }
  ],
  "generated_dest_index" : {
    "mappings" : {
      "_meta" : {
        "_transform" : {
          "transform" : "transform-preview",
          "version" : {
            "created" : "7.14.0"
          },
          "creation_date_in_millis" : 1632905272319
        },
        "created_by" : "transform"
      },
      "properties" : {
        "stop" : {
          "type" : "date"
        },
        "event_count" : {
          "type" : "long"
        },
        "start" : {
          "type" : "date"
        },
        "id" : {
          "type" : "keyword"
        }
      }
    },
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "auto_expand_replicas" : "0-1"
      }
    },
    "aliases" : { }
  }
}

Painless 中的计算出奇的简单:(params.stop - params.start)/1000:

  • 不需要更复杂的 datetime API。 Elasticsearch 中的每个日期都存储为自 epoch 以来的长时间(以毫秒为单位),因此简单的差异就足够了。
  • 除以 1,000 就得到了以秒为单位。
  • 自动处理缺少的结束时间。

要创建 transform 作业而不仅仅是预览它,你需要将请求调整为以下内容:

PUT _transform/test_duration
{
  "description": "Calculate the duration of an event from multiple status updates (based on its uniqueID)",
  "frequency": "1m",
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "test_transformed"
  },
  "pivot": {
    "group_by": {
      "id": {
        "terms": {
          "field": "uniqueID.keyword"
        }
      }
    },
    "aggregations": {
      "event_count": {
        "value_count": {
          "field": "_id"
        }
      },
      "start": {
        "min": {
          "field": "eventStart.timestamp"
        }
      },
      "stop": {
        "max": {
          "field": "eventStop.timestamp"
        }
      },
      "duration": {
        "bucket_script": {
          "buckets_path": {
            "start": "start.value",
            "stop": "stop.value"
          },
          "script": """
            return (params.stop - params.start)/1000;
          """
        }
      }
    }
  }
}

使用 GET _transform/test_duration 你可以看到 transform 作业。 并且你必须明确地用 POST _transform/test_duration/_start 启动它,否则它什么也做不了。我们执行如下的命令:

POST _transform/test_duration/_start

最后,stats API 非常适合查看工作正在进行或已经完成的工作:

GET _transform/test_duration/_stats

上面命令的结果为:

{
  "count" : 1,
  "transforms" : [
    {
      "id" : "test_duration",
      "state" : "stopped",
      "stats" : {
        "pages_processed" : 2,
        "documents_processed" : 6,
        "documents_indexed" : 3,
        "documents_deleted" : 0,
        "trigger_count" : 1,
        "index_time_in_ms" : 753,
        "index_total" : 1,
        "index_failures" : 0,
        "search_time_in_ms" : 7,
        "search_total" : 2,
        "search_failures" : 0,
        "processing_time_in_ms" : 8,
        "processing_total" : 2,
        "delete_time_in_ms" : 0,
        "exponential_avg_checkpoint_duration_ms" : 1012.0,
        "exponential_avg_documents_indexed" : 3.0,
        "exponential_avg_documents_processed" : 6.0
      },
      "checkpointing" : {
        "last" : {
          "checkpoint" : 1,
          "timestamp_millis" : 1632907175535
        },
        "changes_last_detected_at" : 1632907175535
      }
    }
  ]
}

从上面我们可以看出来状态为 stopped,它表示 transform 已经完成。我们打开 Kibana,并查看状态:

 

 

最后但并非最不重要的是,这些是生成的文档:

GET test_transformed/_search

上面命令显示的结果为:

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "test_transformed",
        "_type" : "_doc",
        "_id" : "QRRx52klPRvG45a5oLgZ95sAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "duration" : 240.0,
          "stop" : "2020-07-01T13:54:55.000Z",
          "event_count" : 3,
          "start" : "2020-07-01T13:50:55.000Z",
          "id" : "A"
        }
      },
      {
        "_index" : "test_transformed",
        "_type" : "_doc",
        "_id" : "Qq7col5MOHvjTNMiAGonnqAAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "duration" : 35.0,
          "stop" : "2020-07-01T13:53:00.000Z",
          "event_count" : 2,
          "start" : "2020-07-01T13:52:25.000Z",
          "id" : "B"
        }
      },
      {
        "_index" : "test_transformed",
        "_type" : "_doc",
        "_id" : "Q-N5zMGevsgbxCl0WsHH6CIAAAAAAAAA",
        "_score" : 1.0,
        "_source" : {
          "stop" : null,
          "event_count" : 1,
          "start" : "2020-07-01T13:54:55.000Z",
          "id" : "C"
        }
      }
    ]
  }
}

这就是计算持续时间。

没有 transform 的聚合

你需要转换来获得这个结果吗? 不。

通过一些小的修改,您可以通过常规聚合获得相同的结果:

POST test/_search
{
  "size": 0,
  "aggregations": {
    "group_by": {
      "terms": {
        "field": "uniqueID.keyword"
      },
      "aggregations": {
        "start": {
          "min": {
            "field": "eventStart.timestamp"
          }
        },
        "stop": {
          "max": {
            "field": "eventStop.timestamp"
          }
        },
        "duration": {
          "bucket_script": {
            "buckets_path": {
              "start": "start.value",
              "stop": "stop.value"
            },
            "script": """
              return (params.stop - params.start)/1000;
            """
          }
        }
      }
    }
  }
}

上面命令返回的结果为:

{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 6,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "group_by" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "A",
          "doc_count" : 3,
          "stop" : {
            "value" : 1.593611695E12,
            "value_as_string" : "2020-07-01T13:54:55.000Z"
          },
          "start" : {
            "value" : 1.593611455E12,
            "value_as_string" : "2020-07-01T13:50:55.000Z"
          },
          "duration" : {
            "value" : 240.0
          }
        },
        {
          "key" : "B",
          "doc_count" : 2,
          "stop" : {
            "value" : 1.59361158E12,
            "value_as_string" : "2020-07-01T13:53:00.000Z"
          },
          "start" : {
            "value" : 1.593611545E12,
            "value_as_string" : "2020-07-01T13:52:25.000Z"
          },
          "duration" : {
            "value" : 35.0
          }
        },
        {
          "key" : "C",
          "doc_count" : 1,
          "stop" : {
            "value" : null
          },
          "start" : {
            "value" : 1.593611695E12,
            "value_as_string" : "2020-07-01T13:54:55.000Z"
          }
        }
      ]
    }
  }
}

虽然结果的结构不同,但结果是相同的。一些附加说明:

  • 设置“size”: 0 这样无需返回任何文档。
  • 在术语聚合(terms aggregation)中,运行其它子聚合(sub aggregation)运行。
  • doc_count 中会自动计算涉及多少状态更新,因此不需要 value_count。
  • bucket_script 是相同的。

 

结论

希望这是 transform 或等效 aggregation 的有用蓝图。 现在你知道所有部分如何组合在一起以及要避免哪些陷阱。

该文档还描述了何时(不)使用转换,这导致了经典的 “取决于” 讨论:

译文:Elasticsearch Transforms: Calculate the Total Duration from Multiple Status Updates

以上是关于Elasticsearch:计算多个状态更新的总持续时间 - transform 应用案例的主要内容,如果未能解决你的问题,请参考以下文章

获取整个索引中的总词频(Elasticsearch)

使用 AFNetworking 计算下载多个文件的总进度

如何计算多个 android webview 所花费的总时间?

Elasticsearch集群“脑裂”现象

ElasticSearch如何更新集群的状态

计算多个选项卡/窗口中的总 XMPP 会话