elasticsearch5.6.8 创建TransportClient工具类

Posted chong-zuo3322

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了elasticsearch5.6.8 创建TransportClient工具类相关的知识,希望对你有一定的参考价值。

ProjectConfig:

技术图片
package com.ultiwill.utils;

import com.ultiwill.entity.EsConfigEntity;
import com.alibaba.fastjson.JSON;

public class ProjectConfig {


    public static EsConfigEntity getESConf() {
        return (EsConfigEntity)JSON.parseObject(JSON.toJSONString(ConfigUtil.readYamlByPrefix("application.yaml", "es")), EsConfigEntity.class);
    }

    public static Object getCommonValue(String key) {
        return ConfigUtil.readYaml("application.yaml").get(key);
    }

}
View Code

ConfigUtil:

技术图片
package com.ultiwill.utils;

import org.yaml.snakeyaml.Yaml;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.Map;

public class ConfigUtil {

    private ConfigUtil() {
    }

    public static Map readYaml(String file) {
        if (!(new File(file)).exists()) {
            String projectDir = System.getProperty("user.dir");
            file = projectDir + File.separator + "conf" + File.separator +file;
        }

        try {
            return (Map)(new Yaml()).loadAs(new FileInputStream(file), HashMap.class);
        } catch (FileNotFoundException var2) {
            var2.printStackTrace();
            return null;
        }
    }


    public static Map<String, Object> readYamlByPrefix(String file, String prefix) {
        return (Map)readYaml(file).get(prefix);
    }

}
View Code

EsConfigEntity:

技术图片
package com.ultiwill.entity;

public class EsConfigEntity {
    private String clusterName;
    private String clusterHttpPort;
    private String clusterTcpPort;
    private String clusterNodes;
    private Long splitLimitNum;
    private Integer primaryShard;
    private Integer replicateShard;
    private String indexTranslogInterval;
    private String env;

    public EsConfigEntity() {
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public String getClusterHttpPort() {
        return this.clusterHttpPort;
    }

    public String getClusterTcpPort() {
        return this.clusterTcpPort;
    }

    public String getClusterNodes() {
        return this.clusterNodes;
    }

    public Long getSplitLimitNum() {
        return this.splitLimitNum;
    }

    public Integer getPrimaryShard() {
        return this.primaryShard;
    }

    public Integer getReplicateShard() {
        return this.replicateShard;
    }

    public String getIndexTranslogInterval() {
        return this.indexTranslogInterval;
    }

    public String getEnv() {
        return this.env;
    }

    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }

    public void setClusterHttpPort(String clusterHttpPort) {
        this.clusterHttpPort = clusterHttpPort;
    }

    public void setClusterTcpPort(String clusterTcpPort) {
        this.clusterTcpPort = clusterTcpPort;
    }

    public void setClusterNodes(String clusterNodes) {
        this.clusterNodes = clusterNodes;
    }

    public void setSplitLimitNum(Long splitLimitNum) {
        this.splitLimitNum = splitLimitNum;
    }

    public void setPrimaryShard(Integer primaryShard) {
        this.primaryShard = primaryShard;
    }

    public void setReplicateShard(Integer replicateShard) {
        this.replicateShard = replicateShard;
    }

    public void setIndexTranslogInterval(String indexTranslogInterval) {
        this.indexTranslogInterval = indexTranslogInterval;
    }

    public void setEnv(String env) {
        this.env = env;
    }

