Elastic Search(ES)使用笔记

Posted Smile_Miracle

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Elastic Search(ES)使用笔记相关的知识,希望对你有一定的参考价值。

ElasticSearch介绍:

  

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

我们建立一个网站或应用程序,并要添加搜索功能,但是想要完成搜索工作的创建是非常困难的。我们希望搜索解决方案要运行速度快,我们希望能有一个零配置和一个完全免费的搜索模式,我们希望能够简单地使用JSON通过HTTP来索引数据,我们希望我们的搜索服务器始终可用,我们希望能够从一台开始并扩展到数百台,我们要实时搜索,我们要简单的多租户,我们希望建立一个云的解决方案。因此我们利用Elasticsearch来解决所有这些问题及可能出现的更多其它问题。其作用如果你用过Solr的话,他们效果是差不多的,看你怎么用,但是在数据量过亿级别的话,ES的数据处理速度就比solr快很多。

  

 

使用

① maven依赖导入:

        <dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
			<version>6.2.2</version>
		</dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.2.2</version>
        </dependency>

② 工具类编写:

package com.mvs.utils;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;

import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Repository;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.infochang.comm.utils.PUtils;
import com.mvs.common.BizException;
import com.mvs.model.EsSearchVo;

@Repository
@SuppressWarnings("resource")
public class ElasticSearchUtils
//public class ElasticSearchUtils implements InitializingBean 
	    private static Logger log = LoggerFactory.getLogger(ElasticSearchUtils.class);
	    private static TransportClient client;  
	    
	    //取得实例  
	    public static  TransportClient getTransportClient()  
	    	if (client != null)
	    		return client;
	    	else 
		    	try 
			    	  Settings settings = Settings.builder()
													    			.put("cluster.name",PUtils.getString("es.cluster.name"))  
													                .put("client.transport.sniff", true)  //启动集群嗅探
													                .build(); 
			    	  
						client = new PreBuiltTransportClient(settings)
								     .addTransportAddress(new TransportAddress(InetAddress.getByName(PUtils.getString("es.cluster.url")), 
								     Integer.valueOf(PUtils.getString("es.cluster.port"))));
					 catch (UnknownHostException e) 
						log.error(">>>>>>>>>ES连接初始化失败<<<<<<<<<", e);
						client = null;
					 
	    	
	    	return client;
	      
	    
	    /** 
	     * 创建索引 
	     * @param indexName 索引名称,相当于数据库名称 
	     * @param typeName 索引类型,相当于数据库中的表名 
	     * @param id id名称,相当于每个表中某一行记录的标识 
	     * @param jsonData json数据 
	     */  
		public static void createIndex(String indexName, String typeName, String id,  
	            Map<String,Object> dataMap)   
	    	TransportClient transportClient = getTransportClient();
	    	if(transportClient!=null) 
	    		if(dataMap!=null&&dataMap.get("summary")!=null) 
	    			log.info(">>>>>>>>>ES Temp Insert Data Summary Is:"+dataMap.get("summary").toString()+"<<<<<<<<<");
	    		
				IndexRequestBuilder requestBuilder = transportClient.prepareIndex(indexName, typeName, id).setSource(dataMap);
				requestBuilder.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
				requestBuilder.execute().actionGet();
//				transportClient.close();
	    	else 
	    		log.error(">>>>>>>>>ES连接初始化失败,创建索引失败<<<<<<<<<");
	    	
	      
		
		/** 
	     * 批量创建索引 
	     * @param indexName 索引名称,相当于数据库名称 
	     * @param typeName 索引类型,相当于数据库中的表名 
	     * @param id id名称,相当于每个表中某一行记录的标识 
	     * @param jsonData json数据 
	     */  
	public static void createIndexDoBatch(String indexName, String typeName, String id,  List<Map<String,Object>> dataLst)  
	    	TransportClient transportClient = getTransportClient();
	    	if(transportClient!=null) 
	    		BulkRequestBuilder bulkRequest = transportClient.prepareBulk();
	    		if(dataLst!=null&&dataLst.size()>0) 
    	    		for (Map<String,Object> dataMap : dataLst)
    	    			bulkRequest.add(transportClient.prepareIndex(indexName, typeName).setSource(dataMap));
    	    		
	    		
	    		bulkRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
	    		bulkRequest.execute().actionGet();
	    	
	     
	    /** 
	     * 执行搜索 
	     * @param indexname 索引名称 
	     * @param type 索引类型 
	     * @param queryBuilder 查询条件 
	     * @return 
	     */  
	    public static SearchResponse searcher(String indexName, String typeName,  QueryBuilder queryBuilder)  
	    	TransportClient transportClient = getTransportClient();
	        SearchResponse searchResponse = transportClient.prepareSearch(indexName)  
	                .setTypes(typeName).setQuery(queryBuilder).execute()  
	                .actionGet();//执行查询  
//	        transportClient.close();
	        return searchResponse;  
	      
	    
	    /** 
	     * 执行搜索 
	     * @param indexname 索引名称 
	     * @param type 索引类型 
	     * @param queryBuilder 查询条件 
	     * @return 
	     */  
	    @SuppressWarnings("unchecked")
		public static List<Map<String,Object>> searcher(QueryBuilder queryBuilder,String indexName, String typeName)  
	    	List<Map<String,Object>> dataMap = new ArrayList<>();
	    	TransportClient transportClient = getTransportClient();
	    	try 
				SearchResponse searchResponse = transportClient.prepareSearch(indexName)  
				        .setTypes(typeName).setQuery(queryBuilder).execute()  
				        .actionGet();//执行查询  
				
				if(searchResponse!=null) 
					SearchHits hits = searchResponse.getHits();
					for (int i = 0; i < hits.getHits().length; i++)   
				        if (null == hits.getHits()[i])   
				            continue;
				         else   
				        	dataMap.add(JSON.parseObject(hits.getHits()[i].getSourceAsString(), Map.class));
				            
				       
				    else 
				    	 log.error("ES未查询到任何结果!!!");  
				    
			 catch (Exception e) 
				 log.error("ES查询异常!!!");  
				 throw new BizException(e.getMessage(), e.getCause());
			
	        return dataMap;  
	      
	    
	    /** 
	     * 更新索引 
	     * @param indexName 索引名称 
	     * @param typeName 索引类型 
	     * @param id id名称 
	     * @param jsonData json数据 
	     */  
		public static void updateIndex(String indexName, String typeName, String id, Map<String,Object> dataMap)   
	    	TransportClient transportClient = getTransportClient();
			UpdateRequestBuilder updateRequest = transportClient.prepareUpdate(indexName, typeName, id).setDoc(dataMap);
			updateRequest.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
	        updateRequest.execute().actionGet();
