使用rabbitmq广播模式来处理集群下的websocket消息推送

Posted 没有技术的小彭

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用rabbitmq广播模式来处理集群下的websocket消息推送相关的知识,希望对你有一定的参考价值。

websocket属于长连接,当客户端连接上服务端后,将保持于服务端的连接。
而当websocket服务端存在集群的情况,如果需要将某个消息发送到客户端时,通过接口调用发送,这种情况只能将消息发送到与这台服务端连接的客户端,会存在部分客户无法接收消息的情况。

后续通过搜集资料选择采用了用rabbitmq来做websocket的集群,即通过使用mq的广播交换机,然后结合服务端启动创建动态队列来绑定同一个交换机,这样便能使集群中的每一个服务端都能收到消息,然后再去往连在当前服务端的websocket推送消息,不在的则跳过不发。

交换机以及队列设置



/**
 * @author peng
 * @program 
 * @description 交换机以及队列设置
 * @create 2022/03/30 22:08
 **/
@Configuration
public class BroadcastMqConfig 


    private final static String BROADCAST_EXCHANGE = "broadcast_exchange";

    @Value("$server.port")
    private String serverPort;


    public static String getQueueName() 
        return "broadcast:" + getLocalIP() + ":";
    

    /**
     * websocket 交换机
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutDirectExchange() 
        return new FanoutExchange(BROADCAST_EXCHANGE);
    

    /**
     * websocket 队列
     *
     * @return
     */
    @Bean
    public Queue queueFanoutQueue() 
        return QueueBuilder.durable(BROADCAST_EXCHANGE + ":" + getLocalIP() + ":" + serverPort).build();
    


    /**
     * websocket 队列交换机绑定
     *
     * @param callQueueFanoutQueue
     * @param broadcastmsgFanoutDirectExchange
     * @return
     */
    @Bean
    public Binding broadcastFanoutBinding(Queue callQueueFanoutQueue, FanoutExchange broadcastmsgFanoutDirectExchange) 
        return BindingBuilder
                .bind(callQueueFanoutQueue)
                .to(broadcastmsgFanoutDirectExchange);
    


    /**
     * 取当前系统站点本地地址 linux下 和 window下可用
     *
     * @return
     */
    public static String getLocalIP() 
        String ipStr = "";
        InetAddress ip = null;
        try 
            // 如果是Windows操作系统
            if (isWindowsOS()) 
                ip = InetAddress.getLocalHost();
             else 
                boolean findIp = false;
                Enumeration<NetworkInterface> netInterfaces = NetworkInterface
                        .getNetworkInterfaces();
                while(netInterfaces.hasMoreElements()) 
                    if (findIp) 
                        break;
                    
                    NetworkInterface ni = netInterfaces.nextElement();
                    // 跳过docker网卡
                    if (ni.getName().contains("docker")) 
                        continue;
                    
                    // 遍历所有ip
                    Enumeration<InetAddress> ips = ni.getInetAddresses();
                    while(ips.hasMoreElements()) 
                        ip = (InetAddress) ips.nextElement();
                        // 127.开头的都是lookback地址
                        if (ip.isSiteLocalAddress() && !ip.isLoopbackAddress()
                                && !ip.getHostAddress().contains(":")) 
                            findIp = true;
                            break;
                        
                    
                
            
         catch (Exception e) 
            e.printStackTrace();
        
        if (null != ip) 
            ipStr = ip.getHostAddress();
        
        return ipStr;
    

    /**
     * 判断当前系统是否windows
     *
     * @return boolean
     */
    public static boolean isWindowsOS() 
        boolean isWindowsOS = false;
        String osName = System.getProperty("os.name");
        if (osName.toLowerCase().contains("windows")) 
            isWindowsOS = true;
        
        return isWindowsOS;
        


上面主要是通过动态获取当前服务器的ip以及应用启动的端口来作为队列名字,但是绑定交换机是同一个,由于交换机使用的是广播交换机,此模式下会将消息发送到所有队列。
当应用启动后可自动创建以 当前应用的的ip + 端口 为名称的队列

生产者

/**
 * @author peng
 * @program 
 * @description 生产者设置
 * @create 2022/03/30 22:08
 **/
@Component
@Slf4j
public class BroadcastmsgProducer 

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送推送websocket消息
     *
     * @param param
     */
    public void sendMsg(BroadcastSendDto param) 
        rabbitTemplate.convertAndSend(BroadcastMqConfig.BROADCAST_EXCHANGE,
                "",
                param);
    

以上为生产者代码,因为是广播模式,所以路由key为空也可以广播到所有队列。

消费者

/**
 * @author peng
 * @program 
 * @description 消费者
 * @create 2022/03/30 22:08
 **/
@Component
@Slf4j
public class BroadcastmsgReceiver 

    @Autowired
    private BoadcastWebSocketServer webSocketServer;

    @RabbitListener(queues = "#T(com.server.mq.BroadcastMqConfig).getBroadcastmsgQueueName()$server.port")
    public void pushMessageToUser(Channel channel, BroadcastSendDto param, @Header(AmqpHeaders.DELIVERY_TAG) long tag) 
        try 
            log.info("接收推送至具体用户websocket mq :", param);
            // 发送websocket消息
            webSocketServer.sendMessageToUser(param);
         catch (Exception e) 
            log.error("获取推送至具体用户websocket 异常 ...", e);
         finally 
            try 
                channel.basicAck(tag, false);
             catch (IOException e) 
                log.error("---------消息确认异常---------", e);
            
        
    

以上为消费者设置,消费者使用动态队列名称监听,“#T(com.server.mq.BroadcastMqConfig).getBroadcastmsgQueueName()*” 是表示使用代码块获取返回字符串,“$server.port”是动态获取配置文件参数值,获取端口。
当应用启动后可监听以 当前应用的的ip + 端口 为名称的队列消息

调用入口

	/**
     * 发送消息至某个用户
     * @param sendDto
     * @return
     * @throws IOException
     */
    @PostMapping("/sendMsg")
    public R sendMsg(@RequestBody @Validated BroadcastSendDto sendDto) 
        log.info("发送消息:", JSON.toJSONString(sendDto));
        broadcastmsgProducer.sendMsg(sendDto);
        return R.ok();
    

以上为通过接口调用的入口,我们此部分由于是微服务的存在,所以对外提供接口调用,便在接口处修改为发送mq消息。
如若处于设计阶段且是单机应用,可考虑直接需发送消息代码处直接发送mq消息。

以上就是我使用的处理集群的方式,这种属于比较简单粗暴的,看到网上有通过记录所有连接与哪一台服务端连接的信息,然后再通过redis等中间来获取精准通知到具体某一台服务端来推送消息。但是由于时间有限,就没有仔细研究了。

以上是关于使用rabbitmq广播模式来处理集群下的websocket消息推送的主要内容,如果未能解决你的问题,请参考以下文章

aliyun-ons (node版)之 集群模式下模拟广播消费

RabbitMQ:第四章:RabbitMQ集群搭建

RabbitMQ:第四章:RabbitMQ集群搭建

Windows RabbitMQ 镜像队列 (高可用性一台宕机自动切换另一台) 使用 RabbitMQ 自带的Web 管理工具

NetCore RabbitMQ 发布订阅模式,消息广播

redis模型:集群