什么是Spring Cloud Bus 消息总线,读完这篇文章你就懂了
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了什么是Spring Cloud Bus 消息总线,读完这篇文章你就懂了相关的知识,希望对你有一定的参考价值。
参考技术A 消息代理中间件构建一个共用的消息主题让所有微服务实例订阅,当该消息主题产生消息时会被所有微服务实例监听和消费。消息代理又是什么?消息代理是一个消息验证、传输、路由的架构模式,主要用来实现接收和分发消息,并根据设定好的消息处理流来转发给正确的应用。它在微服务之间起到通信调度作用,减少了服务之间的依赖。
什么是 Spring Cloud Bus
Spring Cloud Bus 是 Spring Cloud 体系内的消息总线,用来连接分布式系统的所有节点。
Spring Cloud Bus 将分布式的节点用轻量的消息代理(RibbitMQ、Kafka)连接起来。可以通过消息代理广播配置文件的更改,或服务之间的通讯,也可以用于监控。解决了微服务数据变更,及时同步的问题。
什么时候使用 Spring Cloud Bus
微服务一般都采用集群方式部署,而且在高并发下经常需要对服务进行扩容、缩容、上线、下线的操作。比如我们需要更新配置,又或者需要同时失效所有服务器上的某个缓存,需要向所有相关的服务器发送命令,此时就可以选择使用 Spring Cloud Bus 了。
总的来说,就是在我们需要把一个操作散发到所有后端相关服务器的时候,就可以选择使用 Spring Cloud Bus 了。
接下来我们通过 Spring Cloud Bus 实现微服务架构的配置刷新。
环境准备
RibbitMQ v3.8.2 地址:192.168.10.101
bus-demo 聚合工程 SpringBoot 2.2.4.RELEASE、Spring Cloud Hoxton.SR1。
eureka-server:注册中心
eureka-server02:注册中心
config-server:配置中心服务端
config-server02:配置中心服务端
order-service:订单服务(配置中心客户端)
order-service02:订单服务(配置中心客户端)
配置文件 order-service-prod.yml
spring:
application:
name: order-service # 应用名称
# 配置 Eureka Server 注册中心
eureka:
instance:
prefer-ip-address: true # 是否使用 ip 地址注册
instance-id: $spring.cloud.client.ip-address:$server.port # ip:port
client:
service-url: # 设置服务注册中心地址
defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/
# 自定义配置
name: order-service-prod
password: root
Spring Cloud Bus 实现配置刷新
客户端发起通知
消息总线(Bus)的典型应用场景就是配置中心客户端刷新。
我们在学习 Spring Cloud Config 配置中心时给大家讲了基于 Actuator 的配置刷新,当时的案例只有一个 Config Client,我们可以使用 Webhook,设置手动刷新都不算太费事,但是如果客户端比较多的情况下,一个一个去手动刷新未免有点复杂,这种方案就不太适合了。使用 Spring Cloud Bus 可以完美解决这一问题。
借助 Spring Cloud Bus 的广播功能,让 Config Client 都订阅配置更新事件,当配置更新时,触发其中一个端的更新事件,Spring Cloud Bus 就把此事件广播到其他订阅客户端,以此来达到批量更新。
Webhook 监听被触发,给 ConfigClient A 发送 bus-refresh 请求刷新配置
ConfigClient A 读取 ConfigServer 中的配置,并且发送消息给 Bus
Bus 接收消息后广播通知其他 ConfigClient
其他 ConfigClient 收到消息重新读取最新配置
添加依赖
Config Client 添加 spring cloud starter bus amqp 依赖。
<!-- spring cloud starter bus amqp 依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
配置文件
配置文件需要配置 消息队列 和 bus-refresh 自动刷新端点。/actuator/bus-refresh 端点会清除 @RefreshScope 缓存重新绑定属性。
Config Client 的 bootstrap.yml 核心配置。
spring:
cloud:
config:
name: order-service # 配置文件名称,对应 git 仓库中配置文件前半部分
label: master # git 分支
profile: prod # 指定环境
discovery:
enabled: true # 开启
service-id: config-server # 指定配置中心服务端的 service-id
# 消息队列
rabbitmq:
host: 192.168.10.106
# 度量指标监控与健康检查
management:
endpoints:
web:
base-path: /actuator # 访问端点根路径,默认为 /actuator
exposure:
include: bus-refresh # 需要开启的端点
#exclude: # 不需要开启的端点
测试
查看端点
可以看到已经开启了 bus-refresh 自动刷新端点。
修改 Git 仓库配置
修改 Git 仓库配置信息如下:
# 自定义配置
name: order-service-prod-1.0
自动刷新
刷新页面发现结果并未改变,没事正常。
通过 Post 方式调用 任意客户端 的自动刷新端点:再次访问结果如下:
查看队列
再来观察一下消息队列的 UI 界面,发现多了一个 springCloudBus 的交换机。
该交换机下绑定了两个队列对应我们的两个 Config Client。
客户端发起通知缺陷
打破了微服务的职责单一性。微服务本身是业务模块,它本不应该承担配置刷新的职责。
破坏了微服务各节点的对等性。
存在一定的局限性。例如,微服务在迁移时,它的网络地址常常会发生变化,此时如果想要做到自动刷新,就不得不修改Webhook 的配置。
服务端发起通知
为了解决客户端发起通知缺陷,我们改用服务端发起通知。
Webhook监听被触发,给 ConfigServer 发送 bus-refresh 请求刷新配置
ConfigServer 发送消息给 Bus
Bus 接收消息后广播通知所有 ConfigClient
歌 ConfigClient 收到消息重新读取最新配置
添加依赖
Config Server 添加 spring cloud starter bus amqp 依赖。
<!-- spring cloud starter bus amqp 依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
配置文件
配置文件需要配置 消息队列 和 bus-refresh 自动刷新端点。/actuator/bus-refresh 端点会清除 @RefreshScope 缓存重新绑定属性。
Config Server 的 application.yml 核心配置。
spring:
application:
name: config-server # 应用名称
cloud:
config:
server:
git:
uri: https://github.com/imrhelloworld/config-repo # 配置文件所在仓库地址
#username: # Github 等产品的登录账号
#password: # Github 等产品的登录密码
#default-label: master # 配置文件分支
#search-paths: # 配置文件所在根目录
# 消息队列
rabbitmq:
host: 192.168.10.106
# 配置 Eureka Server 注册中心
eureka:
instance:
prefer-ip-address: true # 是否使用 ip 地址注册
instance-id: $spring.cloud.client.ip-address:$server.port # ip:port
client:
service-url: # 设置服务注册中心地址
defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/
# 度量指标监控与健康检查
management:
endpoints:
web:
base-path: /actuator # 访问端点根路径,默认为 /actuator
exposure:
include: bus-refresh # 需要开启的端点
#exclude: # 不需要开启的端点
spring cloud 中消息总线(bus)使用
消息系统
说到消息系统大家耳熟能详的几个一般来说都有各自适用的场景,我们这里简单说一下几个常见的消息系统。
ActiveMQ是比较老牌的消息系统,当然了不一定是大家第一个熟知的消息系统,因为现在电商、互联网规模越来越大,不断进入程序员眼帘的大多是Kafka和RocketMQ。ActiveMQ出现的要比他们早,而且涵盖的功能也特别全,路由、备份、查询、事务、集群等等。他的美中不足是不能支撑超大规模、超高并发的互联网应用,ActiveMQ的并发承受能力在百万级别,大概500次/s的消息频率。
Kafka是新一代的消息系统,相对于ActiveMQ来说增加了分片功能,类似于数据库分库分表,一台Broker仅负责一部分数据收发,从而使得他的伸缩性特别好,通过增加Broker就可以不断增加处理能力。一般来说,Kafka被用来处理日志流,作为流计算的接入点。在电商的订单、库存等系统里边一般不用,主要顾虑的是Kafka异步刷盘机制可能导致数据丢失。当然,对于数据丢失这一点不同的工程师也有不同的看法,认为Kafka的Master-Slave的多写机制,完全能够避免数据丢失。
RocketMQ是阿里开源的一款消息系统,开发的初衷就是要支撑阿里庞大的电商系统。RocketMQ和Kafka有很多相似之处,由于RocketMQ开发中很大程度上参考了Kafka的实现。RocketMQ同样提供了优秀的分片机制,RocketMQ的分片比Kafka的分片有所增强,区分了绝对有序和非绝对有序两种选项。另外RocketMQ采用的是同步刷盘,一般认为不会造成数据丢失。
RabbitMQ类似于ActiveMQ也是一个相对小型的消息系统,他的优势在于灵活的路由机制,可以进行自由配置。
Redis的pub/sub功能,由于Redis是内存级的系统,所以速度和单机的并发能力是上述四个消息系统不能比拟的,但是也是由于内存存储的缘故,在消息的保障上就更弱一些。据说新浪博客系统选择了Redis的pub/sub作为消息系统,不能不说艺高人胆大。
什么时候用cloud bus
spring cloud bus在整个后端服务中起到联通的作用,联通后端的多台服务器。我们为什么需要他做联通呢?
后端服务器一般都做了集群化,很多台服务器,而且在大促活动期经常发生服务的扩容、缩容、上线、下线。这样,后端服务器的数量、IP就会变来变去,如果我们想进行一些线上的管理和维护工作,就需要维护服务器的IP。
比如我们需要更新配置、比如我们需要同时失效所有服务器上的某个缓存,都需要向所有的相关服务器发送命令,也就是调用一个接口。
你可能会说,我们一般会采用zookeeper的方式,统一存储服务器的ip地址,需要的时候,向对应服务器发送命令。这是一个方案,但是他的解耦性、灵活性、实时性相比消息总线都差那么一点。
总的来说,就是在我们需要把一个操作散发到所有后端相关服务器的时候,就可以选择使用cloud bus了。
使用cloud bus之后,我们的服务端架构会变成这样:
cloud bus能做什么
当前spring cloud bus提供了两个可用的接口:1./bus/env用于设置某一个配置项2./bus/refresh用于刷新所有绑定到刷新点的配置项。
这两个接口是使用spring boot actuator方式发布出来的(可以参见:深入SpringBoot:自定义Endpoint一文),接收到消息后会使用spring的stream框架(可以参考:张开涛的解Spring事件驱动模型一文)把消息传播到所有注册的相关服务器。
/bus/env的参数格式:
name=&value=&destination=
/bus/refresh的参数格式:
destination=
当然了,上述的destination参数都可以不提供。
spring cloud config 配合spring cloud bus实现配置信息更新
spring cloud config 配置更新有两种方式:1.配置git仓库的web hook,当git仓库有更新时自动调用bus提供的刷新接口,刷新缓存;2.手工调用bus提供的刷新接口。
不论一方案还是二方案区别仅在于是不同的人触发了刷新接口。实际上,线上服务器一般很少采用自动刷新的机制,都会在修改后,确认无误后再执行刷新。
关键的修改点是把所有的后端服务器连接到同一个消息系统上,然后监听配置更新消息。
安装RabbitMQ
安装方法很简单,直接在官网下载对应的安装文件就可以了。
因为RabbitMQ是Erlang语言写的,所以如果你的机器上没有安装Erlang,那么需要先安装Erlang。
增加bus包的引用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
增加RabbitMQ配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5671
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
增加@RefreshScope注解
@SpringBootApplication
@RestController
@RefreshScope
public class ConfigClientApplication
public static void main(String[] args)
SpringApplication.run(ConfigClientApplication.class, args);
@Value("$app-name")
private String app_name;
@RequestMapping("hi")
public String hi()
return "hello "+ app_name;
spring cloud扩展消息总线方法
这里就不写了,要想了解可以参考spring cloud bus 扩展消息总线方法
以上是关于什么是Spring Cloud Bus 消息总线,读完这篇文章你就懂了的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud学习记录 08Spring Cloud Bus服务总线
Spring Cloud 入门教程:和RabbitMQ的整合 -- 消息总线Spring Cloud Netflix Bus