Elasticsearch 聚合按每个存储桶的前一个结果进行过滤

Posted

技术标签:

【中文标题】Elasticsearch 聚合按每个存储桶的前一个结果进行过滤【英文标题】:Elasticsearch aggregations filtering by top one result from each bucket 【发布时间】:2014-12-07 16:02:19 【问题描述】:

在 Elasticsearch 的单个索引中给定这样的数据集:

实体 ID |创建 |状态
---------+------------+------------
1 | 2000/01/01 |草案
1 | 2001/01/02 |得到正式认可的
2 | 2000/01/01 |草案
2 | 2000/01/02 |得到正式认可的
2 | 2001/01/03 |被拒绝
3 | 2000/01/01 |草案
3 | 2001/01/03 |批准

我只想过滤最新状态已获批准的实体。

所以我一直在尝试聚合和子聚合,并且我设法让所有实体只包含最新状态,如下所示:


  "size": 0,
  "aggs": 
    "newest-event-query": 
      "terms": 
        "field": "entityId"
      ,
      "aggs": 
        "newest-event": 
          "top_hits": 
            "size": 1,
            "sort": [
              
                "created": 
                  "order": "desc"
                
              
            ]
          
        
      
    
  

应该给出这样的结果:

实体 ID |创建 |状态
---------+------------+------------
1 | 2001/01/02 |得到正式认可的
2 | 2001/01/02 |被拒绝
3 | 2001/01/03 |批准

但我想进一步过滤该结果以仅包含已批准的记录(1、3),然后最终能够查询该结果。

向 top_hits aggs 添加额外的 aggs 似乎不起作用:


  "size": 0,
  "aggs": 
    "newest-event-query": 
      "terms": 
        "field": "entityId"
      ,
      "aggs": 
        "newest-event": 
          "top_hits": 
            "size": 1,
            "sort": [
              
                "created": 
                  "order": "desc"
                
              
            ],
            "aggs": 
              "approved-only": 
                "filter": 
                  "term": 
                    "status": "approved"
                  
                
              
            
          
        
      
    
  

结果:

"error": "SearchPhaseExecutionException[Failed to execute phase [query], all shards failed; shardFailures [gupa9nwpQWmGa3JqFmF2NA][creations][0]: SearchParseException[[creations][0]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][0]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][0]: SearchParseException[[events][0]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][0]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][creations][1]: SearchParseException[[creations][1]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][1]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][1]: SearchParseException[[events][1]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][1]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][creations][2]: SearchParseException[[creations][2]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][2]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][2]: SearchParseException[[events][2]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][2]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][creations][3]: SearchParseException[[creations][3]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][3]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][3]: SearchParseException[[events][3]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][3]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][creations][4]: SearchParseException[[creations][4]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[creations][4]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; [gupa9nwpQWmGa3JqFmF2NA][events][4]: SearchParseException[[events][4]: from[-1],size[0]: Parse Failure [Failed to parse source ["size":0,"aggs":"newest-event-query":"terms":"field":"entityId","aggs":"newest-event":"top_hits":"size":1,"sort":["created":"order":"desc"],"aggs":"aproved-only":"filter":"term":"status":"approved"]]]; nested: SearchParseException[[events][4]: from[-1],size[0]: Parse Failure [Unknown key for a START_OBJECT in [newest-event]: [aggs].]]; ]",
"status": 400

任何帮助表示赞赏。

编辑:过滤已批准将不起作用,因为事件可以从已批准状态返回到另一个状态。我总是需要按最新状态过滤。本练习的重点是创建一个不可变的数据结构 - 单个实体可以经历很多阶段,但我们应该始终只查询最新的。

编辑 2: 为了找到解决方案,我还查看了父子结构,虽然关闭它仍然有一些限制,比如 has_parent 或 has_child 需要有一个固定的“id” ”。另一个明显且高效的解决方案是在写入时简单地标记最新项目 - 例如。使用布尔值,但我想要原子性,在一个文档上重置该布尔值并将其设置在新文档上不是原子操作。

【问题讨论】:

