Elasticsearch之指标,分桶,管道聚合之操作类ElasticsearchRestTemplate和RestHighLevelClient以及dsl
Posted 江#
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elasticsearch之指标,分桶,管道聚合之操作类ElasticsearchRestTemplate和RestHighLevelClient以及dsl相关的知识,希望对你有一定的参考价值。
聚合概念
聚合就相当于是数据库中的分组(GROUP BY) 但是他比GROUP BY更加的强大 \\
聚合类型三大类
Bucketing(桶聚合)
- Date Histogram Aggregation:根据日期阶梯分组,例如给定阶梯为周,会自动每周分为一组
- Histogram Aggregation:根据数值阶梯分组,与日期类似
- Terms Aggregation:根据词条内容分组,词条内容完全匹配的为一组
- Range Aggregation:数值和日期的范围分组,指定开始和结束,然后按段分组
- Missing Aggregation:统计文档中缺失字段的数量,缺失字段包含值为null的情况
- Filter Aggregation:对经过Filter条件过滤后的结果集进行聚合查询
每个桶都与一个键和一个文档标准相关联,通过桶的聚合查询,我们将得到一个桶的列表,即:满足条 件的文档集合。是按照某种方式对数据进行分组 比如对1,2,3,1,3使用Terms对其聚合,可以得到1桶,2桶,3桶。
看出ES的分组方式相当强大,mysql的group by只能实现类似Terms Aggregation的分组效果,而ES还可以根据阶梯和范围来分组。
Pipeline(管道)
对其他聚合的输出或相关指标进行二次聚合 比如Terms聚合后拿到了1,2,3桶这个时候我们可以在对他其他的属性进行聚合 也就是对结果在聚合 和数据库中 多字段分组一个意思(多字段分组可以使用script脚本去聚合需要添加配置 但是我试了下没什么用 如果有那个大神有好的解决办法可以分享出来 我这边用的是管道聚合也就是聚合套聚合 或者是在洗数据的时候拼接好分组字段)
Metric(指标)
指标聚合类似于 COUNT() 、 SUM() 、 MAX() 等统计方法
- Avg Aggregation:求平均值
- Max Aggregation:求最大值
- Min Aggregation:求最小值
- Percentiles Aggregation:求百分比
- Stats Aggregation:同时返回avg、max、min、sum、count等
- Sum Aggregation:求和
- Top hits Aggregation:求前几
- Value Count Aggregation:求总数
Term Aggregation
GET pre_package/_search
"aggs":
"Group":
"terms":
"field": "productGroupId",
"size": 10
这个表示,查询索引为pre_package中的文档数据,并按照cargoOwnerName进行聚合查询,命名为:Group,且只查询前10条
Range Aggregation
GET pre_package/_search
"aggs":
"Group":
"range":
"field": "isPrePackage",
"ranges": [
"to": 1
,
"from": 1,
"to": 2
,
"from": 2
]
按照isPrePackage属性,分为三档,分别为:小于1,1到2,大于2
Date Range Aggregation
GET pre_package/_search
"aggs":
"Group":
"date_range":
"field": "update_date",
"ranges": [
"to": "2020-05-01 00:00:00"
,
"from": "2020-05-02 00:00:00",
"to": "2020-08-01 00:00:00"
,
"from": "2020-08-02 00:00:00"
]
基于时间范围的聚合查询
Filter Aggregation
GET pre_package/_search
"aggs":
"flight_Miles":
"filter":
"term":
"cargoOwnerName": "联合利华测试"
对经过Filter条件过滤后的结果集进行聚合查询
Missing Aggregation
GET pre_package/_search
"aggs":
"without_age":
"missing":
"field": "orderTypeName"
统计文档中缺失字段的数量,缺失字段包含值为null的情况
Histogram Aggregation
GET pre_package/_search
"aggs":
"test":
"histogram":
"field": "id",
"interval": 100
直方图聚合,可按照一定的区间进行统计
ElasticsearchRestTemplate操作帮助类
package com.xxl.job.executor.utils;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.xxl.job.executor.es.EsSortPage;
import com.xxl.job.executor.es.QueryCondition;
import com.xxl.job.executor.es.TermQueryReq;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @description: es操作类(ElasticsearchRestTemplate)
* @Author:
* @NAME: ElasticsearchRestEsUtils
* @date: 2022/7/1 14:17
*/
@Component
public class ElasticsearchRestEsUtils
@Autowired
private ElasticsearchRestTemplate elasticsearchTemplate;
/**
* @description: 创建索引
* @author:
* @date 2022/7/1 14:07
* @param: [cla]
* @return: boolean
*/
public boolean createIndexOps(Class cla)
// 索引别名
IndexOperations ops = elasticsearchTemplate.indexOps(cla);
if (!ops.exists())
ops.create();
ops.refresh();
ops.putMapping(ops.createMapping());
return true;
/**
* @description: 删除索引
* @author:
* @date 2022/7/1 15:23
* @param: [cla]
* @return: void
*/
public void deleteIndex(Class cla)
// 这里使用了 restTemplate 的 indexOps() 获取 IndexOperations 对象操作索引
// dao 所提供的方法不支持操作索引
boolean delete = elasticsearchTemplate.indexOps(cla).delete();
System.out.println("delete = " + delete);
/**
* @description: 分页查询全部
* @author:
* @date 2022/7/1 15:37
* @param: [esSortPage]
* @return: java.util.List<?>
*/
public List<?> findByPageable(EsSortPage esSortPage)
List<Object> relist = new ArrayList<>();
// 设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = null;
if (esSortPage.getSort() == 1)
sort = Sort.by(Sort.Direction.DESC, esSortPage.getSortField());
else
sort = Sort.by(Sort.Direction.ASC, esSortPage.getSortField());
int currentPage = esSortPage.getCurrentPage();
int pageSize = esSortPage.getPageSize();
// 设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize, sort);
ElasticsearchRepository<?, Long> dao = esSortPage.getDao();
//分页查询
Page<?> daoAll = dao.findAll(pageRequest);
List<?> content = daoAll.getContent();
for (Object o : content)
relist.add(o);
return relist;
/**
* @description: 根据组装条件分页查询 会根据类型去匹配全等还是分词模糊查询
* @author:
* @date 2022/7/3 11:07
* @param: [termQueryReq]
* @return: java.util.List<?>
*/
public List<?> findByTermQuery(TermQueryReq termQueryReq)
List<Object> relist = new ArrayList<>();
// 设置排序(排序方式,正序还是倒序)
FieldSortBuilder balance = null;
if (termQueryReq.getSort() == 1)
balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.DESC);
else
balance = new FieldSortBuilder(termQueryReq.getSortField()).order(SortOrder.ASC);
//分页条件
int currentPage = termQueryReq.getCurrentPage();
int pageSize = termQueryReq.getPageSize();
// 构建查询条件
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
;
List<QueryCondition> mapList = termQueryReq.getMapList();
for (QueryCondition queryCondition : mapList)
if (queryCondition.getFiltration() == 1) //过滤
RangeQueryBuilder date = QueryBuilders.rangeQuery(queryCondition.getKey()).gte(queryCondition.getValue()).lte(queryCondition.getEndValue());
boolQueryBuilder.filter(date);
else
if (queryCondition.getExtractive() == 1)
if (queryCondition.getValue() != null)
boolQueryBuilder.must(QueryBuilders.matchQuery(queryCondition.getKey(), queryCondition.getValue()).minimumShouldMatch(termQueryReq.getScore()));//多条件全匹配 and 因为text会分词匹配 所以得设置一个百分比得分 越高月精确
else
if (queryCondition.getValue() instanceof Integer[])
Integer[] pulldown = (Integer[]) queryCondition.getValue();
for (Integer integer : pulldown)
boolQueryBuilder.should(QueryBuilders.matchQuery(queryCondition.getKey(), integer));//多条件匹配之一 or
// 分页
Pageable pageable = PageRequest.of(currentPage, pageSize);
// 执行查询
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
.withPageable(pageable)
.withSort(balance)
.build();
List<Object> resList = new ArrayList<>();
SearchHits<?> search = elasticsearchTemplate.search(query, termQueryReq.getCls());
for (SearchHit<?> searchHit : search)
resList.add(searchHit.getContent());
return resList;
/**
* @description: 聚合搜索 类似于group by,对termQueryReq.getAggregate()字段进行聚合,
* @author:
* @date 2022/7/3 11:32
* @param: [termQueryReq]
* @return: java.util.List<?>
*/
public List<Map<String, Object>> findPolymerization(TermQueryReq termQueryReq)
NativeSearchQuery query = new NativeSearchQueryBuilder()
.addAggregation(AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword"))
.build();
SearchHits<?> searchHits = elasticsearchTemplate.search(query, termQueryReq.getCls());
//取出聚合结果
Aggregations aggregations = searchHits.getAggregations();
Terms terms = (Terms) aggregations.asMap().get("count");
List<Map<String, Object>> mapList = new ArrayList<>();
for (Terms.Bucket bucket : terms.getBuckets())
Map<String, Object> map = new HashMap<>();
String keyAsString = bucket.getKeyAsString(); // 聚合字段列的值
long docCount = bucket.getDocCount(); // 聚合字段对应的数量
map.put("keyAsString", keyAsString);
map.put("docCount", docCount);
mapList.add(map);
return mapList;
/**
* @description: 嵌套聚合 统计出相同termQueryReq.getAggregate()的文档数量,再统计出termQueryReq.getNes()的平均值,带排序
* @author:
* @date 2022/7/3 11:36
* @param: [termQueryReq]
* @return: java.util.List<?>
*/
public List<Map<String, Object>> findNest(TermQueryReq termQueryReq)
// 创建聚合查询条件
TermsAggregationBuilder stateAgg = AggregationBuilders.terms("count").field(termQueryReq.getAggregate() + ".keyword");
AvgAggregationBuilder balanceAgg = AggregationBuilders.avg("avg_" + termQueryReq.getNes()).field(termQueryReq.getNes());
// 嵌套
stateAgg.subAggregation(balanceAgg);
// 按balance的平均值降序排序
if (termQueryReq.getSort() == 1)
stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), true));
else
stateAgg.order(BucketOrder.aggregation("avg_" + termQueryReq.getNes(), false));
NativeSearchQuery build = new NativeSearchQueryBuilder()
.addAggregation(stateAgg)
.build();
//执行查询
SearchHits<?> searchHits = elasticsearchTemplate.search(build, termQueryReq.getCls());
// 取出聚合结果
Aggregations aggregations = searchHits.getAggregations();
Terms terms = (Terms) aggregations.asMap().get("count");
List<Map<String, Object>> mapList = new ArrayList<>(以上是关于Elasticsearch之指标,分桶,管道聚合之操作类ElasticsearchRestTemplate和RestHighLevelClient以及dsl的主要内容,如果未能解决你的问题,请参考以下文章