基于JavaKafkaElasticSearch的搜索框架的设计与实现

Posted CSDN大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于JavaKafkaElasticSearch的搜索框架的设计与实现相关的知识,希望对你有一定的参考价值。

↑ 点击上方蓝字关注我们,和小伙伴一起聊技术!



Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用rest api用于文档搜索。


项目主页:https://github.com/chaokunyang/jkes


安装


可以参考jkes-integration-test项目快速掌握jkes框架的使用方法。jkes-integration-test是我们用来测试功能完整性的一个Spring Boot Application。


  • 安装jkes-index-connectorjkes-delete-connector到Kafka Connect类路径

  • 安装 Smart Chinese Analysis Plugin

sudo bin/elasticsearch-plugin install analysis-smartcn


配置


  • 引入jkes-spring-data-jpa依赖

  • 添加配置

@EnableAspectJAutoProxy@EnableJkes@Configurationpublic class JkesConfig {  @Bean  public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) {    return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport);  } }


  • 提供JkesProperties Bean

@Component@Configurationpublic class JkesConf extends DefaultJkesPropertiesImpl {    @PostConstruct    public void setUp() {        Config.setJkesProperties(this);    }    @Override    public String getKafkaBootstrapServers() {        return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292";    }    @Override    public String getKafkaConnectServers() {        return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084";    }    @Override    public String getEsBootstrapServers() {        return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200";    }    @Override    public String getDocumentBasePackage() {        return "com.timeyang.jkes.integration_test.domain";    }    @Override    public String getClientId() {        return "integration_test";    } }


这里可以很灵活,如果使用Spring Boot,可以使用@ConfigurationProperties提供配置。


  • 增加索引管理端点 因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在Spring MVC中,可以按照如下方式添加索引端点。

@RestController@RequestMapping("/api/search")public class SearchEndpoint {    private Indexer indexer;    @Autowired    public SearchEndpoint(Indexer indexer) {        this.indexer = indexer;    }    @RequestMapping(value = "/start_all", method = RequestMethod.POST)    public void startAll() {        indexer.startAll();    }    @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST)    public void start(@PathVariable("entityClassName") String entityClassName) {        indexer.start(entityClassName);    }    @RequestMapping(value = "/stop_all", method = RequestMethod.PUT)    public Map<String, Boolean> stopAll() {        return indexer.stopAll();    }    @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT)    public Boolean stop(@PathVariable("entityClassName") String entityClassName) {        return indexer.stop(entityClassName);    }    @RequestMapping(value = "/progress", method = RequestMethod.GET)    public Map<String, IndexProgress> getProgress() {        return indexer.getProgress();    } }


快速开始


索引API


使用com.timeyang.jkes.core.annotation包下相关注解标记实体。


