什么是Spring Cloud Bus 消息总线,读完这篇文章你就懂了

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了什么是Spring Cloud Bus 消息总线,读完这篇文章你就懂了相关的知识,希望对你有一定的参考价值。

参考技术A 消息代理中间件构建一个共用的消息主题让所有微服务实例订阅,当该消息主题产生消息时会被所有微服务实例监听和消费。

  消息代理又是什么?消息代理是一个消息验证、传输、路由的架构模式,主要用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。它在微服务之间起到通信调度作用,减少了服务之间的依赖。

什么是 Spring Cloud Bus

  Spring Cloud Bus 是 Spring Cloud 体系内的消息总线,用来连接分布式系统的所有节点。

  Spring Cloud Bus 将分布式的节点用轻量的消息代理(RibbitMQ、Kafka)连接起来。可以通过消息代理广播配置文件的更改,或服务之间的通讯,也可以用于监控。解决了微服务数据变更,及时同步的问题。

什么时候使用 Spring Cloud Bus

  微服务一般都采用集群方式部署,而且在高并发下经常需要对服务进行扩容、缩容、上线、下线的操作。比如我们需要更新配置,又或者需要同时失效所有服务器上的某个缓存,需要向所有相关的服务器发送命令,此时就可以选择使用 Spring Cloud Bus 了。

  总的来说,就是在我们需要把一个操作散发到所有后端相关服务器的时候,就可以选择使用 Spring Cloud Bus 了。

  接下来我们通过 Spring Cloud Bus 实现微服务架构的配置刷新。

环境准备

  RibbitMQ v3.8.2 地址:192.168.10.101

  bus-demo 聚合工程 SpringBoot 2.2.4.RELEASE、Spring Cloud Hoxton.SR1。

eureka-server:注册中心

eureka-server02:注册中心

config-server:配置中心服务端

config-server02:配置中心服务端

order-service:订单服务(配置中心客户端)

order-service02:订单服务(配置中心客户端)

  配置文件 order-service-prod.yml

spring:

  application:

    name: order-service # 应用名称

# 配置 Eureka Server 注册中心

eureka:

  instance:

    prefer-ip-address: true      # 是否使用 ip 地址注册

    instance-id: $spring.cloud.client.ip-address:$server.port # ip:port

  client:

    service-url:                  # 设置服务注册中心地址

      defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/

# 自定义配置

name: order-service-prod

password: root

Spring Cloud Bus 实现配置刷新

客户端发起通知

  消息总线(Bus)的典型应用场景就是配置中心客户端刷新。

  我们在学习 Spring Cloud Config 配置中心时给大家讲了基于 Actuator 的配置刷新,当时的案例只有一个 Config Client,我们可以使用 Webhook,设置手动刷新都不算太费事,但是如果客户端比较多的情况下,一个一个去手动刷新未免有点复杂,这种方案就不太适合了。使用 Spring Cloud Bus 可以完美解决这一问题。

  借助 Spring Cloud Bus 的广播功能,让 Config Client 都订阅配置更新事件,当配置更新时,触发其中一个端的更新事件,Spring Cloud Bus 就把此事件广播到其他订阅客户端,以此来达到批量更新。

Webhook 监听被触发,给 ConfigClient A 发送 bus-refresh 请求刷新配置

ConfigClient A 读取 ConfigServer 中的配置,并且发送消息给 Bus

Bus 接收消息后广播通知其他 ConfigClient

其他 ConfigClient 收到消息重新读取最新配置

添加依赖

  Config Client 添加 spring cloud starter bus amqp 依赖。

<!-- spring cloud starter bus amqp 依赖 -->

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-starter-bus-amqp</artifactId>

</dependency>

配置文件

  配置文件需要配置 消息队列 和 bus-refresh 自动刷新端点。/actuator/bus-refresh 端点会清除 @RefreshScope 缓存重新绑定属性。

  Config Client 的 bootstrap.yml 核心配置。

spring:

  cloud:

    config:

      name: order-service # 配置文件名称,对应 git 仓库中配置文件前半部分

      label: master # git 分支

      profile: prod # 指定环境

      discovery:

        enabled: true # 开启

        service-id: config-server # 指定配置中心服务端的 service-id

  # 消息队列

  rabbitmq:

    host: 192.168.10.106

