ElasticSearch工具类
Posted seufelix
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch工具类相关的知识,希望对你有一定的参考价值。
1 import com.alibaba.fastjson.JSON; 2 import lombok.extern.slf4j.Slf4j; 3 import org.apache.commons.collections4.CollectionUtils; 4 import org.apache.commons.collections4.MapUtils; 5 import org.apache.commons.lang.ArrayUtils; 6 import org.apache.commons.lang.StringUtils; 7 import org.apache.poi.ss.formula.functions.T; 8 import org.elasticsearch.action.bulk.BulkRequest; 9 import org.elasticsearch.action.bulk.BulkResponse; 10 import org.elasticsearch.action.get.GetRequest; 11 import org.elasticsearch.action.get.GetResponse; 12 import org.elasticsearch.action.index.IndexRequest; 13 import org.elasticsearch.action.search.SearchRequest; 14 import org.elasticsearch.action.search.SearchResponse; 15 import org.elasticsearch.action.search.SearchScrollRequest; 16 import org.elasticsearch.action.update.UpdateRequest; 17 import org.elasticsearch.action.update.UpdateResponse; 18 import org.elasticsearch.client.RequestOptions; 19 import org.elasticsearch.client.RestHighLevelClient; 20 import org.elasticsearch.client.indices.GetIndexRequest; 21 import org.elasticsearch.common.unit.TimeValue; 22 import org.elasticsearch.common.xcontent.XContentType; 23 import org.elasticsearch.index.query.BoolQueryBuilder; 24 import org.elasticsearch.index.query.QueryBuilders; 25 import org.elasticsearch.search.Scroll; 26 import org.elasticsearch.search.SearchHit; 27 import org.elasticsearch.search.aggregations.Aggregation; 28 import org.elasticsearch.search.aggregations.AggregationBuilder; 29 import org.elasticsearch.search.aggregations.AggregationBuilders; 30 import org.elasticsearch.search.aggregations.BucketOrder; 31 import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; 32 import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; 33 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; 34 import org.elasticsearch.search.aggregations.metrics.Cardinality; 35 import org.elasticsearch.search.builder.SearchSourceBuilder; 36 import org.springframework.beans.factory.annotation.Autowired; 37 import org.springframework.beans.factory.annotation.Qualifier; 38 import org.springframework.stereotype.Component; 39 import org.trimps.ticp.fuzhou.telecom.vo.pagination.BasePageCondition; 40 41 import java.io.IOException; 42 import java.util.ArrayList; 43 import java.util.HashMap; 44 import java.util.List; 45 import java.util.Map; 46 47 @Component 48 @Slf4j 49 public class ESUtils { 50 51 @Autowired 52 @Qualifier("highLevelClient") 53 RestHighLevelClient client; 54 55 public long countLogs(String esIndex, SearchSourceBuilder builder) { 56 SearchRequest searchRequest = new SearchRequest(); 57 searchRequest.indices(esIndex); 58 searchRequest.source(builder); 59 try { 60 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 61 return result.getHits().getTotalHits().value; 62 } catch (Exception e) { 63 log.error("error query data", e); 64 return 0; 65 } 66 } 67 68 /** 69 * 根据id精确查询 70 * 71 * @param esIndex 72 * @param id 73 * @return 74 */ 75 public Map<String, Object> getOne(String esIndex, String id) { 76 GetRequest getRequest = new GetRequest(esIndex, id); 77 try { 78 GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT); 79 if (getResponse.getSource() == null) { 80 return null; 81 } 82 return getResponse.getSourceAsMap(); 83 } catch (Exception e) { 84 log.warn("getOne from ES Exception : ", e); 85 return null; 86 } 87 } 88 89 public SearchHit[] queryDocsByIds(String esIndex, List<String> ids) { 90 SearchSourceBuilder builder = new SearchSourceBuilder(); 91 SearchRequest searchRequest = new SearchRequest(); 92 builder.query(QueryBuilders.termsQuery("_id", ids)); 93 searchRequest.indices(esIndex); 94 searchRequest.source(builder); 95 try { 96 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 97 return result.getHits().getHits(); 98 } catch (Exception e) { 99 log.error("Batch query docs by ids failed!", e); 100 } 101 return null; 102 } 103 104 public void insertOrUpdateOne(String index, String id, String data) { 105 IndexRequest request = new IndexRequest(index); 106 request.id(id); 107 request.source(data, XContentType.JSON); 108 try { 109 client.index(request, RequestOptions.DEFAULT); 110 } catch (Exception e) { 111 log.error("error save info", e); 112 } 113 } 114 115 /** 116 * 修改文档-只需要给要修改的字段 117 */ 118 public void updateOne(String index, String id, Map<String, Object> map) { 119 UpdateRequest updateRequest = new UpdateRequest(index, id); 120 updateRequest.doc(map); 121 try { 122 UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT); 123 log.info("updateOne result:{}", JSON.toJSONString(updateResponse)); 124 } catch (Exception e) { 125 log.warn("updateOne to ES Exception : ", e); 126 throw new RuntimeException(e); 127 } 128 } 129 130 /** 131 * 批量插入文档 132 * 133 * @param index ES索引 134 * @param documents 待提交的批量文档 135 * @param uuidKey 文档中ID字段对应的key值 136 */ 137 public BulkResponse insertDocumentsAsBatch(String index, List<Map<String, Object>> documents, String uuidKey) { 138 BulkResponse response = null; 139 if (StringUtils.isBlank(index) || CollectionUtils.isEmpty(documents)) { 140 log.warn("Es index is blank or documents is empty."); 141 return response; 142 } 143 144 try { 145 int size = documents.size(); 146 BulkRequest bulkRequest = new BulkRequest(); 147 for (int i = 0; i < size; i++) { 148 Map<String, Object> document = documents.get(i); 149 if (MapUtils.isEmpty(document) || !document.containsKey(uuidKey)) { 150 continue; 151 } 152 bulkRequest.add(new IndexRequest(index).opType("create").id(document.get(uuidKey).toString()).source(document)); 153 } 154 response = client.bulk(bulkRequest, RequestOptions.DEFAULT); 155 } catch (Exception e) { 156 log.error("Insert documents to es as batch failed!", e); 157 } 158 return response; 159 } 160 161 162 /** 163 * 批量更新文档 164 * 165 * @param index ES索引 166 * @param documents 待提交的批量文档 167 * @param uuidKey 文档中ID字段对应的key值 168 */ 169 public BulkResponse updateDocumentsAsBatch(String index, List<Map<String, Object>> documents, String uuidKey) { 170 BulkResponse response = null; 171 if (StringUtils.isBlank(index) || CollectionUtils.isEmpty(documents)) { 172 log.warn("Es index is blank or documents is empty."); 173 return response; 174 } 175 176 try { 177 int size = documents.size(); 178 BulkRequest bulkRequest = new BulkRequest(); 179 for (int i = 0; i < size; i++) { 180 Map<String, Object> document = documents.get(i); 181 if (MapUtils.isEmpty(document) || !document.containsKey(uuidKey)) { 182 continue; 183 } 184 bulkRequest.add(new UpdateRequest(index, document.get(uuidKey).toString()).doc(document)); 185 } 186 response = client.bulk(bulkRequest, RequestOptions.DEFAULT); 187 } catch (Exception e) { 188 log.error("Update documents to es as batch failed!", e); 189 } 190 return response; 191 } 192 193 public SearchResponse queryData2(SearchSourceBuilder sourceBuilder, String... esIndex) { 194 SearchRequest searchRequest = new SearchRequest(); 195 searchRequest.indices(esIndex); 196 searchRequest.source(sourceBuilder); 197 try { 198 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 199 return result; 200 } catch (Exception e) { 201 log.error("error query info", e); 202 } 203 return null; 204 } 205 206 /** 207 * 判断索引名是否存在 208 * 209 * @param indexName 210 * @param 211 * @return 212 */ 213 public boolean isExistsIndex(String indexName) { 214 GetIndexRequest request = new GetIndexRequest(indexName); 215 try { 216 boolean response = client.indices().exists(request, RequestOptions.DEFAULT); 217 return response; 218 } catch (IOException e) { 219 log.error("error", e); 220 return false; 221 } 222 223 } 224 225 public long counts4Index(String esIndex) { 226 SearchRequest searchRequest = new SearchRequest(); 227 searchRequest.indices(esIndex); 228 try { 229 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 230 return result.getHits().getTotalHits().value; 231 } catch (Exception e) { 232 log.error("error query data", e); 233 return 0; 234 } 235 } 236 237 public long countDistinctField(String esIndex, String countField, SearchSourceBuilder sourceBuilder) { 238 long count = 0; 239 if (StringUtils.isBlank(esIndex) || StringUtils.isBlank(countField)) { 240 return count; 241 } 242 243 SearchRequest searchRequest = new SearchRequest(); 244 searchRequest.indices(esIndex); 245 AggregationBuilder aggregationBuilder = AggregationBuilders.cardinality("field_count").field(countField); 246 sourceBuilder.aggregation(aggregationBuilder); 247 sourceBuilder.size(0); 248 searchRequest.source(sourceBuilder); 249 try { 250 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 251 Histogram histogram = (Histogram) result.getAggregations().asMap().get(countField); 252 long total_value = 0; 253 for (Histogram.Bucket t : histogram.getBuckets()) { 254 Cardinality cardinality = t.getAggregations().get("field_count"); 255 long value = cardinality.getValue(); 256 total_value = total_value + value; 257 } 258 return total_value; 259 } catch (Exception e) { 260 log.error("Count field failed!", e); 261 } 262 return 0; 263 } 264 265 public long countDistinctField(String esIndex, String countField) { 266 long count = 0; 267 if (StringUtils.isBlank(esIndex) || StringUtils.isBlank(countField)) { 268 return count; 269 } 270 271 SearchRequest searchRequest = new SearchRequest(); 272 searchRequest.indices(esIndex); 273 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 274 AggregationBuilder aggregationBuilder = AggregationBuilders.cardinality("field_count").field(countField); 275 sourceBuilder.aggregation(aggregationBuilder); 276 sourceBuilder.size(0); 277 searchRequest.source(sourceBuilder); 278 try { 279 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 280 Histogram histogram = (Histogram) result.getAggregations().asMap().get(countField); 281 long total_value = 0; 282 for (Histogram.Bucket t : histogram.getBuckets()) { 283 Cardinality cardinality = t.getAggregations().get("field_count"); 284 long value = cardinality.getValue(); 285 total_value = total_value + value; 286 } 287 return total_value; 288 } catch (Exception e) { 289 log.error("Count field failed!", e); 290 } 291 return 0; 292 } 293 294 public boolean deleteData(String esIndexName, String esId) { 295 org.elasticsearch.action.delete.DeleteRequest deleteRequest = new org.elasticsearch.action.delete.DeleteRequest(esIndexName, esId); 296 org.elasticsearch.action.delete.DeleteResponse deleteResponse = null; 297 try { 298 deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT); 299 } catch (Exception e) { 300 log.error("error is " + e); 301 return false; 302 303 } 304 return true; 305 } 306 307 /** 308 * 分组统计 309 * 310 * @param esIndex 索引 311 * @param groupFiled 被统计字段 312 */ 313 public Map<String, Long> groupCount(String esIndex, String groupFiled) { 314 Map<String, Long> statistics = new HashMap<>(); 315 SearchRequest searchRequest = new SearchRequest(); 316 searchRequest.indices(esIndex); 317 318 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 319 try { 320 sourceBuilder.size(0); 321 searchRequest.source(sourceBuilder); 322 323 TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("fieldCount").field(groupFiled); 324 sourceBuilder.aggregation(aggregationBuilder); 325 326 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 327 Map<String, Aggregation> aggregationMap = result.getAggregations().asMap(); 328 ParsedStringTerms grageTerms = (ParsedStringTerms) aggregationMap.get("fieldCount"); 329 List buckets = grageTerms.getBuckets(); 330 331 for (Object object : buckets) { 332 ParsedStringTerms.ParsedBucket obj = (ParsedStringTerms.ParsedBucket) object; 333 String name = obj.getKeyAsString(); 334 long count = obj.getDocCount(); 335 statistics.putIfAbsent(name, count); 336 } 337 } catch (Exception e) { 338 log.error("Group count failed!", e); 339 } 340 return statistics; 341 } 342 343 /** 344 * 分组统计 345 * 346 * @param esIndex 索引 347 * @param groupFiled 被统计字段 348 * @param boolQueryBuilder 查询条件 349 */ 350 public Map<String, Long> groupCount(String esIndex, String groupFiled, BoolQueryBuilder boolQueryBuilder) { 351 Map<String, Long> statistics = new HashMap<>(); 352 SearchRequest searchRequest = new SearchRequest(); 353 searchRequest.indices(esIndex); 354 355 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); 356 try { 357 sourceBuilder.size(0); 358 searchRequest.source(sourceBuilder); 359 sourceBuilder.query(boolQueryBuilder); 360 361 TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("fieldCount").field(groupFiled); 362 aggregationBuilder.order(BucketOrder.count(false)); 363 aggregationBuilder.size(10000); 364 sourceBuilder.aggregation(aggregationBuilder); 365 log.info("sourceBuilder:" + sourceBuilder.toString()); 366 367 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 368 Map<String, Aggregation> aggregationMap = result.getAggregations().asMap(); 369 ParsedStringTerms grageTerms = (ParsedStringTerms) aggregationMap.get("fieldCount"); 370 List buckets = grageTerms.getBuckets(); 371 372 for (Object object : buckets) { 373 ParsedStringTerms.ParsedBucket obj = (ParsedStringTerms.ParsedBucket) object; 374 String name = obj.getKeyAsString(); 375 long count = obj.getDocCount(); 376 statistics.putIfAbsent(name, count); 377 } 378 } catch (Exception e) { 379 log.error("Group count failed!", e); 380 } 381 return statistics; 382 } 383 384 public SearchHit[] queryDataBig(String esIndex, SearchSourceBuilder sourceBuilder) { 385 final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1)); 386 SearchRequest searchRequest = new SearchRequest(); 387 searchRequest.indices(esIndex); 388 searchRequest.source(sourceBuilder); 389 searchRequest.scroll(scroll); 390 try { 391 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); 392 String scrollId = result.getScrollId(); 393 SearchHit[] searchHits2 = result.getHits().getHits(); 394 List<SearchHit> searchHitList = new ArrayList<>(); 395 for (int i = 0; i < searchHits2.length; i++) { 396 searchHitList.add(searchHits2[i]); 397 } 398 if (result.getHits().getTotalHits().value > 100000) { 399 long count = searchHits2.length; 400 while (count < 100000) { 401 SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 402 scrollRequest.scroll(scroll); 403 try { 404 result = client.scroll(scrollRequest, RequestOptions.DEFAULT); 405 } catch (Exception e) { 406 log.error("error is " + e); 407 } 408 scrollId = result.getScrollId(); 409 searchHits2 = result.getHits().getHits(); 410 for (int i = 0; i < searchHits2.length; i++) { 411 searchHitList.add(searchHits2[i]); 412 } 413 long size = searchHits2.length; 414 count = count + size; 415 416 } 417 SearchHit[] searchHits = new SearchHit[searchHitList.size()]; 418 searchHitList.toArray(searchHits); 419 return searchHits; 420 } else { 421 while (searchHits2 != null && searchHits2.length > 0) { 422 SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 423 scrollRequest.scroll(scroll); 424 try { 425 result = client.scroll(scrollRequest, RequestOptions.DEFAULT); 426 } catch (Exception e) { 427 log.error("error is " + e); 428 } 429 scrollId = result.getScrollId(); 430 searchHits2 = result.getHits().getHits(); 431 for (int i = 0; i < searchHits2.length; i++) { 432 searchHitList.add(searchHits2[i]); 433 } 434 } 435 SearchHit[] searchHits = new SearchHit[searchHitList.size()]; 436 searchHitList.toArray(searchHits); 437 return searchHits; 438 } 439 440 } catch (Exception e) { 441 log.error("error query info", e); 442 } 443 return null; 444 } 445 }
以上是关于ElasticSearch工具类的主要内容,如果未能解决你的问题,请参考以下文章
ElasticSearch学习问题记录——Invalid shift value in prefixCoded bytes (is encoded value really an INT?)(代码片段