//	        transportClient.close();
	        
	      
	    
		 public static Map<String, Object> getEsMatchFields(Map<String,Object> dataMap) 
		    	Map<String,Object> fieldMap = new HashMap<>();
		    	try 
					String fields = PUtils.getString("zabbix.es.match.field");
					if(StringUtils.isBlank(fields)) 
						log.error("*********请先在配置文件中配置Zabbix命中字段*********");
						fieldMap = null;
					else 
						String[] fieldArr = fields.split(",",-1);
						for (String key : fieldArr) 
						    if(dataMap.get(key) == null)
						        log.error("*********当前数据不包含要命中的字段,请检查数据字段*********");
                                break;
						    
							String fieldValue = dataMap.get(key).toString();
							if(StringUtils.isBlank(fieldValue)) 
								log.error("*********当前数据不包含要命中的字段,请检查数据字段*********");
								break;
							else 
								fieldMap.put(key, fieldValue);
							
						
					
				 catch (Exception e) 
					log.error("*********设置Zabbix命中字段数据异常*********",e);
				
				return fieldMap;
			
		
		
		     /** 
			 * 根据查询数据更新
			 * @param indexName 索引名称 
			 * @param typeName 索引类型 
			 * @param flag 
			 * @param jsonData json数据 
			 */  
			@SuppressWarnings( "rawtypes")
			public static void updateByQueryNotice(String indexName, String typeName, Map<String,Object> dataMap, boolean flag)   
				try 
					    TransportClient transportClient = getTransportClient();
					    Map<String, Object> conMap = getEsMatchFields(dataMap);
						if(conMap!=null&&conMap.size()>0) 
							List<String> fieldLst =  new ArrayList<>(conMap.keySet()) ;
							BoolQueryBuilder builder = null;
							for (int i=0;i< fieldLst.size(); i++) 
							    if(i==0) 
							    	builder = QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSE"))
							    			                                                           .must(QueryBuilders.matchQuery(fieldLst.get(i),conMap.get(fieldLst.get(i)).toString()));
							    else 
							    	builder =  builder.must(QueryBuilders.matchQuery(fieldLst.get(i),conMap.get(fieldLst.get(i)).toString()));
                                            
							    
						    
							SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
						    SearchResponse myresponse=responsebuilder.setQuery(builder).execute().actionGet();
							                    
							    if(myresponse!=null) 
							    	SearchHits hits = myresponse.getHits();
							    	for (int i = 0; i < hits.getHits().length; i++)   
							            if (null == hits.getHits()[i])   
							                log.error("ES未查询到任何结果!!!");
							             else   
							                	 if(flag) 
							                		 log.info("ES Match Data Length Is:"+hits.getTotalHits());
							                		 Map map = JSONObject.parseObject(hits.getHits()[i].getSourceAsString(),Map.class);
							                		 if(map.get("repeat_count")!=null&&StringUtils.isNotEmpty(map.get("repeat_count").toString())) 
							                			 dataMap.put("repeat_count",(Integer)map.get("repeat_count")+1);
							                			 updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
							                		 
							                	 else 
							                		 updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
							                	 
							                
							           
							        else 
							        	 log.error("ES未查询到任何结果!!!");  
							        
				else 
					log.error("当前搜索没有条件数据,请先确认条件是否添加");  
				
		 catch (Exception e) 
			log.error("ES Query Search Failed!!!",e);  
		
			
			
			
			 /** 
			 * 根据查询数据更新
			 * @param indexName 索引名称 
			 * @param typeName 索引类型 
			 * @param flag 
			 * @param jsonData json数据 
			 */  
			@SuppressWarnings( "unchecked")
			public static List<Map<String,Object>> getZabbixMatchData(String indexName, String typeName, Map<String,Object> dataMap)   
				List<Map<String,Object>> resultMap = new ArrayList<>();
				TransportClient transportClient = getTransportClient();
				try 
						if(dataMap!=null&&dataMap.size()>0) 
							List<String> fieldLst =  new ArrayList<>(dataMap.keySet()) ;
							BoolQueryBuilder builder = null;
						for (int i=0;i< fieldLst.size(); i++) 
							    if(i==0) 
							    	builder = QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSE"))
							    			                                                           .must(QueryBuilders.matchQuery(fieldLst.get(i),dataMap.get(fieldLst.get(i)).toString()));
							    else 
							    	builder =  builder.must(QueryBuilders.matchQuery(fieldLst.get(i),dataMap.get(fieldLst.get(i)).toString()));
                                            
							    
						    
							SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
						    SearchResponse myresponse=responsebuilder.setQuery(builder).execute().actionGet();
							                    
						    if(myresponse!=null) 
						    	SearchHits hits = myresponse.getHits();
						    	for (int i = 0; i < hits.getHits().length; i++)   
						            if (null == hits.getHits()[i])   
						                continue;
						             else   
						            	resultMap.add(JSON.parseObject(hits.getHits()[i].getSourceAsString(), Map.class));
						                
						           
						        else 
						        	 log.error("ES未查询到任何结果!!!");  
						        
						else 
							log.error("当前搜索没有条件数据,请先确认条件是否添加");  
						
		 catch (Exception e) 
			log.error("ES Query Search Failed!!!",e);  
		
			return resultMap;
	
			
			
			 /** 
			 * 根据查询数据更新
			 * @param indexName 索引名称 
			 * @param typeName 索引类型 
			 * @param flag 
			 * @param jsonData json数据 
			 */  
			@SuppressWarnings( "rawtypes")
			public static void updateByQueryNew(String indexName, String typeName, Map<String,Object> dataMap, boolean flag)   
				try 
					    TransportClient transportClient = getTransportClient();
						JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap);
						if(jsonObject!=null&&jsonObject.size()>0) 
							Set<String> keySet = jsonObject.keySet();
							List<SearchRequestBuilder > dataLst = new ArrayList<>();
					        for (String string : keySet) 
					        	SearchRequestBuilder requestBuilder = transportClient.prepareSearch().setQuery(
					        			QueryBuilders.matchQuery(string, jsonObject.get(string)).operator(Operator.AND))
					        			.addAggregation(AggregationBuilders.terms(string).field(jsonObject.get(string).toString()));
					        	dataLst.add(requestBuilder);
							
					        
							
							if(dataLst==null||dataLst.size()==0) 
								log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<");
							else 
								    //取最后一条聚合查询的数据
								    MultiSearchRequestBuilder mrBuilder = transportClient.prepareMultiSearch();
							        mrBuilder.add(dataLst.get(dataLst.size()-1));
							        
							        MultiSearchResponse multiResponse = mrBuilder.execute().actionGet();
							        if(multiResponse!=null) 
							        	 for (MultiSearchResponse.Item item : multiResponse.getResponses()) 
									            SearchResponse response = item.getResponse();
									            if(response!=null&&response.getHits().getTotalHits()>0) 
									            	 SearchHit[] hits = response.getHits().getHits();
									            	 log.info("ES Match Data Length Is:"+response.getHits().getTotalHits());
											         for (SearchHit searchHit : hits) 
											            	 if(flag) 
										                		 Map map = JSONObject.parseObject(searchHit.getSourceAsString(),Map.class);
										                		 if(map.get("repeat_count")!=null&&StringUtils.isNotEmpty(map.get("repeat_count").toString())) 
										                			 dataMap.put("repeat_count",(Integer)map.get("repeat_count")+1);
										                			 updateIndex(indexName,typeName,searchHit.getId(),dataMap);
										                		 
										                	 else 
										                		 updateIndex(indexName,typeName,searchHit.getId(),dataMap);
										                	 
														
									            else 
									            	log.error(">>>>>>>>>ES DO NOT MATCH ANAY DATA!!!<<<<<<<<<");
									            
									        
							        else 
							        	log.error(">>>>>>>>>ES DO NOT MATCH ANAY DATA!!!<<<<<<<<<");
							        
							     
						
				 catch (Exception e) 
					log.error("ES Query Search Failed!!!",e);  
				
			
			
         /** 
		 * 根据查询数据更新
		 * @param indexName 索引名称 
		 * @param typeName 索引类型 
		 * @param flag 
		 * @param jsonData json数据 
		 */  
		public static void updateByQueryTemp(String indexName, String typeName, Map<String,Object> dataMap)   
			try 
				    TransportClient transportClient = getTransportClient();
					JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap);
					if(jsonObject!=null&&jsonObject.size()>0) 
						Set<String> keySet = jsonObject.keySet();
						List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>();
						for (String key : keySet) 
							if(key.equals("sourceId")) 
								dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key)));
							
					    
						
						if(dataLst.size()!=1) 
							log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<");
						else 
								  QueryBuilder qb  = QueryBuilders.boolQuery().must(dataLst.get(0));
								  SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
								  SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet();
								                    
								    if(myresponse!=null) 
								    	SearchHits hits = myresponse.getHits();
								    	for (int i = 0; i < hits.getHits().length; i++)   
								            if (null == hits.getHits()[i])   
								                log.error("ES未查询到任何结果!!!");
								             else   
								                		 log.info("ES Match Data Length Is:"+hits.getTotalHits());
								                		 // log.info("ES Temp Update Data Summary Is:"+dataMap.get("summary").toString());
								                		 updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
								                
								           
								        else 
								        	 log.error("ES未查询到任何结果!!!");  
								        
						     
					
			 catch (Exception e) 
				log.error("ES Query Search Failed!!!",e);  
			
		
						
        /** 
        * 根据查询数据更新
        * @param indexName 索引名称 
        * @param typeName 索引类型 
        * @param flag 
        * @param jsonData json数据 
        */  
       public static void updateByQueryTempAlert(String indexName, String typeName, Map<String,Object> dataMap)   
           try 
                   TransportClient transportClient = getTransportClient();
                   JSONObject jsonObject = (JSONObject) JSONObject.toJSON(dataMap);
                   if(jsonObject!=null&&jsonObject.size()>0) 
                       Set<String> keySet = jsonObject.keySet();
                       List<MatchPhraseQueryBuilder > dataLst = new ArrayList<>();
                       for (String key : keySet) 
                           if(key.equals("alertNum")) 
                               dataLst.add(QueryBuilders.matchPhraseQuery(key,jsonObject.get(key)));
                           
                       
                       
                       if(dataLst.size()!=1) 
                           log.error(">>>>>>>>>Update Data is Not Right,Pleace Check Out Data Fileds!!!<<<<<<<<<");
                       else 
                                 QueryBuilder qb  = QueryBuilders.boolQuery().must(dataLst.get(0));
                                 SearchRequestBuilder responsebuilder = transportClient.prepareSearch(indexName).setTypes(typeName);
                                 SearchResponse myresponse=responsebuilder.setQuery(qb).execute().actionGet();
                                                   
                                   if(myresponse!=null) 
                                       SearchHits hits = myresponse.getHits();
                                       for (int i = 0; i < hits.getHits().length; i++)   
                                           if (null == hits.getHits()[i])   
                                               log.error("ES未查询到任何结果!!!");
                                            else   
                                                        log.info("ES Match Data Length Is:"+hits.getTotalHits());
                                                        // log.info("ES Temp Update Data Summary Is:"+dataMap.get("summary").toString());
                                                        updateIndex(indexName,typeName,hits.getHits()[i].getId(),dataMap);
                                               
                                          
                                       else 
                                            log.error("ES未查询到任何结果!!!");  
                                       
                            
                   
            catch (Exception e) 
               log.error("ES Query Search Failed!!!",e);  
           
       	
		
	    /** 
	     * 删除指定索引 
	     * @param indexName 
	     * @param typeName 
	     * @param id 
	     */  
	    public static void deleteIndex(String indexName, String typeName, String id)   
	    	TransportClient transportClient = getTransportClient();
	    	transportClient.prepareDelete(indexName, typeName, id).setRefreshPolicy(RefreshPolicy.IMMEDIATE).get();  
