ElasticSearch工具类封装

Posted 张志翔 ̮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticSearch工具类封装相关的知识,希望对你有一定的参考价值。

        最近在项目中有看到一种比较实用的ElasticSearch工具类封装方式,特此记录便于日后查阅。

        1、controller层

@RequestMapping(value = "/uri/page", method = RequestMethod.GET)
public DataResult page(
        @RequestParam(name = "pageIndex") Integer pageIndex,
        @RequestParam(name = "pageSize") Integer pageSize,
        @RequestParam(name = "uri") String uri,
        @RequestParam(name = "api") String api,
        @RequestParam(name = "method") String method) 
    try 
        List<Filter> filters = new ArrayList();
        if(StringUtils.isNotBlank(uri))
            filters.add(new Filter("uri", FilterEnum.INCLUDE_NO_SPLIT.getType(), uri));
        
        if(StringUtils.isNotBlank(api))
            filters.add(new Filter("api", FilterEnum.EQUAL.getType(), api.toLowerCase()));
        
        if(StringUtils.isNotBlank(method))
            filters.add(new Filter("method", FilterEnum.EQUAL.getType(), method.toLowerCase()));
        
        return new DataResult(SystemStatusCode.NORMAL.getValue(), "ok", topStatisticsService.findTopUriByCondition(filters, pageIndex, pageSize));
     catch (Exception e) 
        LOGGER.error("获取uri top分页数据失败", e);
        return new DataResult(SystemStatusCode.ERROR.getValue(), "fail", e.getMessage());
    

       2、FilterEnum

package com.fenqile.sgp.api.common;

/**
 * @author sherrycao
 * @version 2019/3/12
 */
public enum FilterEnum 

    INCLUDE(1, "包含"),
    EXCLUDE(2, "不包含"),
    EQUAL(3, "等于"),
    UNEQUAL(4, "不等于"),
    INCLUDE_NO_SPLIT(1, "包含,但不分词"),
    EQUAL_NO_SPLIT(5, "等于,但不分词");

    private int type;

    private String desc;

    FilterEnum(int type, String desc) 
        this.type = type;
        this.desc = desc;
    

    public int getType() 
        return type;
    

    public void setType(int type) 
        this.type = type;
    

    public String getDesc() 
        return desc;
    

    public void setDesc(String desc) 
        this.desc = desc;
    

        3、Service层

public PageInfo<TreeComplexVo> findTopUriByCondition(List<Filter> filters, int pageIndex, int pageSize) throws Exception 

    PageInfo<TreeComplexVo> pageInfo = new PageInfo<>();

    // 获取所有index
    List<String> indices = Arrays.asList(EsConstants.USER_TOP_URI_INDEX_PREFIX);

    for (String index : indices) 
        elasticSearchDao.createIndex(EsClient.getClient(), index);
    


    SearchSourceBuilder searchSourceBuilder = structSearchSourceBuilder(filters, pageIndex, pageSize);
    searchSourceBuilder.sort("complexScore", SortOrder.DESC);

    List<TreeComplexVo> treeComplexVo = new ArrayList();
    String tableName = topProviderService.getEsTableName(EsConstants.TOP_URI, topProviderService.getEsTableIndex(EsConstants.TOP_URI));
    SearchResult searchResult = elasticSearchDao.search(EsClient.getClient(), indices, tableName, searchSourceBuilder.toString());
    List<SearchResult.Hit<JSONObject, Void>> hits = searchResult.getHits(JSONObject.class);

    for (SearchResult.Hit<JSONObject, Void> hit : hits) 
        treeComplexVo.add(JSON.parseObject(hit.source.toString(), TreeComplexVo.class));
    

    Long total = searchResult.getTotal();
    pageInfo.setTotal(total);
    pageInfo.setPageNum(pageIndex);
    pageInfo.setPageSize(pageSize);
    pageInfo.setList(treeComplexVo);

    Long num = total % pageSize;
    int page;
    if (num == 0) 
        page = (int) (total / pageSize);
     else 
        page = (int) (total / pageSize) + 1;
    
    pageInfo.setPages(page);
    // 生成query
    return pageInfo;