@lombok.Data@Entity@Documentpublic class Person extends AuditedEntity {    // @Id will be identified automatically    // @Field(type = FieldType.Long)    @Id    @GeneratedValue(strategy = GenerationType.IDENTITY)    private Long id;    @MultiFields(            mainField = @Field(type = FieldType.Text),            otherFields = {                    @InnerField(suffix = "raw", type = FieldType.Keyword),                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")            }    )    private String name;    @Field(type = FieldType.Keyword)    private String gender;    @Field(type = FieldType.Integer)    private Integer age;    // don't add @Field to test whether ignored    // @Field(type = FieldType.Text)    private String description;    @Field(type = FieldType.Object)    @ManyToOne(fetch = FetchType.EAGER)    @JoinColumn(name = "group_id")    private PersonGroup personGroup; }
@lombok.Data@Entity@Document(type = "person_group", alias = "person_group_alias")public class PersonGroup extends AuditedEntity {    @Id    @GeneratedValue(strategy = GenerationType.IDENTITY)    private Long id;    private String name;    private String interests;    @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true)    private List<Person> persons;    private String description;    @DocumentId    @Field(type = FieldType.Long)    public Long getId() {        return id;    }    @MultiFields(            mainField = @Field(type = FieldType.Text),            otherFields = {                    @InnerField(suffix = "raw", type = FieldType.Keyword),                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")            }    )    public String getName() {        return name;    }    @Field(type = FieldType.Text)    public String getInterests() {        return interests;    }    @Field(type = FieldType.Nested)    public List<Person> getPersons() {        return persons;    }    /**     * 不加Field注解,测试序列化时是否忽略     */    public String getDescription() {        return description;    } }


当更新实体时,文档会被自动索引到ElasticSearch;删除实体时,文档会自动从ElasticSearch删除。


搜索API


启动搜索服务jkes-search-service,搜索服务是一个Spring Boot Application,提供rest搜索api,默认运行在9000端口。


  • URI query

curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10


  • Nested query

integration_test_person_group/person_group/_search?from=0&size=10{  "query": {    "nested": {      "path": "persons",      "score_mode": "avg",      "query": {        "bool": {          "must": [            {              "range": {                "persons.age": {                  "gt": 5                }              }            }          ]        }      }    }  } }


  • match query

integration_test_person_group/person_group/_search?from=0&size=10{  "query": {      "match": {        "interests": "Hadoop"      }    } }


  • bool query

{  "query": {    "bool" : {      "must" : {        "match" : { "interests" : "Hadoop" }      },      "filter": {        "term" : { "name.raw" : "name0" }      },      "should" : [        { "match" : { "interests" : "Flink" } },        {            "nested" : {                "path" : "persons",                "score_mode" : "avg",                "query" : {                    "bool" : {                        "must" : [                        { "match" : {"persons.name" : "name40"} },                        { "match" : {"persons.interests" : "interests"} }                        ],                        "must_not" : {                            "range" : {                              "age" : { "gte" : 50, "lte" : 60 }                            }                          }                    }                }            }        }      ],      "minimum_should_match" : 1,      "boost" : 1.0    }  }}


  • Source filtering

integration_test_person_group/person_group/_search {    "_source": false,    "query" : {        "match" : { "name" : "name17" }    } }
integration_test_person_group/person_group/_search {    "_source": {            "includes": [ "name", "persons.*" ],            "excludes": [ "date*", "version", "persons.age" ]        },    "query" : {        "match" : { "name" : "name17" }    } }


  • prefix

integration_test_person_group/person_group/_search {  "query": {    "prefix" : { "name" : "name" }  } }


  • wildcard

integration_test_person_group/person_group/_search {    "query": {        "wildcard" : { "name" : "name*" }    } }


  • regexp

integration_test_person_group/person_group/_search {    "query": {        "regexp":{            "name": "na.*17"        }    } }


Jkes工作原理


索引工作原理:


  • 应用启动时,Jkes扫描所有标注@Document注解的实体,为它们构建元数据

  • 基于构建的元数据,创建indexmappingJson格式的配置,然后通过ElasticSearch Java Rest Client将创建/更新index配置

  • 为每个文档创建/更新Kafka ElasticSearch Connector,用于创建/更新文档

  • 为整个项目启动/更新Jkes Deleter Connector,用于删除文档

  • 拦截数据操作方法。将* save(*)方法返回的数据包装为SaveEvent保存到EventContainer;使用(* delete*(..)方法的参数,生成一个DeleteEvent/DeleteAllEvent保存到EventContainer

  • 拦截事务。在事务提交后使用JkesKafkaProducer发送SaveEvent中的实体到Kafka,Kafka会使用我们提供的JkesJsonSerializer序列化指定的数据,然后发送到Kafka。

  • SaveEvent不同,DeleteEvent会直接被序列化,然后发送到Kafka,而不是只发送一份数据

  • SaveEventDeleteEvent不同,DeleteAllEvent不会发送数据到Kafka,而是直接通过ElasticSearch Java Rest Client删除相应的index,然后重建该索引,重启Kafka ElasticSearch Connector


查询工作原理:


  • 查询服务通过rest api提供

  • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序API的接入难度

  • 查询服务是一个Spring Boot Application,使用docker打包为镜像

  • 查询服务提供多版本API,用于API进化和兼容

  • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端

  • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中


流程图



模块介绍


jkes-core


jkes-core是整个jkes的核心部分。主要包括以下功能:


  • annotation包提供了jkes的核心注解

  • elasticsearch包封装了elasticsearch相关的操作,如为所有的文档创建/更新索引,更新mapping

  • kafka包提供了Kafka 生产者,Kafka Json Serializer,Kafka Connect Client

  • metadata包提供了核心的注解元数据的构建与结构化模型

  • event包提供了事件模型与容器

  • exception包提供了常见的Jkes异常

  • http包基于Apache Http Client封装了常见的http json请求

  • support包暴露了Jkes核心配置支持

  • util包提供了一些工具类,便于开发。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils


jkes-boot


jkes-boot用于与一些第三方开源框架进行集成。


当前,我们通过jkes-spring-data-jpa,提供了与spring data jpa的集成。通过使用Spring的AOP机制,对Repository方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent保存到EventContainer。通过使用我们提供的SearchPlatformTransactionManager,对常用的事务管理器(如JpaTransactionManager)进行包装,提供事务拦截功能。


在后续版本,我们会提供与更多框架的集成。


jkes-spring-data-jpa说明:


  • ContextSupport类用于从bean工厂获取Repository Bean

  • @EnableJkes让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型

  • EventSupport处理事件的细节,在保存和删除数据时生成相应事件存放到EventContainer,在事务提交和回滚时处理相应的事件

  • SearchPlatformTransactionManager包装了客户端的事务管理器,在事务提交和回滚时加入了回调hook

  • audit包提供了一个简单的AuditedEntity父类,方便添加审计功能,版本信息可用于结合ElasticSearch的版本机制保证不会索引过期文档数据

  • exception包封装了常见异常

  • intercept包提供了AOP切点和切面

  • index包提供了全量索引功能。当前,我们提供了基于线程池的索引机制和基于ForkJoin的索引机制。在后续版本,我们会重构代码,增加基于阻塞队列生产者-消费者模式,提供并发性能


jkes-services


jkes-services主要用来提供一些服务。 目前,jkes-services提供了以下服务:


  • jkes-delete-connector

    • jkes-delete-connector是一个Kafka Connector,用于从kafka集群获取索引删除事件(DeleteEvent),然后使用Jest Client删除ElasticSearch中相应的文档。

    • 借助于Kafka Connect的rest admin api,我们轻松地实现了多租户平台上的文档删除功能。只要为每个项目启动一个jkes-delete-connector,就可以自动处理该项目的文档删除工作。避免了每启动一个新的项目,我们都得手动启动一个Kafka Consumer来处理该项目的文档删除工作。尽管可以通过正则订阅来减少这样的工作,但是还是非常不灵活

  • jkes-search-service

    • jkes-search-service是一个restful的搜索服务,提供了多版本的rest query api。查询服务提供多版本API,用于API进化和兼容

    • jkes-search-service目前支持URI风格的搜索和JSON请求体风格的搜索。

    • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序的接入难度

    • 查询服务是一个Spring Boot Application,使用docker打包为镜像

    • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。

    • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。


后续,我们将会基于zookeeper构建索引集群,提供集群索引管理功能。


jkes-integration-test


jkes-integration-test是一个基于Spring Boot集成测试项目,用于进行功能测试。同时测量一些常见操作的吞吐率。


开发


To build a development version you'll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.


Contribute


  • Source Code: https://github.com/chaokunyang/jkes

  • Issue Tracker: https://github.com/chaokunyang/jkes/issues


LICENSE


This project is licensed under Apache License 2.0.


以上是关于基于JavaKafkaElasticSearch的搜索框架的设计与实现的主要内容,如果未能解决你的问题,请参考以下文章

推荐算法简介:基于用户的协同过滤基于物品的协同过滤基于内容的推荐

Nginx虚拟主机 (基于域名 基于端口 基于ip)

基于内容与基于协作的过滤?

依存句法分析:基于图的依存句法分析基于转移的依存句法分析基于神经网络的依存句法分析

事件处理:基于功能的组件与基于类的组件

基于项目和基于内容的协同过滤有啥区别?