//	    	transportClient.close();
	     
	    
	    /** 
	     * 判断一个index中的type是否有数据 
	     * @param index 
	     * @param type 
	     * @return 
	     * @throws Exception 
	     */  
	    public static Boolean existDocOfType(String index, String type) throws Exception   
	        SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type)  
	                .setSearchType(SearchType.QUERY_THEN_FETCH)  
	                .setSize(1);  
	        SearchResponse response = builder.execute().actionGet();  
	        long docNum = response.getHits().getTotalHits();  
	        if (docNum == 0)   
	            return false;  
	          
	        return true;  
	      
	  
	    /** 
	     * 根据type来删除数据 
	     * @param index 
	     * @param types 
	     * @return 
	     */  
	    public static long deleteDocByType(String index, String[] types)   
	    	TransportClient transportClient = getTransportClient();
	        long oldTime = System.currentTimeMillis();  
	        StringBuilder b = new StringBuilder();  
	        b.append("\\"query\\":\\"match_all\\":");  
	        DeleteByQueryResponse response = new DeleteByQueryRequestBuilder(transportClient, DeleteByQueryAction.INSTANCE)  
	        .setIndices(index).setTypes(types)  
	        .setSource(b.toString())  
	        .execute().actionGet();  
	        Stack<String> allTypes = new Stack<String>();  
	        for(String type : types)  
	            allTypes.add(type);  
	          
	        while(!allTypes.isEmpty())  
	            String type = allTypes.pop();  
	            while(true)  
	                try   
	                    if (existDocOfType(index, type) == false)   
	                        break;  
	                      
	                 catch (Exception e)   
	                    log.error("queryError: " + e.getMessage());  
	                  
	              
	          
	        System.out.println(System.currentTimeMillis() - oldTime);  
	        return response.getTotalDeleted();  
	      
	    
	    /** 
	     * 根据字段删除数据
	     * @param (indexName,typeName,field,value)
	     * @return 
	     */
	    public  static boolean deleteByField(String indexName,String typeName,String field,String value) 
	    	boolean flag = false;
	    	try 
				SearchResponse searchResponse = searcher(indexName,typeName,QueryBuilders.matchQuery(field, value));
				if(searchResponse!=null) 
					SearchHits hits = searchResponse.getHits();
					for (int i = 0; i < hits.getHits().length; i++)   
				        if (null == hits.getHits()[i])   
				            log.error("ES未查询到任何结果!!!");
				         else   
				            		 log.info("Color Rules Will Be Deleted Length IS: "+hits.getTotalHits());
				            		 deleteIndex(indexName,typeName,hits.getHits()[i].getId());
				            
				       
				    
				flag = true;
				log.info("ES Data Delete By Field Success!!!");
			 catch (Exception e) 
				log.error("ES Data Delete By Field Exception!!!",e);
				flag = false;
			
	    	return flag;
	    
	    
	    /** 
	     * 分页查询
	     * @param sortType 
	     * @param sort 
	     * @param index 
	     * @param types 
	     * @return 
	     */  
	   public static EsSearchVo findByPage(String indexName, Integer size,Integer pageNum,QueryBuilder queryBuilder, Object sort, Object sortType)    
		   EsSearchVo vo = new EsSearchVo();
	    	TransportClient transportClient = getTransportClient();
	    	SearchResponse res = null;
	    	SearchResponse searchResponse = null;
	    	if(sort!=null&&sortType.toString().equals("DESC")) 
	    		searchResponse = transportClient.prepareSearch(indexName)
						.setQuery(queryBuilder)       
                        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。
                        .setSize(size)
                        .addSort(sort.toString(), SortOrder.DESC)
		                .setExplain(true)// 设置是否按查询匹配度排序
		                .setScroll(new TimeValue(20000)).execute()  //设置TimeValue表示需要保持搜索的上下文时间。
		                .actionGet();//注意:首次搜索已经包含数据
	    	else if(sort!=null&&sortType.toString().equals("ASC")) 
	    		searchResponse = transportClient.prepareSearch(indexName)
						.setQuery(queryBuilder)       
                        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。
                        .setSize(size)
                        .addSort(sort.toString(), SortOrder.ASC)
		                .setExplain(true)// 设置是否按查询匹配度排序
		                .setScroll(new TimeValue(20000)).execute()  //设置TimeValue表示需要保持搜索的上下文时间。
		                .actionGet();//注意:首次搜索已经包含数据
	    	else 
	    		searchResponse = transportClient.prepareSearch(indexName)
						.setQuery(queryBuilder)       
                        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。
                        .setSize(size)
		                .setExplain(true)// 设置是否按查询匹配度排序
		                .setScroll(new TimeValue(20000)).execute()  //设置TimeValue表示需要保持搜索的上下文时间。
		                .actionGet();//注意:首次搜索已经包含数据
	    	
	    	  
	        //获取总数量  
	        long totalCount = searchResponse.getHits().getTotalHits();  
	        vo.setTotal(totalCount);
	        
	        int page  = 0;
	        int pageCount=0;
	        if(totalCount%size==0) 
	        	pageCount = (int)totalCount/(size);
	        else 
	        	pageCount = (int)totalCount/(size)+1;
	        
	        if(totalCount<size) 
	        	page = 1;
	        else 
	        	page=(int)totalCount/(size);
	        
	        
	        log.info("*************************ES Page Query Size Number is:"+pageCount+"************************");  
	        log.info("*************************ES Page Query Match Data Number is:"+totalCount+"************************");  
	        for (int i = 1; i <=page; i++)   
	        	if(pageNum-1==0) 
	        		res = searchResponse; 
	        		break;
	        	else if(pageNum-1==i)
	            //再次发送请求,并使用上次搜索结果的ScrollId  
	        		res = transportClient.prepareSearchScroll(searchResponse.getScrollId())  
														                    .setScroll(new TimeValue(20000)).execute()  
														                    .actionGet();  
	                    
	              break;
	        	else 
	        		searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId())  
		                    .setScroll(new TimeValue(20000)).execute()  
		                    .actionGet();  
	        	
	          
	        vo.setSr(res);
	        return vo;
	    
	   		
	/*   public static void main(String[] args) 
		   List<Map<String,Object>> dataMap = new ArrayList<>();
		   Map<String,Object> m1=new HashMap<>();
		   m1.put("", true);
		   HttpUtils.doPost(url, params);
	*/
	   
    /**
     * 判断指定的索引名是否存在
     * 
     * @param indexName
     *            索引名
     * @return 存在:true; 不存在:false;
     */
    public static boolean isExistsIndex(String indexName) 
        IndicesExistsResponse response = getTransportClient().admin().indices().exists(new IndicesExistsRequest().indices(new String[]  indexName ))
                .actionGet();
        return response.isExists();
    
    
    /**
     * 判断指定的索引的类型是否存在
     * 
     * @param indexName
     *            索引名
     * @param indexType
     *            索引类型
     * @return 存在:true; 不存在:false;
     */
    public static boolean isExistsType(String indexName, String indexType) 
        TypesExistsResponse response = getTransportClient().admin().indices().typesExists(new TypesExistsRequest(new String[]  indexName , indexType))
                .actionGet();
        return response.isExists();
    
    
    /**
     * 创建索引
     * @param indexName
     * @param indexType
     * @param isFielddata 是否需要支持排序
     */
    public static void createIndexType(String indexName, String indexType, boolean isFielddata) 
        CreateIndexRequestBuilder cib = getTransportClient().admin().indices().prepareCreate(indexName);
        XContentBuilder mapping;
        try 
            mapping = XContentFactory.jsonBuilder().startObject()
                        .startObject("_all").field("enabled", false).endObject()// 关闭_all字段
                        .startObject("_source").field("enabled", true).endObject()// 打开_source字段
                        .startObject("properties") // 设置之定义字段
                            .startObject("_default_")
                            .field("type", "text") // 设置数据类型