    public boolean equals(Object o) {
        if (o == this) {
            return true;
        } else if (!(o instanceof EsConfigEntity)) {
            return false;
        } else {
            EsConfigEntity other = (EsConfigEntity)o;
            if (!other.canEqual(this)) {
                return false;
            } else {
                label119: {
                    Object this$clusterName = this.getClusterName();
                    Object other$clusterName = other.getClusterName();
                    if (this$clusterName == null) {
                        if (other$clusterName == null) {
                            break label119;
                        }
                    } else if (this$clusterName.equals(other$clusterName)) {
                        break label119;
                    }

                    return false;
                }

                Object this$clusterHttpPort = this.getClusterHttpPort();
                Object other$clusterHttpPort = other.getClusterHttpPort();
                if (this$clusterHttpPort == null) {
                    if (other$clusterHttpPort != null) {
                        return false;
                    }
                } else if (!this$clusterHttpPort.equals(other$clusterHttpPort)) {
                    return false;
                }

                label105: {
                    Object this$clusterTcpPort = this.getClusterTcpPort();
                    Object other$clusterTcpPort = other.getClusterTcpPort();
                    if (this$clusterTcpPort == null) {
                        if (other$clusterTcpPort == null) {
                            break label105;
                        }
                    } else if (this$clusterTcpPort.equals(other$clusterTcpPort)) {
                        break label105;
                    }

                    return false;
                }

                Object this$clusterNodes = this.getClusterNodes();
                Object other$clusterNodes = other.getClusterNodes();
                if (this$clusterNodes == null) {
                    if (other$clusterNodes != null) {
                        return false;
                    }
                } else if (!this$clusterNodes.equals(other$clusterNodes)) {
                    return false;
                }

                label91: {
                    Object this$splitLimitNum = this.getSplitLimitNum();
                    Object other$splitLimitNum = other.getSplitLimitNum();
                    if (this$splitLimitNum == null) {
                        if (other$splitLimitNum == null) {
                            break label91;
                        }
                    } else if (this$splitLimitNum.equals(other$splitLimitNum)) {
                        break label91;
                    }

                    return false;
                }

                Object this$primaryShard = this.getPrimaryShard();
                Object other$primaryShard = other.getPrimaryShard();
                if (this$primaryShard == null) {
                    if (other$primaryShard != null) {
                        return false;
                    }
                } else if (!this$primaryShard.equals(other$primaryShard)) {
                    return false;
                }

                label77: {
                    Object this$replicateShard = this.getReplicateShard();
                    Object other$replicateShard = other.getReplicateShard();
                    if (this$replicateShard == null) {
                        if (other$replicateShard == null) {
                            break label77;
                        }
                    } else if (this$replicateShard.equals(other$replicateShard)) {
                        break label77;
                    }

                    return false;
                }

                label70: {
                    Object this$indexTranslogInterval = this.getIndexTranslogInterval();
                    Object other$indexTranslogInterval = other.getIndexTranslogInterval();
                    if (this$indexTranslogInterval == null) {
                        if (other$indexTranslogInterval == null) {
                            break label70;
                        }
                    } else if (this$indexTranslogInterval.equals(other$indexTranslogInterval)) {
                        break label70;
                    }

                    return false;
                }

                Object this$env = this.getEnv();
                Object other$env = other.getEnv();
                if (this$env == null) {
                    if (other$env != null) {
                        return false;
                    }
                } else if (!this$env.equals(other$env)) {
                    return false;
                }

                return true;
            }
        }
    }

    protected boolean canEqual(Object other) {
        return other instanceof EsConfigEntity;
    }



    public String toString() {
        return "EsConfigEntity(clusterName=" + this.getClusterName() + ", clusterHttpPort=" + this.getClusterHttpPort() + ", clusterTcpPort=" + this.getClusterTcpPort() + ", clusterNodes=" + this.getClusterNodes() + ", splitLimitNum=" + this.getSplitLimitNum() + ", primaryShard=" + this.getPrimaryShard() + ", replicateShard=" + this.getReplicateShard() + ", indexTranslogInterval=" + this.getIndexTranslogInterval() + ", env=" + this.getEnv() + ")";
    }
}
View Code

application.yaml:

es:
  clusterName: cloud-shield
  clusterHttpPort: 9200
  clusterTcpPort: 9300
  clusterNodes: 10.20.0.1,10.20.0.2,10.20.0.3,10.20.0.4,10.20.0.5,10.20.0.6,10.20.0.7,10.20.0.8,10.20.0.9,10.20.0.10,10.20.0.11,10.20.0.12,10.20.0.13,10.20.0.14,10.20.0.15,10.20.0.16,10.20.0.17,10.20.0.18,10.20.0.19
  splitLimitNum: 400000000
  primaryShard: 11
  replicateShard: 0
  indexTranslogInterval: 10s
  env: ultiwill

esClient:
  "cluster.name": cloud-shield
  "client.transport.sniff": "false"
  "xpack.security.user": "aw:123456"
  "processors": "30"


ESClientUtil:

技术图片
package com.ultiwill.utils;

import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;

