SpringBoot整合RabbitMq广播模式动态队列名称

Posted 铨✌�

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMq广播模式动态队列名称相关的知识,希望对你有一定的参考价值。

优雅的实现多实例广播

  • 在业务场景中我们需要广播通知一个服务的所有实例,如多实例内存缓存数据准实时同步等
  • RabbitMq有Exchange的概念,一个Exchange可以绑定多个队列,它的广播模式是依靠广播交换机FanoutExchange实现的,投递消息时我们将消息投递给FanoutExchange, FanoutExchange 再将消息发送给每一个与之绑定的队列中,也就是说我们在实际场景中同一个服务的多个实例需要用不同的队列名绑定到同一个FanoutExchange上,从而实现消息广播。
  • 那么问题来了,在使用@RabbitListener注解时,一个服务多个实例如何使用不同的队列名呢?
  • @RabbitListener支持使用配置,可以从配置中获取队列名,但是这又需要我们在启动不同实例的时候修改不同的配置来启动,有没有更好的方式解决这个问题呢?有,本文将介绍一个优雅的方式来解决这个问题。

版本说明

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
	<version>1.7.3.RELEASE</version>
</dependency>

项目配置

spring: 
  rabbitmq:
  	# mq ip或host
    host: 127.0.0.1
    # mq 端口,默认5672
    port: 5672

配置类编写

  • 定义一个QueueExchangeBindConfig 类用来初始化队列、交换机以及绑定队列和交换机。
  • 声明队列时在队列名称末尾拼上当前机器IP地址,以保证每个实例的队列名称不重复(队列名称重复的话仅会有一台实例消费)。
  • 当前机器IP获取用的InetUtils,一般你有用到注册中心整合都会在spring容器中有这个类,如果项目里没有的话可以参考spring-cloud-commons包里的实现自己写一个(注意排除回环ip),或者引入这个包。
  • 队列名称以#dynamic#开头是为了使用@RabbitListener监听的时候拦截做处理(监听的时候也需要将当前机器IP地址拼上),我们后面还需要一个后置处理器处理监听队列名。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.commons.util.InetUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class QueueExchangeBindConfig 
    public static final String DYNAMIC_HOST_NAME_EXCHANGE = "dynamic_host_name_exchange";//交换器
    public static final String DYNAMIC_HOST_NAME_ROUTING = "dynamic_host_name_routing";//路由
    public static final String DYNAMIC_HOST_NAME_QUEUE = "#dynamic#_dynamic_host_name_queue_";// 消息队列名

    @Autowired
    InetUtils inetUtils;

    @Bean("dynamicHostNameQueue")
    public Queue DynamicHostNameQueue() 
        InetUtils.HostInfo firstNonLoopbackHostInfo = inetUtils.findFirstNonLoopbackHostInfo();
        String ipAddress = firstNonLoopbackHostInfo.getIpAddress();
        String queueName = DYNAMIC_HOST_NAME_QUEUE.concat(ipAddress);
        Queue queue = new Queue(queueName, true);
        log.info("DynamicHostNameQueue init success queueName:", queueName);
        return queue;
    

    @Bean("dynamicHostNameFanoutExchange")
    public FanoutExchange DynamicHostNameFanoutExchange() 
        FanoutExchange fanoutExchange = new FanoutExchange(DYNAMIC_HOST_NAME_ROUTING );
        log.info("DynamicHostNameFanoutExchange init success:", fanoutExchange.getName());
        return fanoutExchange;
    

    @Bean
    public Binding DynamicHostNameBinding(@Qualifier("dynamicHostNameQueue") Queue queue,
                                           @Qualifier("dynamicHostNameFanoutExchange") FanoutExchange fanoutExchange) 
        Binding binding = BindingBuilder.bind(queue).to(fanoutExchange);
        log.info("DynamicHostNameBinding bing  to  success.", queue.getName(), fanoutExchange.getName());
        return binding;
    


队列监听类编写(消费者)

  • 注意,@RabbitListener注解中填的队列名称跟上面配置类的名称一致,以#dynamic#开头, 后面我们会解析这个开头标记拼接当前机器IP

  • 这里我们用的是自动ack,请根据实际场景需要切换手动模式

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = QueueExchangeBindConfig.DYNAMIC_HOST_NAME_QUEUE)
@Slf4j
public class DynamicHostNameQueueListener 
    @RabbitHandler
    public void process(String msg) 
        log.info("[DynamicHostNameQueueListener] receive sync msg:", msg);
        if (StringUtils.isRealEmpty(msg)) 
            return;
        
        try 
            // process the msg
         catch (Exception e) 
            log.error("[DynamicHostNameQueueListener] process failed, msg:", msg ,e);
        
    