//                            .field("date_detection", false) // 设置字段不做时间检测
                            .field("fielddata", isFielddata) // 设置支持排序
                            .endObject()
                        .endObject()
                       .endObject();
            cib.addMapping(indexType, mapping);
            cib.execute().actionGet();
         catch (IOException e) 
            e.printStackTrace();
        
    



注意事项

 ① 在ES 5.6.9版本你可以直接用一个JSON字符串来进行新增和修改操作,但是在6.2.2版本中这个方法被取缔,使用的时候一定       要注意ES的版本,在6.0版本以后你可以直接传递一个Map来做数据的增改操作;

    

 /** 
	     * 创建索引 
	     * @param indexName 索引名称,相当于数据库名称 
	     * @param typeName 索引类型,相当于数据库中的表名 
	     * @param id id名称,相当于每个表中某一行记录的标识 
	     * @param jsonData json数据 
	     */  
		public static void createIndex(String indexName, String typeName, String id,  
	            Map<String,Object> dataMap)   
	    	TransportClient transportClient = getTransportClient();
	    	if(transportClient!=null) 
				IndexRequestBuilder requestBuilder = transportClient.prepareIndex(indexName, typeName, id).setSource(dataMap);
				requestBuilder.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
				requestBuilder.execute().actionGet();
//				transportClient.close();
	    	else 
	    		log.error(">>>>>>>>>ES连接初始化失败,创建索引失败<<<<<<<<<");
	    	
	      

 

