基于Kafka实现的Spring Cloud消息总线

Posted 低端码农

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Kafka实现的Spring Cloud消息总线相关的知识,希望对你有一定的参考价值。

        在上一篇文章《Spring Cloud 分布式配置中心》中,我们简单介绍了使用git仓库搭建配置中心及实现配置信息的动态刷新。但是,当我们去刷新配置中心时,只能实现单个节点的刷新,如果我们有多个微服务应用,比如分布式部署或者集群部署,那么如何实现多节点的配置刷新呢?在这里,我们简单介绍实现这种功能的方式--消息总线,并搭建极简的小Demo来做演示。


一、什么是消息总线?

        在微服务架构的系统中, 我们通常会使用轻量级的消息代理来构建个共用的消息主题让系统中所有微服务实例都连接上来, 由于该主题中产生的消息会被所有实例监听和消费, 所以我们称它为消息总线。 在总线上的各个实例都可以方便地广播些需要让其他连接在该主题上的实例都知道的消息, 例如配置信息的变更或者其他些管理操作等。由于消息总线在微服务架构系统中被广泛使用, 所以它同配置中心样, 几乎是微服务架构中的必备组件。

        Spring Cloud 作为微服务架构综合性的解决方案,对此自然也有自己
的实现, 这就是我们将要介绍的
Spring Cloud Bus

        既然是消息总线,就会用到消息中间件,目前Spring Cloud Bus仅支持RabbitMQ和Kafka。RabbitMQ是大家比较 熟悉和常用的中间件产品,此处不多介绍。Kafka也是一款经典地、高性能的分布式发布订阅消息系统,而且近年来大数据发展迅速,Kafka成为大数据最核心的技术,可以说,如果不了解Kafka,我们已经处在“OUT”的边缘。(sorry,这里是我自己瞎写的。)关于Kafka的知识大家可以自行搜索。下面简单介绍下Kafka的安装、启动以及基于Kafka实现的spring cloud 消息总线。


二、Kafka的安装与启动

        此处所介绍的,是在windows环境下的操作。Kafka依赖于zookeeper,因此运行Kafka之前必须安装并运行zookeeper。

        1、下载zookeeper并解压。解压完成后,在解压出的conf文件夹下有一个zoo_sample.cfg配置文件,我们备份此配置文件并更名为zoo.cfg。

2、编辑zoo.cfg文件,指定数据目录、日志目录,还可以设置连接时间、超时时间、端口号等。此处我们只设置两个目录,其余全部使用默认。zookeeper的默认端口是2181。

基于Kafka实现的Spring Cloud消息总线

3、我们进入bin目录,双击“zkServer.cmd”命令,即可启动zookeeper。

基于Kafka实现的Spring Cloud消息总线

注:安装zookeeper后,可以设置环境变量等,启动时无需进入bin目录,同时zookeeper还可以搭建集群,此处只是做spring cloud bus的演示,因此此处简单操作。(复杂的我没有手动试过啊!)

至此,zookeeper已经安装并启动起来。下面我们安装启动Kafka。


4、下载并解压Kafka,可以看到解压出来的conf文件件下有多个配置文件,其中就有zookeeper的配置文件,因此zookeeper必不可少。

基于Kafka实现的Spring Cloud消息总线

我们使用默认的zookeeper配置信息。


5、编辑conf目录下的server.properties配置文件,我们像编辑zookeeper的配置文件zoo.cfg一样,只指定日志目录。

基于Kafka实现的Spring Cloud消息总线


6、此时我们就可以启动Kafka了。(感觉越牛逼的软件安装起来越简单啊,就像Redis和Neo4j,牛逼得不行,可是安装也是so easy)

我们使用cmd进入Kafka的根目录,然后执行如下命令:

.inwindowskafka-server-start.bat .configserver.properties

启动窗口如下:

基于Kafka实现的Spring Cloud消息总线


至此,Kafka已经启动成功。下面我们把Kafka引入Spring Cloud Config。


二、Spring Cloud Bus 的搭建

        消息总线的主要目的是使多个微服务应用监听并消费消息,我们这里主要介绍的是如何实现同时刷新多个微服务节点的配置信息。此处有两种方式,一是发送如下post请求到任意一个config客户端:

curl -X POST http://127.0.0.1:1234/bus/refresh

另外就是发送此post请求到Config服务端。需要说明的是,如果发送此post请求到客户端,服务端是不必引入Kafka依赖的。建议客户端和服务端全部引入。

        1、在Config服务端和客户端中分别引入Kafka依赖。

基于Kafka实现的Spring Cloud消息总线

2、创建两个Config 客户端。可以直接复制之前搭建的客户端,然后修改端口号即可。

3、先后启动服务端,客户端。可以看到,启动日志中打印出了zookeeper和kafka的连接信息。

基于Kafka实现的Spring Cloud消息总线

基于Kafka实现的Spring Cloud消息总线

   4、我们可以在kafka的根目录使用cmd执行如下命令查看Topic,

.inwindowskafka-topics.bat --list --zookeeper localhost:2181

基于Kafka实现的Spring Cloud消息总线

可以看到,已经创建了一个springCloudBus的主题,所有的Config应用都会监听和消费此主题的消息。


5、我们分别发送如下get请求到两个客户端(端口不同),发现能够正常获取git仓库配置文件中参数from的值。

基于Kafka实现的Spring Cloud消息总线

基于Kafka实现的Spring Cloud消息总线

     现在修改git仓库配置文件,修改from参数的值为:from=git-test-1.14

基于Kafka实现的Spring Cloud消息总线


6、现在发送如下post请求到一个客户端(端口2345),

基于Kafka实现的Spring Cloud消息总线

整个请求执行了12s!大家可以去看没有使用bus时,请求时间只有几秒。


7、现在我们再次发送get请求到端口为2345的客户端:

可以看到2345客户端得到了更改后的from属性值。

我们再去请求23456端口的客户端:

可以看到,另外一个客户端也刷新到了修改后的from属性值。这正是Spring Cloud Bus的作用。


8、我们同样可以发送如下post请求到服务端,会发现两个客户端也会得到修改后的from值。此处效果与上述相同,不再演示。

curl -X POST http://127.0.0.1:1234/bus/refresh


9、部分刷新

       我们有时候只需要使指定的客户端刷新配置信息,而不是全部刷新,我们可以在上述post请求指定刷新范围,如:

curl -X POST http://127.0.0.1:1234/bus/refresh?destination=zzlspace:2345

即使用destination参数指定客户端的实例名,这样符合条件的客户端就会刷新配置,而其他客户端不会刷新配置。


总结:我们简单介绍了基于kakfa实现的spring cloud bus,所有的配置都是极简或默认配置,但是大体的功能已经实现。我们可以通过Spring Cloud Config实现微服务应用的配置信息集中外部管理,同时通过Spring Cloud Bus实现多个微服务配置信息的同时刷新。当我们使用了配置中心后,在修改配置信息的时候,无需重新启动应用,直接使用POST请求的方式刷新即可。

以上是关于基于Kafka实现的Spring Cloud消息总线的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud(12)——基于Kafka的Stream实现

Spring Cloud构建微服务架构消息总线(续:Kafka)

基于 Spring functional 的Secured Kafka (kerberos) configuration

使用 Spring Cloud Stream Kafka Binder 批量使用带有密钥的 Kafka 消息

spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息

Spring Cloud Stream