spring cloud 中消息总线(bus)使用

Posted 不去天涯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring cloud 中消息总线(bus)使用相关的知识,希望对你有一定的参考价值。

消息系统

说到消息系统大家耳熟能详的几个一般来说都有各自适用的场景,我们这里简单说一下几个常见的消息系统。


ActiveMQ是比较老牌的消息系统,当然了不一定是大家第一个熟知的消息系统,因为现在电商、互联网规模越来越大,不断进入程序员眼帘的大多是Kafka和RocketMQ。ActiveMQ出现的要比他们早,而且涵盖的功能也特别全,路由、备份、查询、事务、集群等等。他的美中不足是不能支撑超大规模、超高并发的互联网应用,ActiveMQ的并发承受能力在百万级别,大概500次/s的消息频率。


Kafka是新一代的消息系统,相对于ActiveMQ来说增加了分片功能,类似于数据库分库分表,一台Broker仅负责一部分数据收发,从而使得他的伸缩性特别好,通过增加Broker就可以不断增加处理能力。一般来说,Kafka被用来处理日志流,作为流计算的接入点。在电商的订单、库存等系统里边一般不用,主要顾虑的是Kafka异步刷盘机制可能导致数据丢失。当然,对于数据丢失这一点不同的工程师也有不同的看法,认为Kafka的Master-Slave的多写机制,完全能够避免数据丢失。


RocketMQ是阿里开源的一款消息系统,开发的初衷就是要支撑阿里庞大的电商系统。RocketMQ和Kafka有很多相似之处,由于RocketMQ开发中很大程度上参考了Kafka的实现。RocketMQ同样提供了优秀的分片机制,RocketMQ的分片比Kafka的分片有所增强,区分了绝对有序和非绝对有序两种选项。另外RocketMQ采用的是同步刷盘,一般认为不会造成数据丢失。


RabbitMQ类似于ActiveMQ也是一个相对小型的消息系统,他的优势在于灵活的路由机制,可以进行自由配置。


Redis的pub/sub功能,由于Redis是内存级的系统,所以速度和单机的并发能力是上述四个消息系统不能比拟的,但是也是由于内存存储的缘故,在消息的保障上就更弱一些。据说新浪博客系统选择了Redis的pub/sub作为消息系统,不能不说艺高人胆大。

什么时候用cloud bus

spring cloud bus在整个后端服务中起到联通的作用,联通后端的多台服务器。我们为什么需要他做联通呢?

后端服务器一般都做了集群化,很多台服务器,而且在大促活动期经常发生服务的扩容、缩容、上线、下线。这样,后端服务器的数量、IP就会变来变去,如果我们想进行一些线上的管理和维护工作,就需要维护服务器的IP。

比如我们需要更新配置、比如我们需要同时失效所有服务器上的某个缓存,都需要向所有的相关服务器发送命令,也就是调用一个接口。

你可能会说,我们一般会采用zookeeper的方式,统一存储服务器的ip地址,需要的时候,向对应服务器发送命令。这是一个方案,但是他的解耦性、灵活性、实时性相比消息总线都差那么一点。

总的来说,就是在我们需要把一个操作散发到所有后端相关服务器的时候,就可以选择使用cloud bus了。

使用cloud bus之后,我们的服务端架构会变成这样:

cloud bus能做什么

当前spring cloud bus提供了两个可用的接口:1./bus/env用于设置某一个配置项2./bus/refresh用于刷新所有绑定到刷新点的配置项。

这两个接口是使用spring boot actuator方式发布出来的(可以参见:深入SpringBoot:自定义Endpoint一文),接收到消息后会使用spring的stream框架(可以参考:张开涛的解Spring事件驱动模型一文)把消息传播到所有注册的相关服务器。

/bus/env的参数格式:

name=&value=&destination=

/bus/refresh的参数格式:

destination=

当然了,上述的destination参数都可以不提供。

spring cloud config 配合spring cloud bus实现配置信息更新

spring cloud config 配置更新有两种方式:1.配置git仓库的web hook,当git仓库有更新时自动调用bus提供的刷新接口,刷新缓存;2.手工调用bus提供的刷新接口。

不论一方案还是二方案区别仅在于是不同的人触发了刷新接口。实际上,线上服务器一般很少采用自动刷新的机制,都会在修改后,确认无误后再执行刷新。

关键的修改点是把所有的后端服务器连接到同一个消息系统上,然后监听配置更新消息。

安装RabbitMQ

安装方法很简单,直接在官网下载对应的安装文件就可以了。

因为RabbitMQ是Erlang语言写的,所以如果你的机器上没有安装Erlang,那么需要先安装Erlang。

增加bus包的引用

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

增加RabbitMQ配置

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5671
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

增加@RefreshScope注解

@SpringBootApplication
@RestController
@RefreshScope
public class ConfigClientApplication 

    public static void main(String[] args) 
        SpringApplication.run(ConfigClientApplication.class, args);
    

    @Value("$app-name")
    private String app_name;

    @RequestMapping("hi")
    public String hi()
        return "hello "+ app_name;
    

spring cloud扩展消息总线方法

这里就不写了,要想了解可以参考spring cloud bus 扩展消息总线方法

以上是关于spring cloud 中消息总线(bus)使用的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Bus消息总线

spring cloud 中消息总线(bus)使用

SpringCloud 教程 消息总线(Spring Cloud Bus)

第八篇:消息总线(Spring Cloud Bus)

Spring Cloud Bus 消息总线介绍

Spring Cloud Bus 消息总线介绍