使用rabbitmq广播模式来处理集群下的websocket消息推送
Posted 没有技术的小彭
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用rabbitmq广播模式来处理集群下的websocket消息推送相关的知识,希望对你有一定的参考价值。
websocket属于长连接,当客户端连接上服务端后,将保持于服务端的连接。
而当websocket服务端存在集群的情况,如果需要将某个消息发送到客户端时,通过接口调用发送,这种情况只能将消息发送到与这台服务端连接的客户端,会存在部分客户无法接收消息的情况。
后续通过搜集资料选择采用了用rabbitmq来做websocket的集群,即通过使用mq的广播交换机,然后结合服务端启动创建动态队列来绑定同一个交换机,这样便能使集群中的每一个服务端都能收到消息,然后再去往连在当前服务端的websocket推送消息,不在的则跳过不发。
交换机以及队列设置
/**
* @author peng
* @program
* @description 交换机以及队列设置
* @create 2022/03/30 22:08
**/
@Configuration
public class BroadcastMqConfig
private final static String BROADCAST_EXCHANGE = "broadcast_exchange";
@Value("$server.port")
private String serverPort;
public static String getQueueName()
return "broadcast:" + getLocalIP() + ":";
/**
* websocket 交换机
*
* @return
*/
@Bean
public FanoutExchange fanoutDirectExchange()
return new FanoutExchange(BROADCAST_EXCHANGE);
/**
* websocket 队列
*
* @return
*/
@Bean
public Queue queueFanoutQueue()
return QueueBuilder.durable(BROADCAST_EXCHANGE + ":" + getLocalIP() + ":" + serverPort).build();
/**
* websocket 队列交换机绑定
*
* @param callQueueFanoutQueue
* @param broadcastmsgFanoutDirectExchange
* @return
*/
@Bean
public Binding broadcastFanoutBinding(Queue callQueueFanoutQueue, FanoutExchange broadcastmsgFanoutDirectExchange)
return BindingBuilder
.bind(callQueueFanoutQueue)
.to(broadcastmsgFanoutDirectExchange);
/**
* 取当前系统站点本地地址 linux下 和 window下可用
*
* @return
*/
public static String getLocalIP()
String ipStr = "";
InetAddress ip = null;
try
// 如果是Windows操作系统
if (isWindowsOS())
ip = InetAddress.getLocalHost();
else
boolean findIp = false;
Enumeration<NetworkInterface> netInterfaces = NetworkInterface
.getNetworkInterfaces();
while(netInterfaces.hasMoreElements())
if (findIp)
break;
NetworkInterface ni = netInterfaces.nextElement();
// 跳过docker网卡
if (ni.getName().contains("docker"))
continue;
// 遍历所有ip
Enumeration<InetAddress> ips = ni.getInetAddresses();
while(ips.hasMoreElements())
ip = (InetAddress) ips.nextElement();
// 127.开头的都是lookback地址
if (ip.isSiteLocalAddress() && !ip.isLoopbackAddress()
&& !ip.getHostAddress().contains(":"))
findIp = true;
break;
catch (Exception e)
e.printStackTrace();
if (null != ip)
ipStr = ip.getHostAddress();
return ipStr;
/**
* 判断当前系统是否windows
*
* @return boolean
*/
public static boolean isWindowsOS()
boolean isWindowsOS = false;
String osName = System.getProperty("os.name");
if (osName.toLowerCase().contains("windows"))
isWindowsOS = true;
return isWindowsOS;
上面主要是通过动态获取当前服务器的ip以及应用启动的端口来作为队列名字,但是绑定交换机是同一个,由于交换机使用的是广播交换机,此模式下会将消息发送到所有队列。
当应用启动后可自动创建以 当前应用的的ip + 端口 为名称的队列
生产者
/**
* @author peng
* @program
* @description 生产者设置
* @create 2022/03/30 22:08
**/
@Component
@Slf4j
public class BroadcastmsgProducer
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送推送websocket消息
*
* @param param
*/
public void sendMsg(BroadcastSendDto param)
rabbitTemplate.convertAndSend(BroadcastMqConfig.BROADCAST_EXCHANGE,
"",
param);
以上为生产者代码,因为是广播模式,所以路由key为空也可以广播到所有队列。
消费者
/**
* @author peng
* @program
* @description 消费者
* @create 2022/03/30 22:08
**/
@Component
@Slf4j
public class BroadcastmsgReceiver
@Autowired
private BoadcastWebSocketServer webSocketServer;
@RabbitListener(queues = "#T(com.server.mq.BroadcastMqConfig).getBroadcastmsgQueueName()$server.port")
public void pushMessageToUser(Channel channel, BroadcastSendDto param, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
try
log.info("接收推送至具体用户websocket mq :", param);
// 发送websocket消息
webSocketServer.sendMessageToUser(param);
catch (Exception e)
log.error("获取推送至具体用户websocket 异常 ...", e);
finally
try
channel.basicAck(tag, false);
catch (IOException e)
log.error("---------消息确认异常---------", e);
以上为消费者设置,消费者使用动态队列名称监听,“#T(com.server.mq.BroadcastMqConfig).getBroadcastmsgQueueName()*” 是表示使用代码块获取返回字符串,“$server.port”是动态获取配置文件参数值,获取端口。
当应用启动后可监听以 当前应用的的ip + 端口 为名称的队列消息
调用入口
/**
* 发送消息至某个用户
* @param sendDto
* @return
* @throws IOException
*/
@PostMapping("/sendMsg")
public R sendMsg(@RequestBody @Validated BroadcastSendDto sendDto)
log.info("发送消息:", JSON.toJSONString(sendDto));
broadcastmsgProducer.sendMsg(sendDto);
return R.ok();
以上为通过接口调用的入口,我们此部分由于是微服务的存在,所以对外提供接口调用,便在接口处修改为发送mq消息。
如若处于设计阶段且是单机应用,可考虑直接需发送消息代码处直接发送mq消息。
以上就是我使用的处理集群的方式,这种属于比较简单粗暴的,看到网上有通过记录所有连接与哪一台服务端连接的信息,然后再通过redis等中间来获取精准通知到具体某一台服务端来推送消息。但是由于时间有限,就没有仔细研究了。
以上是关于使用rabbitmq广播模式来处理集群下的websocket消息推送的主要内容,如果未能解决你的问题,请参考以下文章
aliyun-ons (node版)之 集群模式下模拟广播消费
Windows RabbitMQ 镜像队列 (高可用性一台宕机自动切换另一台) 使用 RabbitMQ 自带的Web 管理工具