SpringBoot连接多RabbitMQ源

Posted PacosonSWJTU

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot连接多RabbitMQ源相关的知识,希望对你有一定的参考价值。

转自:

SpringBoot连接多RabbitMQ源 - 掘金在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就…https://juejin.cn/post/6844904039797243917


在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就不太适用了,需要单独编写每个连接。

在SpringBoot框架中,我们常用的两个类一般是:

  • RabbitTemplate:作为生产、消费消息使用;
  • RabbitAdmin:作为申明、删除交换机和队列,绑定和解绑队列和交换机的绑定关系使用。

所以我们连接多个RabbitMQ就需要重新建立连接、重新实现这两个类。 代码如下:

配置

application.properties配置文件需要配置两个连接:


 

server.port=8080

# rabbitmq
v2.spring.rabbitmq.host=host
v2.spring.rabbitmq.port=5672
v2.spring.rabbitmq.username=username
v2.spring.rabbitmq.password=password
v2.spring.rabbitmq.virtual-host=virtual-host
#consume 手动 ack
v2.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.当mandatory标志位设置为true时,
#   如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
#   那么broker会调用basic.return方法将消息返还给生产者;
#2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
#   mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
#   否则就将消息return给发送者;
v2.spring.rabbitmq.template.mandatory=true
#publisher confirms 发送确认
v2.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送达exchange
#   2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
v2.spring.rabbitmq.publisher-returns=true
v2.spring.rabbitmq.listener.simple.prefetch=5

# rabbitmq
v1.spring.rabbitmq.host=host
v1.spring.rabbitmq.port=5672
v1.spring.rabbitmq.username=username
v1.spring.rabbitmq.password=password
v1.spring.rabbitmq.virtual-host=virtual-host
#consume 手动 ack
v1.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.当mandatory标志位设置为true时,
#   如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
#   那么broker会调用basic.return方法将消息返还给生产者;
#2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
#   mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
#   否则就将消息return给发送者;
v1.spring.rabbitmq.template.mandatory=true
#publisher confirms 发送确认
v1.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送达exchange
#   2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
v1.spring.rabbitmq.publisher-returns=true
v1.spring.rabbitmq.listener.simple.prefetch=5

重写连接工厂

需要注意的是,在多源的情况下,需要在某个连接加上@Primary注解,表示主连接,默认使用这个连接;

package com.example.config.rabbitmq;

import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * Created by shuai on 2019/4/23.
 */
@Configuration
public class MultipleRabbitMQConfig 

    @Bean(name = "v2ConnectionFactory")
    public CachingConnectionFactory hospSyncConnectionFactory(
            @Value("$v2.spring.rabbitmq.host") String host,
            @Value("$v2.spring.rabbitmq.port") int port,
            @Value("$v2.spring.rabbitmq.username") String username,
            @Value("$v2.spring.rabbitmq.password") String password,
            @Value("$v2.spring.rabbitmq.virtual-host") String virtualHost,
            @Value("$v2.spring.rabbitmq.publisher-confirms") Boolean publisherConfirms,
            @Value("$v2.spring.rabbitmq.publisher-returns") Boolean publisherReturns) 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    

    @Bean(name = "v2RabbitTemplate")
    public RabbitTemplate firstRabbitTemplate(
            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("$v2.spring.rabbitmq.template.mandatory") Boolean mandatory) 
        RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);
        v2RabbitTemplate.setMandatory(mandatory);
        v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> 
            if (!ack) 
//                    LOGGER.info(" 发送RabbitMQ消息 ack确认 失败: []", this.name, JSON.toJSONString(object));
             else 
//                    LOGGER.info(" 发送RabbitMQ消息 ack确认 成功: []", this.name, JSON.toJSONString(object));
            
        );
        v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> 
//                LOGGER.error(" 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[], code[], s[], exchange[], routingKey[]", new Object[]this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey));
        );
        return v2RabbitTemplate;
    

    @Bean(name = "v2ContainerFactory")
    public SimpleRabbitListenerContainerFactory hospSyncFactory(
            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("$v2.spring.rabbitmq.listener.simple.acknowledge-mode") String acknowledge,
            @Value("$v2.spring.rabbitmq.listener.simple.prefetch") Integer prefetch
    ) 
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
        factory.setPrefetchCount(prefetch);
        return factory;
    

    @Bean(name = "v2RabbitAdmin")
    public RabbitAdmin iqianzhanRabbitAdmin(
            @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) 
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    


    // mq主连接
    @Bean(name = "v1ConnectionFactory")
    @Primary
    public CachingConnectionFactory publicConnectionFactory(
            @Value("$v1.spring.rabbitmq.host") String host,
            @Value("$v1.spring.rabbitmq.port") int port,
            @Value("$v1.spring.rabbitmq.username") String username,
            @Value("$v1.spring.rabbitmq.password") String password,
            @Value("$v1.spring.rabbitmq.virtual-host") String virtualHost,
            @Value("$v1.spring.rabbitmq.publisher-confirms") Boolean publisherConfirms,
            @Value("$v1.spring.rabbitmq.publisher-returns") Boolean publisherReturns) 
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    

    @Bean(name = "v1RabbitTemplate")
    @Primary
    public RabbitTemplate publicRabbitTemplate(
            @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("$v1.spring.rabbitmq.template.mandatory") Boolean mandatory) 
        RabbitTemplate v1RabbitTemplate = new RabbitTemplate(connectionFactory);
        v1RabbitTemplate.setMandatory(mandatory);
        v1RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> 
            if (!ack) 