ElasticSearchUtils.createIndex(indexName, indexName, null, tempMap);

 

② ES工具如果在接口中调用频繁,一定要做好相应的关闭或者构建该工具的单实例以免JAVA虚拟机被耗资源,这个耗费资源的      度还是非常大的;

 

    

  //取得实例  
	    public static  TransportClient getTransportClient()  
	    	if (client != null)
	    		return client;
	    	else 
		    	try 
			    	  Settings settings = Settings.builder()
													    			.put("cluster.name",CLUSTER_NAME)  
													                .put("client.transport.sniff", true)  //启动集群嗅探
													                .build(); 
			    	
						client = new PreBuiltTransportClient(settings)
								     .addTransportAddress(new TransportAddress(InetAddress.getByName(IP), PORT));
					 catch (UnknownHostException e) 
						log.error(">>>>>>>>>ES连接初始化失败<<<<<<<<<", e);
						client = null;
					 
	    	
	    	return client;
	      

 

③ Scroll分页查询的效率在数据量上来后比form to操作来的更好,但是要记住,在6.0版本后已经不存在SearType为SCAN的类      型,也就是说,它第一次查询就会给你返回数据,不会像之前一样查询第二次才给你返回数据,所以如果你想要获取全量数        据,那么代码你就得按规范来修改

 

   

