Elasticsearch 聚合按每个存储桶的前一个结果进行过滤
Posted
技术标签:
【中文标题】Elasticsearch 聚合按每个存储桶的前一个结果进行过滤【英文标题】:Elasticsearch aggregations filtering by top one result from each bucket 【发布时间】:2014-12-07 16:02:19 【问题描述】:在 Elasticsearch 的单个索引中给定这样的数据集:
实体 ID |创建 |状态 ---------+------------+------------ 1 | 2000/01/01 |草案 1 | 2001/01/02 |得到正式认可的 2 | 2000/01/01 |草案 2 | 2000/01/02 |得到正式认可的 2 | 2001/01/03 |被拒绝 3 | 2000/01/01 |草案 3 | 2001/01/03 |批准
我只想过滤最新状态已获批准的实体。
所以我一直在尝试聚合和子聚合,并且我设法让所有实体只包含最新状态,如下所示:
"size": 0,
"aggs":
"newest-event-query":
"terms":
"field": "entityId"
,
"aggs":
"newest-event":
"top_hits":
"size": 1,
"sort": [
"created":
"order": "desc"
]
应该给出这样的结果:
实体 ID |创建 |状态 ---------+------------+------------ 1 | 2001/01/02 |得到正式认可的 2 | 2001/01/02 |被拒绝 3 | 2001/01/03 |批准
但我想进一步过滤该结果以仅包含已批准的记录(1、3),然后最终能够查询该结果。
向 top_hits aggs 添加额外的 aggs 似乎不起作用:
"size": 0,
"aggs":
"newest-event-query":
"terms":
"field": "entityId"
,
"aggs":
"newest-event":
"top_hits":
"size": 1,
"sort": [
"created":
"order": "desc"
],
"aggs":
"approved-only":
"filter":
"term":
"status": "approved"
结果:
"error": "SearchPhaseExecutionException[Failed to execute phase [query], all shards failed; shardFailures [gupa9nwpQWmGa3JqFmF2NA][creations][0]: SearchParseException[[creations][0]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][0]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][0]: SearchParseException[[events][0]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][0]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][creations][1]: SearchParseException[[creations][1]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][1]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][1]: SearchParseException[[events][1]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][1]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][creations][2]: SearchParseException[[creations][2]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][2]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][2]: SearchParseException[[events][2]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][2]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][creations][3]: SearchParseException[[creations][3]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][3]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][3]: SearchParseException[[events][3]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][3]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][creations][4]: SearchParseException[[creations][4]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][4]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][4]: SearchParseException[[events][4]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][4]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; ]",
"status": 400
任何帮助表示赞赏。
编辑:过滤已批准将不起作用,因为事件可以从已批准状态返回到另一个状态。我总是需要按最新状态过滤。本练习的重点是创建一个不可变的数据结构 - 单个实体可以经历很多阶段,但我们应该始终只查询最新的。
编辑 2: 为了找到解决方案,我还查看了父子结构,虽然关闭它仍然有一些限制,比如 has_parent 或 has_child 需要有一个固定的“id” ”。另一个明显且高效的解决方案是在写入时简单地标记最新项目 - 例如。使用布尔值,但我想要原子性,在一个文档上重置该布尔值并将其设置在新文档上不是原子操作。
【问题讨论】:
为什么不能只做一个过滤查询来获得批准?批准后是否有一些状态? 为什么不能添加过滤查询,所以聚合应用于过滤结果集? 添加了进一步的解释——有意义吗? 【参考方案1】:我用过terms aggregation 和bucket selector aggreation。 在一个术语下,我使用创建日期字段的最大聚合创建了一个最近条目的存储桶,以及一个状态被批准的创建日期存储桶”。使用存储桶选择器,我保留了最新日期和最新批准日期相同的条款
Entity: 1 --> using terms aggregation
"Latest created date":"2001-01-02" --> using max aggregation
"Latest approved doc": --> using filter aggregation
"Latest approved date":"2000-01-01" --> Using max aggregation
"Bucket where Latest created date==Latest approved doc>Latest approved date"
--> using bucket selector aggregation
映射
"index90" :
"mappings" :
"properties" :
"created" :
"type" : "date",
"format" : "[yyyy-MM-dd]"
,
"entityId" :
"type" : "integer"
,
"status" :
"type" : "text",
"fields" :
"keyword" :
"type" : "text"
数据:
"hits" : [
"_index" : "index90",
"_type" : "_doc",
"_id" : "xZsmY3EBdTQt60iNXDQB",
"_score" : 1.0,
"_source" :
"entityId" : 1,
"created" : "2000-01-01",
"status" : "draft"
,
"_index" : "index90",
"_type" : "_doc",
"_id" : "xpsmY3EBdTQt60iNojQc",
"_score" : 1.0,
"_source" :
"entityId" : 1,
"created" : "2001-01-02",
"status" : "approved"
,
"_index" : "index90",
"_type" : "_doc",
"_id" : "x5smY3EBdTQt60iN7DQc",
"_score" : 1.0,
"_source" :
"entityId" : 2,
"created" : "2000-01-01",
"status" : "draft"
,
"_index" : "index90",
"_type" : "_doc",
"_id" : "yJsnY3EBdTQt60iNAzT7",
"_score" : 1.0,
"_source" :
"entityId" : 2,
"created" : "2000-01-02",
"status" : "approved"
,
"_index" : "index90",
"_type" : "_doc",
"_id" : "yZsnY3EBdTQt60iNIjQY",
"_score" : 1.0,
"_source" :
"entityId" : 2,
"created" : "2000-01-03",
"status" : "rejected"
]
查询:
"aggs":
"entitites":
"terms":
"field": "entityId",
"size": 10
,
"aggs":
"latest_entry":
"max":
"field": "created"
,
"latest_approved_entry":
"filter":
"term":
"status.keyword": "approved"
,
"aggs":
"approved_date":
"max":
"field": "created"
,
"select_bucket_with":
"bucket_selector":
"buckets_path":
"latest_entry":"latest_entry",
"latest_approved_entry":"latest_approved_entry>approved_date"
,
"script": "if(params['latest_entry']==params['latest_approved_entry']) return true;"
结果:
"aggregations" :
"entitites" :
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
"key" : 1,
"doc_count" : 2,
"latest_entry" :
"value" : 9.783936E11,
"value_as_string" : "2001-01-02"
,
"latest_approved_entry" :
"doc_count" : 1,
"approved_date" :
"value" : 9.783936E11,
"value_as_string" : "2001-01-02"
]
【讨论】:
以上是关于Elasticsearch 聚合按每个存储桶的前一个结果进行过滤的主要内容,如果未能解决你的问题,请参考以下文章
Elasticsearch 聚合后排序 --- 2022-04-03
Elasticsearch:Bucket script 聚合