java操作elasticsearch

Posted weishao-lsv

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java操作elasticsearch相关的知识,希望对你有一定的参考价值。

Elasticsearch是一个搜索引擎,建立在Lucene之上

 

集群 (cluster)

  代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。
  es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,
  你与任何一个节点的通信和与整个es集群通信是等价的。

 

节点(node)

  每一个运行实例称为一个节点,每一个运行实例既可以在同一机器上,也可以在不同的机器上.所谓运行实例,就是一个服务器进程.
  在测试环境内,可以在一台服务器上运行多个服务器进程,在生产环境建议每台服务器运行一个服务器进程

 

索引(index)

  这里的索引是名词不是动词,在elasticsearch里面支持多个索引。类似于关系数据库里面每一个服务器可以支持多个数据库一样。
  在每一索引下面又支持多种类型,类似于关系数据库里面的一个数据库可以有多张表。但是本质上和关系数据库有很大的区别。

 

分片(shards) 

  把一个索引分解为多个小的索引,每一个小的索引叫做分片。分片后就可以把各个分片分配到不同的节点中,构成分布式搜索
  分片的数量只能在索引创建前指定,并且索引创建后不能更改

 

副本(replicas)

  副本的作用一是提高系统的容错性,当个某个节点某个分片损坏或丢失时可以从副本中恢复。二是提高es的查询效率,es会自动对搜索请求进行负载均衡

 

recovery

  代表数据恢复或叫数据重新分布,es在有节点加入或退出时会根据机器的负载对索引分片进行重新分配,挂掉的节点重新启动时也会进行数据恢复。

 

river

  代表es的一个数据源,也是其它存储方式(如:数据库)同步数据到es的一个方法。它是以插件方式存在的一个es服务,通过读取river中的数据并把它索引到es中,
        
   官方的river有couchDB的,RabbitMQ的,Twitter的,Wikipedia的,river这个功能将会在后面的文件中重点说到。

 

gateway

  代表es索引的持久化存储方式,es默认是先把索引存放到内存中,当内存满了时再持久化到硬盘。当这个es集群关闭再重新启动时就会从gateway中读取索引数据。
        
    es支持多种类型的gateway,有本地文件系统(默认),分布式文件系统,Hadoop的HDFS和amazon的s3云存储服务。

    ---将各种集群状态信息、索引配置信息等全部持久存放在网关中

 

discovery.zen

  代表es的自动发现节点机制,es是一个基于p2p的系统,它先通过广播寻找存在的节点,再通过多播协议来进行节点之间的通信,同时也支持点对点的交互。

 

Transport

  代表es内部节点或集群与客户端的交互方式,默认内部是使用tcp协议进行交互,同时它支持http协议(json格式)、thrift、servlet、
  memcached、zeroMQ等的传输协议(通过插件方式集成)。

 

索引(Index)

  ElaticSearch将数据存放在一个或多个索引当中。一个索引相当于一个数据库,里面存放用户文档数据。在底层,ElasticSearch实际上还是
   使用Lucene完成读写数据的操作,ElasticSearch索引是由一个或多个Lucene索引组成,所以ES中的分片或副本实际上就是一个Lucene索引。
 

 

文档(Document)

  文档是ES中主要的实体,所有ES的查询都是基于存放在ES中文档资源的查询。每个文档都是由各种域(Field)组成,每个域(Field)有一个名
  称和一个或多个值构成。实际上,从用户的角度看,一个ES文档就是一个JSON对象。

 

映射(Mapping)

  映射用于定义文档域的属性,这些属性包括分词器,字段类型,存储类型等。对于没有定义的字段类型的属性,ES可以自动通过其字段值进行识别。

 

类型(Type)

  ES中每个文档必须有一个类型定义。这里的类型相当于数据库当中的表,类型定义了字段映射(类似数据库表结构),
  这样一来,每个索引可以包含多种文档类型,而每种文档类型定义一种映射关系。

 

路由(Routing)

  ES给每个文档建索引后,通过路由可以算出所查的文档处在哪个分片上,因为在建立索引之初使用公式:shard = hash(routting) % number_of_pr
  imary_shards进行文档分配。routing值是一个任意的字符串,默认是文档的ID,通过人工指定就可以控制文档存放在哪个shard的位置了。

 

索引别名(Index Alias)

  索引别名相当于快捷方式或软链接,可以指向一个或多个索引,甚至可以指向带路由的分片。

 

近实时性 near realtime (nrt)

  Elasticsearch是一个近实时性的搜索平台,所以对于刚建过的索引文件进行查询时需要一个轻微的等待时间(通常为1秒)。

 

