springboot 2.0集成elasticsearch 7.6.2(集群)
Posted k↑
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot 2.0集成elasticsearch 7.6.2(集群)相关的知识,希望对你有一定的参考价值。
小伙伴们,你们好呀,我是老寇,跟我一起学习es 7.6.2
目录
二、配置application-dev.yml(生产就克隆application-dev改成生产配置)
一、引入依赖配置pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.2</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.6.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.6.2</version>
</dependency>
二、配置application-dev.yml(生产就克隆application-dev改成生产配置)
elasticsearch:
host: 192.168.1.1:9200,192.1.2.133:9200,192.168.1.3:9200
cluster-name: laokou-elasticsearch
username:
password:
synonym:
path: http://192.168.1.1:9048/laokou-service/synonym
问题思考:比如说,一条文章记录,它有标题,内容,阅读量,在数据存入es时,我需要对es配置分词器,并且能够通过阅读量来筛选数据,你怎么做?
三、配置ES注解
注解可以修饰属性或方法(前提是先配置)
type > 需要在es配置什么类型
participle > 需要配置什么分词器
/**
* @author Kou Shenhai
*/
@Target(ElementType.FIELD,ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface FieldInfo
/**
* 默认 keyword
* @return
*/
String type() default "keyword";
/**
* 0 not_analyzed 1 ik_smart 2.ik_max_word 3.ik-index(自定义分词器)
* @return
*/
int participle() default 0;
拼接属性对应的类型及分词器
/**
* 属性、类型、分词器
* @author Kou Shenhai 2413176044@leimingtech.com
* @version 1.0
* @date 2021/2/9 0009 上午 10:20
*/
@Data
@NoArgsConstructor
public class FieldMapping
private String field;
private String type;
private Integer participle;
public FieldMapping(String field, String type, Integer participle)
this.field = field;
this.type = type;
this.participle = participle;
组装每个属性对应的类型及分词器 => List<FieldMapping>
/**
* 每个属性对应的类型及分词器
* @author Kou Shenhai 2413176044@leimingtech.com
* @version 1.0
* @date 2021/1/24 0024 下午 7:51
*/
@Slf4j
public class FieldMappingUtil
public static List<FieldMapping> getFieldInfo(Class clazz)
return getFieldInfo(clazz, null);
public static List<FieldMapping> getFieldInfo(Class clazz, String fieldName)
//返回class中的所有字段(包括私有字段)
Field[] fields = clazz.getDeclaredFields();
//创建FieldMapping集合
List<FieldMapping> fieldMappingList = new ArrayList<>();
for (Field field : fields)
//获取字段上的FieldInfo对象
boolean annotationPresent = field.isAnnotationPresent(FieldInfo.class);
if (annotationPresent)
FieldInfo fieldInfo = field.getAnnotation(FieldInfo.class);
//获取字段名称
String name = field.getName();
fieldMappingList.add(new FieldMapping(name, fieldInfo.type(), fieldInfo.participle()));
else
continue;
return fieldMappingList;
四、配置es及swagger
/**
* es配置文件
* @author Kou Shenhai 2413176044@leimingtech.com
* @version 1.0
* @date 2020/8/9 0009 下午 2:01
*/
@Configuration
public class ElasticsearchConfig
private static final String HTTP_SCHEME = "http";
private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConfig.class);
/**
* 权限验证
*/
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
/**
* es主机
*/
@Value("$elasticsearch.host")
private String[] host;
@Value("$elasticsearch.username")
private String username;
@Value("$elasticsearch.password")
private String password;
@Bean
public RestClientBuilder restClientBuilder()
HttpHost[] hosts = Arrays.stream(host)
.map(this::makeHttpHost)
.filter(Objects::nonNull)
.toArray(HttpHost[]::new);
LOGGER.info("host:",Arrays.toString(hosts));
//配置权限验证
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
return RestClient.builder(hosts).setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
.setMaxConnPerRoute(100)
//最大连接数
.setMaxConnTotal(100)
).setRequestConfigCallback(builder ->
builder.setConnectTimeout(-1);
builder.setSocketTimeout(60000);
builder.setConnectionRequestTimeout(-1);
return builder;
);
/**
* 处理请求地址
* @param address
* @return
*/
private HttpHost makeHttpHost(String address)
assert StringUtils.isNotEmpty(address);
String[] hostAddress = address.split(":");
if (hostAddress.length == 2)
String ip = hostAddress[0];
Integer port = Integer.valueOf(hostAddress[1]);
return new HttpHost(ip, port, HTTP_SCHEME);
else
return null;
/**
* 配置highLevelClient bean
* @param restClientBuilder
* @return
*/
@Bean(name = "restHighLevelClient")
public RestHighLevelClient restHighLevelClient(@Autowired RestClientBuilder restClientBuilder)
return new RestHighLevelClient(restClientBuilder);
/**
* @author Kou Shenhai
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig
@Bean
public Docket createRestApi()
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class))
.paths(PathSelectors.any())
.build();
private ApiInfo apiInfo()
return new ApiInfoBuilder()
.title("API文档")
.version("2.0.0")
.description("API文档 - Elasticsearch服务")
//作者信息
.contact(new Contact("寇申海", "https://blog.csdn.net/qq_39893313", "2413176044@qq.com"))
.build();
五、ES工具类 (索引相关配置不懂的,请查看elasticsearch 7.6.2 - 索引管理)
/**
* Elasticsearch工具类-用于操作ES
* @author Kou Shenhai 2413176044@leimingtech.com
* @version 1.0
* @date 2021/1/24 0024 下午 5:42
*/
@Slf4j
@Component
public class ElasticsearchUtil
private static final String PRIMARY_KEY_NAME = "id";
@Value("$elasticsearch.synonym.path")
private String synonymPath;
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 批量同步数据到ES
* @param indexName 索引名称
* @param indexAlias 别名
* @param jsonDataList 数据列表
* @param clazz 类型
* @return
* @throws IOException
*/
public boolean saveDataBatch(String indexName,String indexAlias,String jsonDataList,Class clazz) throws IOException
//判空
if (StringUtils.isBlank(jsonDataList))
return false;
if (syncIndex(indexName, indexAlias, clazz))
return false;
//3.批量操作Request
BulkRequest bulkRequest = packBulkIndexRequest(indexName, jsonDataList);
if (bulkRequest.requests().isEmpty())
return false;
final BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulk.hasFailures())
for (BulkItemResponse item : bulk.getItems())
log.error("索引[],主键[]更新操作失败,状态为:[],错误信息:",indexName,item.getId(),item.status(),item.getFailureMessage());
return false;
// 记录索引新增与修改数量
Integer createdCount = 0;
Integer updatedCount = 0;
for (BulkItemResponse item : bulk.getItems())
if (DocWriteResponse.Result.CREATED.equals(item.getResponse().getResult()))
createdCount++;
else if (DocWriteResponse.Result.UPDATED.equals(item.getResponse().getResult()))
updatedCount++;
log.info("索引[]批量同步更新成功,共新增[]个,修改[]个",indexName,createdCount,updatedCount);
return true;
/**
* ES修改数据
* @param indexName 索引名称
* @param id 主键
* @param paramJson 参数JSON
* @return
*/
public boolean updateData(String indexName,String id,String paramJson)
UpdateRequest updateRequest = new UpdateRequest(indexName, id);
//如果修改索引中不存在则进行新增
updateRequest.docAsUpsert(true);
//立即刷新数据
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(paramJson,XContentType.JSON);
try
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("索引[],主键:【】操作结果:[]",indexName,id,updateResponse.getResult());
if (DocWriteResponse.Result.CREATED.equals(updateResponse.getResult()))
//新增
log.info("索引:【】,主键:【】新增成功",indexName,id);
return true;
else if (DocWriteResponse.Result.UPDATED.equals(updateResponse.getResult()))
//修改
log.info("索引:【】,主键:【】修改成功",indexName, id);
return true;
else if (DocWriteResponse.Result.NOOP.equals(updateResponse.getResult()))
//无变化
log.info("索引:[],主键:[]无变化",indexName, id);
return true;
catch (IOException e)
e.printStackTrace();
log.error("索引:[],主键:【】,更新异常:[]",indexName, id,e);
return false;
return false;
/**
* 批量修改ES
* @param indexName 索引名称
* @param indexAlias 别名
* @param jsonDataList 数据列表
* @param clazz 类型
* @return
* @throws IOException
*/
public boolean updateDataBatch(String indexName,String indexAlias, String jsonDataList,Class clazz) throws IOException
//1.创建索引
boolean createIndexFlag = createIndex(indexName,indexAlias, clazz);
if (!createIndexFlag)
return false;
return this.updateDataBatch(indexName,jsonDataList);
/**
* 删除数据
* @param indexName 索引名称
* @param id 主键
* @return
*/
public boolean deleteData(String indexName,String id)
DeleteRequest deleteRequest = new DeleteRequest(indexName);
deleteRequest.id(id);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult()))
log.error("索引:【】,主键:【】删除失败",indexName, id);
return false;
else
log.info("索引【】主键【】删除成功",indexName, id);
return true;
catch (IOException e)
e.printStackTrace();
log.error("删除索引【】出现异常[]",indexName,e);
return false;
/**
* 批量删除ES
* @param indexName 索引名称
* @param ids id列表
* @return
*/
public boolean deleteDataBatch(String indexName,List<String> ids)
if (CollectionUtils.isEmpty(ids))
return false;
BulkRequest bulkRequest = packBulkDeleteRequest(indexName, ids);
try
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulk.hasFailures())
for (BulkItemResponse item : bulk.getItems())
log.error("删除索引:[],主键:失败,信息:",indexName,item.getId(),item.getFailureMessage());
return false;
//记录索引新增与修改数量
Integer deleteCount = 0;
for (BulkItemResponse item : bulk.getItems())
if (DocWriteResponse.Result.DELETED.equals(item.getResponse().getResult()))
deleteCount++;
log.info("批量删除索引[]成功,共删除[]个",indexName,deleteCount);
return true;
catch (IOException e)
e.printStackTrace();
log.error("删除索引:【】出现异常:",indexName,e);
return false;
/**
* 组装删除操作
* @param indexName 索引名称
* @param ids id列表
* @return
*/
private BulkRequest packBulkDeleteRequest(String indexName, List<String> ids)
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ids.forEach(id ->
DeleteRequest deleteRequest = new DeleteRequest(indexName);
deleteRequest.id(id);
bulkRequest.add(deleteRequest);
);
return bulkRequest;
/**
* 批量修改ES
* @param indexName 索引名称
* @param jsonDataList json数据列表
* @return
*/
public boolean updateDataBatch(String indexName, String jsonDataList)
//判空
if (StringUtils.isBlank(jsonDataList))
return false;
BulkRequest bulkRequest = packBulkUpdateRequest(indexName, jsonDataList);
if (bulkRequest.requests().isEmpty())
return false;
try
//同步执行
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulk.hasFailures())
for (BulkItemResponse item : bulk.getItems())
log.error("索引【】,主键[]修改操作失败,状态为【】,错误信息:",indexName,item.status(),item.getFailureMessage());
return false;
//记录索引新增与修改数量
Integer createCount = 0;
Integer updateCount = 0;
for (BulkItemResponse item : bulk.getItems())
if (DocWriteResponse.Result.CREATED.equals(item.getResponse().getResult()))
createCount++;
else if (DocWriteResponse.Result.UPDATED.equals(item.getResponse().getResult()))
updateCount++;
log.info("索引【】批量修改更新成功,共新增[]个,修改[]个",indexName,createCount,updateCount);
catch (IOException e)
e.printStackTrace();
log.error("索引【】批量修改更新出现异常",indexName);
return false;
return true;
/**
* 组装bulkUpdate
* @param indexName 索引名称
* @param jsonDataList 数据列表
* @return
*/
private BulkRequest packBulkUpdateRequest(String indexName,String jsonDataList)
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
if (jsonArray.isEmpty())
return bulkRequest;
jsonArray.forEach(o ->
Map<String, String> map = (Map<String, String>) o;
UpdateRequest updateRequest = new UpdateRequest(indexName,map.get(PRIMARY_KEY_NAME));
// 修改索引中不存在就新增
updateRequest.docAsUpsert(true);
updateRequest.doc(JSON.toJSONString(o),XContentType.JSON);
bulkRequest.add(updateRequest);
);
return bulkRequest;
/**
* 删除索引、新建索引
* @param indexName 索引名称
* @param indexAlias 别名
* @param clazz 类型
* @return
* @throws IOException
*/
private boolean syncIndex(String indexName, String indexAlias, Class clazz) throws IOException
//1.删除索引
boolean deleteAllFlag = deleteIndex(indexName);
if (!deleteAllFlag)
return true;
//2.创建索引
boolean createIndexFlag = createIndex(indexName, indexAlias, clazz);
if (!createIndexFlag)
return true;
return false;
/**
* 根据主键查询ES
* @param indexName 索引名称
* @param id 主键
* @return
*/
public String getDataById(String indexName,String id)
//判断索引是否存在
//1.判断索引是否存在
boolean result = isIndexExists(indexName);
if (!result)
return "";
GetRequest getRequest = new GetRequest(indexName, id);
try
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
String resultJson = getResponse.getSourceAsString();
log.info("索引【】主键【】,查询结果:【】",indexName,id,resultJson);
return resultJson;
catch (IOException e)
e.printStackTrace();
log.error("索引【】主键[],查询异常:",indexName,id,e);
return "";
/**
* 清空索引内容
* @param indexName 索引名称
* @return
*/
public boolean deleteAll(String indexName)
//1.判断索引是否存在
boolean result = isIndexExists(indexName);
if (!result)
log.error("索引【】不存在,删除失败",indexName);
return false;
DeleteRequest deleteRequest = new DeleteRequest(indexName);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
if (DocWriteResponse.Result.NOT_FOUND.equals(deleteResponse.getResult()))
log.error("索引【】删除失败",indexName);
return false;
log.info("索引【】删除成功",indexName);
return true;
catch (IOException e)
e.printStackTrace();
log.error("删除索引[],出现异常[]",indexName,e);
return false;
/**
* 批量数据保存到ES-异步
* @param indexName 索引名称
* @param indexAlias 索引别名
* @param jsonDataList 数据列表
* @param clazz 类型
* @return
* @throws IOException
*/
public boolean saveDataBatchSync(String indexName,String indexAlias,String jsonDataList,Class clazz) throws IOException
//判空
if (StringUtils.isBlank(jsonDataList))
return false;
if (syncIndex(indexName, indexAlias, clazz))
return false;
//3.批量操作Request
BulkRequest bulkRequest = packBulkIndexRequest(indexName, jsonDataList);
if (bulkRequest.requests().isEmpty())
return false;
//4.异步执行
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>()
@Override
public void onResponse(BulkResponse bulkItemResponses)
if (bulkItemResponses.hasFailures())
for (BulkItemResponse item : bulkItemResponses.getItems())
log.error("索引【】,主键【】更新失败,状态【】,错误信息:",indexName,item.getId(),
item.status(),item.getFailureMessage());
//失败操作
@Override
public void onFailure(Exception e)
log.error("索引【】批量异步更新出现异常:",indexName,e);
;
restHighLevelClient.bulkAsync(bulkRequest,RequestOptions.DEFAULT,listener);
log.info("索引批量更新索引【】中",indexName);
return true;
/**
* 删除索引
* @param indexName 索引名称
* @return
* @throws IOException
*/
public boolean deleteIndex(String indexName) throws IOException
//1.判断索引是否存在
boolean result = isIndexExists(indexName);
if (!result)
log.error("索引【】不存在,删除失败",indexName);
return false;
//2.删除操作Request
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
if (!acknowledgedResponse.isAcknowledged())
log.error("索引【】删除失败",indexName);
return false;
log.info("索引【】删除成功",indexName);
return true;
/**
* 批量操作的Request
* @param indexName 索引名称
* @param jsonDataList json数据列表
* @return
*/
private BulkRequest packBulkIndexRequest(String indexName,String jsonDataList)
BulkRequest bulkRequest = new BulkRequest();
//IMMEDIATE > 请求向es提交数据,立即进行数据刷新<实时性高,资源消耗高>
//WAIT_UNTIL > 请求向es提交数据,等待数据完成刷新<实时性高,资源消耗低>
//NONE > 默认策略<实时性低>
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
JSONArray jsonArray = JSONArray.parseArray(jsonDataList);
if (jsonArray.isEmpty())
return bulkRequest;
//循环数据封装bulkRequest
jsonArray.forEach(obj ->
final Map<String, String> map = (Map<String, String>) obj;
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest.source(JSON.toJSONString(obj),XContentType.JSON);
indexRequest.id(map.get(PRIMARY_KEY_NAME));
bulkRequest.add(indexRequest);
);
return bulkRequest;
/**
* 创建索引
* @param indexName 索引名称
* @param indexAlias 别名
* @param clazz 类型
* @return
* @throws IOException
*/
public boolean createIndex(String indexName,String indexAlias,Class clazz) throws IOException
//判断索引是否存在
boolean result = isIndexExists(indexName);
if (!result)
boolean createResult = createIndexAndCreateMapping(indexName,indexAlias, FieldMappingUtil.getFieldInfo(clazz));
if (!createResult)
log.info("索引【】创建失败",indexName);
return false;
log.info("索引:[]创建成功",indexName);
return true;
/**
* 数据同步到ES
* @param id 主键
* @param indexName 索引名称
* @param jsonData json数据
* @param clazz 类型
* @return
*/
public boolean saveData(String id,String indexName,String indexAlias,String jsonData,Class clazz) throws IOException
//1.创建索引
boolean createIndexFlag = createIndex(indexName,indexAlias, clazz);
if (!createIndexFlag)
return false;
//2.创建操作Request
IndexRequest indexRequest = new IndexRequest(indexName);
//3.配置相关信息
indexRequest.source(jsonData, XContentType.JSON);
//IMMEDIATE > 立即刷新
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
indexRequest.id(id);
IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
//4.判断索引是新增还是修改
if (IndexResponse.Result.CREATED.equals(response.getResult()))
log.info("索引【】保存成功",indexName);
return true;
else if (IndexResponse.Result.UPDATED.equals(response.getResult()))
log.info("索引【】修改成功",indexName);
return true;
return false;
/**
* 判断索引是否存在
* @param indexName 索引名称
* @return
*/
public boolean isIndexExists(String indexName)
try
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
return restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
catch (Exception e)
e.printStackTrace();
return false;
/**
* 创建索引设置相关配置信息
* @param indexName 索引名称
* @param indexAlias 索引别名
* @param fieldMappingList 数据列表
* @return
* @throws IOException
*/
private boolean createIndexAndCreateMapping(String indexName,String indexAlias, List<FieldMapping> fieldMappingList) throws IOException
//封装es索引的mapping
XContentBuilder mapping = packEsMapping(fieldMappingList, null);
mapping.endObject().endObject();
mapping.close();
//进行索引的创建
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
//配置分词器
XContentBuilder settings = packSettingMapping();
XContentBuilder aliases = packEsAliases(indexAlias);
log.info("索引配置脚本:",settings);
log.info("索引字段内容:",mapping);
createIndexRequest.settings(settings);
createIndexRequest.mapping("_doc", mapping);
createIndexRequest.aliases(aliases);
//同步方式创建索引
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest,RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
if (acknowledged)
log.info("索引:创建成功", indexName);
return true;
else
log.error("索引:创建失败", indexName);
return false;
/**
* 配置ES别名
* @author Kou Shenhai
* @param alias 别名
* @return
* @throws IOException
*/
private XContentBuilder packEsAliases(String alias) throws IOException
XContentBuilder aliases = XContentFactory.jsonBuilder().startObject()
.startObject(alias).endObject();
aliases.endObject();
aliases.close();
return aliases;
/**
* 配置Mapping
* @param fieldMappingList 组装的实体类信息
* @param mapping
* @return
* @throws IOException
*/
private XContentBuilder packEsMapping(List<FieldMapping> fieldMappingList,XContentBuilder mapping) throws IOException
if (mapping == null)
//如果对象是空,首次进入,设置开始结点
mapping = XContentFactory.jsonBuilder().startObject()
.field("dynamic",true)
.startObject("properties");
//循环实体对象的类型集合封装ES的Mapping
for (FieldMapping fieldMapping : fieldMappingList)
String field = fieldMapping.getField();
String dataType = fieldMapping.getType();
Integer participle = fieldMapping.getParticiple();
//设置分词规则
if (Constant.NOT_ANALYZED.equals(participle))
if (FieldTypeEnum.DATE.getValue().equals(dataType))
mapping.startObject(field)
.field("type", dataType)
.field("format","yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")
.endObject();
else
mapping.startObject(field)
.field("type", dataType)
.endObject();
else if (Constant.IK_INDEX.equals(participle))
mapping.startObject(field)
.field("type",dataType)
.field("eager_global_ordinals",true)
//fielddata=true 用来解决text字段不能进行聚合操作
.field("fielddata",true)
.field("boost",100.0)
.field("analyzer","ik-index-synonym")
.field("search_analyzer","ik-search-synonym")
.startObject("fields").startObject("pinyin")
.field("term_vector", "with_positions_offsets")
.field("analyzer","ik-search-pinyin")
.field("type",dataType)
.field("boost",100.0)
.endObject().endObject()
.endObject();
return mapping;
/**
* 配置Settings
* @return
* @throws IOException
*/
private XContentBuilder packSettingMapping() throws IOException
XContentBuilder setting = XContentFactory.jsonBuilder().startObject()
.startObject("index")
.field("number_of_shards",5)
.field("number_of_replicas",1)
.field("refresh_interval","120s")
.endObject()
.startObject("analysis");
//ik分词 同义词 拼音
setting.startObject("analyzer")
.startObject("ik-search-pinyin")
.field("type","custom")
.field("tokenizer","ik_smart")
.field("char_filter",new String[] "html_strip")
.field("filter", new String[]"laokou-pinyin","word_delimiter","lowercase", "asciifolding")
.endObject();
setting.startObject("ik-index-synonym")
.field("type","custom")
.field("tokenizer","ik_max_word")
.field("char_filter",new String[] "html_strip")
.field("filter", new String[]"laokou-remote-synonym")
.endObject();
setting.startObject("ik-search-synonym")
.field("type","custom")
.field("tokenizer","ik_smart")
.field("char_filter",new String[] "html_strip")
.field("filter", new String[]"laokou-remote-synonym")
.endObject();
setting.endObject();
//设置拼音分词器 同义词分词
setting.startObject("filter")
.startObject("laokou-pinyin")
.field("type", "pinyin")
.field("keep_first_letter", false)
.field("keep_separate_first_letter", false)
.field("keep_full_pinyin", true)
.field("keep_original", false)
.field("keep_joined_full_pinyin",true)
.field("limit_first_letter_length", 16)
.field("lowercase", true)
.field("remove_duplicated_term", true)
.endObject()
.startObject("laokou-remote-synonym")
.field("type","dynamic_synonym")
.field("synonyms_path", synonymPath)
.field("interval",120)
.field("dynamic_reload",true)
.endObject()
.endObject();
setting.endObject().endObject();
setting.close();
return setting;
问题思考:比如说,我有几条记录,文章记录,聊天记录,订单记录,它们是不同的索引,需要单独建立索引,怎么根据不同的数据类型来创建不同的索引?你会怎么做?
六、索引管理工具类
/**
* 索引管理
* @author Kou Shenhai 2413176044@leimingtech.com
* @version 1.0
* @date 2021/10/31 0031 上午 10:11
*/
public class FieldUtil
public static final String MESSAGE_INDEX = "message";
private static final Map<String,Class<?>> classMap = new HashMap<>(16);
static
classMap.put(FieldUtil.MESSAGE_INDEX, MessageIndex.class);
public static Class<?> getClazz(final String indexName)
return classMap.getOrDefault(indexName,Object.class);
七、测试es
/**
* Elasticsearch API 服务
* @author Kou Shenhai 2413176044@leimingtech.com
* @version 1.0
* @date 2021/2/8 0008 下午 6:33
*/
@RestController
@RequestMapping("/api")
@Api(tags = "Elasticsearch API 服务")
public class ElasticsearchController
@Autowired
private ElasticsearchUtil elasticsearchUtil;
@PostMapping("/sync")
@ApiOperation("同步数据到ES")
@CrossOrigin
public void syncIndex(@RequestBody final ElasticsearchModel model) throws IOException
String id = model.getId();
String indexName = model.getIndexName();
String indexAlias = model.getIndexAlias();
String jsonData = model.getData();
Class<?> clazz = FieldUtil.getClazz(indexAlias);
elasticsearchUtil.saveData(id,indexName,indexAlias,jsonData,clazz);
@PostMapping("/batchSync")
@ApiOperation("批量数据保存到ES-异步")
@CrossOrigin
public void batchSyncIndex(@RequestBody final ElasticsearchModel model) throws IOException
String indexName = model.getIndexName();
String indexAlias = model.getIndexAlias();
String jsonDataList = model.getData();
Class<?> clazz = FieldUtil.getClazz(indexAlias);
elasticsearchUtil.saveDataBatchSync(indexName,indexAlias,jsonDataList,clazz);
@PostMapping("/batch")
@ApiOperation("批量同步数据到ES")
@CrossOrigin
public void saveBatchIndex(@RequestBody final ElasticsearchModel model) throws IOException
String indexName = model.getIndexName();
String indexAlias = model.getIndexAlias();
String jsonDataList = model.getData();
Class<?> clazz = FieldUtil.getClazz(indexAlias);
elasticsearchUtil.saveDataBatch(indexName,indexAlias,jsonDataList,clazz);
@GetMapping("/get")
@ApiOperation("根据主键获取ES")
@CrossOrigin
@ApiImplicitParams(
@ApiImplicitParam(name = "indexName",value = "索引名称",required = true,paramType = "query",dataType = "String"),
@ApiImplicitParam(name = "id",value = "主键",required = true,paramType = "query",dataType = "String")
)
public HttpResultUtil<String> getDataById(@RequestParam("indexName")String indexName,@RequestParam("id")String id)
return new HttpResultUtil<String>().ok(elasticsearchUtil.getDataById(indexName,id));
@PutMapping("/batch")
@ApiOperation("批量修改ES")
@CrossOrigin
public void updateDataBatch(@RequestBody final ElasticsearchModel model) throws IOException
String indexName = model.getIndexName();
String indexAlias = model.getIndexAlias();
String jsonDataList = model.getData();
Class<?> clazz = FieldUtil.getClazz(indexAlias);
elasticsearchUtil.updateDataBatch(indexName,indexAlias,jsonDataList,clazz);
@PutMapping("/sync")
@ApiOperation("同步修改ES")
@CrossOrigin
public void updateData(@RequestBody final ElasticsearchModel model)
String id = model.getId();
String indexName = model.getIndexName();
String paramJson = model.getData();
elasticsearchUtil.updateData(indexName,id,paramJson);
@DeleteMapping("/batch")
@ApiOperation("批量删除ES")
@CrossOrigin
public void deleteDataBatch(@RequestParam("indexName")String indexName,@RequestParam("ids")List<String> ids)
elasticsearchUtil.deleteDataBatch(indexName,ids);
@DeleteMapping("/sync")
@ApiOperation("同步删除ES")
@CrossOrigin
public void deleteData(@RequestParam("indexName")String indexName,@RequestParam("id")String id)
elasticsearchUtil.deleteData(indexName,id);
@DeleteMapping("/all")
@ApiOperation("清空ES")
@CrossOrigin
public void deleteAll(@RequestParam("indexName")String indexName)
elasticsearchUtil.deleteAll(indexName);
大功告成
补充:可根据自己的业务进行数据分区
以上是关于springboot 2.0集成elasticsearch 7.6.2(集群)的主要内容,如果未能解决你的问题,请参考以下文章
springboot 2.0集成elasticsearch 7.6.2(集群)
springboot 2.0 集成 kafka 2.6.2(集群) + elk 7.6.2(集群)
Spring Boot 2.0 图文教程 | 集成邮件发送功能
springboot 2.0集成elasticsearch 7.6.2 (集群)关键字高亮显示