springboot 2.0集成elasticsearch 7.6.2(集群)

Posted k↑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot 2.0集成elasticsearch 7.6.2(集群)相关的知识,希望对你有一定的参考价值。

小伙伴们,你们好呀,我是老寇,跟我一起学习es 7.6.2

注:请点击我,获取源码

目录

一、引入依赖配置pom.xml

二、配置application-dev.yml(生产就克隆application-dev改成生产配置)

三、配置ES注解

四、配置es及swagger

五、ES工具类 (索引相关配置不懂的,请查看elasticsearch 7.6.2 - 索引管理)

七、测试es

一、引入依赖配置pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
        </dependency> 
       <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.6.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.6.2</version>
        </dependency>

二、配置application-dev.yml(生产就克隆application-dev改成生产配置)

elasticsearch:
  host: 192.168.1.1:9200,192.1.2.133:9200,192.168.1.3:9200
  cluster-name: laokou-elasticsearch
  username:
  password:
  synonym:
    path: http://192.168.1.1:9048/laokou-service/synonym

问题思考比如说,一条文章记录,它有标题,内容,阅读量,在数据存入es时,我需要对es配置分词器,并且能够通过阅读量来筛选数据,你怎么做?

三、配置ES注解

注解可以修饰属性或方法(前提是先配置)

type > 需要在es配置什么类型

participle > 需要配置什么分词器

/**
 * @author Kou Shenhai
 */
@Target(ElementType.FIELD,ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface FieldInfo 

    /**
     * 默认 keyword
     * @return
     */
    String type() default "keyword";

    /**
     * 0 not_analyzed 1 ik_smart 2.ik_max_word 3.ik-index(自定义分词器)
     * @return
     */
    int participle() default 0;

拼接属性对应的类型及分词器

/**
 * 属性、类型、分词器
 * @author Kou Shenhai 2413176044@leimingtech.com
 * @version 1.0
 * @date 2021/2/9 0009 上午 10:20
 */
@Data
@NoArgsConstructor
public class FieldMapping 

    private String field;

    private String type;

    private Integer participle;

    public FieldMapping(String field, String type, Integer participle) 
        this.field = field;
        this.type = type;
        this.participle = participle;
    

组装每个属性对应的类型及分词器 => List<FieldMapping>

/**
 * 每个属性对应的类型及分词器
 * @author Kou Shenhai 2413176044@leimingtech.com
 * @version 1.0
 * @date 2021/1/24 0024 下午 7:51
 */
@Slf4j
public class FieldMappingUtil 

    public static List<FieldMapping> getFieldInfo(Class clazz) 
        return getFieldInfo(clazz, null);
    

    public static List<FieldMapping> getFieldInfo(Class clazz, String fieldName) 
        //返回class中的所有字段(包括私有字段)
        Field[] fields = clazz.getDeclaredFields();
        //创建FieldMapping集合
        List<FieldMapping> fieldMappingList = new ArrayList<>();
        for (Field field : fields) 
            //获取字段上的FieldInfo对象
            boolean annotationPresent = field.isAnnotationPresent(FieldInfo.class);
            if (annotationPresent) 
                FieldInfo fieldInfo = field.getAnnotation(FieldInfo.class);
                //获取字段名称
                String name = field.getName();
                fieldMappingList.add(new FieldMapping(name, fieldInfo.type(), fieldInfo.participle()));
             else 
                continue;
            
        
        return fieldMappingList;
    

四、配置es及swagger

/**
 * es配置文件
 * @author Kou Shenhai 2413176044@leimingtech.com
 * @version 1.0
 * @date 2020/8/9 0009 下午 2:01
 */
@Configuration
public class ElasticsearchConfig 
    
    private static final String HTTP_SCHEME = "http";

    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
    /**
     * 权限验证
     */
    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    /**
     * es主机
     */
    @Value("$elasticsearch.host")
    private String[] host;

    @Value("$elasticsearch.username")
    private String username;

    @Value("$elasticsearch.password")
    private String password;

    @Bean
    public RestClientBuilder restClientBuilder() 
        HttpHost[] hosts = Arrays.stream(host)
                .map(this::makeHttpHost)
                .filter(Objects::nonNull)
                .toArray(HttpHost[]::new);
        LOGGER.info("host:",Arrays.toString(hosts));
        //配置权限验证
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        return RestClient.builder(hosts).setHttpClientConfigCallback(httpClientBuilder ->
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
                .setMaxConnPerRoute(100)
                //最大连接数
                .setMaxConnTotal(100)
        ).setRequestConfigCallback(builder -> 
            builder.setConnectTimeout(-1);
            builder.setSocketTimeout(60000);
            builder.setConnectionRequestTimeout(-1);
            return builder;
        );
    

    /**
     * 处理请求地址
     * @param address
     * @return
     */
    private HttpHost makeHttpHost(String address) 
        assert StringUtils.isNotEmpty(address);
        String[] hostAddress = address.split(":");
        if (hostAddress.length == 2) 
            String ip = hostAddress[0];
            Integer port = Integer.valueOf(hostAddress[1]);
            return new HttpHost(ip, port, HTTP_SCHEME);
         else 
            return null;
        
    

    /**
     * 配置highLevelClient bean
     * @param restClientBuilder
     * @return
     */
    @Bean(name = "restHighLevelClient")
    public RestHighLevelClient restHighLevelClient(@Autowired RestClientBuilder restClientBuilder) 
        return new RestHighLevelClient(restClientBuilder);
    
/**
 * @author Kou Shenhai
 */
@Configuration
@EnableSwagger2
public class SwaggerConfig 
    @Bean
    public Docket createRestApi() 
        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .select()
                .apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class))
                .paths(PathSelectors.any())
                .build();
    
    private ApiInfo apiInfo() 
        return new ApiInfoBuilder()
                .title("API文档")
                .version("2.0.0")
                .description("API文档 - Elasticsearch服务")
                //作者信息
                .contact(new Contact("寇申海", "https://blog.csdn.net/qq_39893313", "2413176044@qq.com"))
                .build();
    