java操作elastic:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <runSuite>**/MainTestSuite.class</runSuite>
        <elasticsearch.plugin.name>sql</elasticsearch.plugin.name>
        <elasticsearch.plugin.site>true</elasticsearch.plugin.site>
        <elasticsearch.plugin.jvm>true</elasticsearch.plugin.jvm>
        <elasticsearch.version>5.6.2</elasticsearch.version>
        <elasticsearch.rest.version>5.5.2</elasticsearch.rest.version>
        <slf4j.version>1.7.7</slf4j.version>
        <elasticsearch.plugin.classname>org.elasticsearch.plugin.nlpcn.SqlPlug</elasticsearch.plugin.classname>
    </properties>
    
    <repositories>
        <repository>
            <id>elasticsearch-releases</id>
            <url>https://artifacts.elastic.co/maven</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.0.2.RELEASE</version>
        </dependency>
        
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>provided</scope>
        </dependency>
            
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>${elasticsearch.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>rest</artifactId>
            <version>${elasticsearch.rest.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>x-pack-transport</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>
            
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
            
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>15.0</version>
        </dependency>
        
        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-all</artifactId>
            <version>1.3</version>
            <scope>test</scope>
        </dependency>
            
        <dependency>
            <groupId>org.locationtech.spatial4j</groupId>
            <artifactId>spatial4j</artifactId>
            <version>0.6</version>
        </dependency>
        
        <dependency>
            <groupId>com.vividsolutions</groupId>
            <artifactId>jts</artifactId>
            <version>1.13</version>
        </dependency>
         
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.41</version>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.15</version>
        </dependency>
        
        <!-- LOGGING begin -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- common-logging 实际调用slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- java.util.logging 实际调用slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jul-to-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- LOGGING end -->
                         
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <version>2.5</version>
            <scope>provided</scope>
        </dependency>
        
        <dependency>  
            <groupId>junit</groupId>  
            <artifactId>junit</artifactId>  
            <version>4.12</version>  
            <scope>test</scope>  
        </dependency>
    </dependencies>

 

EsQuery

public class EsQuery {
    
    protected static Logger logger = LoggerFactory.getLogger(EsQuery.class);
    
    /**
     * 集群状态
     */
    public void clusterStatus(){
        ClusterAdminClient clusterAdminClient = ElasticUtil.getClusterClient().admin().cluster();
        ClusterHealthResponse healths = clusterAdminClient.prepareHealth().get();
        String clusterName = healths.getClusterName();
        int numberOfDataNodes = healths.getNumberOfDataNodes();
        int numberOfNodes = healths.getNumberOfNodes();
        ClusterHealthStatus status = healths.getStatus();
        System.out.println("集群名称:"+clusterName);
        System.out.println("数据节点:"+numberOfDataNodes);
        System.out.println("正常节点:"+numberOfNodes);
        System.out.println("状态值:"+status.name());
    }
    
    /**
     * 判断索引库是否存在
     * @param indexName
     * @return
     */
    public boolean isIndexExists(String indexName) {
        IndicesExistsRequest inExistsRequest = new IndicesExistsRequest(indexName);
        IndicesExistsResponse inExistsResponse = ElasticUtil.getClusterClient().admin().indices()
                .exists(inExistsRequest).actionGet();
        return inExistsResponse.isExists();
    }
    
    /**
     * 创建索引 indexName 索引名称
     * @param indexName
     * @return
     * @throws IOException
     */
    public boolean createIndex(String indexName){
        if(isIndexExists(indexName)){
            return false;
        }
        CreateIndexResponse response = ElasticUtil.getClusterClient().admin().indices().prepareCreate(indexName).execute().actionGet();
        if(response.isAcknowledged()){
            return true;
        }
        return false;
    }
    

    /**
     * 删除索引库  
     * @param indexName
     * @return
     */
    public boolean dropIndex(String indexName) {
        if (!isIndexExists(indexName)) { 
            return false;
        } else {
            DeleteIndexResponse dResponse = ElasticUtil.getClusterClient().admin().indices().prepareDelete(indexName).execute().actionGet();
            if (dResponse.isAcknowledged()) {
                return true;
            }else{
                return false;
            }
        }
    }
    
    /**
     * 采用standard分词器-默认*/
    public boolean addType(String indexName,String typeName){
        XContentBuilder builder=null;
        try {
            builder = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject(typeName)
                    .endObject()
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(typeName).source(builder);
        try {
            PutMappingResponse mappingResponse = ElasticUtil.getClusterClient().admin().indices().putMapping(mappingRequest).actionGet();
            if (mappingResponse.isAcknowledged()) {
                return true;
            }else{
                return false;
            }
        } catch (IndexNotFoundException e) {
            System.out.println("索引不存在,创建失败...");
        }
        return false;
    }
    
    /**
     * 采用IK分词器
     */
    public boolean addIKType(String indexName,String typeName){
        XContentBuilder builder=null;
        try {
            builder = XContentFactory.jsonBuilder()
                    .startObject()
                    .startObject(typeName)
                    .startObject("properties")
                    .startObject("poi_id").field("type","integer").endObject()
                    .startObject("poi_title").field("type","text").field("analyzer","ik_max_word").endObject()
                    .startObject("poi_address").field("type","text").field("analyzer","ik_max_word").endObject()
                    .startObject("poi_tags").field("type","text").field("analyzer","ik_max_word").endObject()
                    .startObject("poi_phone").field("type","text").endObject()
                    .endObject()
                    .endObject()
                    .endObject();
        } catch (IOException e) {
            e.printStackTrace();
        }
        PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(typeName).source(builder);
        try {
            PutMappingResponse mappingResponse = ElasticUtil.getClusterClient().admin().indices().putMapping(mappingRequest).actionGet();
            if (mappingResponse.isAcknowledged()) {
                return true;
            }else{
                return false;
            }
        } catch (IndexNotFoundException e) {
            System.out.println("索引不存在,创建失败...");
        }
        return false;
    }
     
    
    /**
     * 添加或修改数据  设置自己的id
     * @param id
     * @param json
     */
    public static String insertOrUpdate(String indexName,String typeName,String id,Map<String, Object> json) {
        if(json==null){
            return null;
        }
        IndexResponse response = ElasticUtil.getClusterClient().prepareIndex(indexName, typeName,id).setSource(json).execute().actionGet();
        return response.getId();
    }
    

    /**
     * 添加或修改数据  使用随机id
     * @param json
     */
    public String insertOrUpdate(String indexName,String typeName,Map<String, Object> json) {
        if(json==null){
            return null;
        }
        IndexResponse response = ElasticUtil.getClusterClient().prepareIndex(indexName, typeName).setSource(json).execute().actionGet();
        return response.getId();
    }
    
    /**
     * 通过id查询单条数据
     * @param id
     * @return
     */
    public GetResponse getResourceById(String indexName,String typeName,String id){
        GetResponse response = ElasticUtil.getClusterClient().prepareGet(indexName,typeName, id).get();
        //Map<String, Object> source = response.getSource();
        return response;
    }
    
    /**
     * 删除数据
     * @param id
     */
    public void deleteResourceByIds(String indexName,String typeName,String[] ids) {
        if(ids==null||ids.length<1){
            return;
        }
        for(String id :ids){
            ElasticUtil.getClusterClient().prepareDelete(indexName, typeName, id)
            .execute().actionGet();
            System.out.println("删除id: "+id);
        }
        System.out.println("delete over..");
    }
    
    /**
     * 查询 index/type 数据
     * @param indexName
     */
    public static void simpleQuery(String indexName,String typeName){
        SearchRequestBuilder prepareSearch = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null && !"".equals(typeName.trim())){
            prepareSearch.setTypes(typeName);
        }
        //        prepareSearch.setFrom(1).setSize(10);
        SearchResponse response = prepareSearch.execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }   
        System.out.println("total==> "+searchHits.length);
    }
    
    /**
     * 通过field字段过滤索引库
     * @param indexName
     * @param typeName
     * @param field
     * @param value
     */
    public void matchFieldQuery(String indexName,String typeName,String field,String value){
        QueryBuilder qb = QueryBuilders.matchQuery(field,value);
        SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null&&!"".equals(typeName.trim())){
            requestBuilder.setTypes(typeName);
        }
        SearchResponse response = requestBuilder.setQuery(qb).execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }   
        System.out.println("total==> "+searchHits.length);
    }
    
    /**
     * 通过多个field字段过滤索引库
     * @param indexName
     * @param typeName
     * @param field1
     * @param field2
     * @param value
     */
    public void multiFieldMatchQuery(String indexName,String typeName,String field1,String field2,String value){
        QueryBuilder qb = QueryBuilders.multiMatchQuery(value,field1, field2);
        SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null&&!"".equals(typeName.trim())){
            requestBuilder.setTypes(typeName);
        }
        SearchResponse response = requestBuilder.setQuery(qb) .execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }   
    }
    
    /**
     * 通过id 获取多条数据*/
    public void idsQuery(String indexName,String typeName,String[] ids){
        IdsQueryBuilder qb = QueryBuilders.idsQuery().addIds(ids);
        SearchRequestBuilder requestBuilder = ElasticUtil.getClusterClient().prepareSearch(indexName);
        if(typeName!=null&&!"".equals(typeName.trim())){
            requestBuilder.setTypes(typeName);
        }
        SearchResponse response = requestBuilder.setQuery(qb).execute().actionGet();
        SearchHit[] searchHits = response.getHits().getHits();
        for (SearchHit sh : searchHits) {
            System.out.println(sh.getId()+"==>"+sh.getSource());
        }   
        System.out.println("total==> "+searchHits.length);
    }
    
}

 










以上是关于java操作elasticsearch的主要内容,如果未能解决你的问题,请参考以下文章

ElasticSearch04_elasticsearch-Rest-Client整合SpringBoot中使用保存数据利用JAVA代码操作es

ElasticSearch04_elasticsearch-Rest-Client整合SpringBoot中使用保存数据利用JAVA代码操作es

ElasticSearch04_elasticsearch-Rest-Client整合SpringBoot中使用保存数据利用JAVA代码操作es

ElasticSearch学习问题记录——Invalid shift value in prefixCoded bytes (is encoded value really an INT?)(代码片段

商城项目19_elasticsearch-Rest-Client整合SpringBoot中使用保存数据利用JAVA代码操作es

商城项目19_elasticsearch-Rest-Client整合SpringBoot中使用保存数据利用JAVA代码操作es