/** 
	     * 分页查询
	     * @param index 
	     * @param types 
	     * @return 
	     */  
	   public static EsSearchVo findByPage(String indexName, Integer size,Integer pageNum,String sortField,QueryBuilder queryBuilder)    
		   EsSearchVo vo = new EsSearchVo();
	    	TransportClient transportClient = getTransportClient();
	    	SearchResponse res = null;
	    	SearchResponse searchResponse = transportClient.prepareSearch(indexName)
	    																								.setQuery(queryBuilder)       
	    			                                                                                    .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)//这种方式返回的document与用户要求的size是相等的。
	    			                                                                                    .setSize(size)
																						                .addSort(sortField, SortOrder.DESC)
																						                .setExplain(true)// 设置是否按查询匹配度排序
																						                .setScroll(new TimeValue(20000)).execute()  //设置TimeValue表示需要保持搜索的上下文时间。
																						                .actionGet();//注意:首次搜索已经包含数据  
	        //获取总数量  
	        long totalCount = searchResponse.getHits().getTotalHits();  
	        vo.setTotal(totalCount);
	        
	        int page  = 0;
	        int pageCount=0;
	        if(totalCount%size==0) 
	        	pageCount = (int)totalCount/(size);
	        else 
	        	pageCount = (int)totalCount/(size)+1;
	        
	        if(totalCount<size) 
	        	page = 1;
	        else 
	        	page=(int)totalCount/(size);
	        
	        
	        log.info("*************************ES Page Query Size Number is:"+pageCount+"************************");  
	        log.info("*************************ES Page Query Match Data Number is:"+totalCount+"************************");  
	        for (int i = 1; i <=page; i++)   
	        	if(pageNum-1==0) 
	        		res = searchResponse; 
	        		break;
	        	else if(pageNum-1==i)
	            //再次发送请求,并使用上次搜索结果的ScrollId  
	        		res = transportClient.prepareSearchScroll(searchResponse.getScrollId())  
														                    .setScroll(new TimeValue(20000)).execute()  
														                    .actionGet();  
	                    
	              break;
	        	else 
	        		searchResponse = transportClient.prepareSearchScroll(searchResponse.getScrollId())  
		                    .setScroll(new TimeValue(20000)).execute()  
		                    .actionGet();  
	        	
	          
	        vo.setSr(res);
	        return vo;
	    

④ 如果你分页的单页数量比你ES中存在的数据量还大,举例说,你ES中该查询只有5条数据,但是你分页的pageSize是10,那        么如果你不对这一步查询做相关处理的话数据你是拿不到的,处理如下:

   

 if(totalCount<size) 
	        	page = 1;
	        else 
	        	page=(int)totalCount/(size);
	        

⑤ ES新建索引也就是新增数据的流程中如果伴随着查询操作的话,那么你的自增操作必须加上更新策略支持,不然在此流程中查 询是 获取不到数据的,常用策略有RefreshPolicy.IMMEDIATE(更新策略为立即更新),RefreshPolicy.NONE(更新策略为默认,也就是每几秒执行一次批量更行,这是ES的默认策略),RefreshPolicy.WAIT_UNTIL(这个暂时不清楚,字面意识是等待什么操作完成)。ElasticSearch 实际上是伪实时的,所有分片之间默认1s同步更新

 

③ 使用

@SuppressWarnings("unchecked")
	private Map<String, Object> inserEsFirst(ConfigVo configVo, AlertWarnVo alertWarnVo, String indexName) 
		Map<String,Object> resultMap = new HashMap<>();
		 //用来关联元数据与丰富过后的数据的标识符
	    String sourceId = new SimpleDateFormat("yyyyMMddhhmmssSSS").format(new Date());
		boolean  flag = true;
		try 
				//如果告警状态没有,强制设置为open
	    	    if(alertWarnVo.getEventValue()==null||StringUtils.isEmpty(alertWarnVo.getEventValue().toString())) 
	    	    	alertWarnVo.setWarnStatus("OPEN");
	    	    else if("0".equals(alertWarnVo.getEventValue())) 
	    	    	alertWarnVo.setWarnStatus("CLOSE");
	    	    else if("1".equals(alertWarnVo.getEventValue())) 
	    	    	alertWarnVo.setWarnStatus("OPEN");
	    	    
			    Map<String, Object> tempMap = (Map<String, Object>) JSON.parse(JSON.toJSONString(alertWarnVo));
	    	    tempMap.put("source", configVo.getDataSource());
	    	    //设置默认状态过滤标识位为false
	    	    tempMap.put("isFilter", false);
	    	    tempMap.put("sourceId", sourceId);
	    	    resultMap.put("sourceId", sourceId);
			   ElasticSearchUtils.createIndex(indexName, indexName, null, tempMap);
		 catch (Exception e) 
			log.error(">>>>>>>>>Es Data Insert Failed!!!<<<<<<<<<", e);
			flag =false;
		
		resultMap.put("flag", flag);
		return resultMap;
	

	@SuppressWarnings("unchecked")
	private Map<String,Object> updateESFilter(ConfigVo configVo, AlertWarnVo o,String indexName) 
		Map<String,Object> resultMap = new HashMap<>();
		boolean  flag = true;
		try 
			    Map<String, Object> tempMap = (Map<String, Object>) JSON.parse(JSON.toJSONString(o));
			    //根据sourceId存在修改
			    if(tempMap.get("sourceId")!=null&&StringUtils.isNotEmpty(tempMap.get("sourceId").toString())) 
			    	    tempMap.put("isFilter", true);
					    //indexName与typeName一样
					    ElasticSearchUtils.updateByQueryTemp(indexName, indexName, tempMap);
			    else 
			    	log.error(">>>>>>>>>Es Data Update Failed Cause SourceId Is Empty!!!<<<<<<<<<" );
			    	flag =false;
			    
		 catch (Exception e) 
			log.error(">>>>>>>>>Es Data Update Failed!!!<<<<<<<<<", e);
			flag =false;
		
		resultMap.put("flag", flag);
		return resultMap;
	
