Elasticsearch之指标,分桶,管道聚合之操作类ElasticsearchRestTemplate和RestHighLevelClient以及dsl

Posted 江#

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch之指标,分桶,管道聚合之操作类ElasticsearchRestTemplate和RestHighLevelClient以及dsl相关的知识,希望对你有一定的参考价值。

聚合概念

聚合就相当于是数据库中的分组(GROUP BY) 但是他比GROUP BY更加的强大 \\

聚合类型三大类

Bucketing(桶聚合)

  • Date Histogram Aggregation:根据日期阶梯分组,例如给定阶梯为周,会自动每周分为一组
  • Histogram Aggregation:根据数值阶梯分组,与日期类似
  • Terms Aggregation:根据词条内容分组,词条内容完全匹配的为一组
  • Range Aggregation:数值和日期的范围分组,指定开始和结束,然后按段分组
  • Missing Aggregation:统计文档中缺失字段的数量,缺失字段包含值为null的情况
  • Filter Aggregation:对经过Filter条件过滤后的结果集进行聚合查询

每个桶都与一个键和一个文档标准相关联,通过桶的聚合查询,我们将得到一个桶的列表,即:满足条 件的文档集合。是按照某种方式对数据进行分组 比如对1,2,3,1,3使用Terms对其聚合,可以得到1桶,2桶,3桶。

看出ES的分组方式相当强大,mysql的group by只能实现类似Terms Aggregation的分组效果,而ES还可以根据阶梯和范围来分组。

Pipeline(管道)
对其他聚合的输出或相关指标进行二次聚合 比如Terms聚合后拿到了1,2,3桶这个时候我们可以在对他其他的属性进行聚合 也就是对结果在聚合 和数据库中 多字段分组一个意思(多字段分组可以使用script脚本去聚合需要添加配置 但是我试了下没什么用 如果有那个大神有好的解决办法可以分享出来 我这边用的是管道聚合也就是聚合套聚合 或者是在洗数据的时候拼接好分组字段)

Metric(指标)
指标聚合类似于 COUNT() 、 SUM() 、 MAX() 等统计方法

  • Avg Aggregation:求平均值
  • Max Aggregation:求最大值
  • Min Aggregation:求最小值
  • Percentiles Aggregation:求百分比
  • Stats Aggregation:同时返回avg、max、min、sum、count等
  • Sum Aggregation:求和
  • Top hits Aggregation:求前几
  • Value Count Aggregation:求总数

Term Aggregation

GET pre_package/_search

  "aggs": 
    "Group": 
      "terms": 
        "field": "productGroupId",
        "size": 10
      
    
  


这个表示,查询索引为pre_package中的文档数据,并按照cargoOwnerName进行聚合查询,命名为:Group,且只查询前10条

Range Aggregation

GET pre_package/_search

  "aggs": 
    "Group": 
      "range": 
        "field": "isPrePackage",
        "ranges": [
          
            "to": 1
          ,
          
            "from": 1,
            "to": 2
          ,
          
            "from": 2
          
        ]
      
    
  

按照isPrePackage属性,分为三档,分别为:小于1,1到2,大于2

Date Range Aggregation

GET pre_package/_search

  "aggs": 
    "Group": 
      "date_range": 
        "field": "update_date",
        
        "ranges": [
          
            "to": "2020-05-01 00:00:00"
          ,
          
            "from": "2020-05-02 00:00:00",
            "to": "2020-08-01 00:00:00"
          ,
          
            "from": "2020-08-02 00:00:00"
          
        ]
      
    
  

基于时间范围的聚合查询
Filter Aggregation

GET pre_package/_search

  "aggs": 
    "flight_Miles": 
      "filter": 
         "term": 
          "cargoOwnerName": "联合利华测试"
        
      
    
  

对经过Filter条件过滤后的结果集进行聚合查询
Missing Aggregation

GET pre_package/_search

  "aggs": 
    "without_age": 
      "missing": 
        "field": "orderTypeName"
      
    
  


统计文档中缺失字段的数量,缺失字段包含值为null的情况
Histogram Aggregation

