springBoot集成rabbitmq 之延时(死信)队列
Posted 普通成员-铷弟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springBoot集成rabbitmq 之延时(死信)队列相关的知识,希望对你有一定的参考价值。
springBoot集成rabbitmq 之延时(死信)队列
comsumer 服务
配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitDeadQueueConfig
/**
* Queue 可以有4个参数
* 1.队列名
* 2.durable 持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true
* 3.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* 4.exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
/**
* 死信队列跟交换机类型没有关系 不一定为directExchange 不影响该类型交换机的特性.
*
* @return the exchange
*/
@Bean
public Exchange deadLetterExchange()
return ExchangeBuilder.directExchange("EXCHANGE_DL").durable(true).build();
/**
* 声明一个死信队列.
* x-dead-letter-exchange 对应 死信交换机
* x-dead-letter-routing-key 对应 死信队列
*
* @return the queue
*/
@Bean
public Queue deadLetterQueue()
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明 死信交换机
args.put("x-dead-letter-exchange", "EXCHANGE_DL");
// x-dead-letter-routing-key 声明 死信路由键
args.put("x-dead-letter-routing-key","KEY_R");
return QueueBuilder.durable("QUEUE_DL").withArguments(args).build();
/**
* 定义死信队列转发队列.
*
* @return the queue
*/
@Bean
public Queue redirectQueue()
return QueueBuilder.durable("QUEUE_REDIRECT").build();
/**
* 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
* @return the binding
*/
@Bean
public Binding deadLetterBinding()
return new Binding("QUEUE_DL", Binding.DestinationType.QUEUE,
"EXCHANGE_DL", "KEY_DL", null);
/**
* 死信路由通过 KEY_R 绑定键绑定到最终处理队列上.
* @return the binding
*/
@Bean
public Binding redirectBinding()
return new Binding("QUEUE_REDIRECT", Binding.DestinationType.QUEUE,
"EXCHANGE_DL", "KEY_R", null);
接收者
import com.rabbitmq.client.Channel;
import org.springfram
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitDeadQueueConfig
/**
* Queue 可以有4个参数
* 1.队列名
* 2.durable 持久化消息队列 ,rabbitmq重启的时候不需要创建新的队列 默认true
* 3.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* 4.exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
/**
* 死信队列跟交换机类型没有关系 不一定为directExchange 不影响该类型交换机的特性.
*
* @return the exchange
*/
@Bean
public Exchange deadLetterExchange()
return ExchangeBuilder.directExchange("EXCHANGE_DL").durable(true).build();
/**
* 声明一个死信队列.
* x-dead-letter-exchange 对应 死信交换机
* x-dead-letter-routing-key 对应 死信队列
*
* @return the queue
*/
@Bean
public Queue deadLetterQueue()
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 声明 死信交换机
args.put("x-dead-letter-exchange", "EXCHANGE_DL");
// x-dead-letter-routing-key 声明 死信路由键
args.put("x-dead-letter-routing-key","KEY_R");
return QueueBuilder.durable("QUEUE_DL").withArguments(args).build();
/**
* 定义死信队列转发队列.
*
* @return the queue
*/
@Bean
public Queue redirectQueue()
return QueueBuilder.durable("QUEUE_REDIRECT").build();
/**
* 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
* @return the binding
*/
@Bean
public Binding deadLetterBinding()
return new Binding("QUEUE_DL", Binding.DestinationType.QUEUE,
"EXCHANGE_DL", "KEY_DL", null);
/**
* 死信路由通过 KEY_R 绑定键绑定到最终处理队列上.
* @return the binding
*/
@Bean
public Binding redirectBinding()
return new Binding("QUEUE_REDIRECT", Binding.DestinationType.QUEUE,
"EXCHANGE_DL", "KEY_R", null);
ework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@RabbitListener(queues = "QUEUE_REDIRECT")
public class DeadQueueReceiver
@RabbitHandler
public void process(String hello, Message message, Channel channel)
try
//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("消息消费成功!");
catch (Exception e)
System.out.println("消息消费失败:"+e.getMessage() + e);
//丢弃这条消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
System.out.println("接收者 : " + hello +","+ new Date());
producer服务
发送者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.UUID;
@Component
public class DeadQueueSender implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback
@Autowired
private RabbitTemplate rabbitTemplate;
public void send()
//设置回调对象
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
String content = "死信队列 = " + new Date() + ", 内容= " + UUID.randomUUID().toString();
MessagePostProcessor messagePostProcessor = message ->
MessageProperties messageProperties = message.getMessageProperties();
// 设置编码
messageProperties.setContentEncoding("utf-8");
// 设置过期时间10*1000毫秒
messageProperties.setExpiration("10000");
return message;
;
// 向DL_QUEUE 发送消息 10*1000毫秒后过期 形成死信
rabbitTemplate.convertAndSend("EXCHANGE_DL", "KEY_DL", content, messagePostProcessor, correlationData);
System.out.println("发送成功,"+new Date()+","+content);
/**
* 消息回调确认方法
* 如果消息没有到exchange,则confirm回调,ack=false
* 如果消息到达exchange,则confirm回调,ack=true
* @param
*/
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s)
System.out.println("confirm--message:回调消息ID为: " + correlationData.getId());
if (isSendSuccess)
System.out.println("确认--消息:消息发送成功");
else
System.out.println("确认--消息:消息发送失败" + s);
/**
* exchange到queue成功,则不回调return
* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)
System.out.println("返回--消息:" + new String(message.getBody()) + ",replyCode:" + replyCode
+ ",replyText:" + replyText + ",exchange:" + exchange + ",routingKey:" + routingKey);
测试
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DeadQueueTests
@Autowired
private DeadQueueSender deadQueueSender;
@GetMapping("DeadQueueTests")
public void hello()
deadQueueSender.send();
通过执行测试类,查看到了消息消费的情况,生产者共计生产了 1 个消息,被消费者消费了一次,但发送消息时间及实际消费时间差 10 秒钟。
以上是关于springBoot集成rabbitmq 之延时(死信)队列的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)
springBoot集成rabbitmq 之主题模式(topics)
springBoot集成rabbitmq 之路由模式模式(Routing)
springBoot集成rabbitmq 之发布/订阅模式模式(Publish/Subscribe)