# 度量指标监控与健康检查

management:

  endpoints:

    web:

      base-path: /actuator    # 访问端点根路径,默认为 /actuator

      exposure:

        include: bus-refresh  # 需要开启的端点

        #exclude:            # 不需要开启的端点

测试

查看端点

  可以看到已经开启了 bus-refresh 自动刷新端点。

修改 Git 仓库配置

  修改 Git 仓库配置信息如下:

# 自定义配置

name: order-service-prod-1.0

自动刷新

  刷新页面发现结果并未改变,没事正常。

通过 Post 方式调用 任意客户端 的自动刷新端点:再次访问结果如下:

查看队列

  再来观察一下消息队列的 UI 界面,发现多了一个 springCloudBus 的交换机。

  该交换机下绑定了两个队列对应我们的两个 Config Client。

客户端发起通知缺陷

打破了微服务的职责单一性。微服务本身是业务模块,它本不应该承担配置刷新的职责。

破坏了微服务各节点的对等性。

存在一定的局限性。例如,微服务在迁移时,它的网络地址常常会发生变化,此时如果想要做到自动刷新,就不得不修改Webhook 的配置。

服务端发起通知

  为了解决客户端发起通知缺陷,我们改用服务端发起通知。

Webhook监听被触发,给 ConfigServer 发送 bus-refresh 请求刷新配置

ConfigServer 发送消息给 Bus

Bus 接收消息后广播通知所有 ConfigClient

歌 ConfigClient 收到消息重新读取最新配置

添加依赖

  Config Server 添加 spring cloud starter bus amqp 依赖。

<!-- spring cloud starter bus amqp 依赖 -->

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-starter-bus-amqp</artifactId>

</dependency>

配置文件

  配置文件需要配置 消息队列 和 bus-refresh 自动刷新端点。/actuator/bus-refresh 端点会清除 @RefreshScope 缓存重新绑定属性。

  Config Server 的 application.yml 核心配置。

spring:

  application:

    name: config-server # 应用名称

  cloud:

    config:

      server:

        git:

          uri: https://github.com/imrhelloworld/config-repo # 配置文件所在仓库地址

          #username:            # Github 等产品的登录账号

          #password:            # Github 等产品的登录密码

          #default-label: master # 配置文件分支

          #search-paths:        # 配置文件所在根目录

  # 消息队列

  rabbitmq:

    host: 192.168.10.106

# 配置 Eureka Server 注册中心

eureka:

  instance:

    prefer-ip-address: true      # 是否使用 ip 地址注册

    instance-id: $spring.cloud.client.ip-address:$server.port # ip:port

  client:

    service-url:                  # 设置服务注册中心地址

      defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/

# 度量指标监控与健康检查

management:

  endpoints:

    web:

      base-path: /actuator    # 访问端点根路径,默认为 /actuator

      exposure:

        include: bus-refresh  # 需要开启的端点

        #exclude:            # 不需要开启的端点

spring cloud 中消息总线(bus)使用

消息系统

说到消息系统大家耳熟能详的几个一般来说都有各自适用的场景,我们这里简单说一下几个常见的消息系统。


ActiveMQ是比较老牌的消息系统,当然了不一定是大家第一个熟知的消息系统,因为现在电商、互联网规模越来越大,不断进入程序员眼帘的大多是Kafka和RocketMQ。ActiveMQ出现的要比他们早,而且涵盖的功能也特别全,路由、备份、查询、事务、集群等等。他的美中不足是不能支撑超大规模、超高并发的互联网应用,ActiveMQ的并发承受能力在百万级别,大概500次/s的消息频率。


Kafka是新一代的消息系统,相对于ActiveMQ来说增加了分片功能,类似于数据库分库分表,一台Broker仅负责一部分数据收发,从而使得他的伸缩性特别好,通过增加Broker就可以不断增加处理能力。一般来说,Kafka被用来处理日志流,作为流计算的接入点。在电商的订单、库存等系统里边一般不用,主要顾虑的是Kafka异步刷盘机制可能导致数据丢失。当然,对于数据丢失这一点不同的工程师也有不同的看法,认为Kafka的Master-Slave的多写机制,完全能够避免数据丢失。