GET pre_package/_search

  "aggs": 
    "test": 
      "histogram": 
        "field": "id",
        "interval": 100
      
    
  


直方图聚合,可按照一定的区间进行统计

ElasticsearchRestTemplate操作帮助类

package com.xxl.job.executor.utils;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.xxl.job.executor.es.EsSortPage;
import com.xxl.job.executor.es.QueryCondition;
import com.xxl.job.executor.es.TermQueryReq;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 * @description: es操作类(ElasticsearchRestTemplate)
 * @Author: 
 * @NAME: ElasticsearchRestEsUtils
 * @date: 2022/7/1 14:17
 */
@Component
public class ElasticsearchRestEsUtils 

    @Autowired
    private ElasticsearchRestTemplate elasticsearchTemplate;


    /**
     * @description: 创建索引
     * @author: 
     * @date 2022/7/1 14:07
     * @param: [cla]
     * @return: boolean
     */
    public boolean createIndexOps(Class cla) 
        // 索引别名
        IndexOperations ops = elasticsearchTemplate.indexOps(cla);
        if (!ops.exists()) 
            ops.create();
            ops.refresh();
            ops.putMapping(ops.createMapping());
        
        return true;
    

    /**
     * @description: 删除索引
     * @author: 
     * @date 2022/7/1 15:23
     * @param: [cla]
     * @return: void
     */
    public void deleteIndex(Class cla) 
        // 这里使用了 restTemplate 的 indexOps() 获取 IndexOperations 对象操作索引
        // dao 所提供的方法不支持操作索引
        boolean delete = elasticsearchTemplate.indexOps(cla).delete();
        System.out.println("delete = " + delete);
    

    /**
     * @description: 分页查询全部
     * @author: 
     * @date 2022/7/1 15:37
     * @param: [esSortPage]
     * @return: java.util.List<?>
     */
    public List<?> findByPageable(EsSortPage esSortPage) 
        List<Object> relist = new ArrayList<>();
        // 设置排序(排序方式,正序还是倒序,排序的 id)
        Sort sort = null;
        if (esSortPage.getSort() == 1) 
            sort = Sort.by(Sort.Direction.DESC, esSortPage.getSortField());
         else 
            sort = Sort.by(Sort.Direction.ASC, esSortPage.getSortField());
        

        int currentPage = esSortPage.getCurrentPage();
        int pageSize = esSortPage.getPageSize();

        // 设置查询分页
        PageRequest pageRequest = PageRequest.of(currentPage, pageSize, sort);
        ElasticsearchRepository<?, Long> dao = esSortPage.getDao();
        //分页查询
        Page<?> daoAll = dao.findAll(pageRequest);
        List<?> content = daoAll.getContent();
        for (Object o : content) 
            relist.add(o);
        
        return relist;
    

    /**
     * @description: 根据组装条件分页查询  会根据类型去匹配全等还是分词模糊查询
     * @author: 
     * @date 2022/7/3 11:07
     * @param: [termQueryReq]
     * @return: java.util.List<?>
     */
    public List<?> findByTermQuery(TermQueryReq termQueryReq) 
        List<Object> relist = new ArrayList<>();
        // 设置排序(排序方式,正序还是倒序)
        FieldSortBuilder balance = null;
        if (termQueryReq.getSort() == 1) 
            balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.DESC);
         else 
            balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.ASC);
        
        //分页条件
        int currentPage = termQueryReq.getCurrentPage();
        int pageSize = termQueryReq.getPageSize();
        // 构建查询条件
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        ;
        List<QueryCondition> mapList = termQueryReq.getMapList();
        for (QueryCondition queryCondition : mapList) 
            if (queryCondition.getFiltration() == 1) //过滤
                RangeQueryBuilder date = QueryBuilders.rangeQuery(queryCondition.getKey()).gte(queryCondition.getValue()).lte(queryCondition.getEndValue());
                boolQueryBuilder.filter(date);
             else 
                if (queryCondition.getExtractive() == 1) 
                    if (queryCondition.getValue() != null) 
                        boolQueryBuilder.must(QueryBuilders.matchQuery(queryCondition.getKey(), queryCondition.getValue()).minimumShouldMatch(termQueryReq.getScore()));//多条件全匹配 and  因为text会分词匹配 所以得设置一个百分比得分 越高月精确
                    
                 else 
                    if (queryCondition.getValue() instanceof Integer[]) 
                        Integer[] pulldown = (Integer[]) queryCondition.getValue();
                        for (Integer integer : pulldown) 
                            boolQueryBuilder.should(QueryBuilders.matchQuery(queryCondition.getKey(), integer));//多条件匹配之一 or
                        
                    
                
            
        

        // 分页
        Pageable pageable = PageRequest.of(currentPage, pageSize);

        // 执行查询
        NativeSearchQuery query = new NativeSearchQueryBuilder()
                .withQuery(boolQueryBuilder)
                .withPageable(pageable)
                .withSort(balance)
                .build();

        List<Object> resList = new ArrayList<>();
        SearchHits<?> search = elasticsearchTemplate.search(query, termQueryReq.getCls());
        for (SearchHit<?> searchHit : search) 
            resList.add(searchHit.getContent());
        
        return resList;
    

    /**
     * @description: 聚合搜索 类似于group by,对termQueryReq.getAggregate()字段进行聚合,
     * @author: 
     * @date 2022/7/3 11:32
     * @param: [termQueryReq]
     * @return: java.util.List<?>
     */
    public List<Map<String, Object>> findPolymerization(TermQueryReq termQueryReq) 
        NativeSearchQuery query = new NativeSearchQueryBuilder()
                .addAggregation(AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword"))
                .build();

        SearchHits<?> searchHits = elasticsearchTemplate.search(query, termQueryReq.getCls());

        //取出聚合结果
        Aggregations aggregations = searchHits.getAggregations();
        Terms terms = (Terms) aggregations.asMap().get("count");
        List<Map<String, Object>> mapList = new ArrayList<>();
        for (Terms.Bucket bucket : terms.getBuckets()) 
            Map<String, Object> map = new HashMap<>();
            String keyAsString = bucket.getKeyAsString();   // 聚合字段列的值
            long docCount = bucket.getDocCount();           // 聚合字段对应的数量
            map.put("keyAsString", keyAsString);
            map.put("docCount", docCount);
            mapList.add(map);
        
        return mapList;
    

    /**
     * @description: 嵌套聚合 统计出相同termQueryReq.getAggregate()的文档数量,再统计出termQueryReq.getNes()的平均值,带排序
     * @author: 
     * @date 2022/7/3 11:36
     * @param: [termQueryReq]
     * @return: java.util.List<?>
     */
    public List<Map<String, Object>> findNest(TermQueryReq termQueryReq) 
        // 创建聚合查询条件
        TermsAggregationBuilder stateAgg = AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword");
        AvgAggregationBuilder balanceAgg = AggregationBuilders.avg("avg_" + termQueryReq.getNes()).field(termQueryReq.getNes());
        // 嵌套
        stateAgg.subAggregation(balanceAgg);
        // 按balance的平均值降序排序
        if (termQueryReq.getSort() == 1) 
            stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), true));
         else 
            stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), false));
        


        NativeSearchQuery build = new NativeSearchQueryBuilder()
                .addAggregation(stateAgg)
                .build();
        //执行查询
        SearchHits<?> searchHits = elasticsearchTemplate.search(build, termQueryReq.getCls());
        // 取出聚合结果
        Aggregations aggregations = searchHits.getAggregations();
        Terms terms = (Terms) aggregations.asMap().get("count");

        List<Map<String, Object>> mapList = new ArrayList<>(

以上是关于Elasticsearch之指标,分桶,管道聚合之操作类ElasticsearchRestTemplate和RestHighLevelClient以及dsl的主要内容,如果未能解决你的问题,请参考以下文章

ElasticSearch 聚合函数

elasticsearch2

七.全文检索ElasticSearch经典入门-聚合查询

Elasticsearch:Bucket script 聚合

Elasticsearch 聚合功能

MongoDB——聚合管道之$limit&$skip&$sort操作