public class ESClientUtil implements Serializable {
    private static org.slf4j.Logger logger = LoggerFactory.getLogger(ESClientUtil.class);
    private static TransportClient esClusterClient;
    private static BulkProcessor bulkProcessor;

    /**
     *@DESC: 初始化ES客户端
     * @return
     * @throws UnknownHostException
     */
    private static TransportClient initClient() throws UnknownHostException {
        String key = ProjectConfig.getESConf().getClusterName();
        Map<String,String> esClientSetting = (Map<String,String>) ProjectConfig.getCommonValue("esClient");
        Settings settings = Settings.builder()
                //.put("cluster.name", key)
                //.put("client.transport.sniff", true)
                //.put("xpack.security.user", "aw:123456")
                //.put("xpack.security.user", "developer:elastic_developer")
                .put(esClientSetting)
                .build();
        return new PreBuiltXPackTransportClient(settings).addTransportAddresses(assembleESAddress().toArray(new InetSocketTransportAddress[assembleESAddress().size()]));
    }

    /**
     * @DESC: 获取TransportClient
     * @return
     */
    public static TransportClient getClient(){
        System.setProperty("es.set.netty.runtime.available.processors", "false");
        if(esClusterClient==null){
            synchronized (ESClientUtil.class){
                try{
                    if(esClusterClient==null){
                        esClusterClient = initClient();
                        esClusterClient.settings();
                    }
                }catch (Exception e){
                    logger.error("ESClient创建失败...." + esClusterClient,e);
                }
            }
        }
        return esClusterClient;
    }

    /**
     * @DESC: 组装ES的hosts为TransportAddress
     * */
    private static List<TransportAddress> assembleESAddress() throws UnknownHostException {
        ArrayList<TransportAddress> esAddrList = new ArrayList<>();
        String[] esAddrArray = ProjectConfig.getESConf().getClusterNodes().split(",");
        for (String es:esAddrArray){
            esAddrList.add(new InetSocketTransportAddress(InetAddress.getByName(es),Integer.valueOf(ProjectConfig.getESConf().getClusterTcpPort())));
        }
        return esAddrList;
    }

    /**
     *@DESC: 关闭client连接
     */
    public static void closeClient(){
        if (esClusterClient != null){
            synchronized (ESClientUtil.class){
                try {
                    esClusterClient.close();
                    logger.info("ES Client 关闭成功...");
                } catch (Exception e) {
                    logger.error("ES Client关闭失败...",e);
                }
            }
        }
    }

    /**
     * @Desc: 利用bulk API将消息批量写入ES,不考虑routing
     * */
    public static void save2ES(TransportClient client, String index, String type, List<Map<String,Object>> messages,boolean ifSetId) throws Exception{
        try {
            BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            for (Map<String,Object> message:messages) {
                IndexRequestBuilder indexRequestBuilder1;
                if (ifSetId) {
                    indexRequestBuilder1 = client.prepareIndex().setId(message.get("id").toString()).setOpType(IndexRequest.OpType.INDEX).setIndex(index).setType("doc");
                } else {
                    indexRequestBuilder1 = client.prepareIndex()
                            .setOpType(IndexRequest.OpType.INDEX) //如果该doc不存在则insert,存在则update
                            .setIndex(index).setType("doc");
                }
                IndexRequestBuilder indexRequestBuilder2 = indexRequestBuilder1.setSource(message);
                bulkRequestBuilder.add(indexRequestBuilder2);
            }
//            logger.info("开始通过单线程模式加载数据到ES.....");
            BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
            if (bulkResponse.hasFailures()){
                logger.error("写入ES失败:" + bulkResponse.buildFailureMessage());
                throw new Exception("写入ES失败"+bulkResponse.buildFailureMessage());
            }
        }catch (Exception e){
            logger.error("ES 写入ES失败...",e);
            throw new Exception("写入ES失败",e);
        }

    }

