微服务之消息总线

Posted 阿灿的学习笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务之消息总线相关的知识,希望对你有一定的参考价值。


简介

         

在上篇文章《微服务之配置中心》中写到,客户端可从服务端获取配置信息,当Git仓库中的配置文件修改后,为了让客户端获取最新的配置信息,可以通过执行refresh操作进行手动刷新。但是这样有问题,当客户端很多时(随之系统的不断扩大),如果需要每个客户端都执行一遍,那就蛋疼了,显然这种方案就不适合了。Spring Cloud作为微服务架构的一个综合解决方案,也提供了对应的解决方案Spring Cloud Bus,即消息总线。

这里要理解一个概念,消息总线。简单理解就是一个消息中心,众多微服务实例可以连接到总线上,实例可以往消息中心发送或接收信息(通过监听)。比如:实例A发送一条消息到总线上,总线上的实例B可以接收到信息(实例B订阅了实例A),这样的话,消息总线就充当一个中间者的角色,使得实例A和实例B解偶了,很方便。



消息总线(Spring Cloud Bus


原理


         Spring Cloud Bus通过建立多个应用之间的通信频道,管理和传播应用间的消息,从技术角度来说,应用了AMQP消息代理作为通道,通过MQ的广播机制实现消息的发送和接收。以其典型应用——配置中心客户端刷新为例,说明下工作流程:

微服务之消息总线

1)修改配置文件,触发webhookclientA发送bus/refresh

2clientA重新从配置中心获取新的配置信息,同时发送消息到Spring Cloud Bus

3Spring Cloud Bus收到消息,同时通知clientBclientC(订阅配置更新事件);

4clientBclientC收到通知,重新请求配置中心,获取新的配置信息。

       这样,三个客户端均得到最新的配置。


消息代理


       这个过程中,作为通道的AMQP消息代理很重要。AMQP(高级消息队列协议,是一个标准)是一个网络协议,从扮演角色来说,消息代理从生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers,这个过程中的发布者,消费者,消息代理可以存在于不同的设备上,下面简单介绍下工作流程(其实跟上面的类似):

微服务之消息总线

       消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

       AMQP作为一个标准协议,主要实现方案有RabbitMQActiveMQQpid等。这里我主要以RabbitMQ为例进行说明,它是一个优秀的微服务架构消息中间件,与Spring Cloud Bus能够很好的结合使用。

       下图显示了RabbitMQWeb管理首页:

微服务之消息总线

1Broker:消息队列服务器,即负责接收生产者消息,发送至消费者的;

2Connections:连接,即发送者、消息接收者、消费者之间的物理连接;

3Channel:通道,连接生产者、消费者的逻辑结构。一个Connection可以对应多个Channel

微服务之消息总线

4Exchange:消息交换机,消息第一个到达的地方,可以指定路由规则,决定消息分发到不同的消息队列中去;

5Queue:消息队列,消息经Exchange路由转发至此,进入逻辑等待状态(等待消费,即客户端获取);

6Binding:绑定,把ExchangeQueue按照路由规则进行绑定,即决定Exchange接收消息后,需要发送到哪些Queue中:

微服务之消息总线


消息发送-接收原理图

 

微服务之消息总线


配置步骤


Config Server服务端


(1)在Config Server添加RabbitMQ依赖,非常简单:

微服务之消息总线

(2)修改配置文件,添加RabbitMQ的配置信息:

微服务之消息总线

(3)启动类加注解:

微服务之消息总线


Config Client客户端


(1)在Config Client添加RabbitMQ依赖,非常简单: 

微服务之消息总线

(2)修改配置文件,添加RabbitMQ的配置信息:

微服务之消息总线


Eureka Server注册中心


    省略。


测试


1)启动Eureka Server注册中心、ConfigServer服务端、Config Client客户端(开启两个实例),如图:

微服务之消息总线

(2)客户端获取配置文件的值,如图:

微服务之消息总线

微服务之消息总线

微服务之消息总线

(3)修改配置文件值,改为apps.caac.net/demo,如图:

微服务之消息总线

(4)通过curl执行刷新操作(配置服务端执行/bus/refresh),如图:

微服务之消息总线

(5)客户端重新获取配置文件的值,可知已获取最新配置信息,如图:

微服务之消息总线

微服务之消息总线


局部刷新


         有时我们只想刷新部分微服务的配置,此时可通过/bus/refresh端点的destination参数来定位要刷新的应用程序。


刷新指定实例(某一服务)


       执行:/bus/refresh?destination=customers:9000

其中,customers:9000指的是各个微服务的ApplicationContext ID(默认为:${spring.application.name}:${server.port})。


刷新全部实例(某一服务)


       执行:/bus/refresh?destination=customers:**,这样就可以触发customers微服务所有实例的配置刷新。

 

 

注意:这里的${spring.application.name}区分大小写。

        

 


以上是关于微服务之消息总线的主要内容,如果未能解决你的问题,请参考以下文章

面向业务的微服务消息总线

微服务实战:落地微服务架构到直销系统(构建消息总线框架接口)

微服务实战:落地微服务架构到直销系统(将生产者与消费者接入消息总线)

Bus:消息总线

Bus:消息总线

Spring Cloud构建微服务架构消息总线(续:Kafka)