elasticsearch 进行聚合+去重查询

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了elasticsearch 进行聚合+去重查询相关的知识,希望对你有一定的参考价值。

参考技术A 已客户customer为例
我想查询每日的客户数。
先按照日期分桶,然后在桶内按照 姓名来去重 来计算客户数(实际会按照客户id 来区分客户)
测试数据见 文章末尾
一共是9条数据, 名字分别为:
river Lucy 1 Lucy frank tom lily lily tom tom
不同的名字是 6 个。

查询结果为:

2019-04-10 当天查出了9条数据,去重后是6条。

打印结果为

这是我们期望的结果。

ElasticSearch聚合分析

聚合用于分析查询结果集的统计指标,我们以观看日志分析为例,介绍各种常用的ElasticSearch聚合操作。

目录:

首先展示一下我们要分析的文档结构:

{
    "video_id": 1289643545120062253, // 视频id
    "video_uid": 3931482202390368051, // 视频发布者id
    "uid": 47381776787453866, // 观看用户id
    "time": 1533891263224, // 时间发生时间
    "watch_duration": 30 // 观看时长
}

每个文档记录了一个观看事件,我们通过聚合分析用户的观看行为。

ElasticSearch引入了两个相关概念:

  • 桶(Buckets): 满足特定条件的文档的集合
  • 指标(Metrics): 桶中文档的统计值,如特定字段的平均值

查询用户观看视频数和观看时长

首先用sql语句描述这个查询:

SELECT uid, count(*) as view_count, avg(watch_duration) as avg_duration 
FROM view_log
WHERE time >= #{since} AND time <= #{to} 
GROUP BY uid;
GET /view_log/_search
{
   "size" : 0,
   "query": {
       "range": {
           "time": {
               "gte": 0, // since
               "lte": 0 // to
           }
       }
   },
   "aggs": {
      "agg": { // agg为聚合的名称
        "terms": { // 聚合的条件为 uid 相同
          "field": "uid"
        },
        "aggs": { // 添加统计指标(Metrics)
          "avg_duration": { 
              "avg": { // 统计 watch_duration 的平均值
                "field": "watch_duration" 
              }
          }
        }
      }
   }
}

response:

{
  "took": 10,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 100000,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "agg": {
      "buckets": [
        {
          "key": 21836334489858688,
          "doc_count": 4026,
          "avg_duration": {
            "value": 12778.882352941177
          }
        },
        {
          "key": 31489302390368051,
          "doc_count": 2717,
          "avg_duration": {
            "value": 2652.5714285714284
          }
        }
      ]
    }
}

result.aggregations.agg.buckets列表中包含了查询的结果。

因为我们按照terms:uid进行聚合,每个bucket为uid相同的文档集合,key字段即为uid。

doc_count 字段表明bucket中文档的数目即sql语句中的count(*) as view_count

avg_duration.value 表示 watch_duration 的平均值即该用户的平均观看时长。

聚合分页器

在实际应用中用户的数量非常惊人, 不可能通过一次查询得到全部结果因此我们需要分页器分批取回:

GET /view_log/_search
{
   "size" : 0,
   "query": {
       "range": {
           "time": {
               "gte": 0, // since
               "lte": 0 // to
           }
       }
   },
   "aggs": {
      "agg": { 
        "terms": { 
            "field": "uid",
            "size": 10000, // bucket 的最大个数
            "include": { // 将聚合结果分为10页,序号为[0,9], 取第一页
                "partition": 0,
                "num_partitions": 10 
            }
        },
        "aggs": { 
          "avg_duration": { 
              "avg": { 
                "field": "watch_duration" 
              }
          }
        }
      }
   }
}

上述查询与上节的查询几乎完全相同,只是在aggs.agg.terms字段中添加了include字段进行分页。

查询视频uv

单个视频uv

uv是指观看一个视频的用户数(user view),与此相对没有按照用户去重的观看数称为pv(page view)。

用SQL语句来描述:

SELECT video_id, count(*) as pv, count(distinct uid) as uv
FROM view_log
WHERE video_id = #{video_id};

