Elastic Search(ES)使用笔记
Posted Smile_Miracle
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elastic Search(ES)使用笔记相关的知识,希望对你有一定的参考价值。
ElasticSearch介绍:
ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。
我们建立一个网站或应用程序,并要添加搜索功能,但是想要完成搜索工作的创建是非常困难的。我们希望搜索解决方案要运行速度快,我们希望能有一个零配置和一个完全免费的搜索模式,我们希望能够简单地使用JSON通过HTTP来索引数据,我们希望我们的搜索服务器始终可用,我们希望能够从一台开始并扩展到数百台,我们要实时搜索,我们要简单的多租户,我们希望建立一个云的解决方案。因此我们利用Elasticsearch来解决所有这些问题及可能出现的更多其它问题。其作用如果你用过Solr的话,他们效果是差不多的,看你怎么用,但是在数据量过亿级别的话,ES的数据处理速度就比solr快很多。
使用
① maven依赖导入:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.2.2</version>
</dependency>
② 工具类编写:
package com.mvs.utils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.infochang.comm.utils.PUtils;
import com.mvs.common.BizException;
import com.mvs.model.EsSearchVo;
@Repository
@SuppressWarnings("resource")
public class ElasticSearchUtils
//public class ElasticSearchUtils implements InitializingBean
private static Logger log = LoggerFactory.getLogger(ElasticSearchUtils.class);
private static TransportClient client;
//取得实例
public static TransportClient getTransportClient()
if (client != null)
return client;
else
try
Settings settings = Settings.builder()
.put("cluster.name",PUtils.getString("es.cluster.name"))
.put("client.transport.sniff", true) //启动集群嗅探
.build();
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName(PUtils.getString("es.cluster.url")),
Integer.valueOf(PUtils.getString("es.cluster.port"))));
catch (UnknownHostException e)
log.error(">>>>>>>>>ES连接初始化失败<<<<<<<<<", e);
client = null;
return client;
/**
* 创建索引
* @param indexName 索引名称,相当于数据库名称
* @param typeName 索引类型,相当于数据库中的表名
* @param id id名称,相当于每个表中某一行记录的标识
* @param jsonData json数据
*/
public static void createIndex(String indexName, String typeName, String id,
Map<String,Object> dataMap)
TransportClient transportClient = getTransportClient();
if(transportClient!=null)
if(dataMap!=null&&dataMap.get("summary")!=null)
log.info(">>>>>>>>>ES Temp Insert Data Summary Is:"+dataMap.get("summary").toString()+"<<<<<<<<<");
IndexRequestBuilder requestBuilder = transportClient.prepareIndex(indexName, typeName, id).setSource(dataMap);
requestBuilder.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
requestBuilder.execute().actionGet();
// transportClient.close();
else
log.error(">>>>>>>>>ES连接初始化失败,创建索引失败<<<<<<<<<");
/**
* 批量创建索引
* @param indexName 索引名称,相当于数据库名称
* @param typeName 索引类型,相当于数据库中的表名
* @param id id名称,相当于每个表中某一行记录的标识
* @param jsonData json数据
*/
public static void createIndexDoBatch(String indexName, String typeName, String id, List<Map<String,Object>> dataLst)
TransportClient transportClient = getTransportClient();
if(transportClient!=null)
BulkRequestBuilder bulkRequest = transportClient.prepareBulk();
if(dataLst!=null&&dataLst.size()>0)
for (Map<String,Object> dataMap : dataLst)
bulkRequest.add(transportClient.prepareIndex(indexName, typeName).setSource(dataMap));
bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
bulkRequest.execute().actionGet();
/**
* 执行搜索
* @param indexname 索引名称
* @param type 索引类型
* @param queryBuilder 查询条件
* @return
*/
public static SearchResponse searcher(String indexName, String typeName, QueryBuilder queryBuilder)
TransportClient transportClient = getTransportClient();
SearchResponse searchResponse = transportClient.prepareSearch(indexName)
.setTypes(typeName).setQuery(queryBuilder).execute()
.actionGet();//执行查询
// transportClient.close();
return searchResponse;
/**
* 执行搜索
* @param indexname 索引名称
* @param type 索引类型
* @param queryBuilder 查询条件
* @return
*/
@SuppressWarnings("unchecked")
public static List<Map<String,Object>> searcher(QueryBuilder queryBuilder,String indexName, String typeName)
List<Map<String,Object>> dataMap = new ArrayList<>();
TransportClient transportClient = getTransportClient();
try
SearchResponse searchResponse = transportClient.prepareSearch(indexName)
.setTypes(typeName).setQuery(queryBuilder).execute()
.actionGet();//执行查询
if(searchResponse!=null)
SearchHits hits = searchResponse.getHits();
for (int i = 0; i < hits.getHits().length; i++)
if (null == hits.getHits()[i])
continue;
else
dataMap.add(JSON.parseObject(hits.getHits()[i].getSourceAsString(), Map.class));
else
log.error("ES未查询到任何结果!!!");
catch (Exception e)
log.error("ES查询异常!!!");
throw new BizException(e.getMessage(), e.getCause());
return dataMap;
/**
* 更新索引
* @param indexName 索引名称
* @param typeName 索引类型
* @param id id名称
* @param jsonData json数据
*/
public static void updateIndex(String indexName, String typeName, String id, Map<String,Object> dataMap)
TransportClient transportClient = getTransportClient();
UpdateRequestBuilder updateRequest = transportClient.prepareUpdate(indexName, typeName, id).setDoc(dataMap);
updateRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
updateRequest.execute().actionGet();
// transportClient.close();
public static Map<String, Object> getEsMatchFields(Map<String,Object> dataMap)
Map<String,Object> fieldMap = new HashMap<>();
try
String fields = PUtils.getString("zabbix.es.match.field");
if(StringUtils.isBlank(fields))
log.error("*********请先在配置文件中配置Zabbix命中字段*********");
fieldMap = null;
else
String[] fieldArr = fields.split(",",-1);
for (String key : fieldArr)
if(dataMap.get(key) == null)
log.error("*********当前数据不包含要命中的字段,请检查数据字段*********");
break;
String fieldValue = dataMap.get(key).toString();
if(StringUtils.isBlank(fieldValue))
log.error("*********当前数据不包含要命中的字段,请检查数据字段*********");
break;
else
fieldMap.put(key, fieldValue);
catch (Exception e)
log.error("*********设置Zabbix命中字段数据异常*********",e);
return fieldMap;
/**
* 根据查询数据更新
* @param indexName 索引名称
* @param typeName 索引类型
* @param flag
* @param jsonData json数据
*/
@SuppressWarnings( "rawtypes")
public static void updateByQueryNotice(String indexName, String typeName, Map<String,Object> dataMap, boolean flag)
try
TransportClient transportClient = getTransportClient();
Map<String, Object> conMap = getEsMatchFields(dataMap);
if(conMap!=null&&conMap.size()>0)
List<String> fieldLst = new ArrayList<>(conMap.keySet()) ;
BoolQueryBuilder builder = null;
for (int i=0;i< fieldLst.size(); i++)
if(i==0)
builder = QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSE"))
.must(QueryBuilders.matchQuery(fieldLst.get(i),conMap.get(fieldLst.get(i)).toString()));
else
builder = builder.must(QueryBuilders.matchQuery(fieldLst.get(i),conMap.get(fieldLst.get(i)).toString()));
SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
SearchResponse myresponse=responsebuilder.setQuery(builder).execute().actionGet();
if(myresponse!=null)
SearchHits hits = myresponse.getHits();
for (int i = 0; i < hits.getHits().length; i++)
if (null == hits.getHits()[i])
log.error("ES未查询到任何结果!!!");
else
if(flag)
log.info("ES Match Data Length Is:"+hits.getTotalHits());
Map map = JSONObject.parseObject(hits.getHits()[i].getSourceAsString(),Map.class);
if(map.get("repeat_count")!=null&&StringUtils.isNotEmpty(map.get("repeat_count").toString()))
dataMap.put("repeat_count",(Integer)map.get("repeat_count")+1);
updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
else
updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
else
log.error("ES未查询到任何结果!!!");
else
log.error("当前搜索没有条件数据,请先确认条件是否添加");
catch (Exception e)
log.error("ES Query Search Failed!!!",e);
/**
* 根据查询数据更新
* @param indexName 索引名称
* @param typeName 索引类型
* @param flag
* @param jsonData json数据
*/
@SuppressWarnings( "unchecked")
public static List<Map<String,Object>> getZabbixMatchData(String indexName, String typeName, Map<String,Object> dataMap)
List<Map<String,Object>> resultMap = new ArrayList<>();
TransportClient transportClient = getTransportClient();
try
if(dataMap!=null&&dataMap.size()>0)
List<String> fieldLst = new ArrayList<>(dataMap.keySet()) ;
BoolQueryBuilder builder = null;
for (int i=0;i< fieldLst.size(); i++)
if(i==0)
builder = QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSE"))
.must(QueryBuilders.matchQuery(fieldLst.get(i),dataMap.get(fieldLst.get(i)).toString()));
else
builder = builder.must(QueryBuilders.matchQuery(fieldLst.get(i),dataMap.get(fieldLst.get(i)).toString()));
SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
SearchResponse myresponse=responsebuilder.setQuery(builder).execute().actionGet();
if(myresponse!=null)
SearchHits hits = myresponse.getHits();
for (int i = 0; i < hits.getHits().length; i++)
if (null == hits.getHits()[i])
continue;
else
resultMap.add(JSON.parseObject(hits.getHits()[i].getSourceAsString(), Map.class));
else
log.error("ES未查询到任何结果!!!");
else
log.error("当前搜索没有条件数据,请先确认条件是否添加");
catch (Exception e)
log.error("ES Query Search Failed!!!",e);
return resultMap;
/**
* 根据查询数据更新
* @param indexName 索引名称
* @param typeName 索引类型
* @param flag
* @param jsonData json数据
*/
@SuppressWarnings( "rawtypes")
public static void updateByQueryNew(String indexName, String typeName, Map<String,Object> dataMap, boolean flag)
try
TransportClient transportClient = getTransportClient();
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap);
if(jsonObject!=null&&jsonObject.size()>0)
Set<String> keySet = jsonObject.keySet();
List<SearchRequestBuilder > dataLst = new ArrayList<>();
for (String string : keySet)
SearchRequestBuilder requestBuilder = transportClient.prepareSearch().setQuery(
QueryBuilders.matchQuery(string, jsonObject.get(string)).operator(Operator.AND))
.addAggregation(AggregationBuilders.terms(string).field(jsonObject.get(string).toString()));
dataLst.add(requestBuilder);
if(dataLst==null||dataLst.size()==0)
log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<");
else
//取最后一条聚合查询的数据
MultiSearchRequestBuilder mrBuilder = transportClient.prepareMultiSearch();
mrBuilder.add(dataLst.get(dataLst.size()-1));
MultiSearchResponse multiResponse = mrBuilder.execute().actionGet();
if(multiResponse!=null)
for (MultiSearchResponse.Item item : multiResponse.getResponses())
SearchResponse response = item.getResponse();
if(response!=null&&response.getHits().getTotalHits()>0)
SearchHit[] hits = response.getHits().getHits();
log.info("ES Match Data Length Is:"+response.getHits().getTotalHits());
for (SearchHit searchHit : hits)
if(flag)
Map map = JSONObject.parseObject(searchHit.getSourceAsString(),Map.class);
if(map.get("repeat_count")!=null&&StringUtils.isNotEmpty(map.get("repeat_count").toString()))
dataMap.put("repeat_count",(Integer)map.get("repeat_count")+1);
updateIndex(indexName,typeName,searchHit.getId(),dataMap);
else
updateIndex(indexName,typeName,searchHit.getId(),dataMap);
else
log.error(">>>>>>>>>ES DO NOT MATCH ANAY DATA!!!<<<<<<<<<");
else
log.error(">>>>>>>>>ES DO NOT MATCH ANAY DATA!!!<<<<<<<<<");
catch (Exception e)
log.error("ES Query Search Failed!!!",e);
/**
* 根据查询数据更新
* @param indexName 索引名称
* @param typeName 索引类型
* @param flag
* @param jsonData json数据
*/
public static void updateByQueryTemp(String indexName, String typeName, Map<String,Object> dataMap)
try
TransportClient transportClient = getTransportClient();
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap);
if(jsonObject!=null&&jsonObject.size()>0)
Set<String> keySet = jsonObject.keySet();
List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>();
for (String key : keySet)
if(key.equals("sourceId"))
dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key)));
if(dataLst.size()!=1)
log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<");
else
QueryBuilder qb = QueryBuilders.boolQuery().must(dataLst.get(0));
SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet();
if(myresponse!=null)
SearchHits hits = myresponse.getHits();
for (int i = 0; i < hits.getHits().length; i++)
if (null == hits.getHits()[i])
log.error("ES未查询到任何结果!!!");
else
log.info("ES Match Data Length Is:"+hits.getTotalHits());
// log.info("ES Temp Update Data Summary Is:"+dataMap.get("summary").toString());
updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
else
log.error("ES未查询到任何结果!!!");
catch (Exception e)
log.error("ES Query Search Failed!!!",e);
/**
* 根据查询数据更新
* @param indexName 索引名称
* @param typeName 索引类型
* @param flag
* @param jsonData json数据
*/
public static void updateByQueryTempAlert(String indexName, String typeName, Map<String,Object> dataMap)
try
TransportClient transportClient = getTransportClient();
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap);
if(jsonObject!=null&&jsonObject.size()>0)
Set<String> keySet = jsonObject.keySet();
List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>();
for (String key : keySet)
if(key.equals("alertNum"))
dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key)));
if(dataLst.size()!=1)
log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<");
else
QueryBuilder qb = QueryBuilders.boolQuery().must(dataLst.get(0));
SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet();
if(myresponse!=null)
SearchHits hits = myresponse.getHits();
for (int i = 0; i < hits.getHits().length; i++)
if (null == hits.getHits()[i])
log.error("ES未查询到任何结果!!!");
else
log.info("ES Match Data Length Is:"+hits.getTotalHits());
// log.info("ES Temp Update Data Summary Is:"+dataMap.get("summary").toString());
updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
else
log.error("ES未查询到任何结果!!!");
catch (Exception e)
log.error("ES Query Search Failed!!!",e);
/**
* 删除指定索引
* @param indexName
* @param typeName
* @param id
*/
public static void deleteIndex(String indexName, String typeName, String id)
TransportClient transportClient = getTransportClient();
transportClient.prepareDelete(indexName, typeName, id).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get();
// transportClient.close();
/**
* 判断一个index中的type是否有数据
* @param index
* @param type
* @return
* @throws Exception
*/
public static Boolean existDocOfType(String index, String type) throws Exception
SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setSize(1);
SearchResponse response = builder.execute().actionGet();
long docNum = response.getHits().getTotalHits();
if (docNum == 0)
return false;
return true;
/**
* 根据type来删除数据
* @param index
* @param types
* @return
*/
public static long deleteDocByType(String index, String[] types)
TransportClient transportClient = getTransportClient();
long oldTime = System.currentTimeMillis();
StringBuilder b = new StringBuilder();
b.append("\\"query\\":\\"match_all\\":");
DeleteByQueryResponse response = new DeleteByQueryRequestBuilder(transportClient, DeleteByQueryAction.INSTANCE)
.setIndices(index).setTypes(types)
.setSource(b.toString())
.execute().actionGet();
Stack<String> allTypes = new Stack<String>();
for(String type : types)
allTypes.add(type);
while(!allTypes.isEmpty())
String type = allTypes.pop();
while(true)
try
if (existDocOfType(index, type) == false)
break;
catch (Exception e)
log.error("queryError: " + e.getMessage());
System.out.println(System.currentTimeMillis() - oldTime);
return response.getTotalDeleted();
/**
* 根据字段删除数据
* @param (indexName,typeName,field,value)
* @return
*/
public static boolean deleteByField(String indexName,String typeName,String field,String value)
boolean flag = false;
try
SearchResponse searchResponse = searcher(indexName,typeName,QueryBuilders.matchQuery(field, value));
if(searchResponse!=null)
SearchHits hits = searchResponse.getHits();
for (int i = 0; i < hits.getHits().length; i++)
if (null == hits.getHits()[i])
log.error("ES未查询到任何结果!!!");
else
log.info("Color Rules Will Be Deleted Length IS: "+hits.getTotalHits());
deleteIndex(indexName,typeName,hits.getHits()[i].getId());
flag = true;
log.info("ES Data Delete By Field Success!!!");
catch (Exception e)
log.error("ES Data Delete By Field Exception!!!",e);
flag = false;
return flag;
/**
* 分页查询
* @param sortType
* @param sort
* @param index
* @param types
* @return
*/
public static EsSearchVo findByPage(String indexName, Integer size,Integer pageNum,QueryBuilder queryBuilder, Object sort, Object sortType)
EsSearchVo vo = new EsSearchVo();
TransportClient transportClient = getTransportClient();
SearchResponse res = null;
SearchResponse searchResponse = null;
if(sort!=null&&sortType.toString().equals("DESC"))
searchResponse = transportClient.prepareSearch(indexName)
.setQuery(queryBuilder)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。
.setSize(size)
.addSort(sort.toString(), SortOrder.DESC)
.setExplain(true)// 设置是否按查询匹配度排序
.setScroll(new TimeValue(20000)).execute() //设置TimeValue表示需要保持搜索的上下文时间。
.actionGet();//注意:首次搜索已经包含数据
else if(sort!=null&&sortType.toString().equals("ASC"))
searchResponse = transportClient.prepareSearch(indexName)
.setQuery(queryBuilder)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。
.setSize(size)
.addSort(sort.toString(), SortOrder.ASC)
.setExplain(true)// 设置是否按查询匹配度排序
.setScroll(new TimeValue(20000)).execute() //设置TimeValue表示需要保持搜索的上下文时间。
.actionGet();//注意:首次搜索已经包含数据
else
searchResponse = transportClient.prepareSearch(indexName)
.setQuery(queryBuilder)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。
.setSize(size)
.setExplain(true)// 设置是否按查询匹配度排序
.setScroll(new TimeValue(20000)).execute() //设置TimeValue表示需要保持搜索的上下文时间。
.actionGet();//注意:首次搜索已经包含数据
//获取总数量
long totalCount = searchResponse.getHits().getTotalHits();
vo.setTotal(totalCount);
int page = 0;
int pageCount=0;
if(totalCount%size==0)
pageCount = (int)totalCount/(size);
else
pageCount = (int)totalCount/(size)+1;
if(totalCount<size)
page = 1;
else
page=(int)totalCount/(size);
log.info("*************************ES Page Query Size Number is:"+pageCount+"************************");
log.info("*************************ES Page Query Match Data Number is:"+totalCount+"************************");
for (int i = 1; i <=page; i++)
if(pageNum-1==0)
res = searchResponse;
break;
else if(pageNum-1==i)
//再次发送请求,并使用上次搜索结果的ScrollId
res = transportClient.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(20000)).execute()
.actionGet();
break;
else
searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(20000)).execute()
.actionGet();
vo.setSr(res);
return vo;
/* public static void main(String[] args)
List<Map<String,Object>> dataMap = new ArrayList<>();
Map<String,Object> m1=new HashMap<>();
m1.put("", true);
HttpUtils.doPost(url, params);
*/
/**
* 判断指定的索引名是否存在
*
* @param indexName
* 索引名
* @return 存在:true; 不存在:false;
*/
public static boolean isExistsIndex(String indexName)
IndicesExistsResponse response = getTransportClient().admin().indices().exists(new IndicesExistsRequest().indices(new String[] indexName ))
.actionGet();
return response.isExists();
/**
* 判断指定的索引的类型是否存在
*
* @param indexName
* 索引名
* @param indexType
* 索引类型
* @return 存在:true; 不存在:false;
*/
public static boolean isExistsType(String indexName, String indexType)
TypesExistsResponse response = getTransportClient().admin().indices().typesExists(new TypesExistsRequest(new String[] indexName , indexType))
.actionGet();
return response.isExists();
/**
* 创建索引
* @param indexName
* @param indexType
* @param isFielddata 是否需要支持排序
*/
public static void createIndexType(String indexName, String indexType, boolean isFielddata)
CreateIndexRequestBuilder cib = getTransportClient().admin().indices().prepareCreate(indexName);
XContentBuilder mapping;
try
mapping = XContentFactory.jsonBuilder().startObject()
.startObject("_all").field("enabled", false).endObject()// 关闭_all字段
.startObject("_source").field("enabled", true).endObject()// 打开_source字段
.startObject("properties") // 设置之定义字段
.startObject("_default_")
.field("type", "text") // 设置数据类型
// .field("date_detection", false) // 设置字段不做时间检测
.field("fielddata", isFielddata) // 设置支持排序
.endObject()
.endObject()
.endObject();
cib.addMapping(indexType, mapping);
cib.execute().actionGet();
catch (IOException e)
e.printStackTrace();
注意事项
① 在ES 5.6.9版本你可以直接用一个JSON字符串来进行新增和修改操作,但是在6.2.2版本中这个方法被取缔,使用的时候一定 要注意ES的版本,在6.0版本以后你可以直接传递一个Map来做数据的增改操作;
/**
* 创建索引
* @param indexName 索引名称,相当于数据库名称
* @param typeName 索引类型,相当于数据库中的表名
* @param id id名称,相当于每个表中某一行记录的标识
* @param jsonData json数据
*/
public static void createIndex(String indexName, String typeName, String id,
Map<String,Object> dataMap)
TransportClient transportClient = getTransportClient();
if(transportClient!=null)
IndexRequestBuilder requestBuilder = transportClient.prepareIndex(indexName, typeName, id).setSource(dataMap);
requestBuilder.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
requestBuilder.execute().actionGet();
// transportClient.close();
else
log.error(">>>>>>>>>ES连接初始化失败,创建索引失败<<<<<<<<<");
ElasticSearchUtils.createIndex(indexName, indexName, null, tempMap);
② ES工具如果在接口中调用频繁,一定要做好相应的关闭或者构建该工具的单实例以免JAVA虚拟机被耗资源,这个耗费资源的 度还是非常大的;
//取得实例
public static TransportClient getTransportClient()
if (client != null)
return client;
else
try
Settings settings = Settings.builder()
.put("cluster.name",CLUSTER_NAME)
.put("client.transport.sniff", true) //启动集群嗅探
.build();
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName(IP), PORT));
catch (UnknownHostException e)
log.error(">>>>>>>>>ES连接初始化失败<<<<<<<<<", e);
client = null;
return client;
③ Scroll分页查询的效率在数据量上来后比form to操作来的更好,但是要记住,在6.0版本后已经不存在SearType为SCAN的类 型,也就是说,它第一次查询就会给你返回数据,不会像之前一样查询第二次才给你返回数据,所以如果你想要获取全量数 据,那么代码你就得按规范来修改
/**
* 分页查询
* @param index
* @param types
* @return
*/
public static EsSearchVo findByPage(String indexName, Integer size,Integer pageNum,String sortField,QueryBuilder queryBuilder)
EsSearchVo vo = new EsSearchVo();
TransportClient transportClient = getTransportClient();
SearchResponse res = null;
SearchResponse searchResponse = transportClient.prepareSearch(indexName)
.setQuery(queryBuilder)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。
.setSize(size)
.addSort(sortField, SortOrder.DESC)
.setExplain(true)// 设置是否按查询匹配度排序
.setScroll(new TimeValue(20000)).execute() //设置TimeValue表示需要保持搜索的上下文时间。
.actionGet();//注意:首次搜索已经包含数据
//获取总数量
long totalCount = searchResponse.getHits().getTotalHits();
vo.setTotal(totalCount);
int page = 0;
int pageCount=0;
if(totalCount%size==0)
pageCount = (int)totalCount/(size);
else
pageCount = (int)totalCount/(size)+1;
if(totalCount<size)
page = 1;
else
page=(int)totalCount/(size);
log.info("*************************ES Page Query Size Number is:"+pageCount+"************************");
log.info("*************************ES Page Query Match Data Number is:"+totalCount+"************************");
for (int i = 1; i <=page; i++)
if(pageNum-1==0)
res = searchResponse;
break;
else if(pageNum-1==i)
//再次发送请求,并使用上次搜索结果的ScrollId
res = transportClient.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(20000)).execute()
.actionGet();
break;
else
searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(new TimeValue(20000)).execute()
.actionGet();
vo.setSr(res);
return vo;
④ 如果你分页的单页数量比你ES中存在的数据量还大,举例说,你ES中该查询只有5条数据,但是你分页的pageSize是10,那 么如果你不对这一步查询做相关处理的话数据你是拿不到的,处理如下:
if(totalCount<size)
page = 1;
else
page=(int)totalCount/(size);
⑤ ES新建索引也就是新增数据的流程中如果伴随着查询操作的话,那么你的自增操作必须加上更新策略支持,不然在此流程中查 询是 获取不到数据的,常用策略有RefreshPolicy.IMMEDIATE(更新策略为立即更新),RefreshPolicy.NONE(更新策略为默认,也就是每几秒执行一次批量更行,这是ES的默认策略),RefreshPolicy.WAIT_UNTIL(这个暂时不清楚,字面意识是等待什么操作完成)。ElasticSearch 实际上是伪实时的,所有分片之间默认1s同步更新
③ 使用
@SuppressWarnings("unchecked")
private Map<String, Object> inserEsFirst(ConfigVo configVo, AlertWarnVo alertWarnVo, String indexName)
Map<String,Object> resultMap = new HashMap<>();
//用来关联元数据与丰富过后的数据的标识符
String sourceId = new SimpleDateFormat("yyyyMMddhhmmssSSS").format(new Date());
boolean flag = true;
try
//如果告警状态没有,强制设置为open
if(alertWarnVo.getEventValue()==null||StringUtils.isEmpty(alertWarnVo.getEventValue().toString()))
alertWarnVo.setWarnStatus("OPEN");
else if("0".equals(alertWarnVo.getEventValue()))
alertWarnVo.setWarnStatus("CLOSE");
else if("1".equals(alertWarnVo.getEventValue()))
alertWarnVo.setWarnStatus("OPEN");
Map<String, Object> tempMap = (Map<String, Object>) JSON.parse(JSON.toJSONString(alertWarnVo));
tempMap.put("source", configVo.getDataSource());
//设置默认状态过滤标识位为false
tempMap.put("isFilter", false);
tempMap.put("sourceId", sourceId);
resultMap.put("sourceId", sourceId);
ElasticSearchUtils.createIndex(indexName, indexName, null, tempMap);
catch (Exception e)
log.error(">>>>>>>>>Es Data Insert Failed!!!<<<<<<<<<", e);
flag =false;
resultMap.put("flag", flag);
return resultMap;
@SuppressWarnings("unchecked")
private Map<String,Object> updateESFilter(ConfigVo configVo, AlertWarnVo o,String indexName)
Map<String,Object> resultMap = new HashMap<>();
boolean flag = true;
try
Map<String, Object> tempMap = (Map<String, Object>) JSON.parse(JSON.toJSONString(o));
//根据sourceId存在修改
if(tempMap.get("sourceId")!=null&&StringUtils.isNotEmpty(tempMap.get("sourceId").toString()))
tempMap.put("isFilter", true);
//indexName与typeName一样
ElasticSearchUtils.updateByQueryTemp(indexName, indexName, tempMap);
else
log.error(">>>>>>>>>Es Data Update Failed Cause SourceId Is Empty!!!<<<<<<<<<" );
flag =false;
catch (Exception e)
log.error(">>>>>>>>>Es Data Update Failed!!!<<<<<<<<<", e);
flag =false;
resultMap.put("flag", flag);
return resultMap;
@Override
public Map<String, Object> queryBaseInfo()
Map<String, Object> dataMap = new HashMap<>();
try
Calendar baseCal = Calendar.getInstance();
baseCal.set(Calendar.HOUR_OF_DAY,0);
baseCal.set(Calendar.MINUTE, 0);
baseCal.set(Calendar.SECOND, 0);
baseCal.add(Calendar.DATE,0);
//查询Disaster数据,CLOSED数据不算其中
SearchResponse searcherDisaster = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Disaster"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherDisaster!=null&&searcherDisaster.getHits().getTotalHits()!=0)
dataMap.put("DisasterCount", searcherDisaster.getHits().getTotalHits());
else
dataMap.put("DisasterCount", 0);
//查询High数据,CLOSED数据不算其中
SearchResponse searcherHigh = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "High"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherHigh!=null&&searcherHigh.getHits().getTotalHits()!=0)
dataMap.put("HighCount", searcherHigh.getHits().getTotalHits());
else
dataMap.put("HighCount", 0);
//查询Average数据,CLOSED数据不算其中
SearchResponse searcherAverage= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Average"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherAverage!=null&&searcherAverage.getHits().getTotalHits()!=0)
dataMap.put("AverageCount", searcherAverage.getHits().getTotalHits());
else
dataMap.put("AverageCount", 0);
//查询Warning数据,CLOSED数据不算其中
SearchResponse searcherWarning= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Warning"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherWarning!=null&&searcherWarning.getHits().getTotalHits()!=0)
dataMap.put("WarningCount", searcherWarning.getHits().getTotalHits());
else
dataMap.put("WarningCount", 0);
//查询Not Classified数据,CLOSED数据不算其中
SearchResponse searcherNotClassified= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Not Classified"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherNotClassified!=null&&searcherNotClassified.getHits().getTotalHits()!=0)
dataMap.put("NotClassifiedCount", searcherNotClassified.getHits().getTotalHits());
else
dataMap.put("NotClassifiedCount", 0);
//查询Information数据,CLOSED数据不算其中
SearchResponse searcherInformation= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
PUtils.getString("es.index.notice"),
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("severity", "Information"))
.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
if(searcherInformation!=null&&searcherInformation.getHits().getTotalHits()!=0)
dataMap.put("InformationCount", searcherInformation.getHits().getTotalHits());
else
dataMap.put("InformationCount", 0);
//查询近7天告警总数数据,CLOSED数据不算其中
List<Long> eventsLst = new ArrayList<>();
for(int i=0;i<7;i++)
RangeQueryBuilder rangequerybuilder = null;
if(i==6)
Calendar cal = Calendar.getInstance();
cal.set(Calendar.HOUR_OF_DAY,0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.add(Calendar.DATE,(i-6));
rangequerybuilder = QueryBuilders.rangeQuery("eventTime").gt(cal.getTimeInMillis());
else
Calendar cal1 = Calendar.getInstance();
cal1.set(Calendar.HOUR_OF_DAY,0);
cal1.set(Calendar.MINUTE, 0);
cal1.set(Calendar.SECOND, 0);
cal1.add(Calendar.DATE,(i-6));
Calendar cal2 = Calendar.getInstance();
cal2.set(Calendar.HOUR_OF_DAY,0);
cal2.set(Calendar.MINUTE, 0);
cal2.set(Calendar.SECOND, 0);
cal2.add(Calendar.DATE,(i-5));
rangequerybuilder = QueryBuilders.rangeQuery("eventTime").gt(cal1.getTimeInMillis()).lt(cal2.getTimeInMillis());
SearchResponse searcher = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"), PUtils.getString("es.index.notice"), rangequerybuilder);
if(searcher!=null&&searcher.getHits().getTotalHits()!=0)
eventsLst.add(searcher.getHits().getTotalHits());
else
eventsLst.add(0L);
dataMap.put("eventsLst", eventsLst);
catch (Exception e)
e.printStackTrace();
logger.error(">>>>>>>>>Query ES Base Source Has failed!!!<<<<<<<<<", e);
dataMap=null;
return dataMap;
@SuppressWarnings( "rawtypes", "unchecked" )
@Override
public List<Map<String, Object>> queryAllEvents(Integer pageNum, Integer pageSize)
List<Map<String, Object>> dataLst = new ArrayList<>();
try
EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
QueryBuilders.boolQuery()
.must(QueryBuilders.matchAllQuery()));
if(vo!=null&&vo.getSr()!=null)
SearchResponse searcher = vo.getSr();
logger.info("******************Total Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
SearchHits hits = searcher.getHits();
for (SearchHit searchHit : hits.getHits())
if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0)
Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
parseMap.put("page", pageNum);
parseMap.put("size", pageSize);
parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
dataLst.add(parseMap);
else
logger.info("******************No Source Matched In ES!!!!********************");
catch (Exception e)
e.printStackTrace();
logger.error("****************** Query All Es Events Failed,Cause: ********************",e);
dataLst=null;
return dataLst;
@SuppressWarnings( "rawtypes", "unchecked" )
@Override
public List<Map<String, Object>> queryCloseEvents(Integer pageNum, Integer pageSize)
List<Map<String, Object>> dataLst = new ArrayList<>();
try
EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("warnStatus","CLOSED")));
if(vo!=null&&vo.getSr()!=null)
SearchResponse searcher = vo.getSr();
logger.info("******************Closed Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
SearchHits hits = searcher.getHits();
for (SearchHit searchHit : hits.getHits())
if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0)
Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
parseMap.put("page", pageNum);
parseMap.put("size", pageSize);
parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
dataLst.add(parseMap);
catch (Exception e)
e.printStackTrace();
logger.error("****************** Query Closed Es Events Failed,Cause: ********************",e);
dataLst=null;
return dataLst;
@SuppressWarnings( "rawtypes", "unchecked" )
@Override
public List<Map<String, Object>> queryOpenEvents(Integer pageNum, Integer pageSize)
List<Map<String, Object>> dataLst = new ArrayList<>();
try
EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("warnStatus","OPEN")));
if(vo!=null&&vo.getSr()!=null)
SearchResponse searcher = vo.getSr();
logger.info("******************Open Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
SearchHits hits = searcher.getHits();
for (SearchHit searchHit : hits.getHits())
if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0)
Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
parseMap.put("page", pageNum);
parseMap.put("size", pageSize);
parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
dataLst.add(parseMap);
catch (Exception e)
e.printStackTrace();
logger.error("****************** Query Open Es Events Failed,Cause: ********************",e);
dataLst=null;
return dataLst;
特别提醒:在刚启动ES的时候,ES中是没有库的,这个时候要在服务启动的时候就创建项目中默认用到的ES库,否则会报错,创建方式分为两种,一种为实现InitializingBean接口,或者实现ApplicationListener接口然后添加如下代码,前者要在该实现类配置相应的注解,@resource或者compnent都行,然后配置文件得扫描到该实现类所在的位置;后者要在spring配置文件中注册也就是<bean>的方式。
//判断temp库是否存在
if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.temp")))
logger.error("ES库["+PUtils.getString("es.index.temp")+ "]不存在,创建新库.......");
//创建Temp库
ElasticSearchUtils.createIndexType(PUtils.getString("es.index.temp"), PUtils.getString("es.index.temp"), true);
if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.notice")))
logger.error("ES库["+PUtils.getString("es.index.notice")+ "]不存在,创建新库.......");
//创建notice库
ElasticSearchUtils.createIndexType(PUtils.getString("es.index.notice"), PUtils.getString("es.index.notice"), true);
if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.color")))
logger.error("ES库["+PUtils.getString("es.index.color")+ "]不存在,创建新库.......");
//创建color库
ElasticSearchUtils.createIndexType(PUtils.getString("es.index.color"), PUtils.getString("es.index.color"), true);
总结:
ES 的ClusterName相当于就是一个数据库mysql,indexName相当于他的库名,typeName相当于是表名,_id相当于此表的主键,当然你也可以自定义主键来完成相应的需求,它的效率与mysql在大数据量中相比要强大N倍。
以上是关于Elastic Search(ES)使用笔记的主要内容,如果未能解决你的问题,请参考以下文章