RabbitMQ整合SpringBoot

Posted 永旗狍子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ整合SpringBoot相关的知识,希望对你有一定的参考价值。

 

RabbitMQ整合SpringBoot

一.RabbitMQ整合SpringBoot(手动ack)

1.创建SpringBoot工程

2.导入依赖

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

3.编写配置文件

application.yml

spring:
  rabbitmq:
    host: 192.168.247.128
    port: 5672
    username: root
    password: 123
    virtual-host: /root
    listener:
      simple:
        acknowledge-mode: manual  #手动ack
    publisher-confirm-type: simple #开启config机制
    publisher-returns: true #开启return机制

4.声明exchange,queue

@Configuration
public class RabbitMQConfig 

    //1. 创建exchange - topic
    @Bean   // 把方法的返回值放入到IOC容器中,方法的名字就是bean的ID
    public TopicExchange getTopicExchange()
        return new TopicExchange("boot-topic-exchange",true,false);
    
    //2. 创建queue
    @Bean
    public Queue getQueue()
        return new Queue("boot-queue",true,false,false,null);
    
    //3. 绑定在一起
    @Bean
    public Binding getBinding(TopicExchange topicExchange,Queue queue)
        return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
    

5.发布消息到RabbitMQ

在test测试方法中

@SpringBootTest
class SpringbootRabbitmqApplicationTests 


    @Autowired
    //    xxxxxxTemplate;
    //    JdbcTemplate; // Spring和JDBC的整合
    //    RedisTemplate; // SPring和Redis
    //    SqlSessionTemplate; // Sprign和MyBatis
    //    HibnerateTemplate;
    // Spring和RabbitMQ整合的bean,可以通过这个bean来操作MQ
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() 
        System.out.println(rabbitTemplate);
        for (int i = 0; i < 1; i++) 
            rabbitTemplate.convertAndSend("boot-topic-exchange", "text", "红色大狼狗" + i);
        
        System.out.println("消息发送完成。。。。。");
    

6.创建消费者监听消息

    // 手动ack
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

@Component
public class Consumer 

//    @RabbitListener(queues = "boot-queue")
//    public void getMessage(String message) 
//        System.out.println("接受到的消息:" + message);
//    

    @RabbitListener(queues = "boot-queue")  // 指定监听那个队列中的消息
    public void getMessage(String data, Channel channel, Message message) 
        // 因为在消费的过程中可能会出现异常,所以把所有的业务代码都要用try包起来

        try 
            // 1、消费这个消息
            System.out.println("监听者获取到的消息是:" + data);
            Thread.sleep(500);

//            int i = 10 / 0; // 模拟消费过程中出现了异常

            // 2.获取消息的表示
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try 
                // 3.把消息消费成功,告诉MQ这个消息我已经消费了
                // 第一个参数是应答消息的标识(10)
                // 第二个参数是否批量应答(如果开启的批量应答,会把小于10的消息全部应答)
                channel.basicAck(deliveryTag, false);
                System.out.println("消息【" + deliveryTag + "】应答MQ--》已完成");


             catch (IOException e) 
//                e.printStackTrace();
            
         catch (InterruptedException e) 
            System.out.println("消费者在消费的过程中出现了异常。。。");
            // 既然出现异常了,说明消息没有正常消费,应该通知MQ这个消息我无法消费,把这个消息转发给其他的消费者
            long deliveryTag = message.getMessageProperties().getDeliveryTag();

            try 
                // 应答MQ,这个消息我消费不了
                // 第一个参数是应答消息的标识(10)
                // 第二个参数是否批量应答(如果开启的批量应答,会把小于10的消息全部应答)
                // 第三个参数:该消息是否重入队列,压倒队列的头部
                //          false:该消息就会丢失
                channel.basicNack(deliveryTag, false, true);
                System.out.println("消息【" + deliveryTag + "】应答MQ--》未完成");
             catch (IOException ex) 
                ex.printStackTrace();
            
        

    

解决:消息是否到达交换机?消息是否到达队列?

  • 采用Confirm保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
  • exchange是不能持久化消息的,queue是可以持久化消息。
  • 采用Return机制来监听消息是否从exchange送到了指定的queue中。

7.开启Confirm和Return机制

@Component
public class RabbitMQConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback 

    @Autowired
   private RabbitTemplate rabbitTemplate;

    // init-mtehod<bean init-method>
    @PostConstruct // 在对象实例化完成以后调用
    public void init()
        // 消息发送出去后就会调用这个类中的confirm();
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    

    // confirm是确认消息是否到达交换机
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String context) 
        if (ack) 
            System.out.println("消息已经发送到了交换机上:" + context);
        else 
            System.out.println("消息没有发送到了交换机上:" + context);
        
    

    // 消息没有发送到队列时调用
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) 
        byte[] body = returnedMessage.getMessage().getBody();
        System.out.println("消息没有发送到队列"+returnedMessage.getRoutingKey()+"-->"+new String(body));
    

解决:消费者在消费消息时出现闪断的问题?

如下图所示:

为了解决这个问题我们需要借助Redis,下一篇文章解决。

以上是关于RabbitMQ整合SpringBoot的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot 整合RabbitMq 自定义消息监听容器来实现消息批量处理

springboot整合消息队列——RabbitMQ

Springboot整合RabbitMQ(三)——Topic主题交换机

SpringBoot 整合RabbitMQ

SpringBoot整合RabbitMQ--重试/消息序列化--方法/实例

springboot系列-springboot整合RabbitMQ