消息队列 RabbitMq高级特性

Posted baidawei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列 RabbitMq高级特性相关的知识,希望对你有一定的参考价值。

一.消息的可靠投递

在使用RabbitMq的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败的场景。RabbitMQ为我们提供了两种方式用来控制消息的投递可靠性

rabbitMQ 整个消息投递过程为:

 producer ->  rabbitMQ broker -> exchange -> queue ->consumer

  1.confirm 确认模式

    消息从producer 到exchange 会返回一个 confirmCallback

  2.return 退回模式

    消息从exchange到queue投递失败则会返回一个returnCallbak

Confirm确认模式:

1.在配置文件中开启确认模式 publisher-confimrs :true

spring:
  rabbitmq:
    host: 10.211.55.4
    virtual-host: local
    port: 5672
    username: admin
    password: admin
    publisher-confirms: true

2.在rabbitTemplate定义ConfirmCallBack回调函数

@RequestMapping("/producer")
    public void producer(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            /**
             * ack exchange交换机是否收到了消息
             * cause 失败原因
             */
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    System.out.println("confirm方法 成功了");
                }else{
                    System.out.println("confirm方法 失败了: " + cause);
                }
            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.test","spring boot rabbit mq");
    }

 

Return退回模式:

1.开启退回模式

spring:
  rabbitmq:
    host: 10.211.55.4
    virtual-host: local
    port: 5672
    username: admin
    password: admin
    publisher-returns: true

2.设置returnCallBack

@RequestMapping("/return")
    public void returncallback(){
        //设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);

        //设置return call back
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message   消息对象
             * @param replyCode 错误码
             * @param replyText 错误信息
             * @param exchange  交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了....");

                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);

                //处理
            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"zxcasd","return call back");
    }

 

二、Consumer ACK 确认机制

表示消费端收到消息后到确认方式

有三种确认方式:

1.自动确认 acknowledge="none" 默认

2.手动确认 acknowledge="manual"

3.根据异常情况确认: acknowledge="auto"

1.开启手动确认

spring:
  rabbitmq:
    host: 10.211.55.4
    virtual-host: local
    port: 5672
    username: admin
    password: admin
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual

2.成功调用channel.basicAck   异常调用channel.basicNack

@Component
public class RabbitMQListener implements ChannelAwareBatchMessageListener {

    @Override
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try{
            System.out.println(new String(message.getBody()));
            int a = 1/0;
            //手动确认
            channel.basicAck(deliveryTag,true);
        }catch (Exception ex){
            //发生异常 拒绝确认
            //第三个参数,重回队列
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

 

三、消费端 限流

技术图片

1.确保ack机制为手动确认

2.设置prefetch=1 每次从rabbitmq取几条,ack确认后再取下一条

server:
  port: 9999
spring:
  rabbitmq:
    host: 10.211.55.4
    virtual-host: local
    port: 5672
    username: admin
    password: admin
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
      direct:
        prefetch: 1

 

四、TTL

Time To Live 存活时间/过期时间,当消息到达一定时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列设置过期时间。

1.queue 队列设置过期时间

@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "exchange_ttl";
    public static final String QUEUE_NAME = "queue_ttl";
    public static final String Routing_Key = "ttl.#";

    // 1.交换机
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }


    //2.Queue队列
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).withArgument("x-message-ttl", 5000).build();
    }

    //3.绑定交换机和队列
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(Routing_Key).noargs();
    }
}

2.消息设置过期时间

@RequestMapping("/producer")
    public void producer(){

        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000"); // 消息过期时间
                return message;
            }
        };

        for(int i = 0;i<10;i++){
            if(i>5){
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i,messagePostProcessor);
            }else{
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ttl.test","spring boot rabbit mq"+i);
            }
        }
    }

 

五、死信队列

DLX、Dead Letter Exchange (死信交换机),当消息成为Dead Messag后,可以被重新发送到另一台交换机中。

技术图片

消息成为死信到三种情况:

1.队列消息长度达到限制,

2.消费者拒接消费信息,basicNack/basicReject 并且不把消息重新放入原目标队列,参数requeue=false。

3.原队列存在消息过期设置,消息到达超时时间未被消费

 

设定参数 x-dead-letter-exchange 和 x-dead-letter-routing-key

 

以上是关于消息队列 RabbitMq高级特性的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMq高级特性之延迟队列 通俗易懂 超详细 内含案例

RabbitMQ——RabbitMQ的高级特性(TTL死信队列延迟队列优先级队列RPC)

消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费

消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费

消息队列专题(高级特性篇):RabbitMQ 如何保证消息的可靠性投递传输和消费

RabbitMQ AMQP (高级消息队列协议)