SpringBoot整合RabbitMQ

Posted 海之浪子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMQ相关的知识,希望对你有一定的参考价值。

  • SpringBoot整合RabbitMQ

    公司最近在开发CRM系统的时候,需要将ERP的订单数据实时的传输到CRM系统中,但是由于每天的订单量特别大,采用实时获取后并存储到数据库中,接口的相应速度较慢,性能较差。经过经过多方位评估采用在数据库与接口层添加RabbitMQ作为缓冲层来实现。

    具体为:

    1、ESB将订单数据实时推送至CRM Controller接口
    2、CRM Controller调用RabbitMQ的生产者将数据推送至RabbitMQ服务器
    3、消费者实时消费RabbitMQ的订单并进行处理并存储至数据库中

    这里只是实现了RabbitMQ的生产者和消费者,采用的RabbitMQ Direct模式,代码如下:

    @SpringBootApplication
    @EnableScheduling
    public class RabbitMQApplication {
    
        public static void main(String[] args) {
    
            SpringApplication.run(RabbitMQApplication.class, args);
    
        }
    
    }
    @Configuration
    public class ProducerConfig {
    
        private static final Logger logger = LoggerFactory.getLogger(ProducerConfig.class);
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            connectionFactory.setPublisherConfirms(true); //Use full publisher confirms, with correlation data and a callback for each message.
            connectionFactory.setPublisherReturns(true);
            connectionFactory.setConnectionTimeout(6000); //超时时间
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //采用默认模式创建模板
            rabbitTemplate.setMandatory(true); //强制
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                }
            });
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    //TODO 数据没有投递成功,可以在这里进行一些操作,比如讲消息保存到数据库中,等待下次进行投递
                    logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
                }
            });
            return rabbitTemplate;
        }
    
    }
    @Component
    public class ProducerProcessing {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Scheduled(fixedDelay = 1000, initialDelay = 500)
        public void directSend() {
    
            rabbitTemplate.convertAndSend(ReciverProcessing.DIRECT_EXCHANGE, ReciverProcessing.DIRECT_ROUTINGKEY, "订单信息");
            System.out.println("消息投递完成");
        }
    
    
       // @Scheduled(fixedDelay = 1000, initialDelay = 500)
        public void fanoutSend() {
    
            rabbitTemplate.convertAndSend(ReciverProcessing.FANOUT_EXCHANGE, "", "订单信息");
            System.out.println("消息投递完成");
        }
    
    
    
    }
    @Configuration
    public class ReciverProcessing {
    
        //Direct模式
        public static final String DIRECT_EXCHANGE = "direct_exchange";
        public static final String DIRECT_ROUTINGKEY = "direct_rountingkey";
        public static final String DIRECT_QUEUE = "direct_queue";
    
        @Autowired
        private ConnectionFactory connectionFactory;
    
        //发布订阅模式
        public static final String FANOUT_EXCHANGE = "fanout_exchange";
        public static final String FANOUT_QUEUE1 = "fanout_queue1";
        public static final String FANOUT_QUEUE2 = "fanout_queue2";
    
    
        /**
         * 实例化 DirectExchange
         *
         * @return
         */
        @Bean
        public DirectExchange directExchange() {
            /**
             * Exchange名称
             * Exchange持久化
             * Exchange 是否自动删除
             */
            return new DirectExchange(DIRECT_EXCHANGE, true, false);
        }
    
        /**
         * 创建广播模式交换机
         *
         * @return
         */
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUT_EXCHANGE, true, false);
        }
    
    
        @Bean
        public Queue directQueue() {
            /**
             * 队列名称
             * 队列是否持久化
             */
            return new Queue(DIRECT_QUEUE, true); //队列持久
    
        }
    
    
    
    
        @Bean
        public Binding directBinding() {
            /**
             * 交换器与队列绑定
             */
            return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
        }
    
    
    
    
        @Bean
        public SimpleMessageListenerContainer DirectMessageContainer() {
    
            //创建队列监听容器
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            //监听的队列,可多个
            simpleMessageListenerContainer.setQueues(directQueue());
            simpleMessageListenerContainer.setExposeListenerChannel(true);
            //最大消费者数量
            simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
            //设置并发数
            simpleMessageListenerContainer.setConcurrentConsumers(3);
            //一次拉取消息的数量
            simpleMessageListenerContainer.setPrefetchCount(1);
            //确认模式
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
    
    
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    String body = new String(message.getBody());
                    //TODO 这里开始处理消息,处理消息成功以后可以发送确认消息成功
                    System.out.println("当前线程 " + Thread.currentThread().getName() + " 收到的消息为: " + body);
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
                    //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
                }
            });
            return simpleMessageListenerContainer;
    
    
        }
    
        /**
         * 队列1
         *
         * @return
         */
        @Bean
        public Queue fanoutQueue1() {
            return new Queue(FANOUT_QUEUE1, true);
        }
    
        /**
         * 队列2
         *
         * @return
         */
        @Bean
        public Queue fanoutQueue2() {
            return new Queue(FANOUT_QUEUE2, true);
        }
    
        /**
         * 队列1绑定
         *
         * @return
         */
        @Bean
        public Binding fanoutBinding1() {
            return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
        }
    
        /**
         * 队列2绑定
         *
         * @return
         */
        @Bean
        public Binding fanoutBinding2() {
            return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
        }
        /**
         * 监听队列进行消费,这里相当于一个消费者监听了两个队列,在实际开发应用中,发布订阅模式监听两个队列,收到的消息是一样的,所以这样做是没有什么意义的
         * 可以设置两个消费者,每个消费者只监听其中的一个队列,收到消息后根据需要来进行不同的处理
         *
         * @return
         */
    
        @Bean
        public SimpleMessageListenerContainer fanoutMessageContainer() {
    
            //创建队列监听容器
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            //监听的队列,可多个
            simpleMessageListenerContainer.setQueues(fanoutQueue1(), fanoutQueue2());
            simpleMessageListenerContainer.setExposeListenerChannel(true);
            //最大消费者数量
            simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
            //设置并发数
            simpleMessageListenerContainer.setConcurrentConsumers(3);
            //一次拉取消息的数量
            simpleMessageListenerContainer.setPrefetchCount(1);
            //确认模式
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
    
    
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    String body = new String(message.getBody());
                    //TODO 这里开始处理消息,处理消息成功以后可以发送确认消息成功
                    System.out.println("当前线程 " + Thread.currentThread().getName() + " 收到的消息为: " + body);
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
                    //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
                }
            });
            return simpleMessageListenerContainer;
    
    
        }
    
    }

    相关代码链接: https://github.com/albert-liu435/springmq

以上是关于SpringBoot整合RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

springboot rabbitmq整合

Springboot整合Rabbitmq(史上最详细)

SpringBoot整合RabbitMQ实现死信队列

RabbitMQ基础组件和SpringBoot整合RabbitMQ简单示例

SpringBoot整合RabbitMQ,实现消息发送和消费

SpringBoot系列5SpringBoot整合RabbitMQ