@Override
public Boolean loadServiceCount( int pageSize, Map<String, Integer> appMethodCountMap) 
    //计算新的表索引,这样轮流覆盖,暂时做0,1轮训
    Integer newIndex = getEsTableIndex(EsConstants.TOP_SERVICE)+1;
    String tableName = getEsTableName(EsConstants.TOP_SERVICE, newIndex);
    try
        //清理旧数据
        elasticSearchDao.deleteDocAll(EsClient.getClient(), EsConstants.USER_TOP_PROVIDER_INDEX_PREFIX, tableName);
    catch (Exception e)
        LOGGER.error("清理es数据失败", e);
    

    Map<String, ServiceCountVo> maps = new HashMap();

    for(EnvironmentType environmentType: EnvironmentType.values())
        try
            //切换当前的zk
            ServiceService serviceService = (ServiceService)GovernanceConfig.getEnvServiceMap().get(environmentType.getType()).get(SERVICE_PROVIDER_NAME);
            int pageIndex = 0;
            int resultSize = 0;
            do 
                //查询分页数据
                pageIndex +=1;
                PageInfo pageInfo = serviceService.getServiceCount(pageIndex, pageSize);
                resultSize = appendSingleZkServiceCount(maps, environmentType, pageInfo, appMethodCountMap);
            //分页数据刚好是指定分页大小就继续查询
            while(resultSize==pageSize);

        catch(Exception e)
            LOGGER.error("分页获取zk数据失败", e);
        

    

    /**
     * 分页插入数据到es
     */
    List results = new ArrayList(maps.values());
    int batchSize = results.size()%ES_STORE_BATCH_SIZE==0?results.size()/ES_STORE_BATCH_SIZE:1+results.size()/ES_STORE_BATCH_SIZE;
    for(int i=1;i<=batchSize;i++)
        int start = Math.max(0, i-1)*ES_STORE_BATCH_SIZE;
        int end = Math.min(results.size(), start+ES_STORE_BATCH_SIZE);
        elasticSearchDao.insertDocuments(EsClient.getClient(), results.subList(start, end), EsConstants.USER_TOP_PROVIDER_INDEX_PREFIX, tableName);
    
    //成功后更新表索引,方便查询使用
    setEsTableIndex(EsConstants.TOP_SERVICE, newIndex);
    return true;

        4、EsClient层

package com.fenqile.sgp.web.config;

import com.google.gson.GsonBuilder;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.config.HttpClientConfig;
import com.fenqile.sgp.business.helper.HippoHelper;

/**
 * @author sherrycao
 * @version 2019/3/6
 */
public class EsClient 


    private static JestClient client;

    private static JestClient accessLogClient;

    private static JestClient businessLogClient;

    private EsClient() 

    

    private static void build() 

        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(
                new HttpClientConfig
                        .Builder(HippoHelper.getEsUrls())
                        .multiThreaded(true)
                        //一个route 默认不超过2个连接  路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
                        .defaultMaxTotalConnectionPerRoute(2)
                        //所有route连接总数
                        .maxTotalConnection(2)
                        .connTimeout(10000)
                        .readTimeout(10000)
                        .gson(new GsonBuilder()
                                .setDateFormat("yyyy-MM-dd HH:mm:ss")
                                .create())
                        .build()
        );
        client = factory.getObject();
    


    private static void buildAccessLogClient() 
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(
                new HttpClientConfig
                        .Builder(HippoHelper.getEsAccessLogUrls()).defaultCredentials("elastic","6018C23DD614E02D")
                        .multiThreaded(true)
                        //一个route 默认不超过2个连接  路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
                        .defaultMaxTotalConnectionPerRoute(2)
                        //所有route连接总数
                        .maxTotalConnection(2)
                        .connTimeout(20000)
                        .readTimeout(20000)
                        .gson(new GsonBuilder()
                                .setDateFormat("yyyy-MM-dd HH:mm:ss")
                                .create())
                        .build()
        );
        accessLogClient = factory.getObject();
    

    private static void buildBusinessLogClient() 
        JestClientFactory factory = new JestClientFactory();
        factory.setHttpClientConfig(
                new HttpClientConfig
                        .Builder(HippoHelper.getEsBusinessLogUrls()).defaultCredentials("elastic","6018C23DD614E02D")
                        .multiThreaded(true)
                        //一个route 默认不超过2个连接  路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute
                        .defaultMaxTotalConnectionPerRoute(2)
                        //所有route连接总数
                        .maxTotalConnection(2)
                        .connTimeout(20000)
                        .readTimeout(20000)
                        .gson(new GsonBuilder()
                                .setDateFormat("yyyy-MM-dd HH:mm:ss")
                                .create())
                        .build()
        );
        businessLogClient = factory.getObject();
    

    public static synchronized JestClient getClient() 
        if (client == null) 
            build();
        
        return client;
    

    public static synchronized JestClient getAccessLogClient() 
        if (accessLogClient == null) 
            buildAccessLogClient();
        
        return accessLogClient;
    

    public static synchronized JestClient getBusinessLogClient() 
        if (businessLogClient == null) 
            buildBusinessLogClient();
        
        return businessLogClient;
    

        5、ElasticSearchDao层