@Override
	public Map<String, Object> queryBaseInfo() 
		Map<String, Object> dataMap = new HashMap<>();
		try 
			Calendar baseCal = Calendar.getInstance();
			baseCal.set(Calendar.HOUR_OF_DAY,0);
			baseCal.set(Calendar.MINUTE, 0);
			baseCal.set(Calendar.SECOND, 0);
			baseCal.add(Calendar.DATE,0);
			//查询Disaster数据,CLOSED数据不算其中
			SearchResponse searcherDisaster = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
									                                           PUtils.getString("es.index.notice"), 
									                                           QueryBuilders.boolQuery()
									                                           .must(QueryBuilders.matchQuery("severity", "Disaster"))
									                                           .must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
									                                           .mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherDisaster!=null&&searcherDisaster.getHits().getTotalHits()!=0) 
				dataMap.put("DisasterCount", searcherDisaster.getHits().getTotalHits());
			else 
				dataMap.put("DisasterCount", 0);
			
			//查询High数据,CLOSED数据不算其中
			SearchResponse searcherHigh = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
														                PUtils.getString("es.index.notice"), 
														                QueryBuilders.boolQuery()
														                .must(QueryBuilders.matchQuery("severity", "High"))
														                .must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
														                .mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherHigh!=null&&searcherHigh.getHits().getTotalHits()!=0) 
			dataMap.put("HighCount", searcherHigh.getHits().getTotalHits());
			else 
			dataMap.put("HighCount", 0);
			
			//查询Average数据,CLOSED数据不算其中
			SearchResponse searcherAverage= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
															                PUtils.getString("es.index.notice"), 
															                QueryBuilders.boolQuery()
															                .must(QueryBuilders.matchQuery("severity", "Average"))
															                .must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
															                .mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherAverage!=null&&searcherAverage.getHits().getTotalHits()!=0) 
			dataMap.put("AverageCount", searcherAverage.getHits().getTotalHits());
			else 
			dataMap.put("AverageCount", 0);
												
												
			//查询Warning数据,CLOSED数据不算其中
			SearchResponse searcherWarning= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
																                PUtils.getString("es.index.notice"), 
																                QueryBuilders.boolQuery()
																                .must(QueryBuilders.matchQuery("severity", "Warning"))
																                .must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
																                .mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherWarning!=null&&searcherWarning.getHits().getTotalHits()!=0) 
			dataMap.put("WarningCount", searcherWarning.getHits().getTotalHits());
			else 
			dataMap.put("WarningCount", 0);
				
			//查询Not Classified数据,CLOSED数据不算其中
			SearchResponse searcherNotClassified= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
					PUtils.getString("es.index.notice"), 
					QueryBuilders.boolQuery()
					.must(QueryBuilders.matchQuery("severity", "Not Classified"))
					.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
					.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherNotClassified!=null&&searcherNotClassified.getHits().getTotalHits()!=0) 
				dataMap.put("NotClassifiedCount", searcherNotClassified.getHits().getTotalHits());
			else 
				dataMap.put("NotClassifiedCount", 0);
				
			//查询Information数据,CLOSED数据不算其中
			SearchResponse searcherInformation= ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"),
					PUtils.getString("es.index.notice"), 
					QueryBuilders.boolQuery()
					.must(QueryBuilders.matchQuery("severity", "Information"))
					.must(QueryBuilders.rangeQuery("eventTime").gt(baseCal.getTimeInMillis()))
					.mustNot(QueryBuilders.matchQuery("warnStatus", "CLOSED")));
			if(searcherInformation!=null&&searcherInformation.getHits().getTotalHits()!=0) 
				dataMap.put("InformationCount", searcherInformation.getHits().getTotalHits());
			else 
				dataMap.put("InformationCount", 0);
				
			//查询近7天告警总数数据,CLOSED数据不算其中
			List<Long> eventsLst = new ArrayList<>();
			for(int i=0;i<7;i++) 
				RangeQueryBuilder rangequerybuilder = null;
				if(i==6) 
					Calendar cal = Calendar.getInstance();
					cal.set(Calendar.HOUR_OF_DAY,0);
					cal.set(Calendar.MINUTE, 0);
					cal.set(Calendar.SECOND, 0);
					cal.add(Calendar.DATE,(i-6));
					rangequerybuilder = QueryBuilders.rangeQuery("eventTime").gt(cal.getTimeInMillis());
				else 
					Calendar cal1 = Calendar.getInstance();
					cal1.set(Calendar.HOUR_OF_DAY,0);
					cal1.set(Calendar.MINUTE, 0);
					cal1.set(Calendar.SECOND, 0);
					cal1.add(Calendar.DATE,(i-6));
					
					Calendar cal2 = Calendar.getInstance();
					cal2.set(Calendar.HOUR_OF_DAY,0);
					cal2.set(Calendar.MINUTE, 0);
					cal2.set(Calendar.SECOND, 0);
					cal2.add(Calendar.DATE,(i-5));
					rangequerybuilder = QueryBuilders.rangeQuery("eventTime").gt(cal1.getTimeInMillis()).lt(cal2.getTimeInMillis());
				
				
				SearchResponse searcher = ElasticSearchUtils.searcher(PUtils.getString("es.index.notice"), PUtils.getString("es.index.notice"), rangequerybuilder);
				if(searcher!=null&&searcher.getHits().getTotalHits()!=0) 
					eventsLst.add(searcher.getHits().getTotalHits());
					else 
						eventsLst.add(0L);
							
			
			
			dataMap.put("eventsLst", eventsLst);
		 catch (Exception e) 
			e.printStackTrace();
			logger.error(">>>>>>>>>Query ES Base Source Has failed!!!<<<<<<<<<", e);
			dataMap=null;
		
		return dataMap;
	

	@SuppressWarnings( "rawtypes", "unchecked" )
	@Override
	public List<Map<String, Object>> queryAllEvents(Integer pageNum, Integer pageSize) 
		List<Map<String, Object>> dataLst = new ArrayList<>();
		try 
			EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
			        QueryBuilders.boolQuery()
			        .must(QueryBuilders.matchAllQuery()));
			if(vo!=null&&vo.getSr()!=null) 
				SearchResponse searcher = vo.getSr();
				logger.info("******************Total Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
				SearchHits hits = searcher.getHits();
				for (SearchHit searchHit : hits.getHits()) 
					if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0) 
						Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
						parseMap.put("page", pageNum);
						parseMap.put("size", pageSize);
						parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
						dataLst.add(parseMap);
					
				
			else 
				logger.info("******************No Source Matched In ES!!!!********************");
			
			
		 catch (Exception e) 
			e.printStackTrace();
			logger.error("****************** Query All Es Events Failed,Cause: ********************",e);
			dataLst=null;
		 
		
		return dataLst;
	

	@SuppressWarnings( "rawtypes", "unchecked" )
	@Override
	public List<Map<String, Object>> queryCloseEvents(Integer pageNum, Integer pageSize) 
		List<Map<String, Object>> dataLst = new ArrayList<>();
		try 
			EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
					QueryBuilders.boolQuery()
			        .must(QueryBuilders.matchQuery("warnStatus","CLOSED")));
			if(vo!=null&&vo.getSr()!=null) 
				SearchResponse searcher = vo.getSr();
				logger.info("******************Closed Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
				SearchHits hits = searcher.getHits();
				for (SearchHit searchHit : hits.getHits()) 
					if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0) 
						Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
						parseMap.put("page", pageNum);
						parseMap.put("size", pageSize);
						parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
						dataLst.add(parseMap);
					
				
			
			
		 catch (Exception e) 
			e.printStackTrace();
			logger.error("****************** Query Closed Es Events Failed,Cause: ********************",e);
			dataLst=null;
		 
		
		return dataLst;
	

	@SuppressWarnings( "rawtypes", "unchecked" )
	@Override
	public List<Map<String, Object>> queryOpenEvents(Integer pageNum, Integer pageSize) 
		List<Map<String, Object>> dataLst = new ArrayList<>();
		try 
			EsSearchVo vo = ElasticSearchUtils.findByPage(PUtils.getString("es.index.notice"),pageSize,pageNum,"eventTime",
			        QueryBuilders.boolQuery()
			        .must(QueryBuilders.matchQuery("warnStatus","OPEN")));
			if(vo!=null&&vo.getSr()!=null) 
				SearchResponse searcher = vo.getSr();
				logger.info("******************Open Match Sources Are:"+searcher.getHits().getTotalHits()+"********************");
				SearchHits hits = searcher.getHits();
				for (SearchHit searchHit : hits.getHits()) 
					if(searchHit.getSourceAsString()!=null&&searchHit.getSourceAsString().length()>0) 
						Map parseMap = JSON.parseObject(searchHit.getSourceAsString(), Map.class);
						parseMap.put("page", pageNum);
						parseMap.put("size", pageSize);
						parseMap.put("total", vo.getTotal()==null?0:vo.getTotal());
						dataLst.add(parseMap);
					
				
			
			
		 catch (Exception e) 
			e.printStackTrace();
			logger.error("****************** Query Open Es Events Failed,Cause: ********************",e);
			dataLst=null;
		 
		
		return dataLst;
	

