springBoot集成rabbitmq 之延时(死信)队列

Posted 普通成员-铷弟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springBoot集成rabbitmq 之延时(死信)队列相关的知识,希望对你有一定的参考价值。

springBoot集成rabbitmq 之延时(死信)队列

comsumer 服务

配置类


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitDeadQueueConfig 

    /**
        * Queue 可以有4个参数
     *      1.队列名
     *      2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true
     *      3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false
     *      4.exclusive     表示该消息队列是否只在当前connection生效,默认是false
     */

    /**
     * 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.
     *
     * @return the exchange
     */
    @Bean
    public Exchange deadLetterExchange() 
        return ExchangeBuilder.directExchange("EXCHANGE_DL").durable(true).build();
    

    /**
     * 声明一个死信队列.
     * x-dead-letter-exchange   对应  死信交换机
     * x-dead-letter-routing-key  对应 死信队列
     *
     * @return the queue
     */
    @Bean
    public Queue deadLetterQueue() 
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信交换机
        args.put("x-dead-letter-exchange", "EXCHANGE_DL");
//       x-dead-letter-routing-key    声明 死信路由键
        args.put("x-dead-letter-routing-key","KEY_R");
        return QueueBuilder.durable("QUEUE_DL").withArguments(args).build();
    

    /**
     * 定义死信队列转发队列.
     *
     * @return the queue
     */
    @Bean
    public Queue redirectQueue() 
        return QueueBuilder.durable("QUEUE_REDIRECT").build();
    

    /**
     * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() 
        return new Binding("QUEUE_DL", Binding.DestinationType.QUEUE,
                "EXCHANGE_DL", "KEY_DL", null);

    

    /**
     * 死信路由通过 KEY_R 绑定键绑定到最终处理队列上.
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() 
        return new Binding("QUEUE_REDIRECT", Binding.DestinationType.QUEUE,
                "EXCHANGE_DL", "KEY_R", null);
    


接收者

import com.rabbitmq.client.Channel;
import org.springfram
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitDeadQueueConfig 

    /**
        * Queue 可以有4个参数
     *      1.队列名
     *      2.durable       持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true
     *      3.auto-delete   表示消息队列没有在使用时将被自动删除 默认是false
     *      4.exclusive     表示该消息队列是否只在当前connection生效,默认是false
     */

    /**
     * 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.
     *
     * @return the exchange
     */
    @Bean
    public Exchange deadLetterExchange() 
        return ExchangeBuilder.directExchange("EXCHANGE_DL").durable(true).build();
    

    /**
     * 声明一个死信队列.
     * x-dead-letter-exchange   对应  死信交换机
     * x-dead-letter-routing-key  对应 死信队列
     *
     * @return the queue
     */
    @Bean
    public Queue deadLetterQueue() 
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信交换机
        args.put("x-dead-letter-exchange", "EXCHANGE_DL");
//       x-dead-letter-routing-key    声明 死信路由键
        args.put("x-dead-letter-routing-key","KEY_R");
        return QueueBuilder.durable("QUEUE_DL").withArguments(args).build();
    

    /**
     * 定义死信队列转发队列.
     *
     * @return the queue
     */
    @Bean
    public Queue redirectQueue() 
        return QueueBuilder.durable("QUEUE_REDIRECT").build();
    

    /**
     * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() 
        return new Binding("QUEUE_DL", Binding.DestinationType.QUEUE,
                "EXCHANGE_DL", "KEY_DL", null); 
    

    /**
     * 死信路由通过 KEY_R 绑定键绑定到最终处理队列上.
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() 
        return new Binding("QUEUE_REDIRECT", Binding.DestinationType.QUEUE,
                "EXCHANGE_DL", "KEY_R", null);
    

ework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date; 

@Component
@RabbitListener(queues = "QUEUE_REDIRECT")
public class DeadQueueReceiver  
    @RabbitHandler
    public void process(String hello, Message message, Channel channel)  
        try 
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("消息消费成功!");
         catch (Exception e) 
            System.out.println("消息消费失败:"+e.getMessage() + e);
            //丢弃这条消息
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        
        System.out.println("接收者 : " + hello +","+ new Date());
    

producer服务

发送者


import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

@Component
public class DeadQueueSender implements  RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback 


	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void send() 
		//设置回调对象
		this.rabbitTemplate.setConfirmCallback(this);
		this.rabbitTemplate.setReturnCallback(this);
		CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
		String content = "死信队列 = " + new Date() + ", 内容= " + UUID.randomUUID().toString();
		MessagePostProcessor messagePostProcessor = message -> 
			MessageProperties messageProperties = message.getMessageProperties();
//            设置编码
			messageProperties.setContentEncoding("utf-8");
//            设置过期时间10*1000毫秒
			messageProperties.setExpiration("10000");
			return message;
		;
//         向DL_QUEUE 发送消息  10*1000毫秒后过期 形成死信
		rabbitTemplate.convertAndSend("EXCHANGE_DL", "KEY_DL", content, messagePostProcessor, correlationData);
		System.out.println("发送成功,"+new Date()+","+content);
	

	/**
	 * 消息回调确认方法
	 * 如果消息没有到exchange,则confirm回调,ack=false
	 * 如果消息到达exchange,则confirm回调,ack=true
	 * @param
	 */
	@Override
	public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) 
		System.out.println("confirm--message:回调消息ID为: " + correlationData.getId());
		if (isSendSuccess) 
			System.out.println("确认--消息:消息发送成功");
		 else 
			System.out.println("确认--消息:消息发送失败" + s);
		
	

	/**
	 * exchange到queue成功,则不回调return
	 * exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
	 */
	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 
		System.out.println("返回--消息:" + new String(message.getBody()) + ",replyCode:" + replyCode
				+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
	

测试

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; 
@RestController
public class DeadQueueTests 
    @Autowired
    private DeadQueueSender deadQueueSender; 
    @GetMapping("DeadQueueTests")
    public void hello() 
        deadQueueSender.send();
    

通过执行测试类,查看到了消息消费的情况,生产者共计生产了 1 个消息,被消费者消费了一次,但发送消息时间及实际消费时间差 10 秒钟。

以上是关于springBoot集成rabbitmq 之延时(死信)队列的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)

springBoot集成rabbitmq 之主题模式(topics)

springBoot集成rabbitmq 之路由模式模式(Routing)

springBoot集成rabbitmq 之发布/订阅模式模式(Publish/Subscribe)

springboot 微服务之集成RabbitMQ消息中间件

08-rabbitMQ-springboot-延时队列