RabbitMQ 节点消息广播&Ack
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 节点消息广播&Ack相关的知识,希望对你有一定的参考价值。
参考技术A 镜像队列的配置通过添加policy完成,policy添加的命令为:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
更简单的方式是开启management插件后在后台手动操作
通常队列由两部分组成:一部分是amqqueue_process,负责协议相关的消息处理,即接收 生产者发布的消息、向消费者投递消息、处理消息confirm、acknowledge等等;另一部分 是backing_queue,它提供了相关的接口供amqqueue_process调用,完成消息的存储以及 可能的持久化工作等。
镜像队列同样由这两部分组成,amqqueue_process仍旧进行协议相关的消息处理,backin g_queue则是由master节点和slave节点组成的一个特殊的backing_queue。master节点和s lave节点都由一组进程组成,一个负责消息广播的gm,一个负责对gm收到的广播消息进行 回调处理。在master节点上回调处理是coordinator(协调员),在slave节点上则是mirror _queue_slave。mirror_queue_slave中包含了普通的backing_queue进行消息的存储,mas ter节点中backing_queue包含在mirror_queue_master中由amqqueue_process进行调用。
当master即第一个GM挂掉后,slave监听到之后选举最早加入的slave当选master.
这是由于消息广播的机制所决定的,最早加入的slave的消息一定是最全的.
——————
节点之前进行数据同步的进程叫做GM : Guaranteed Multicast
rabbitmq中将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上; 当有节点失效时,相邻的节点会接管保证本次广播的消息会复制到所有节点。
在master节点和slave节点上的这些gm形成一个group,group的信息会记录在mnesia中。 不同的镜像队列形成不同的group。
消息发送
消息从master节点对应的gm发出后,顺着链表依次传送到所有节点,由于所有节点组成一 个循环链表,master节点对应的gm最终会收到自己发送的消息,这个时候master节点就知 道消息已经复制到所有slave节点了。
消息ACK
同样的ack消息会顺着 节点链表经过所有的slave节点,其作用是通知slave节点可以清除缓存的消息,当ack消 息回到master节点时对应广播消息的生命周期结束。
添加节点
master发布的消息是依次经过所有slave节点,在这期间的任何时刻,有可能有节点失效 ,那么相邻的节点可能需要重新发送给新的节点。例如,A->B->C->D->A形成的循环链表 ,A为master节点,广播消息发送给节点B,B再发送给C,如果节点C收到B发送的消息还未 发送给D时异常结束了,那么节点B感知后节点C失效后需要重新将消息发送给D。同样,如 果B节点将消息发送给C后,B,C节点中新增了E节点,那么B节点需要再将消息发送给新增 的E节点。
节点的失效
当slave节点失效时,仅仅是相邻节点感知,然后重新调整邻居节点信息、更新rabbit_qu eue、gm_group的记录等。如果是master节点失效,"资格最老"的slave节点被提升为mast er节点,slave节点会创建出新的coordinator,并告知gm修改回调处理为coordinator, 原来的mirror_queue_slave充当amqqueue_process处理生产者发布的消息,向消费者投递 消息等,来完成角色处理逻辑的变化.
RabbitMQ 如何实现对同一个应用的多个节点进行广播
1.背景了解过RabbitMQ的Fanout模式,应该知道它原本的Fanout模式就是用来做广播的。但是它的广播有一点区别,来回顾下它的含义:Fanout类型没有路由键的概念,只要队列绑定到了改exchange上面,就会接收到所有的消息。
使用过程一般就是先new 出一个Fanout类型的交换机,然后往这个交换机上绑定多个队列queue,不同的消费者各自监听不同的队列,这就实现了广播效果,因为同一个消息,会分发到所有队列中。
举个例子:
应用A监听了队列A,应用B监听了队列B,Fanout类型交换机同时绑定了队列A和B.假设生产者端发送了一条消息到Fanout类型交换机,交换机就会把消息分发到所有队列,这时应用A和应用B会收到同一条消息,这就是广播。
说了上面一大堆,只是为了强调,对于RabbitMQ的原本Fanout模式,它的设计就是多个消费者必须监听不同的队列,多个消费者之间才会形成广播关系。
那么问题来了,假如在Fanout工作模式下,多个消费者同时监听的是同一个队列,会怎样?实践过的同学应该都知道,这种情况下,这些消费者会形成竞争关系,现象是同一个消息只会被其中一个消费者接收,达不到广播的效果。。
2.需求
假如现在有一个需求,要做到对同一个应用的多个节点进行广播,怎么实现?
注意,这里所说的同一个应用多个节点,通俗点理解就是一个war包,布在多个服务器节点上。
在实际部署集群时,为了高可用,同一个应用可能会部署多个节点,那假如工程里已经通过配置定义某个队列,那多个节点它们定义的队列就会是相同的,那按照上面的背景,那这些节点间肯定就会存在竞争关系,即便是Fanout模式的交换机,一条消息也只能被其中一个节点接收,其他节点收不到,达不到广播的效果。那该如何做?
相信看到这里,有人会问,为何会有 对同一个应用的多个节点进行广播的需求场景?为什么要有这个需求。生产中的业务系统很多,自然而然场景就很多。
举两个经典的例子:
1.想要同时刷新所有节点的缓存
业务系统离不开缓存,有时会用内存缓存,假如我要刷新所有节点的内存缓存,多个节点前可能有负载均衡例如nginx之类的,我只需要访问其中一个节点,然后让这个节点做广播通知所有其他节点刷缓存。(广播刷缓存)
2.websocket会话寻找
websocket是比较受欢迎的实时消息推送方案。用过websocket应该知道,websocket只能与多个节点中的其中一个节点做长连接会话保持,也就是说用户的会话只会存在于一个节点上,假设服务端要主动向用户推一条消息,必须要知道用户的会话在哪个节点上,怎么得知?可以通过广播,通过消息广播,把消息发到多个节点上,然后节点收到消息只需要判断用户会话是否就在本节点上,假如在则主动推消息,不在,则丢弃这条消息。
类似上面这两种需求,就需要用到广播,并且是对同一个应用的多个节点进行广播。当然不用广播肯定也有其他通知方案,本文我们只讨论用MQ怎么做到。
3.思路
假如继续用RabbitMQ的Fanout模式,怎么做到对同一个应用的多个节点进行广播?
要起到广播效果,关键就是让多个应用节点间不要存在竞争关系或者存在竞争关系时它们的消息怎么共享?可以从这两个方向解决这个问题。
方法可能很多种,在这里,我只描述两种比较容易实现的方案。
方案1
过程大致如下
-
应用启动,多个节点监听同一个队列(此时多个节点是竞争关系,一条消息只会发到其中一个节点上)
-
消息生产者发送消息,同一条消息只被其中一个节点收到
- 收到消息的节点通过redis的发布订阅模式来通知其他兄弟节点
这种方案是最容易想到的,思路就是依赖其他组件来做消息共享,例如redis这种可以替换成其他方案,只要能做到消息共享就行,那么最终的效果就肯定是广播效果了。
方案2
过程大致如下
-
应用启动,利用监听器生成唯一ID
-
生成的唯一ID,通过文件写入的方式写到配置文件中
-
spring启动,把这个唯一ID加载为全局属性(为何要用唯一ID,就是为了用这个ID作为该节点的监听队列名,当然前缀可以用相同的,后缀用唯一ID区分即可,举个例子就是:节点1监听队列 kunghsu-123 节点2监听队列 kunghsu-456.必须保证它们的唯一ID是唯一的,不然还是会存在竞争关系)
-
多个节点监听了多个队列(让每个队列名都不同,目的就是让他们不存在竞争关系,没有竞争关系就不用做消息共享,只管由MQ分发即可,这时同一条消息就会发到多个节点上)
-
到MQ控制台,将所有节点生成的队列手动绑定到指定的Fanout交换机上(这一步是手动的,当然也可以通过API做到,下面会说到)
-
生产者发送消息指定的Fanout交换机,交换机将同一条消息被分发到多个节点上
- 广播效果达成!
这种方案,也比较容易。这样做,就是为了让多个节点间是广播关系。总的来说不麻烦,其中第五步手动操作其实有点挫,这种手动操作步骤其实是应该转成自动化,让应用程序来完成,方便以后自动化建设。
这种方案的spring配置也比较简单,参考Fanout模式的配置即可。本文重点在这个思路的实现过程。
只列举部分代码如下:
消息生产者
<!-- 只申明交换机,不定义队列 -->
<rabbit:fanout-exchange name="exchangeFour" durable="true" auto-delete="false" >
</rabbit:fanout-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate4" connection-factory="connectionFactory2"
exchange="exchangeFour" />
消息消费者
<rabbit:queue name="${queue-name-fanout}" durable="true"
auto-delete="false" exclusive="false" declared-by="connectAdmin2" />
<bean id="fanoutTwoConsumer" class="com.lunch.foo.rabbitmq.FanoutTwoConsumer"></bean>
<rabbit:listener-container
connection-factory="connectionFactory2">
<rabbit:listener queues="${queue-name-fanout}" ref="fanoutOneConsumer" />
</rabbit:listener-container>
另外,RabbitMQ的客户端API支持让我们 将队列绑定到指定的交换机上。具体可参考我的工具类代码。
代码如下:
package com.lunch.foo.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* Created by xuyaokun On 2019/3/10 2:26
* @desc:
*/
public class RabbitMQUtil {
private static final String HOST = "192.168.3.128";
private static final int PORT = AMQP.PROTOCOL.PORT;
private static final String USERNAME = "kunghsu";
private static final String PASSWORD = "123456";
private static final String VIRTUALHOST = "/";
public static void main(String[] args) {
String QUEUE_NAME = "queueOneX";
String EXCHANGE_NAME = "exchangeFour";
try {
queueBind(EXCHANGE_NAME, QUEUE_NAME);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
/**
* 获取会话链接
*
* @return
* @throws IOException
* @throws TimeoutException
*/
private static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUALHOST);
return factory.newConnection();
}
/**
* 绑定队列到指定交换机
*
* @param exchangeName
* @param queueName
* @throws IOException
* @throws TimeoutException
*/
public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException {
Channel channel = null;
try{
channel = getConnection().createChannel();
} catch(Exception e){
System.out.println("获取RabbitMQ会话连接失败!取消做队列绑定。");
return ;
}
//默认持久化
channel.queueDeclare(queueName, true, false, false, null);
// 声明交换机:指定交换机的名称和类型(广播:fanout)
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true);
// 在消费者端队列绑定
channel.queueBind(queueName, exchangeName, "");
channel.close();
}
}
总结
RabbitMQ的Fanout模式相关的文章,网上一抓一大把,但是几乎没有人讲到 如何实现 对同一个应用的多个节点进行广播。。希望通过这篇文章,能帮助到有需要的同学。另外,假如大家有更好的方案,欢迎交流。感谢阅读!
共同进步,学习分享
欢迎大家关注我的公众号【风平浪静如码】,海量Java相关文章,学习资料都会在里面更新,整理的资料也会放在里面。
觉得写的还不错的就点个赞,加个关注呗!点关注,不迷路,持续更新!!!
以上是关于RabbitMQ 节点消息广播&Ack的主要内容,如果未能解决你的问题,请参考以下文章