五、ES工具类 (索引相关配置不懂的,请查看elasticsearch 7.6.2 - 索引管理

/**
 * Elasticsearch工具类-用于操作ES
 * @author Kou Shenhai 2413176044@leimingtech.com
 * @version 1.0
 * @date 2021/1/24 0024 下午 5:42
 */
@Slf4j
@Component
public class ElasticsearchUtil 

    private static final String PRIMARY_KEY_NAME = "id";

    @Value("$elasticsearch.synonym.path")
    private String synonymPath;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /**
     * 批量同步数据到ES
     * @param indexName 索引名称
     * @param indexAlias 别名
     * @param jsonDataList 数据列表
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    public boolean saveDataBatch(String indexName,String indexAlias,String jsonDataList,Class clazz) throws IOException 
        //判空
        if (StringUtils.isBlank(jsonDataList)) 
            return false;
        
        if (syncIndex(indexName, indexAlias, clazz)) 
            return false;
        
        //3.批量操作Request
        BulkRequest bulkRequest = packBulkIndexRequest(indexName, jsonDataList);
        if (bulkRequest.requests().isEmpty()) 
            return false;
        
        final BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        if (bulk.hasFailures()) 
            for (BulkItemResponse item : bulk.getItems()) 
                log.error("索引[],主键[]更新操作失败,状态为:[],错误信息:",indexName,item.getId(),item.status(),item.getFailureMessage());
            
            return false;
        
        // 记录索引新增与修改数量
        Integer createdCount = 0;
        Integer updatedCount = 0;
        for (BulkItemResponse item : bulk.getItems()) 
            if (DocWriteResponse.Result.CREATED.equals(item.getResponse().getResult())) 
                createdCount++;
             else if (DocWriteResponse.Result.UPDATED.equals(item.getResponse().getResult()))
                updatedCount++;
            
        
        log.info("索引[]批量同步更新成功,共新增[]个,修改[]个",indexName,createdCount,updatedCount);
        return true;
    

    /**
     * ES修改数据
     * @param indexName 索引名称
     * @param id 主键
     * @param paramJson 参数JSON
     * @return
     */
    public boolean updateData(String indexName,String id,String paramJson) 
        UpdateRequest updateRequest = new UpdateRequest(indexName, id);
        //如果修改索引中不存在则进行新增
        updateRequest.docAsUpsert(true);
        //立即刷新数据
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        updateRequest.doc(paramJson,XContentType.JSON);
        try 
            UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            log.info("索引[],主键:【】操作结果:[]",indexName,id,updateResponse.getResult());
            if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult())) 
                //新增
                log.info("索引:【】,主键:【】新增成功",indexName,id);
                return true;
             else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult())) 
                //修改
                log.info("索引:【】,主键:【】修改成功",indexName, id);
                return true;
             else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult())) 
                //无变化
                log.info("索引:[],主键:[]无变化",indexName, id);
                return true;
            
         catch (IOException e) 
            e.printStackTrace();
            log.error("索引:[],主键:【】,更新异常:[]",indexName, id,e);
            return false;
        
        return false;
    

    /**
     * 批量修改ES
     * @param indexName 索引名称
     * @param indexAlias 别名
     * @param jsonDataList 数据列表
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    public boolean updateDataBatch(String indexName,String indexAlias, String jsonDataList,Class clazz) throws IOException 
        //1.创建索引
        boolean createIndexFlag = createIndex(indexName,indexAlias, clazz);
        if (!createIndexFlag) 
            return false;
        
        return this.updateDataBatch(indexName,jsonDataList);
    

    /**
     * 删除数据
     * @param indexName 索引名称
     * @param id 主键
     * @return
     */
    public boolean deleteData(String indexName,String id) 
        DeleteRequest deleteRequest = new DeleteRequest(indexName);
        deleteRequest.id(id);
        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try 
            DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
            if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult())) 
                log.error("索引:【】,主键:【】删除失败",indexName, id);
                return false;
             else 
                log.info("索引【】主键【】删除成功",indexName, id);
                return true;
            
         catch (IOException e) 
            e.printStackTrace();
            log.error("删除索引【】出现异常[]",indexName,e);
            return false;
        
    

    /**
     * 批量删除ES
     * @param indexName 索引名称
     * @param ids id列表
     * @return
     */
    public boolean deleteDataBatch(String indexName,List<String> ids) 
        if (CollectionUtils.isEmpty(ids)) 
            return false;
        
        BulkRequest bulkRequest = packBulkDeleteRequest(indexName, ids);
        try 
            BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (bulk.hasFailures()) 
                for (BulkItemResponse item : bulk.getItems()) 
                    log.error("删除索引:[],主键:失败,信息:",indexName,item.getId(),item.getFailureMessage());
                
                return false;
            
            //记录索引新增与修改数量
            Integer deleteCount = 0;
            for (BulkItemResponse item : bulk.getItems()) 
                if (DocWriteResponse.Result.DELETED.equals(item.getResponse().getResult())) 
                    deleteCount++;
                
            
            log.info("批量删除索引[]成功,共删除[]个",indexName,deleteCount);
            return true;
         catch (IOException e) 
            e.printStackTrace();
            log.error("删除索引:【】出现异常:",indexName,e);
            return false;
        
    

    /**
     * 组装删除操作
     * @param indexName 索引名称
     * @param ids id列表
     * @return
     */
    private BulkRequest packBulkDeleteRequest(String indexName, List<String> ids) 
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        ids.forEach(id -> 
            DeleteRequest deleteRequest = new DeleteRequest(indexName);
            deleteRequest.id(id);
            bulkRequest.add(deleteRequest);
        );
        return bulkRequest;
    

    /**
     * 批量修改ES
     * @param indexName 索引名称
     * @param jsonDataList json数据列表
     * @return
     */
    public boolean updateDataBatch(String indexName, String jsonDataList) 
        //判空
        if (StringUtils.isBlank(jsonDataList)) 
            return false;
        
        BulkRequest bulkRequest = packBulkUpdateRequest(indexName, jsonDataList);
        if (bulkRequest.requests().isEmpty()) 
            return false;
        
        try 
            //同步执行
            BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (bulk.hasFailures()) 
                for (BulkItemResponse item : bulk.getItems()) 
                    log.error("索引【】,主键[]修改操作失败,状态为【】,错误信息:",indexName,item.status(),item.getFailureMessage());
                
                return false;
            
            //记录索引新增与修改数量
            Integer createCount = 0;
            Integer updateCount = 0;
            for (BulkItemResponse item : bulk.getItems()) 
                if (DocWriteResponse.Result.CREATED.equals(item.getResponse().getResult())) 
                    createCount++;
                 else if (DocWriteResponse.Result.UPDATED.equals(item.getResponse().getResult())) 
                    updateCount++;
                
            
            log.info("索引【】批量修改更新成功,共新增[]个,修改[]个",indexName,createCount,updateCount);
         catch (IOException e) 
            e.printStackTrace();
            log.error("索引【】批量修改更新出现异常",indexName);
            return false;
        
        return true;
    

    /**
     * 组装bulkUpdate
     * @param indexName 索引名称
     * @param jsonDataList 数据列表
     * @return
     */
    private BulkRequest packBulkUpdateRequest(String indexName,String jsonDataList) 
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
        if (jsonArray.isEmpty()) 
            return bulkRequest;
        
        jsonArray.forEach(o -> 
            Map<String, String> map = (Map<String, String>) o;
            UpdateRequest updateRequest = new UpdateRequest(indexName,map.get(PRIMARY_KEY_NAME));
            // 修改索引中不存在就新增
            updateRequest.docAsUpsert(true);
            updateRequest.doc(JSON.toJSONString(o),XContentType.JSON);
            bulkRequest.add(updateRequest);
        );
        return bulkRequest;
    

    /**
     * 删除索引、新建索引
     * @param indexName 索引名称
     * @param indexAlias 别名
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    private boolean syncIndex(String indexName, String indexAlias, Class clazz) throws IOException 
        //1.删除索引
        boolean deleteAllFlag = deleteIndex(indexName);
        if (!deleteAllFlag) 
            return true;
        
        //2.创建索引
        boolean createIndexFlag = createIndex(indexName, indexAlias, clazz);
        if (!createIndexFlag) 
            return true;
        
        return false;
    

    /**
     * 根据主键查询ES
     * @param indexName 索引名称
     * @param id 主键
     * @return
     */
    public String getDataById(String indexName,String id) 
        //判断索引是否存在
        //1.判断索引是否存在
        boolean result = isIndexExists(indexName);
        if (!result) 
            return "";
        
        GetRequest getRequest = new GetRequest(indexName, id);
        try 
            GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
            String resultJson = getResponse.getSourceAsString();
            log.info("索引【】主键【】,查询结果:【】",indexName,id,resultJson);
            return resultJson;
         catch (IOException e) 
            e.printStackTrace();
            log.error("索引【】主键[],查询异常:",indexName,id,e);
            return "";
        

    

    /**
     * 清空索引内容
     * @param indexName 索引名称
     * @return
     */
    public boolean deleteAll(String indexName) 
        //1.判断索引是否存在
        boolean result = isIndexExists(indexName);
        if (!result) 
            log.error("索引【】不存在,删除失败",indexName);
            return false;
        
        DeleteRequest deleteRequest = new DeleteRequest(indexName);
        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try 
            DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
            if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult())) 
                log.error("索引【】删除失败",indexName);
                return false;
            
            log.info("索引【】删除成功",indexName);
            return true;
         catch (IOException e) 
            e.printStackTrace();
            log.error("删除索引[],出现异常[]",indexName,e);
            return false;
        
    

    /**
     * 批量数据保存到ES-异步
     * @param indexName 索引名称
     * @param indexAlias 索引别名
     * @param jsonDataList 数据列表
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    public boolean saveDataBatchSync(String indexName,String indexAlias,String jsonDataList,Class clazz) throws IOException 
        //判空
        if (StringUtils.isBlank(jsonDataList)) 
            return false;
        
        if (syncIndex(indexName, indexAlias, clazz)) 
            return false;
        
        //3.批量操作Request
        BulkRequest bulkRequest = packBulkIndexRequest(indexName, jsonDataList);
        if (bulkRequest.requests().isEmpty()) 
            return false;
        
        //4.异步执行
        ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() 
            @Override
            public void onResponse(BulkResponse bulkItemResponses) 
                if (bulkItemResponses.hasFailures()) 
                    for (BulkItemResponse item : bulkItemResponses.getItems()) 
                        log.error("索引【】,主键【】更新失败,状态【】,错误信息:",indexName,item.getId(),
                                item.status(),item.getFailureMessage());
                    
                
            
            //失败操作
            @Override
            public void onFailure(Exception e) 
                log.error("索引【】批量异步更新出现异常:",indexName,e);
            
        ;
        restHighLevelClient.bulkAsync(bulkRequest,RequestOptions.DEFAULT,listener);
        log.info("索引批量更新索引【】中",indexName);
        return true;
    

    /**
     * 删除索引
     * @param indexName 索引名称
     * @return
     * @throws IOException
     */
    public boolean deleteIndex(String indexName) throws IOException 
        //1.判断索引是否存在
        boolean result = isIndexExists(indexName);
        if (!result) 
            log.error("索引【】不存在,删除失败",indexName);
            return false;
        
        //2.删除操作Request
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
        deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
        AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
        if (!acknowledgedResponse.isAcknowledged()) 
            log.error("索引【】删除失败",indexName);
            return false;
        
        log.info("索引【】删除成功",indexName);
        return true;
    

    /**
     * 批量操作的Request
     * @param indexName 索引名称
     * @param jsonDataList json数据列表
     * @return
     */
    private BulkRequest packBulkIndexRequest(String indexName,String jsonDataList) 
        BulkRequest bulkRequest = new BulkRequest();
        //IMMEDIATE > 请求向es提交数据,立即进行数据刷新<实时性高,资源消耗高>
        //WAIT_UNTIL >  请求向es提交数据,等待数据完成刷新<实时性高,资源消耗低>
        //NONE > 默认策略<实时性低>
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
        if (jsonArray.isEmpty()) 
            return bulkRequest;
        
        //循环数据封装bulkRequest
        jsonArray.forEach(obj ->
            final Map<String, String> map = (Map<String, String>) obj;
            IndexRequest indexRequest = new IndexRequest(indexName);
            indexRequest.source(JSON.toJSONString(obj),XContentType.JSON);
            indexRequest.id(map.get(PRIMARY_KEY_NAME));
            bulkRequest.add(indexRequest);
        );
        return bulkRequest;
    

    /**
     * 创建索引
     * @param indexName 索引名称
     * @param indexAlias 别名
     * @param clazz 类型
     * @return
     * @throws IOException
     */
    public boolean createIndex(String indexName,String indexAlias,Class clazz) throws IOException 
        //判断索引是否存在
        boolean result = isIndexExists(indexName);
        if (!result) 
            boolean createResult = createIndexAndCreateMapping(indexName,indexAlias, FieldMappingUtil.getFieldInfo(clazz));
            if (!createResult) 
                log.info("索引【】创建失败",indexName);
                return false;
            
        
        log.info("索引:[]创建成功",indexName);
        return true;
    

    /**
     * 数据同步到ES
     * @param id 主键
     * @param indexName 索引名称
     * @param jsonData json数据
     * @param clazz 类型
     * @return
     */
    public boolean saveData(String id,String indexName,String indexAlias,String jsonData,Class clazz) throws IOException 
        //1.创建索引
        boolean createIndexFlag = createIndex(indexName,indexAlias, clazz);
        if (!createIndexFlag) 
            return false;
        
        //2.创建操作Request
        IndexRequest indexRequest = new IndexRequest(indexName);
        //3.配置相关信息
        indexRequest.source(jsonData, XContentType.JSON);
        //IMMEDIATE > 立即刷新
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        indexRequest.id(id);
        IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        //4.判断索引是新增还是修改
        if (IndexResponse.Result.CREATED.equals(response.getResult())) 
            log.info("索引【】保存成功",indexName);
            return true;
         else if (IndexResponse.Result.UPDATED.equals(response.getResult())) 
            log.info("索引【】修改成功",indexName);
            return true;
        
        return false;
    

    /**
     * 判断索引是否存在
     * @param indexName 索引名称
     * @return
     */
    public boolean isIndexExists(String indexName) 
        try 
            GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
            return restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
        catch (Exception e) 
            e.printStackTrace();
        
        return false;
    

    /**
     * 创建索引设置相关配置信息
     * @param indexName 索引名称
     * @param indexAlias 索引别名
     * @param fieldMappingList 数据列表
     * @return
     * @throws IOException
     */
    private boolean createIndexAndCreateMapping(String indexName,String indexAlias, List<FieldMapping> fieldMappingList) throws IOException 
        //封装es索引的mapping
        XContentBuilder mapping = packEsMapping(fieldMappingList, null);
        mapping.endObject().endObject();
        mapping.close();
        //进行索引的创建
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
        //配置分词器
        XContentBuilder settings = packSettingMapping();
        XContentBuilder aliases = packEsAliases(indexAlias);
        log.info("索引配置脚本:",settings);
        log.info("索引字段内容:",mapping);
        createIndexRequest.settings(settings);
        createIndexRequest.mapping("_doc", mapping);
        createIndexRequest.aliases(aliases);
        //同步方式创建索引
        CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest,RequestOptions.DEFAULT);
        boolean acknowledged = createIndexResponse.isAcknowledged();
        if (acknowledged) 
            log.info("索引:创建成功", indexName);
            return true;
         else 
            log.error("索引:创建失败", indexName);
            return false;
        
    

    /**
     * 配置ES别名
     * @author Kou Shenhai
     * @param alias 别名
     * @return
     * @throws IOException
     */
    private XContentBuilder packEsAliases(String alias) throws IOException
        XContentBuilder aliases = XContentFactory.jsonBuilder().startObject()
                .startObject(alias).endObject();
        aliases.endObject();
        aliases.close();
        return aliases;
    

    /**
     * 配置Mapping
     * @param fieldMappingList 组装的实体类信息
     * @param mapping
     * @return
     * @throws IOException
     */
    private XContentBuilder packEsMapping(List<FieldMapping> fieldMappingList,XContentBuilder mapping) throws IOException 
        if (mapping == null) 
            //如果对象是空,首次进入,设置开始结点
            mapping = XContentFactory.jsonBuilder().startObject()
                    .field("dynamic",true)
                    .startObject("properties");
        
        //循环实体对象的类型集合封装ES的Mapping
        for (FieldMapping fieldMapping : fieldMappingList) 
            String field = fieldMapping.getField();
            String dataType = fieldMapping.getType();
            Integer participle = fieldMapping.getParticiple();
            //设置分词规则
            if (Constant.NOT_ANALYZED.equals(participle)) 
                if (FieldTypeEnum.DATE.getValue().equals(dataType)) 
                    mapping.startObject(field)
                            .field("type", dataType)
                            .field("format","yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
                            .endObject();
                 else 
                    mapping.startObject(field)
                            .field("type", dataType)
                            .endObject();
                
             else if (Constant.IK_INDEX.equals(participle)) 
                mapping.startObject(field)
                        .field("type",dataType)
                        .field("eager_global_ordinals",true)
                        //fielddata=true 用来解决text字段不能进行聚合操作
                        .field("fielddata",true)
                        .field("boost",100.0)
                        .field("analyzer","ik-index-synonym")
                        .field("search_analyzer","ik-search-synonym")
                        .startObject("fields").startObject("pinyin")
                        .field("term_vector", "with_positions_offsets")
                        .field("analyzer","ik-search-pinyin")
                        .field("type",dataType)
                        .field("boost",100.0)
                        .endObject().endObject()
                        .endObject();
            
        
        return mapping;
    

    /**
     * 配置Settings
     * @return
     * @throws IOException
     */
    private XContentBuilder packSettingMapping() throws IOException 
        XContentBuilder setting = XContentFactory.jsonBuilder().startObject()
                .startObject("index")
                .field("number_of_shards",5)
                .field("number_of_replicas",1)
                .field("refresh_interval","120s")
                .endObject()
                .startObject("analysis");
        //ik分词 同义词 拼音
        setting.startObject("analyzer")
                .startObject("ik-search-pinyin")
                .field("type","custom")
                .field("tokenizer","ik_smart")
                .field("char_filter",new String[] "html_strip")
                .field("filter", new String[]"laokou-pinyin","word_delimiter","lowercase", "asciifolding")
                .endObject();
        setting.startObject("ik-index-synonym")
                .field("type","custom")
                .field("tokenizer","ik_max_word")
                .field("char_filter",new String[] "html_strip")
                .field("filter", new String[]"laokou-remote-synonym")
                .endObject();
        setting.startObject("ik-search-synonym")
                .field("type","custom")
                .field("tokenizer","ik_smart")
                .field("char_filter",new String[] "html_strip")
                .field("filter", new String[]"laokou-remote-synonym")
                .endObject();
        setting.endObject();
        //设置拼音分词器 同义词分词
        setting.startObject("filter")
                .startObject("laokou-pinyin")
                .field("type", "pinyin")
                .field("keep_first_letter", false)
                .field("keep_separate_first_letter", false)
                .field("keep_full_pinyin", true)
                .field("keep_original", false)
                .field("keep_joined_full_pinyin",true)
                .field("limit_first_letter_length", 16)
                .field("lowercase", true)
                .field("remove_duplicated_term", true)
                .endObject()
                .startObject("laokou-remote-synonym")
                .field("type","dynamic_synonym")
                .field("synonyms_path", synonymPath)
                .field("interval",120)
                .field("dynamic_reload",true)
                .endObject()
                .endObject();
        setting.endObject().endObject();
        setting.close();
        return setting;
    

问题思考比如说,我有几条记录,文章记录,聊天记录,订单记录,它们是不同的索引,需要单独建立索引,怎么根据不同的数据类型来创建不同的索引?你会怎么做?

六、索引管理工具类

/**
 * 索引管理
 * @author Kou Shenhai 2413176044@leimingtech.com
 * @version 1.0
 * @date 2021/10/31 0031 上午 10:11
 */
public class FieldUtil 

    public static final String MESSAGE_INDEX = "message";

    private static final Map<String,Class<?>> classMap = new HashMap<>(16);

    static 
        classMap.put(FieldUtil.MESSAGE_INDEX, MessageIndex.class);
    

    public static Class<?> getClazz(final String indexName) 
        return classMap.getOrDefault(indexName,Object.class);
    

七、测试es

/**
 * Elasticsearch API 服务
 * @author Kou Shenhai 2413176044@leimingtech.com
 * @version 1.0
 * @date 2021/2/8 0008 下午 6:33
 */
@RestController
@RequestMapping("/api")
@Api(tags = "Elasticsearch API 服务")
public class ElasticsearchController 

    @Autowired
    private ElasticsearchUtil elasticsearchUtil;

    @PostMapping("/sync")
    @ApiOperation("同步数据到ES")
    @CrossOrigin
    public void syncIndex(@RequestBody final ElasticsearchModel model) throws IOException 
        String id = model.getId();
        String indexName = model.getIndexName();
        String indexAlias = model.getIndexAlias();
        String jsonData = model.getData();
        Class<?> clazz = FieldUtil.getClazz(indexAlias);
        elasticsearchUtil.saveData(id,indexName,indexAlias,jsonData,clazz);
    

    @PostMapping("/batchSync")
    @ApiOperation("批量数据保存到ES-异步")
    @CrossOrigin
    public void batchSyncIndex(@RequestBody final ElasticsearchModel model) throws IOException 
        String indexName = model.getIndexName();
        String indexAlias = model.getIndexAlias();
        String jsonDataList = model.getData();
        Class<?> clazz = FieldUtil.getClazz(indexAlias);
        elasticsearchUtil.saveDataBatchSync(indexName,indexAlias,jsonDataList,clazz);
    

    @PostMapping("/batch")
    @ApiOperation("批量同步数据到ES")
    @CrossOrigin
    public void saveBatchIndex(@RequestBody final ElasticsearchModel model) throws IOException 
        String indexName = model.getIndexName();
        String indexAlias = model.getIndexAlias();
        String jsonDataList = model.getData();
        Class<?> clazz = FieldUtil.getClazz(indexAlias);
        elasticsearchUtil.saveDataBatch(indexName,indexAlias,jsonDataList,clazz);
    

    @GetMapping("/get")
    @ApiOperation("根据主键获取ES")
    @CrossOrigin
    @ApiImplicitParams(
            @ApiImplicitParam(name = "indexName",value = "索引名称",required = true,paramType = "query",dataType = "String"),
            @ApiImplicitParam(name = "id",value = "主键",required = true,paramType = "query",dataType = "String")
    )
    public HttpResultUtil<String> getDataById(@RequestParam("indexName")String indexName,@RequestParam("id")String id) 
        return new HttpResultUtil<String>().ok(elasticsearchUtil.getDataById(indexName,id));
    

    @PutMapping("/batch")
    @ApiOperation("批量修改ES")
    @CrossOrigin
    public void updateDataBatch(@RequestBody final ElasticsearchModel model) throws IOException 
        String indexName = model.getIndexName();
        String indexAlias = model.getIndexAlias();
        String jsonDataList = model.getData();
        Class<?> clazz = FieldUtil.getClazz(indexAlias);
        elasticsearchUtil.updateDataBatch(indexName,indexAlias,jsonDataList,clazz);
    

    @PutMapping("/sync")
    @ApiOperation("同步修改ES")
    @CrossOrigin
    public void updateData(@RequestBody final ElasticsearchModel model) 
        String id = model.getId();
        String indexName = model.getIndexName();
        String paramJson = model.getData();
        elasticsearchUtil.updateData(indexName,id,paramJson);
    

    @DeleteMapping("/batch")
    @ApiOperation("批量删除ES")
    @CrossOrigin
    public void deleteDataBatch(@RequestParam("indexName")String indexName,@RequestParam("ids")List<String> ids) 
        elasticsearchUtil.deleteDataBatch(indexName,ids);
    

    @DeleteMapping("/sync")
    @ApiOperation("同步删除ES")
    @CrossOrigin
    public void deleteData(@RequestParam("indexName")String indexName,@RequestParam("id")String id) 
        elasticsearchUtil.deleteData(indexName,id);
    

    @DeleteMapping("/all")
    @ApiOperation("清空ES")
    @CrossOrigin
    public void deleteAll(@RequestParam("indexName")String indexName) 
        elasticsearchUtil.deleteAll(indexName);
    

大功告成

补充:可根据自己的业务进行数据分区

以上是关于springboot 2.0集成elasticsearch 7.6.2(集群)的主要内容,如果未能解决你的问题,请参考以下文章

springboot 2.0集成elasticsearch 7.6.2(集群)

springboot 2.0 集成 kafka 2.6.2(集群) + elk 7.6.2(集群)

Spring Boot 2.0 图文教程 | 集成邮件发送功能

springboot 2.0集成elasticsearch 7.6.2 (集群)关键字高亮显示

Angular(SPA) 前端和 Spring Boot 后端的 SAML 2.0 集成

Spring Boot 和 SAML 2.0