RocketMQ是阿里开源的一款消息系统,开发的初衷就是要支撑阿里庞大的电商系统。RocketMQ和Kafka有很多相似之处,由于RocketMQ开发中很大程度上参考了Kafka的实现。RocketMQ同样提供了优秀的分片机制,RocketMQ的分片比Kafka的分片有所增强,区分了绝对有序和非绝对有序两种选项。另外RocketMQ采用的是同步刷盘,一般认为不会造成数据丢失。


RabbitMQ类似于ActiveMQ也是一个相对小型的消息系统,他的优势在于灵活的路由机制,可以进行自由配置。


Redis的pub/sub功能,由于Redis是内存级的系统,所以速度和单机的并发能力是上述四个消息系统不能比拟的,但是也是由于内存存储的缘故,在消息的保障上就更弱一些。据说新浪博客系统选择了Redis的pub/sub作为消息系统,不能不说艺高人胆大。

什么时候用cloud bus

spring cloud bus在整个后端服务中起到联通的作用,联通后端的多台服务器。我们为什么需要他做联通呢?

后端服务器一般都做了集群化,很多台服务器,而且在大促活动期经常发生服务的扩容、缩容、上线、下线。这样,后端服务器的数量、IP就会变来变去,如果我们想进行一些线上的管理和维护工作,就需要维护服务器的IP。

比如我们需要更新配置、比如我们需要同时失效所有服务器上的某个缓存,都需要向所有的相关服务器发送命令,也就是调用一个接口。

你可能会说,我们一般会采用zookeeper的方式,统一存储服务器的ip地址,需要的时候,向对应服务器发送命令。这是一个方案,但是他的解耦性、灵活性、实时性相比消息总线都差那么一点。

总的来说,就是在我们需要把一个操作散发到所有后端相关服务器的时候,就可以选择使用cloud bus了。

使用cloud bus之后,我们的服务端架构会变成这样:

cloud bus能做什么

当前spring cloud bus提供了两个可用的接口:1./bus/env用于设置某一个配置项2./bus/refresh用于刷新所有绑定到刷新点的配置项。

这两个接口是使用spring boot actuator方式发布出来的(可以参见:深入SpringBoot:自定义Endpoint一文),接收到消息后会使用spring的stream框架(可以参考:张开涛的解Spring事件驱动模型一文)把消息传播到所有注册的相关服务器。

/bus/env的参数格式:

name=&value=&destination=

/bus/refresh的参数格式:

destination=

当然了,上述的destination参数都可以不提供。

spring cloud config 配合spring cloud bus实现配置信息更新

spring cloud config 配置更新有两种方式:1.配置git仓库的web hook,当git仓库有更新时自动调用bus提供的刷新接口,刷新缓存;2.手工调用bus提供的刷新接口。

不论一方案还是二方案区别仅在于是不同的人触发了刷新接口。实际上,线上服务器一般很少采用自动刷新的机制,都会在修改后,确认无误后再执行刷新。

关键的修改点是把所有的后端服务器连接到同一个消息系统上,然后监听配置更新消息。

安装RabbitMQ

安装方法很简单,直接在官网下载对应的安装文件就可以了。

因为RabbitMQ是Erlang语言写的,所以如果你的机器上没有安装Erlang,那么需要先安装Erlang。

增加bus包的引用

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

增加RabbitMQ配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5671
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

增加@RefreshScope注解

@SpringBootApplication
@RestController
@RefreshScope
public class ConfigClientApplication 

    public static void main(String[] args) 
        SpringApplication.run(ConfigClientApplication.class, args);
    

    @Value("$app-name")
    private String app_name;

    @RequestMapping("hi")
    public String hi()
        return "hello "+ app_name;
    

spring cloud扩展消息总线方法

这里就不写了,要想了解可以参考spring cloud bus 扩展消息总线方法

以上是关于什么是Spring Cloud Bus 消息总线,读完这篇文章你就懂了的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Bus 消息总线介绍

干货|Spring Cloud Bus 消息总线介绍

第九章 消息总线: Spring Cloud Bus

Spring Cloud学习记录 08Spring Cloud Bus服务总线

Spring Cloud 入门教程:和RabbitMQ的整合 -- 消息总线Spring Cloud Netflix Bus

Spring Cloud Bus 消息总线介绍