ElasticSearch可以方便的进行count(distinct)查询:

GET /view_log/_search
{
    "aggs": {
      "uv": {
        "cardinality": {
          "field": "uid"
        }
      }
   }
}

response:

{
  "took": 255,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 17579,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "uv": {
      "value": 11
    }
  }
}

批量查询视频uv

ElasticSearch也可以批量查询count(distinct), 先用SQL进行描述:

SELECT video_id, count(*) as pv, count(distinct uid) as uv
FROM view_log
GROUP BY video_id;

查询:

GET /view_log/_search
{
    "size": 0,
    "aggs": {
      "video": {
        "terms": {
          "field": "video_id"
        },
        "aggs": {
          "uv": {
              "cardinality": {
                "field": "uid"
              }
          }
        }
      }
   }
}

response:

{
  "took": 313,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 16940,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "video": {
      "buckets": [
        {
          "key": 25417499722062, // 视频id
          "doc_count": 427, // 视频观看次数 pv
          "uv": {
            "value": 124 // 观看视频的用户数 uv
          }
        },
        {
          "key": 72446898144,
          "doc_count": 744,
          "uv": {
            "value":233
          }
        }
      ]
    }
  }
}

Having查询

SQL可以使用HAVING语句根据聚合结果进行过滤,ElasticSearch可以使用pipeline aggregations达到此效果不过语法较为繁琐。

根据 count 进行过滤

使用SQL查询观看超过200次的视频:

SELECT video_id, count(*) as view_count
FROM view_log
GROUP BY video_id
HAVING count(*) > 200;
GET /view_log/_search
{
  "size": 0,
  "aggs": {
    "view_count": {
      "terms": {
        "field": "video_id"
      },
      "aggs": {
        "having": {
          "bucket_selector": {
            "buckets_path": { // 选择 view_count 聚合的 doc_count 进行过滤
              "view_count": "_count"
            },
            "script": {
              "source": "params.view_count > 200"
            }
          }
        }
      }
    }
  }
}

response:

{
  "took": 83,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 775,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "view_count": {
      "buckets": [
        {
          "key": 35025417499764062,
          "doc_count": 529
        },
        {
          "key": 19913672446898144,
          "doc_count": 759
        }
      ]
    }
  }
}

ElasticSearch实现类似HAVING查询的关键在于使用[bucket_selector]选择聚合结果进行过滤。

根据其它指标进行过滤

接下来我们尝试查询平均观看时长大于5分钟的视频, 用SQL描述该查询:

SELECT video_id FROM view_log
GROUP BY video_id
HAVING avg(watch_duration) > 300;
GET /view_log/_search
{
  "size": 0,
  "aggs": {
    "video": {
      "terms": {
        "field": "video_id"
      },
      "aggs": {
        "avg_duration": {
          "avg": {
            "field": "watch_duration"
          } 
        },
        "avg_duration_filter": {
          "bucket_selector": {
            "buckets_path": {
              "avg_duration": "avg_duration"
              },
              "script": {
                "source": "params.avg_duration > 200"
              }
          }  
        }
      }
    }
  }
}

response:

{
  "took": 137,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 255,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "video": {
      "buckets": [
        {
          "key": 5417499764062,
          "doc_count": 91576,
          "avg_duration": {
            "value": 103
          }
        },
        {
          "key": 19913672446898144,
          "doc_count": 15771,
          "avg_duration": {
            "value": 197
          }
        }
      ]
    }
  }
}

以上是关于elasticsearch 进行聚合+去重查询的主要内容,如果未能解决你的问题,请参考以下文章

elasticSearch Java API 怎么将查询出来的数据类似sql 一样的distinct 去重某个字段

Elasticsearch聚合后分页

聚合查询越来越慢?——详解Elasticsearch的Global Ordinals与High Cardinality

聚合查询越来越慢?——详解Elasticsearch的Global Ordinals与High Cardinality

ElasticSearch进阶:一文全览各种ES查询在Java中的实现

ElasticSearch聚合分析