rabbitmq学习之路

Posted changecode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq学习之路相关的知识,希望对你有一定的参考价值。

 

上一篇讲了消息发送确认,这一次来讲一讲 消息消费确认模式

 

消息发送确认,确认的是消息发送到交换机和队列的确认,消息消费确认则确认的是消息是否被消费者正常消费掉!

 

消息的确认模式有三种

 

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认

 

手动确认是程序员控制度最高的一种模式,消息的自动确认是在消息发送给消费者后就确认了,并删除消息,如果消费者在执行过程中出错,依然会造成消息的丢失。第二种模式则会根据消费者的执行情况,抛出的异常来决定是确认还是拒绝等。

 

贴一下练习的代码

 

因为我是把生产者和消费者分开在两个项目中的,然后我发现 

spring.rabbitmq.listener.simple.acknowledge-mode=manual
这个配置需要两边都配置上,否则就不会开启手动确认模式!


@Component
public class Publisher 


    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void send()
        String message = "mck learn rabbitmq";
        User user = new User();
        user.setName("mck");
        user.setAge(25);
        user.setSex("男");

        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() 
            @Override
            public Message postProcessMessage(Message message) throws AmqpException 
                message.getMessageProperties().setHeader("error","mck");
                return message;
            
        ;

        rabbitTemplate.convertAndSend("hello",user,messagePostProcessor);

    

 

@Component

public class Receiver 


    @RabbitHandler
    @RabbitListener(queues = "hello")
    public void process(@Payload User user, Channel channel, @Headers Map<String,Object> map)
        System.out.println(user.getName());
        if (map.get("errorrr")!= null)
            System.out.println("错误的消息");
            try 
                channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);      //否认消息
                return;
             catch (IOException e) 
                e.printStackTrace();
            
        
        try 
            channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);            //确认消息
         catch (IOException e) 
            e.printStackTrace();
        
    


 

 

经过多次的测试发现了,如果开启手动确认模式之后,消息没有进行确认的话,消费者会继续消费下一条消息,就像我们前面的文章讲的,broker中,分为两部分,一部分是待发送的消息,一部分是已发送,但是没有收到确认的消息。

技术图片

 

 

    我们可以在控制台看到,这部分unacked 就是已发送,但是没有收到确认的消息。这时候如果你将消费者强制关闭,也就是使他断开连接,那么这条消息又会重新进入ready队列去,当你将消费者重启之后,就会再次收到这条消息!!!!

 

 

    我们在消费者代码中还能看到手动的nack消息,nack消息也就意味着告诉broker,这条消息我没有消费好,请重新放入队列,重来一遍,然后就是重新进入ready,再度去被消费者消费。除此之外还可以拒绝消息,

channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);      这样处理的消息就会直接丢失掉!!!

以上是关于rabbitmq学习之路的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习--RabbitMQ集群相关学习

RabbitMQ学习--RabbitMQ集群相关学习

rabbitmq学习:rabbitmq(消息队列)的作用以及rabbitmq之直连交换机

RabbitMQ学习和使用

python之路_RabbitMQ相关介绍

RabbitMQ学习系列: RabbitMQ安装与配置