SpringBoot 搭建 Rabbitmq

Posted 技术博客这里开始

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot 搭建 Rabbitmq相关的知识,希望对你有一定的参考价值。

本文的Spring boot版本是2.1.7.RELEASE。

pom.xml

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

 

application.yml


spring:
  rabbitmq:
    virtual-host: / 
    host: localhost 
    port: 5672
    username: guest
    password: 123456    
    listener:
      simple:
        concurrency: 10
        max-concurrency: 20
        prefetch: 5

mq:
  env: test

cart:
  place:
    order:
      queue: ${mq.env}.cart.place.order.queue
      exchange: ${mq.env}.cart.place.order.exchange
      routing:
        key: ${mq.env}.cart.place.order.routing.key

Queue.config

@Configuration
public class QueueConfig {

    @Autowired
    private Environment environment;

    /**
     * cart place order apply
     */

    /**
     * durable = "true" persistent Rabbitmq does not need to create a new queue when restarting
     */
    @Bean
    public Queue placeOrderQueue() {
        return new Queue(environment.getProperty("cart.place.order.queue"), true);
    }

    /**
     *  durable = "true" Rabbitmq does not need to create a new exchange when restarting
     *
     *  1.Direct exchange: relatively simple, the matching rule is: if the routing keys match, the message is delivered to the relevant queue
     *  2.Fanout exchange: There is no concept of routing keys. He sends messages to all queues bound to this exchange.。
     *  3.Topic exchange: uses the principle of fuzzy matching routing keys to forward messages to the queue
     *
     *  Key: The key value of the queue in the direct-exchange. When the message is sent to the specified key in the direct-exchange, the
     *  message will be forwarded to the message queue specified by the queue parameter.
     */
    @Bean
    public DirectExchange placeOrderExchange() {
        return new DirectExchange(environment.getProperty("cart.place.order.exchange"), true, false);
    }

    @Bean
    public Binding placeOrderBinding() {
        return BindingBuilder.bind(placeOrderQueue()).to(placeOrderExchange()).with(environment.getProperty("cart.place.order.routing.key"));
    }

}

 

Rabbitmq.config

@Configuration
public class RabbitmqConfig {
    private static final Logger logger = LogManager.getLogger(RabbitmqConfig.class);

    @Autowired
    private Environment environment;

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    @Autowired
    private ObjectMapper objectMapper;

    /**
     * singleton can‘t set multi times callback
     * @return
     */
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
       cachingConnectionFactory.setPublisherConfirms(true);
       cachingConnectionFactory.setPublisherReturns(true);

       RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
       rabbitTemplate.setMandatory(true);
       rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

       rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
           if (ack) {
               logger.info("message send succeed:correlationData({}),ack({}),cause({})",correlationData, ack, cause);
           } else {
               logger.info("message send failed:correlationData({}),ack({}),cause({})",correlationData, ack, cause);
           }
       });


       rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
           logger.info("message lose:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
       });

       return rabbitTemplate;

    }

    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory singleListenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return factory;
    }

    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory, cachingConnectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.concurrency", int.class));
        factory.setMaxConcurrentConsumers(environment.getProperty("spring.rabbitmq.listener.simple.max-concurrency", int.class));
        factory.setPrefetchCount(environment.getProperty("spring.rabbitmq.listener.simple.prefetch", int.class));

        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        return factory;
    }

    /**
     * Message persistent: Set the deliveryMode of the message to 2 and the consumer can continue to consume
     * the messages after persistence after restarting;
     * Use convertAndSend to send a message. The message is persistent by default. The following is the source code:
     * new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2;
     *
     * @param exchangeName
     * @param routingKeyName
     * @param content
     * @param flag
     * @param messageId
     * @param <T>
     */
    public <T> void sendMessage(String exchangeName, String routingKeyName, T content, boolean flag, String messageId) {
        logger.info("message send :messageId({}), exchangeName({}), routingKeyName({}), content({}), flag({})",
                messageId, exchangeName, routingKeyName, content);

        CorrelationData correlationData = new CorrelationData();

        try {
            if (flag) {
                MessageProperties properties = new MessageProperties();
                properties.setCorrelationId(messageId);

                correlationData.setId(messageId);

                this.rabbitTemplate().convertAndSend(exchangeName, routingKeyName,
                        MessageBuilder.withBody(objectMapper.writeValueAsBytes(content)).andProperties(properties).build(), correlationData);
            } else {
                this.rabbitTemplate().convertAndSend(exchangeName, routingKeyName, content, correlationData);
            }
        } catch (Exception e) {
            logger.error("error message :e.getMessage({})", e.getMessage());
        }

    }


}

 

生产者只需要你调取RabbitmqConfig中的sendMessage()方法,其中如果你的消费者需要correlationDataId这个参数,那么就将flag设置成true,messageId设置成correlationDataId,就可以在消费端获取了。

String messageId = UUID.randomUUID().toString();
rabbitmqConfig.sendMessage(environment.getProperty("cart.create.order.result.exchange"), environment.getProperty("cart.create.order.result.routing.key"), response, flag, messageId);

 

消费者

    @Autowired
    private RabbitmqConfig rabbitmqConfig;

    @Autowired
    private ObjectMapper objectMapper;

    @RabbitListener(queues = "${cart.place.order.queue}", containerFactory = "multiListenerContainer")
    public void placeOrder(Message message, Channel channel) {

        try {
            logger.info("place order : message ({})" + objectMapper.readValue(message.getBody(), RequestInventoryItems.class));
            logger.info("place order : message ({})" + message.getMessageProperties().getCorrelationId());
//            logger.info("place order : message ({})" + );
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception es) {
            logger.error("error message :es.getMessage({})", es.getMessage());
            try {
                //The third parameter is whether to return to the queue.Pay attention to the endless loop.
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (Exception e) {
                logger.error("error message :e.getMessage({})", e.getMessage());
            }

        }

    }

 

以上就是集成Rabbitmq的全部步骤了。

 

 

 

 

 

 

 

以上是关于SpringBoot 搭建 Rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合RabbitMQ消息队列基本环境搭建

springboot+elasticsearch + rabbitMQ实现全文检索(项目搭建)

RabbitMQ——高级特性(SpringBoot实现)

spring cloudgradle父子项目微服务框架搭建---rabbitMQ延时队列

SpringBoot实战系列RabbitMQ实现消息发送并实现邮箱发送异常监控报警实战

SpringBoot整合RabbitMQ(2021.05.29)