ElasticSearch工具类

Posted seufelix

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch工具类相关的知识,希望对你有一定的参考价值。

  1 import com.alibaba.fastjson.JSON;
  2 import lombok.extern.slf4j.Slf4j;
  3 import org.apache.commons.collections4.CollectionUtils;
  4 import org.apache.commons.collections4.MapUtils;
  5 import org.apache.commons.lang.ArrayUtils;
  6 import org.apache.commons.lang.StringUtils;
  7 import org.apache.poi.ss.formula.functions.T;
  8 import org.elasticsearch.action.bulk.BulkRequest;
  9 import org.elasticsearch.action.bulk.BulkResponse;
 10 import org.elasticsearch.action.get.GetRequest;
 11 import org.elasticsearch.action.get.GetResponse;
 12 import org.elasticsearch.action.index.IndexRequest;
 13 import org.elasticsearch.action.search.SearchRequest;
 14 import org.elasticsearch.action.search.SearchResponse;
 15 import org.elasticsearch.action.search.SearchScrollRequest;
 16 import org.elasticsearch.action.update.UpdateRequest;
 17 import org.elasticsearch.action.update.UpdateResponse;
 18 import org.elasticsearch.client.RequestOptions;
 19 import org.elasticsearch.client.RestHighLevelClient;
 20 import org.elasticsearch.client.indices.GetIndexRequest;
 21 import org.elasticsearch.common.unit.TimeValue;
 22 import org.elasticsearch.common.xcontent.XContentType;
 23 import org.elasticsearch.index.query.BoolQueryBuilder;
 24 import org.elasticsearch.index.query.QueryBuilders;
 25 import org.elasticsearch.search.Scroll;
 26 import org.elasticsearch.search.SearchHit;
 27 import org.elasticsearch.search.aggregations.Aggregation;
 28 import org.elasticsearch.search.aggregations.AggregationBuilder;
 29 import org.elasticsearch.search.aggregations.AggregationBuilders;
 30 import org.elasticsearch.search.aggregations.BucketOrder;
 31 import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
 32 import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
 33 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 34 import org.elasticsearch.search.aggregations.metrics.Cardinality;
 35 import org.elasticsearch.search.builder.SearchSourceBuilder;
 36 import org.springframework.beans.factory.annotation.Autowired;
 37 import org.springframework.beans.factory.annotation.Qualifier;
 38 import org.springframework.stereotype.Component;
 39 import org.trimps.ticp.fuzhou.telecom.vo.pagination.BasePageCondition;
 40 
 41 import java.io.IOException;
 42 import java.util.ArrayList;
 43 import java.util.HashMap;
 44 import java.util.List;
 45 import java.util.Map;
 46 
 47 @Component
 48 @Slf4j
 49 public class ESUtils {
 50 
 51     @Autowired
 52     @Qualifier("highLevelClient")
 53     RestHighLevelClient client;
 54 
 55     public long countLogs(String esIndex, SearchSourceBuilder builder) {
 56         SearchRequest searchRequest = new SearchRequest();
 57         searchRequest.indices(esIndex);
 58         searchRequest.source(builder);
 59         try {
 60             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
 61             return result.getHits().getTotalHits().value;
 62         } catch (Exception e) {
 63             log.error("error query data", e);
 64             return 0;
 65         }
 66     }
 67 
 68     /**
 69      * 根据id精确查询
 70      *
 71      * @param esIndex
 72      * @param id
 73      * @return
 74      */
 75     public Map<String, Object> getOne(String esIndex, String id) {
 76         GetRequest getRequest = new GetRequest(esIndex, id);
 77         try {
 78             GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
 79             if (getResponse.getSource() == null) {
 80                 return null;
 81             }
 82             return getResponse.getSourceAsMap();
 83         } catch (Exception e) {
 84             log.warn("getOne from ES Exception : ", e);
 85             return null;
 86         }
 87     }
 88 
 89     public SearchHit[] queryDocsByIds(String esIndex, List<String> ids) {
 90         SearchSourceBuilder builder = new SearchSourceBuilder();
 91         SearchRequest searchRequest = new SearchRequest();
 92         builder.query(QueryBuilders.termsQuery("_id", ids));
 93         searchRequest.indices(esIndex);
 94         searchRequest.source(builder);
 95         try {
 96             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
 97             return result.getHits().getHits();
 98         } catch (Exception e) {
 99             log.error("Batch query docs by ids failed!", e);
100         }
101         return null;
102     }
103 
104     public void insertOrUpdateOne(String index, String id, String data) {
105         IndexRequest request = new IndexRequest(index);
106         request.id(id);
107         request.source(data, XContentType.JSON);
108         try {
109             client.index(request, RequestOptions.DEFAULT);
110         } catch (Exception e) {
111             log.error("error save info", e);
112         }
113     }
114 
115     /**
116      * 修改文档-只需要给要修改的字段
117      */
118     public void updateOne(String index, String id, Map<String, Object> map) {
119         UpdateRequest updateRequest = new UpdateRequest(index, id);
120         updateRequest.doc(map);
121         try {
122             UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
123             log.info("updateOne result:{}", JSON.toJSONString(updateResponse));
124         } catch (Exception e) {
125             log.warn("updateOne to ES Exception : ", e);
126             throw new RuntimeException(e);
127         }
128     }
129 
130     /**
131      * 批量插入文档
132      *
133      * @param index     ES索引
134      * @param documents 待提交的批量文档
135      * @param uuidKey   文档中ID字段对应的key值
136      */
137     public BulkResponse insertDocumentsAsBatch(String index, List<Map<String, Object>> documents, String uuidKey) {
138         BulkResponse response = null;
139         if (StringUtils.isBlank(index) || CollectionUtils.isEmpty(documents)) {
140             log.warn("Es index is blank or documents is empty.");
141             return response;
142         }
143 
144         try {
145             int size = documents.size();
146             BulkRequest bulkRequest = new BulkRequest();
147             for (int i = 0; i < size; i++) {
148                 Map<String, Object> document = documents.get(i);
149                 if (MapUtils.isEmpty(document) || !document.containsKey(uuidKey)) {
150                     continue;
151                 }
152                 bulkRequest.add(new IndexRequest(index).opType("create").id(document.get(uuidKey).toString()).source(document));
153             }
154             response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
155         } catch (Exception e) {
156             log.error("Insert documents to es as batch failed!", e);
157         }
158         return response;
159     }
160 
161 
162     /**
163      * 批量更新文档
164      *
165      * @param index     ES索引
166      * @param documents 待提交的批量文档
167      * @param uuidKey   文档中ID字段对应的key值
168      */
169     public BulkResponse updateDocumentsAsBatch(String index, List<Map<String, Object>> documents, String uuidKey) {
170         BulkResponse response = null;
171         if (StringUtils.isBlank(index) || CollectionUtils.isEmpty(documents)) {
172             log.warn("Es index is blank or documents is empty.");
173             return response;
174         }
175 
176         try {
177             int size = documents.size();
178             BulkRequest bulkRequest = new BulkRequest();
179             for (int i = 0; i < size; i++) {
180                 Map<String, Object> document = documents.get(i);
181                 if (MapUtils.isEmpty(document) || !document.containsKey(uuidKey)) {
182                     continue;
183                 }
184                 bulkRequest.add(new UpdateRequest(index, document.get(uuidKey).toString()).doc(document));
185             }
186             response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
187         } catch (Exception e) {
188             log.error("Update documents to es as batch failed!", e);
189         }
190         return response;
191     }
192 
193     public SearchResponse queryData2(SearchSourceBuilder sourceBuilder, String... esIndex) {
194         SearchRequest searchRequest = new SearchRequest();
195         searchRequest.indices(esIndex);
196         searchRequest.source(sourceBuilder);
197         try {
198             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
199             return result;
200         } catch (Exception e) {
201             log.error("error query info", e);
202         }
203         return null;
204     }
205 
206     /**
207      * 判断索引名是否存在
208      *
209      * @param indexName
210      * @param
211      * @return
212      */
213     public boolean isExistsIndex(String indexName) {
214         GetIndexRequest request = new GetIndexRequest(indexName);
215         try {
216             boolean response = client.indices().exists(request, RequestOptions.DEFAULT);
217             return response;
218         } catch (IOException e) {
219             log.error("error", e);
220             return false;
221         }
222 
223     }
224 
225     public long counts4Index(String esIndex) {
226         SearchRequest searchRequest = new SearchRequest();
227         searchRequest.indices(esIndex);
228         try {
229             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
230             return result.getHits().getTotalHits().value;
231         } catch (Exception e) {
232             log.error("error query data", e);
233             return 0;
234         }
235     }
236 
237     public long countDistinctField(String esIndex, String countField, SearchSourceBuilder sourceBuilder) {
238         long count = 0;
239         if (StringUtils.isBlank(esIndex) || StringUtils.isBlank(countField)) {
240             return count;
241         }
242 
243         SearchRequest searchRequest = new SearchRequest();
244         searchRequest.indices(esIndex);
245         AggregationBuilder aggregationBuilder = AggregationBuilders.cardinality("field_count").field(countField);
246         sourceBuilder.aggregation(aggregationBuilder);
247         sourceBuilder.size(0);
248         searchRequest.source(sourceBuilder);
249         try {
250             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
251             Histogram histogram = (Histogram) result.getAggregations().asMap().get(countField);
252             long total_value = 0;
253             for (Histogram.Bucket t : histogram.getBuckets()) {
254                 Cardinality cardinality = t.getAggregations().get("field_count");
255                 long value = cardinality.getValue();
256                 total_value = total_value + value;
257             }
258             return total_value;
259         } catch (Exception e) {
260             log.error("Count field failed!", e);
261         }
262         return 0;
263     }
264 
265     public long countDistinctField(String esIndex, String countField) {
266         long count = 0;
267         if (StringUtils.isBlank(esIndex) || StringUtils.isBlank(countField)) {
268             return count;
269         }
270 
271         SearchRequest searchRequest = new SearchRequest();
272         searchRequest.indices(esIndex);
273         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
274         AggregationBuilder aggregationBuilder = AggregationBuilders.cardinality("field_count").field(countField);
275         sourceBuilder.aggregation(aggregationBuilder);
276         sourceBuilder.size(0);
277         searchRequest.source(sourceBuilder);
278         try {
279             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
280             Histogram histogram = (Histogram) result.getAggregations().asMap().get(countField);
281             long total_value = 0;
282             for (Histogram.Bucket t : histogram.getBuckets()) {
283                 Cardinality cardinality = t.getAggregations().get("field_count");
284                 long value = cardinality.getValue();
285                 total_value = total_value + value;
286             }
287             return total_value;
288         } catch (Exception e) {
289             log.error("Count field failed!", e);
290         }
291         return 0;
292     }
293 
294     public boolean deleteData(String esIndexName, String esId) {
295         org.elasticsearch.action.delete.DeleteRequest deleteRequest = new org.elasticsearch.action.delete.DeleteRequest(esIndexName, esId);
296         org.elasticsearch.action.delete.DeleteResponse deleteResponse = null;
297         try {
298             deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
299         } catch (Exception e) {
300             log.error("error is " + e);
301             return false;
302 
303         }
304         return true;
305     }
306 
307     /**
308      * 分组统计
309      *
310      * @param esIndex    索引
311      * @param groupFiled 被统计字段
312      */
313     public Map<String, Long> groupCount(String esIndex, String groupFiled) {
314         Map<String, Long> statistics = new HashMap<>();
315         SearchRequest searchRequest = new SearchRequest();
316         searchRequest.indices(esIndex);
317 
318         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
319         try {
320             sourceBuilder.size(0);
321             searchRequest.source(sourceBuilder);
322 
323             TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("fieldCount").field(groupFiled);
324             sourceBuilder.aggregation(aggregationBuilder);
325 
326             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
327             Map<String, Aggregation> aggregationMap = result.getAggregations().asMap();
328             ParsedStringTerms grageTerms = (ParsedStringTerms) aggregationMap.get("fieldCount");
329             List buckets = grageTerms.getBuckets();
330 
331             for (Object object : buckets) {
332                 ParsedStringTerms.ParsedBucket obj = (ParsedStringTerms.ParsedBucket) object;
333                 String name = obj.getKeyAsString();
334                 long count = obj.getDocCount();
335                 statistics.putIfAbsent(name, count);
336             }
337         } catch (Exception e) {
338             log.error("Group count failed!", e);
339         }
340         return statistics;
341     }
342 
343     /**
344      * 分组统计
345      *
346      * @param esIndex          索引
347      * @param groupFiled       被统计字段
348      * @param boolQueryBuilder 查询条件
349      */
350     public Map<String, Long> groupCount(String esIndex, String groupFiled, BoolQueryBuilder boolQueryBuilder) {
351         Map<String, Long> statistics = new HashMap<>();
352         SearchRequest searchRequest = new SearchRequest();
353         searchRequest.indices(esIndex);
354 
355         SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
356         try {
357             sourceBuilder.size(0);
358             searchRequest.source(sourceBuilder);
359             sourceBuilder.query(boolQueryBuilder);
360 
361             TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("fieldCount").field(groupFiled);
362             aggregationBuilder.order(BucketOrder.count(false));
363             aggregationBuilder.size(10000);
364             sourceBuilder.aggregation(aggregationBuilder);
365             log.info("sourceBuilder:" + sourceBuilder.toString());
366 
367             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
368             Map<String, Aggregation> aggregationMap = result.getAggregations().asMap();
369             ParsedStringTerms grageTerms = (ParsedStringTerms) aggregationMap.get("fieldCount");
370             List buckets = grageTerms.getBuckets();
371 
372             for (Object object : buckets) {
373                 ParsedStringTerms.ParsedBucket obj = (ParsedStringTerms.ParsedBucket) object;
374                 String name = obj.getKeyAsString();
375                 long count = obj.getDocCount();
376                 statistics.putIfAbsent(name, count);
377             }
378         } catch (Exception e) {
379             log.error("Group count failed!", e);
380         }
381         return statistics;
382     }
383 
384     public SearchHit[] queryDataBig(String esIndex, SearchSourceBuilder sourceBuilder) {
385         final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1));
386         SearchRequest searchRequest = new SearchRequest();
387         searchRequest.indices(esIndex);
388         searchRequest.source(sourceBuilder);
389         searchRequest.scroll(scroll);
390         try {
391             SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
392             String scrollId = result.getScrollId();
393             SearchHit[] searchHits2 = result.getHits().getHits();
394             List<SearchHit> searchHitList = new ArrayList<>();
395             for (int i = 0; i < searchHits2.length; i++) {
396                 searchHitList.add(searchHits2[i]);
397             }
398             if (result.getHits().getTotalHits().value > 100000) {
399                 long count = searchHits2.length;
400                 while (count < 100000) {
401                     SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
402                     scrollRequest.scroll(scroll);
403                     try {
404                         result = client.scroll(scrollRequest, RequestOptions.DEFAULT);
405                     } catch (Exception e) {
406                         log.error("error is " + e);
407                     }
408                     scrollId = result.getScrollId();
409                     searchHits2 = result.getHits().getHits();
410                     for (int i = 0; i < searchHits2.length; i++) {
411                         searchHitList.add(searchHits2[i]);
412                     }
413                     long size = searchHits2.length;
414                     count = count + size;
415 
416                 }
417                 SearchHit[] searchHits = new SearchHit[searchHitList.size()];
418                 searchHitList.toArray(searchHits);
419                 return searchHits;
420             } else {
421                 while (searchHits2 != null && searchHits2.length > 0) {
422                     SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
423                     scrollRequest.scroll(scroll);
424                     try {
425                         result = client.scroll(scrollRequest, RequestOptions.DEFAULT);
426                     } catch (Exception e) {
427                         log.error("error is " + e);
428                     }
429                     scrollId = result.getScrollId();
430                     searchHits2 = result.getHits().getHits();
431                     for (int i = 0; i < searchHits2.length; i++) {
432                         searchHitList.add(searchHits2[i]);
433                     }
434                 }
435                 SearchHit[] searchHits = new SearchHit[searchHitList.size()];
436                 searchHitList.toArray(searchHits);
437                 return searchHits;
438             }
439 
440         } catch (Exception e) {
441             log.error("error query info", e);
442         }
443         return null;
444     }
445 }

 

以上是关于ElasticSearch工具类的主要内容,如果未能解决你的问题,请参考以下文章

ElasticSearch学习问题记录——Invalid shift value in prefixCoded bytes (is encoded value really an INT?)(代码片段

ElasticSearch 插件

ElasticSearch工具类封装

SpringBoot Elasticsearch工具类封装

SpringBoot Elasticsearch工具类封装

java调用Linux执行Python爬虫,并将数据存储到elasticsearch中--(java后台代码)