    /**
     * @Desc: 利用bulk API将消息批量写入ES的索引信息表
     * */
    public static void saveES4IndexInfo(TransportClient client, String index, List<Map<String,String>> messages){
        double begin = System.currentTimeMillis();
        try {
            BulkRequestBuilder bulkRequestBuilder = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            for (Map<String,String> message:messages) {
                IndexRequestBuilder indexRequestBuilder1 = client.prepareIndex()
                        .setId(message.get("id").toString())
                        .setOpType(IndexRequest.OpType.INDEX) //如果该doc不存在则insert,存在则update
                        .setIndex(index).setType("doc");
                message.remove("id");
                //message.remove("protocoltype");
                IndexRequestBuilder indexRequestBuilder2 = indexRequestBuilder1.setSource(message);
                bulkRequestBuilder.add(indexRequestBuilder2);
            }
            logger.info("开始通过单线程模式加载数据到ES.....");
            BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
            if (bulkResponse.hasFailures()){
                logger.error("写入ES失败:" + bulkResponse.buildFailureMessage());
            }
        } finally {
            //ESClientUtil.closeClient();
        }
        double end = System.currentTimeMillis();
        logger.info("The total load time cost " + (end - begin)/1000 + " S...");//打印写入到ES所花的总时间
    }

    /**
     * @Desc : 根据index从已经完成的index中获取最大时间和最小时间
     * */
    public static String getTimeRangeByIndex(TransportClient client,String index, String sortField){
        String fromTime = "";
        String endTime = "";
        try {
            SearchResponse ascResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                    .setQuery(QueryBuilders.matchAllQuery())
                    .setSize(1).setScroll(TimeValue.timeValueMinutes(8))
                    .addSort(sortField, SortOrder.ASC)
                    .execute().actionGet();
            SearchResponse descResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                    .setQuery(QueryBuilders.matchAllQuery())
                    .setSize(1).setScroll(TimeValue.timeValueMinutes(8))
                    .addSort(sortField, SortOrder.DESC)
                    .execute().actionGet();
            SearchHit[] ascHits = ascResponse.getHits().getHits();
            SearchHit[] descHits = descResponse.getHits().getHits();
            for (SearchHit hit:ascHits){
                fromTime = hit.getSourceAsMap().get(sortField).toString();
            }
            for (SearchHit hit:descHits){
                endTime = hit.getSourceAsMap().get(sortField).toString();
            }
        } catch (Exception e) {
            logger.error("从当前索引中获取时间范围失败...",e);
        }finally {
            //ESClientUtil.closeClient();
        }
        return fromTime + "-" + endTime;
    }
    
    /**
     * @DESC: 从index_info中获取当前正在进行索引的others协议的索引别名,用来对拼接数据入ES
     * */
    public static String getRunningOthersAlias(TransportClient client){
        try {
            SearchResponse searchResponse = client.prepareSearch("index_info").setSearchType(SearchType.DEFAULT)
                    .setQuery(QueryBuilders.matchAllQuery())
                    .setSize(1000).setScroll(TimeValue.timeValueMinutes(8))
                    .execute().actionGet();
            ArrayList<String> aliasList = new ArrayList<>();
            SearchHit[] resultHits = searchResponse.getHits().getHits();
            for (SearchHit hit:resultHits){

                if(hit.getType().startsWith("others_") && hit.getType().endsWith("9999999999999")){
                    aliasList.add(hit.getType());
                }
            }
            if (aliasList.size() > 1) {
                long maxNum = 0L;
                for (String alias:aliasList){
                    long eachNum = Long.parseLong(alias.split("_")[1].split("-")[0]);
                    if (maxNum < eachNum){
                        maxNum = eachNum;
                    }
                }
                return "others_" + maxNum + "-9999999999999";
            }
            else{
                return aliasList.get(0);
            }
        } finally {
            //ESClientUtil.closeClient();
        }
    }

    /**
     * @DESC: 通过index以及某个field来判断该doc是否存在
     * */
    public static int ifFieldExist(TransportClient client,String index,String field,String fieldValue){
        int length = 0;
        try {
            SearchResponse searchResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                    //.setQuery(QueryBuilders.termQuery("_id", id))
                    .setQuery(QueryBuilders.termQuery(field, fieldValue))
                    .execute().actionGet();
            length = searchResponse.getHits().getHits().length;
        } catch (Exception e) {
            logger.error("查找字段是否存在报错..." + field,e);
        } finally {
        }
        return length;
    }