特别提醒:在刚启动ES的时候,ES中是没有库的,这个时候要在服务启动的时候就创建项目中默认用到的ES库,否则会报错,创建方式分为两种,一种为实现InitializingBean接口,或者实现ApplicationListener接口然后添加如下代码,前者要在该实现类配置相应的注解,@resource或者compnent都行,然后配置文件得扫描到该实现类所在的位置;后者要在spring配置文件中注册也就是<bean>的方式。

 

//判断temp库是否存在
        if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.temp"))) 
            logger.error("ES库["+PUtils.getString("es.index.temp")+ "]不存在,创建新库.......");
            //创建Temp库
            ElasticSearchUtils.createIndexType(PUtils.getString("es.index.temp"), PUtils.getString("es.index.temp"), true);
        
        if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.notice"))) 
            logger.error("ES库["+PUtils.getString("es.index.notice")+ "]不存在,创建新库.......");
            //创建notice库
            ElasticSearchUtils.createIndexType(PUtils.getString("es.index.notice"), PUtils.getString("es.index.notice"), true);
        
        if (!ElasticSearchUtils.isExistsIndex(PUtils.getString("es.index.color"))) 
            logger.error("ES库["+PUtils.getString("es.index.color")+ "]不存在,创建新库.......");
            //创建color库
            ElasticSearchUtils.createIndexType(PUtils.getString("es.index.color"), PUtils.getString("es.index.color"), true);
        

总结:

 

       ES 的ClusterName相当于就是一个数据库mysql,indexName相当于他的库名,typeName相当于是表名,_id相当于此表的主键,当然你也可以自定义主键来完成相应的需求,它的效率与mysql在大数据量中相比要强大N倍。

 

以上是关于Elastic Search(ES)使用笔记的主要内容,如果未能解决你的问题,请参考以下文章

4个最难的 Elastic Search 面试题

Elastic Search的聚合搜索

深入分析Elastic Search的写入过程

Elastic Search (ES)

使用 ES Hadoop 连接器在 Elastic Search 中保存 JavaRDD

Elastic Search和Kibana入门