package com.fenqile.sgp.business.dao;

import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.core.*;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * @author sherrycao
 * @version 2019/3/11
 */
@Service
public class ElasticSearchDao 
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchDao.class);

    /**
     * 插入文档
     * @param document
     * @param indexName
     * @param typeName
     */
    public void insertDocument(JestClient client, Object document, String indexName, String typeName) 
        try 
            Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
            Index index = new Index.Builder(document).build();
            bulk.addAction(index);
            BulkResult bulkResult = client.execute(bulk.build());
            bulkResult.isSucceeded();
         catch (IOException e) 
            LOGGER.error("写入es异常, indexName , typeName ", indexName, typeName);
            LOGGER.error("", e);
        
    

    /**
     * 批量插入文档
     * @param documents
     * @param indexName
     * @param typeName
     */
    public void insertDocuments(JestClient client, List<Object> documents, String indexName, String typeName) 
        Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
        for (Object document : documents) 
            Index index = new Index.Builder(document).build();
            bulk.addAction(index);
        
        try 
            BulkResult bulkResult = client.execute(bulk.build());
            bulkResult.isSucceeded();
         catch (IOException e) 
            LOGGER.error("批量写入es异常, indexName , typeName ", indexName, typeName);
            LOGGER.error("", e);
        
    

    /**
     * 指定id
     * @param client
     * @param documentMap
     * @param indexName
     * @param typeName
     */
    public void insertDocuments(JestClient client, Map<String, Object> documentMap, String indexName, String typeName) 
        Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
        Iterator documentEntry = documentMap.entrySet().iterator();
        while(documentEntry.hasNext())
            Map.Entry<String, Object> entry = (Map.Entry)documentEntry.next();
            Index index = new Index.Builder(entry.getValue()).id(entry.getKey()).build();
            bulk.addAction(index);
        
        try 
            BulkResult bulkResult = client.execute(bulk.build());
            bulkResult.isSucceeded();
         catch (IOException e) 
            LOGGER.error("批量写入es异常, indexName , typeName ", indexName, typeName);
            LOGGER.error("", e);
        
    

    public SearchResult search(JestClient client, String indexName, String typeName, String query) throws Exception 
        Search search = new Search.Builder(query)
                .addIndex(indexName)
                .addType(typeName)
                .build();
        return client.execute(search);
    

    /**
     * 使用type查询
     * @param client
     * @param indexName
     * @param typeName
     * @param query
     * @return
     * @throws Exception
     */
    public SearchResult search(JestClient client, List<String> indexName, String typeName, String query) throws Exception 
        LOGGER.info(query);
        Search search = new Search.Builder(query)
                .addIndices(indexName)
                .addType(typeName)
                .build();
        LOGGER.info(search.toString());
        LOGGER.info(search.getPathToResult());
        return client.execute(search);
    

    /**
     * 不使用type查询
     * @param client
     * @param indexName
     * @param query
     * @return
     * @throws Exception
     */
    public SearchResult search(JestClient client, List<String> indexName, String query) throws Exception 
        LOGGER.info(query);
        Search search = new Search.Builder(query)
                .addIndices(indexName)
                .build();
        LOGGER.info(search.toString());
        LOGGER.info(search.getPathToResult());
        return client.execute(search);
    

    public Boolean createIndex(JestClient client, String indexName) 
        try 
            JestResult jr = client.execute(new CreateIndex.Builder(indexName).build());
            return jr.isSucceeded();
         catch (IOException e) 
            LOGGER.error("", e);
        
        return false;
    
    public boolean deleteDoc(JestClient client,String indexId, String indexName, String indexType) 
        Delete.Builder builder = new Delete.Builder(indexId);
        builder.refresh(true);
        Delete delete = builder.index(indexName).type(indexType).build();
        try 
            JestResult result = client.execute(delete);
            if (result != null && !result.isSucceeded()) 
                throw new RuntimeException(result.getErrorMessage()+"删除文档失败!");
            
         catch (Exception e) 
            LOGGER.error("",e);
            return false;
        
        return true;
    
    public boolean deleteDocAll(JestClient client,String indexName, String indexType) 

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        searchSourceBuilder.query(boolQueryBuilder);

        DeleteByQuery.Builder builder = new DeleteByQuery.Builder(searchSourceBuilder.toString());
        builder.refresh(true);
        DeleteByQuery deleteByQuery = builder.addIndex(indexName).addType(indexType).build();
        try 
            JestResult result = client.execute(deleteByQuery);
            if (result != null && !result.isSucceeded()) 
                throw new RuntimeException(result.getErrorMessage()+"删除文档失败!");
            
         catch (Exception e) 
            LOGGER.error("",e);
            return false;
        
        return true;
    

    /**
     * 删除类型
     * @param indexName
     * @param indexType
     */
    public boolean deleteType(JestClient client, String indexName, String indexType) 
        DeleteIndex deleteIndex = new DeleteIndex.Builder(indexName).type(indexType).build();
        try 
            JestResult result = client.execute(deleteIndex);
            if (result != null && result.isSucceeded()) 
                throw new RuntimeException(result.getErrorMessage()+"删除类型失败!");
            
         catch (Exception e) 
            LOGGER.error("",e);
            return false;
        
        return true;
    

    /**
     * 删除索引
     * @param indexName
     */
    public boolean deleteIndex(JestClient client, String indexName) 
        DeleteIndex deleteIndex = new DeleteIndex.Builder(indexName).build();
        try 
            JestResult result = client.execute(deleteIndex);
            if (result != null && result.isSucceeded()) 
                throw new RuntimeException(result.getErrorMessage()+"删除索引失败!");
            
         catch (Exception e) 
            LOGGER.error("",e);
            return false;
        
        return true;
    
    /**
     * 插入或更新文档
     * @param id
     * @param indexObject
     * @param indexName
     * @param indexType
     * @return
     */
    public boolean insertOrUpdateDoc(JestClient client,String id, Object indexObject, String indexName, String indexType) 
        Index.Builder builder = new Index.Builder(indexObject);
        builder.id(id);
        builder.refresh(true);
        Index index = builder.index(indexName).type(indexType).build();
        try 
            JestResult result = client.execute(index);
            if (result != null && !result.isSucceeded()) 
                throw new RuntimeException(result.getErrorMessage()+"插入更新索引失败!");
            
         catch (Exception e) 
            LOGGER.error("",e);
            return false;
        
        return true;
    

        到此 ElasticSearch工具类封装介绍完成。

以上是关于ElasticSearch工具类封装的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot Elasticsearch工具类封装

Elasticsearch入门 API

Elasticsearch入门 API

Elasticsearch RestHighLevelClient客户端封装

分布式搜索引擎Elasticsearch PHP类封装 使用原生api

php 封装 elasticsearch