    /**
     * @DESC: 通过index以及某个field来判断该doc是否存在
     * */
    public static HashSet<Object> ifFieldExistV2(TransportClient client, String index, String field, List<String> fieldValues){
        HashSet<Object> hashSet = new HashSet<>();
        try {
            int interval = 1000;
            int size = fieldValues.size();
            int  num = size/interval + 1;
            //es批量查询支持有限字段值,因而分批次查询
            for(int i=0;i < num;i++){
                int from = i*interval;
                int to = (i+1)*interval;
                if(to>size) to = size;
                SearchResponse searchResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                        .setQuery(QueryBuilders.termsQuery(field, fieldValues.subList(from,to)))
                        .setFetchSource(field,null)
                        .execute().actionGet();
                SearchHit[] hits = searchResponse.getHits().getHits();
                for(SearchHit hit : hits){
                    hashSet.add(hit.getSourceAsMap().get(field));
                }
            }
        } catch (Exception e) {
            logger.error("查找字段是否存在报错..." + field,e);
        }
        return hashSet;
    }


    /**
     * @DESC: 通过id获取ES对应的doc内容
     * */
    public static Map<String, Object> getRecordById(TransportClient client, String index, String id){
        Map<String, Object> rst = new HashMap<>();
        SearchResponse searchResponse = client.prepareSearch(index).setSearchType(SearchType.DEFAULT)
                .setQuery(QueryBuilders.termQuery("_id", id))
                .execute().actionGet();
        SearchHit[] resultHits = searchResponse.getHits().getHits();
        for(SearchHit hit : resultHits){
            rst = hit.getSourceAsMap();

        }
        return rst;
    }

    /**
     * 获取单例全局 BulkProcessor
     *
     * */
    public static BulkProcessor getBulkProcessor(){
        if(bulkProcessor==null){
            synchronized (ESClientUtil.class){
                try{
                    if(bulkProcessor==null){
                        bulkProcessor = bulkProcessor(getClient(),"",4);
                    }
                }catch (Exception e){
                    logger.error("BulkProcessor创建失败...." + bulkProcessor,e);
                }
            }
        }
        return bulkProcessor;
    }

    private  static BulkProcessor bulkProcessor(TransportClient client,String taskName,int dbNum){
        return    BulkProcessor.builder(
                client,
                new BulkProcessor.Listener() {
                    @Override
                    public void beforeBulk(long executionId,
                                           BulkRequest request) {
                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          BulkResponse response) {
                        try {
                            if(response.hasFailures()){
                                String failureMessage = response.buildFailureMessage();
                                String[] info = failureMessage.split("
");
                                if(info.length>2){
                                    String value=info[0]+":"+info[1];
                                    logger.info("写入es情况:  partitionId:"+ 22 +"  ,executionId: "+ executionId+"  ,数量: " +request.numberOfActions()+"  ,写入存在失败:"+value);
                                    //任务名@IP地址@executionId@当前时间@当前发送量@成功OR失败
                                }else{
                                    logger.info("写入es情况:  partitionId:"+ 22 +"  ,executionId: "+ executionId+"  ,数量: " +request.numberOfActions()+"  ,写入存在失败:");
                                }
                            }else{
                                logger.info("写入es情况:  partitionId:"+ 22 +"  ,executionId: "+ executionId+"  ,数量: " +request.numberOfActions()+"  ,写入成功");
                            }
                        }catch (Exception e){
                            logger.error("BulkProcessor信息写入Redis异常",e);
                        }finally {
                        }

                    }

                    @Override
                    public void afterBulk(long executionId,
                                          BulkRequest request,
                                          Throwable failure) {
                        failure.printStackTrace();
                        logger.info("写入es失败:  partitionId:"+ 44 +"  ,executionId: "+ executionId+"  ,数量: " +request.numberOfActions()+"   ,错误: "+failure);
                    }
                })
                .setBulkActions(15000)
                .setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(120))
                .setConcurrentRequests(5)
                .setBackoffPolicy(
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
    }


}
View Code

 

以上是关于elasticsearch5.6.8 创建TransportClient工具类的主要内容,如果未能解决你的问题,请参考以下文章

搭建Elasticsearch5.6.8 分布式集群

SQL事务用法begin tran,commit tran和rollback tran的用法

sqlserver中事务总结:begin tran,rollback tran,commit tran

Begin Tran 和提交 Tran 的困惑

BEGIN TRAN...COMMIT TRAN 意思与用法

存储开头结尾使用begin tran,rollback tran作用?