ElasticSearch工具类封装
Posted 张志翔 ̮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch工具类封装相关的知识,希望对你有一定的参考价值。
最近在项目中有看到一种比较实用的ElasticSearch工具类封装方式,特此记录便于日后查阅。
1、controller层
@RequestMapping(value = "/uri/page", method = RequestMethod.GET)
public DataResult page(
@RequestParam(name = "pageIndex") Integer pageIndex,
@RequestParam(name = "pageSize") Integer pageSize,
@RequestParam(name = "uri") String uri,
@RequestParam(name = "api") String api,
@RequestParam(name = "method") String method)
try
List<Filter> filters = new ArrayList();
if(StringUtils.isNotBlank(uri))
filters.add(new Filter("uri", FilterEnum.INCLUDE_NO_SPLIT.getType(), uri));
if(StringUtils.isNotBlank(api))
filters.add(new Filter("api", FilterEnum.EQUAL.getType(), api.toLowerCase()));
if(StringUtils.isNotBlank(method))
filters.add(new Filter("method", FilterEnum.EQUAL.getType(), method.toLowerCase()));
return new DataResult(SystemStatusCode.NORMAL.getValue(), "ok", topStatisticsService.findTopUriByCondition(filters, pageIndex, pageSize));
catch (Exception e)
LOGGER.error("获取uri top分页数据失败", e);
return new DataResult(SystemStatusCode.ERROR.getValue(), "fail", e.getMessage());
2、FilterEnum
package com.fenqile.sgp.api.common;
/**
* @author sherrycao
* @version 2019/3/12
*/
public enum FilterEnum
INCLUDE(1, "包含"),
EXCLUDE(2, "不包含"),
EQUAL(3, "等于"),
UNEQUAL(4, "不等于"),
INCLUDE_NO_SPLIT(1, "包含,但不分词"),
EQUAL_NO_SPLIT(5, "等于,但不分词");
private int type;
private String desc;
FilterEnum(int type, String desc)
this.type = type;
this.desc = desc;
public int getType()
return type;
public void setType(int type)
this.type = type;
public String getDesc()
return desc;
public void setDesc(String desc)
this.desc = desc;
3、Service层
public PageInfo<TreeComplexVo> findTopUriByCondition(List<Filter> filters, int pageIndex, int pageSize) throws Exception
PageInfo<TreeComplexVo> pageInfo = new PageInfo<>();
// 获取所有index
List<String> indices = Arrays.asList(EsConstants.USER_TOP_URI_INDEX_PREFIX);
for (String index : indices)
elasticSearchDao.createIndex(EsClient.getClient(), index);
SearchSourceBuilder searchSourceBuilder = structSearchSourceBuilder(filters, pageIndex, pageSize);
searchSourceBuilder.sort("complexScore", SortOrder.DESC);
List<TreeComplexVo> treeComplexVo = new ArrayList();
String tableName = topProviderService.getEsTableName(EsConstants.TOP_URI, topProviderService.getEsTableIndex(EsConstants.TOP_URI));
SearchResult searchResult = elasticSearchDao.search(EsClient.getClient(), indices, tableName, searchSourceBuilder.toString());
List<SearchResult.Hit<JSONObject, Void>> hits = searchResult.getHits(JSONObject.class);
for (SearchResult.Hit<JSONObject, Void> hit : hits)
treeComplexVo.add(JSON.parseObject(hit.source.toString(), TreeComplexVo.class));
Long total = searchResult.getTotal();
pageInfo.setTotal(total);
pageInfo.setPageNum(pageIndex);
pageInfo.setPageSize(pageSize);
pageInfo.setList(treeComplexVo);
Long num = total % pageSize;
int page;
if (num == 0)
page = (int) (total / pageSize);
else
page = (int) (total / pageSize) + 1;
pageInfo.setPages(page);
// 生成query
return pageInfo;
@Override
public Boolean loadServiceCount( int pageSize, Map<String, Integer> appMethodCountMap)
//计算新的表索引,这样轮流覆盖,暂时做0,1轮训
Integer newIndex = getEsTableIndex(EsConstants.TOP_SERVICE)+1;
String tableName = getEsTableName(EsConstants.TOP_SERVICE, newIndex);
try
//清理旧数据
elasticSearchDao.deleteDocAll(EsClient.getClient(), EsConstants.USER_TOP_PROVIDER_INDEX_PREFIX, tableName);
catch (Exception e)
LOGGER.error("清理es数据失败", e);
Map<String, ServiceCountVo> maps = new HashMap();
for(EnvironmentType environmentType: EnvironmentType.values())
try
//切换当前的zk
ServiceService serviceService = (ServiceService)GovernanceConfig.getEnvServiceMap().get(environmentType.getType()).get(SERVICE_PROVIDER_NAME);
int pageIndex = 0;
int resultSize = 0;
do
//查询分页数据
pageIndex +=1;
PageInfo pageInfo = serviceService.getServiceCount(pageIndex, pageSize);
resultSize = appendSingleZkServiceCount(maps, environmentType, pageInfo, appMethodCountMap);
//分页数据刚好是指定分页大小就继续查询
while(resultSize==pageSize);
catch(Exception e)
LOGGER.error("分页获取zk数据失败", e);
/**
* 分页插入数据到es
*/
List results = new ArrayList(maps.values());
int batchSize = results.size()%ES_STORE_BATCH_SIZE==0?results.size()/ES_STORE_BATCH_SIZE:1+results.size()/ES_STORE_BATCH_SIZE;
for(int i=1;i<=batchSize;i++)
int start = Math.max(0, i-1)*ES_STORE_BATCH_SIZE;
int end = Math.min(results.size(), start+ES_STORE_BATCH_SIZE);
elasticSearchDao.insertDocuments(EsClient.getClient(), results.subList(start, end), EsConstants.USER_TOP_PROVIDER_INDEX_PREFIX, tableName);
//成功后更新表索引,方便查询使用
setEsTableIndex(EsConstants.TOP_SERVICE, newIndex);
return true;
4、EsClient层
package com.fenqile.sgp.web.config;
import com.google.gson.GsonBuilder;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import com.fenqile.sgp.business.helper.HippoHelper;
/**
* @author sherrycao
* @version 2019/3/6
*/
public class EsClient
private static JestClient client;
private static JestClient accessLogClient;
private static JestClient businessLogClient;
private EsClient()
private static void build()
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(
new HttpClientConfig
.Builder(HippoHelper.getEsUrls())
.multiThreaded(true)
//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
.defaultMaxTotalConnectionPerRoute(2)
//所有route连接总数
.maxTotalConnection(2)
.connTimeout(10000)
.readTimeout(10000)
.gson(new GsonBuilder()
.setDateFormat("yyyy-MM-dd HH:mm:ss")
.create())
.build()
);
client = factory.getObject();
private static void buildAccessLogClient()
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(
new HttpClientConfig
.Builder(HippoHelper.getEsAccessLogUrls()).defaultCredentials("elastic","6018C23DD614E02D")
.multiThreaded(true)
//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
.defaultMaxTotalConnectionPerRoute(2)
//所有route连接总数
.maxTotalConnection(2)
.connTimeout(20000)
.readTimeout(20000)
.gson(new GsonBuilder()
.setDateFormat("yyyy-MM-dd HH:mm:ss")
.create())
.build()
);
accessLogClient = factory.getObject();
private static void buildBusinessLogClient()
JestClientFactory factory = new JestClientFactory();
factory.setHttpClientConfig(
new HttpClientConfig
.Builder(HippoHelper.getEsBusinessLogUrls()).defaultCredentials("elastic","6018C23DD614E02D")
.multiThreaded(true)
//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
.defaultMaxTotalConnectionPerRoute(2)
//所有route连接总数
.maxTotalConnection(2)
.connTimeout(20000)
.readTimeout(20000)
.gson(new GsonBuilder()
.setDateFormat("yyyy-MM-dd HH:mm:ss")
.create())
.build()
);
businessLogClient = factory.getObject();
public static synchronized JestClient getClient()
if (client == null)
build();
return client;
public static synchronized JestClient getAccessLogClient()
if (accessLogClient == null)
buildAccessLogClient();
return accessLogClient;
public static synchronized JestClient getBusinessLogClient()
if (businessLogClient == null)
buildBusinessLogClient();
return businessLogClient;
5、ElasticSearchDao层
package com.fenqile.sgp.business.dao;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* @author sherrycao
* @version 2019/3/11
*/
@Service
public class ElasticSearchDao
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchDao.class);
/**
* 插入文档
* @param document
* @param indexName
* @param typeName
*/
public void insertDocument(JestClient client, Object document, String indexName, String typeName)
try
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
Index index = new Index.Builder(document).build();
bulk.addAction(index);
BulkResult bulkResult = client.execute(bulk.build());
bulkResult.isSucceeded();
catch (IOException e)
LOGGER.error("写入es异常, indexName , typeName ", indexName, typeName);
LOGGER.error("", e);
/**
* 批量插入文档
* @param documents
* @param indexName
* @param typeName
*/
public void insertDocuments(JestClient client, List<Object> documents, String indexName, String typeName)
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
for (Object document : documents)
Index index = new Index.Builder(document).build();
bulk.addAction(index);
try
BulkResult bulkResult = client.execute(bulk.build());
bulkResult.isSucceeded();
catch (IOException e)
LOGGER.error("批量写入es异常, indexName , typeName ", indexName, typeName);
LOGGER.error("", e);
/**
* 指定id
* @param client
* @param documentMap
* @param indexName
* @param typeName
*/
public void insertDocuments(JestClient client, Map<String, Object> documentMap, String indexName, String typeName)
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
Iterator documentEntry = documentMap.entrySet().iterator();
while(documentEntry.hasNext())
Map.Entry<String, Object> entry = (Map.Entry)documentEntry.next();
Index index = new Index.Builder(entry.getValue()).id(entry.getKey()).build();
bulk.addAction(index);
try
BulkResult bulkResult = client.execute(bulk.build());
bulkResult.isSucceeded();
catch (IOException e)
LOGGER.error("批量写入es异常, indexName , typeName ", indexName, typeName);
LOGGER.error("", e);
public SearchResult search(JestClient client, String indexName, String typeName, String query) throws Exception
Search search = new Search.Builder(query)
.addIndex(indexName)
.addType(typeName)
.build();
return client.execute(search);
/**
* 使用type查询
* @param client
* @param indexName
* @param typeName
* @param query
* @return
* @throws Exception
*/
public SearchResult search(JestClient client, List<String> indexName, String typeName, String query) throws Exception
LOGGER.info(query);
Search search = new Search.Builder(query)
.addIndices(indexName)
.addType(typeName)
.build();
LOGGER.info(search.toString());
LOGGER.info(search.getPathToResult());
return client.execute(search);
/**
* 不使用type查询
* @param client
* @param indexName
* @param query
* @return
* @throws Exception
*/
public SearchResult search(JestClient client, List<String> indexName, String query) throws Exception
LOGGER.info(query);
Search search = new Search.Builder(query)
.addIndices(indexName)
.build();
LOGGER.info(search.toString());
LOGGER.info(search.getPathToResult());
return client.execute(search);
public Boolean createIndex(JestClient client, String indexName)
try
JestResult jr = client.execute(new CreateIndex.Builder(indexName).build());
return jr.isSucceeded();
catch (IOException e)
LOGGER.error("", e);
return false;
public boolean deleteDoc(JestClient client,String indexId, String indexName, String indexType)
Delete.Builder builder = new Delete.Builder(indexId);
builder.refresh(true);
Delete delete = builder.index(indexName).type(indexType).build();
try
JestResult result = client.execute(delete);
if (result != null && !result.isSucceeded())
throw new RuntimeException(result.getErrorMessage()+"删除文档失败!");
catch (Exception e)
LOGGER.error("",e);
return false;
return true;
public boolean deleteDocAll(JestClient client,String indexName, String indexType)
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
searchSourceBuilder.query(boolQueryBuilder);
DeleteByQuery.Builder builder = new DeleteByQuery.Builder(searchSourceBuilder.toString());
builder.refresh(true);
DeleteByQuery deleteByQuery = builder.addIndex(indexName).addType(indexType).build();
try
JestResult result = client.execute(deleteByQuery);
if (result != null && !result.isSucceeded())
throw new RuntimeException(result.getErrorMessage()+"删除文档失败!");
catch (Exception e)
LOGGER.error("",e);
return false;
return true;
/**
* 删除类型
* @param indexName
* @param indexType
*/
public boolean deleteType(JestClient client, String indexName, String indexType)
DeleteIndex deleteIndex = new DeleteIndex.Builder(indexName).type(indexType).build();
try
JestResult result = client.execute(deleteIndex);
if (result != null && result.isSucceeded())
throw new RuntimeException(result.getErrorMessage()+"删除类型失败!");
catch (Exception e)
LOGGER.error("",e);
return false;
return true;
/**
* 删除索引
* @param indexName
*/
public boolean deleteIndex(JestClient client, String indexName)
DeleteIndex deleteIndex = new DeleteIndex.Builder(indexName).build();
try
JestResult result = client.execute(deleteIndex);
if (result != null && result.isSucceeded())
throw new RuntimeException(result.getErrorMessage()+"删除索引失败!");
catch (Exception e)
LOGGER.error("",e);
return false;
return true;
/**
* 插入或更新文档
* @param id
* @param indexObject
* @param indexName
* @param indexType
* @return
*/
public boolean insertOrUpdateDoc(JestClient client,String id, Object indexObject, String indexName, String indexType)
Index.Builder builder = new Index.Builder(indexObject);
builder.id(id);
builder.refresh(true);
Index index = builder.index(indexName).type(indexType).build();
try
JestResult result = client.execute(index);
if (result != null && !result.isSucceeded())
throw new RuntimeException(result.getErrorMessage()+"插入更新索引失败!");
catch (Exception e)
LOGGER.error("",e);
return false;
return true;
到此 ElasticSearch工具类封装介绍完成。
以上是关于ElasticSearch工具类封装的主要内容,如果未能解决你的问题,请参考以下文章
Elasticsearch RestHighLevelClient客户端封装