//                    LOGGER.info(" 发送RabbitMQ消息 ack确认 失败: []", this.name, JSON.toJSONString(object));
             else 
//                    LOGGER.info(" 发送RabbitMQ消息 ack确认 成功: []", this.name, JSON.toJSONString(object));
            
        );
        v1RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> 
//                LOGGER.error(" 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[], code[], s[], exchange[], routingKey[]", new Object[]this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey));
        );
        return v1RabbitTemplate;
    

    @Bean(name = "v1ContainerFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
            @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,
            @Value("$v1.spring.rabbitmq.listener.simple.acknowledge-mode") String acknowledge,
            @Value("$v1.spring.rabbitmq.listener.simple.prefetch") Integer prefetch) 
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
        factory.setPrefetchCount(prefetch);
        return factory;
    

    @Bean(name = "v1RabbitAdmin")
    @Primary
    public RabbitAdmin publicRabbitAdmin(
            @Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory) 
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    

 

创建Exchange、Queue并绑定

再实现RabbitAdmin后,我们就需要根据RabbitAdmin创建对应的交换机和队列,并建立绑定关系

package com.example.config.rabbitmq;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

/**
 * 创建Queue、Exchange并建立绑定关系
 * Created by shuai on 2019/5/16.
 */
@Configuration
public class MyRabbitMQCreateConfig 

    @Resource(name = "v2RabbitAdmin")
    private RabbitAdmin v2RabbitAdmin;

    @Resource(name = "v1RabbitAdmin")
    private RabbitAdmin v1RabbitAdmin;

    @PostConstruct
    public void RabbitInit() 
        v2RabbitAdmin.declareExchange(new TopicExchange("exchange.topic.example.new", true, false));
        v2RabbitAdmin.declareQueue(new Queue("queue.example.topic.new", true));
        v2RabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("queue.example.topic.new", true))        //直接创建队列
                        .to(new TopicExchange("exchange.topic.example.new", true, false))    //直接创建交换机 建立关联关系
                        .with("routing.key.example.new"));    //指定路由Key
    

生产者

为了后续验证每个连接都建立成功,并且都能生产消息,生产者这里分别使用新生成的RabbitTemplate发送一条消息。

package com.example.topic;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class TopicProducer 

    @Resource(name = "v1RabbitTemplate")
    private RabbitTemplate v1RabbitTemplate;

    @Resource(name = "v2RabbitTemplate")
    private RabbitTemplate v2RabbitTemplate;

    public void sendMessageByTopic() 
        String content1 = "This is a topic type of the RabbitMQ message example from v1RabbitTemplate";
        v1RabbitTemplate.convertAndSend(
                "exchange.topic.example.new",
                "routing.key.example.new",
                content1);

        String content2 = "This is a topic type of the RabbitMQ message example from v2RabbitTemplate";
        v2RabbitTemplate.convertAndSend(
                "exchange.topic.example.new",
                "routing.key.example.new",
                content2);
    

消费者

这里需要注意在配置消费队列时,需要标识ContainerFactory

package com.example.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "queue.example.topic.new", containerFactory = "v2ContainerFactory")
public class TopicConsumer 

    @RabbitHandler
    public void consumer(String message) 
        System.out.println(message);
    

 这样就完成了SpringBoot连接多个RabbitMQ源的示例了,再写一段测试代码验证下。

测试验证

package com.example.test;

import com.example.topic.TopicProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQMultipleTest 

    @Autowired
    private TopicProducer topicProducer;


    @Test
    public void topicProducerTest() 
        topicProducer.sendMessageByTopic();
    

 

执行测试代码,验证结果为:

 

验证SpringBoot连接多RabbitMQ源成功!

github地址:Spring Boot 教程、技术栈、示例代码 

 

以上是关于SpringBoot连接多RabbitMQ源的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot b2b2c 多用户商城系统 (十五)Springboot整合RabbitMQ

Dockerizing springboot应用程序(在容器中)未连接到rabbitmq(在容器中):连接被拒绝

SpringBoot整合RabbitMQ重试机制及配置

springboot2.5.6集成RabbitMq,实现Topic主题模式

SpringBoot整合RabbitMq广播模式动态队列名称

SpringBoot整合RabbitMQ入门~~