为什么不能只做一个过滤查询来获得批准?批准后是否有一些状态? 为什么不能添加过滤查询,所以聚合应用于过滤结果集? 添加了进一步的解释——有意义吗? 【参考方案1】:

我用过terms aggregation 和bucket selector aggreation。 在一个术语下,我使用创建日期字段的最大聚合创建了一个最近条目的存储桶,以及一个状态被批准的创建日期存储桶”。使用存储桶选择器,我保留了最新日期和最新批准日期相同的条款

Entity: 1                                        --> using terms aggregation
     "Latest created date":"2001-01-02"          --> using max aggregation
     "Latest approved doc":                      --> using filter aggregation
            "Latest approved date":"2000-01-01"  --> Using max aggregation
     "Bucket where Latest created date==Latest approved doc>Latest approved date" 
                                                 --> using bucket selector aggregation

映射


  "index90" : 
    "mappings" : 
      "properties" : 
        "created" : 
          "type" : "date",
          "format" : "[yyyy-MM-dd]"
        ,
        "entityId" : 
          "type" : "integer"
        ,
        "status" : 
          "type" : "text",
          "fields" : 
            "keyword" : 
              "type" : "text"
            
          
        
      
    
  

数据:

"hits" : [
      
        "_index" : "index90",
        "_type" : "_doc",
        "_id" : "xZsmY3EBdTQt60iNXDQB",
        "_score" : 1.0,
        "_source" : 
          "entityId" : 1,
          "created" : "2000-01-01",
          "status" : "draft"
        
      ,
      
        "_index" : "index90",
        "_type" : "_doc",
        "_id" : "xpsmY3EBdTQt60iNojQc",
        "_score" : 1.0,
        "_source" : 
          "entityId" : 1,
          "created" : "2001-01-02",
          "status" : "approved"
        
      ,
      
        "_index" : "index90",
        "_type" : "_doc",
        "_id" : "x5smY3EBdTQt60iN7DQc",
        "_score" : 1.0,
        "_source" : 
          "entityId" : 2,
          "created" : "2000-01-01",
          "status" : "draft"
        
      ,
      
        "_index" : "index90",
        "_type" : "_doc",
        "_id" : "yJsnY3EBdTQt60iNAzT7",
        "_score" : 1.0,
        "_source" : 
          "entityId" : 2,
          "created" : "2000-01-02",
          "status" : "approved"
        
      ,
      
        "_index" : "index90",
        "_type" : "_doc",
        "_id" : "yZsnY3EBdTQt60iNIjQY",
        "_score" : 1.0,
        "_source" : 
          "entityId" : 2,
          "created" : "2000-01-03",
          "status" : "rejected"
        
      
    ]

查询:


 "aggs": 
   "entitites": 
     "terms": 
       "field": "entityId",
       "size": 10
     ,
     "aggs": 
       "latest_entry": 
         "max": 
           "field": "created"
         
       ,
       "latest_approved_entry":
         "filter": 
           "term": 
             "status.keyword": "approved"
           
         ,
         "aggs": 
           "approved_date": 
             "max": 
               "field": "created"
             
           
         
       ,
       "select_bucket_with":
         "bucket_selector": 
           "buckets_path": 
             "latest_entry":"latest_entry",
             "latest_approved_entry":"latest_approved_entry>approved_date"
           ,
           "script": "if(params['latest_entry']==params['latest_approved_entry']) return true;"
         
       
     
   
 

结果:

"aggregations" : 
    "entitites" : 
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        
          "key" : 1,
          "doc_count" : 2,
          "latest_entry" : 
            "value" : 9.783936E11,
            "value_as_string" : "2001-01-02"
          ,
          "latest_approved_entry" : 
            "doc_count" : 1,
            "approved_date" : 
              "value" : 9.783936E11,
              "value_as_string" : "2001-01-02"
            
          
        
      ]
    
  

【讨论】:

以上是关于Elasticsearch 聚合按每个存储桶的前一个结果进行过滤的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch 聚合后排序 --- 2022-04-03

elasticsearch聚合之terms

Elasticsearch:Bucket script 聚合

elasticsearch aggregation - 桶的精确计数

es 结果排序

013.Elasticsearch聚合统计简单操作