后置处理监听队列名称

  • 使用BeanPostProcessor拦截所有使用了@RabbitListener的bean,反射获取队列名,当队列名以#dynamic#开头时,拼入当前机器IP
  • 注意,这个类的order为 2147483646,是因为spring整合rabbitMq时,注册监听队列用的后置处理类RabbitListenerAnnotationBeanPostProcessor 的order为2147483647,我们必须在注册之前将队列名称修改

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListeners;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cloud.commons.util.InetUtils;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.*;

@Component
@Slf4j
public class RabbitListenerDynamicQueueBeanPostProcessor implements BeanPostProcessor, Ordered, BeanFactoryAware 

    private BeanFactory beanFactory;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException 
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(targetClass);
        if (!CollectionUtils.isEmpty(listenerAnnotations)) 
            for (RabbitListener rabbitListener : listenerAnnotations) 
                boolean isDynamic = false;
                String[] queues = rabbitListener.queues();
                if (queues.length > 0 ) 
                    for(int i = 0; i < queues.length; ++i) 
                        String queue = queues[i];
                        if (queue.startsWith("#dynamic#")) 
                            queues[i] = resolveDynamicQueueName(queues[i]);
                            isDynamic = true;
                        

                    
                

                if (isDynamic) 
                    try 
                        InvocationHandler invocationHandler = Proxy.getInvocationHandler(rabbitListener);
                        Field memberValues = invocationHandler.getClass().getDeclaredField("memberValues");
                        memberValues.setAccessible(true);
                        HashMap memberValuesValue = (HashMap)memberValues.get(invocationHandler);
                        memberValuesValue.put("queues", queues);
                     catch (Exception e) 
                        log.error("RabbitListenerDynamicQueueBeanPostProcessor can't process dynamic queue.", e);
                    
                
            

        
        return bean;
    

    private String resolveDynamicQueueName(String queue) 
        InetUtils inetUtils = this.beanFactory.getBean(InetUtils.class);
        InetUtils.HostInfo firstNonLoopbackHostInfo = inetUtils.findFirstNonLoopbackHostInfo();
        String ipAddress = firstNonLoopbackHostInfo.getIpAddress();
        queue = queue.concat(ipAddress);
        log.info("RabbitListenerDynamicQueueBeanPostProcessor modify queue:", queue);
        return queue;
    

    private Collection<RabbitListener> findListenerAnnotations(Class<?> clazz) 
        Set<RabbitListener> listeners = new HashSet<>();
        RabbitListener ann = AnnotationUtils.findAnnotation(clazz, RabbitListener.class);
        if (ann != null) 
            listeners.add(ann);
        

        RabbitListeners anns = AnnotationUtils.findAnnotation(clazz, RabbitListeners.class);
        if (anns != null) 
            Collections.addAll(listeners, anns.value());
        

        return listeners;
    


    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException 
        return bean;
    

    @Override
    public int getOrder() 
        return 2147483646;
    

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException 
        this.beanFactory = beanFactory;
    


编写消息生产者类

生产者发送消息时只需要指定广播交换机名称即可

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MsgFanoutProvider 
    
	@Autowired
    private RabbitTemplate rabbitTemplate;
	
	public void fanoutMsg() 
		String msg = "hello world.";
		try 
			rabbitTemplate.convertAndSend(QueueExchangeBindConfig.DYNAMIC_HOST_NAME_EXCHANGE,
			 QueueExchangeBindConfig.DYNAMIC_HOST_NAME_ROUTING, msg);
         catch (Exception e) 
            log.error("MsgFanoutProvider failed to publish fanout msg:", msg, e);
        
	

至此,一个基于rabbitMq优雅的多实例广播方案就实现啦。

以上是关于SpringBoot整合RabbitMq广播模式动态队列名称的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

万字长文图文详解Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,TTL)

SpringBoot整合RabbitMQ -fanout模式

SpringBoot整合RabbitMQ -fanout模式

RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式)

RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式