通过RabbitMQ的DIRECT模式以及死信队列实现延时任务
Posted 学习使得吾快乐
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过RabbitMQ的DIRECT模式以及死信队列实现延时任务相关的知识,希望对你有一定的参考价值。
运用RabbitMQ的DIRECT模式以及死信队列实现延时操作以及不同间隔时间后重试
一 、原理描述
图解:
- 一条绑定路由为【FOR_QUEUE1】的消息被发送到交换机【EXCHANGE】上
RabbitTemplate.convertSendAndReceive("EXCHANGE","FOR_QUEUE1","我是一条消息");
- 【EXCHANGE】根据自己身上队列绑定的路由,将这个消息发给了队列【QUEUE_NAME1】
- 消费者代码在处理队列【QUEUE_NAME1】中的这条消息的时候,没有整好,出现了异常,导致这条消息没有被消费
- 这条消息就会根据队列【QUEUE_NAME1】绑定的死信路由和死信交换机 变成一条
绑定路由为【FOR_DEAD_QUEUE1】的消息发往交换机【DEAD_EXCHANGE】 - 消息在死信队列的时候就不搞它了,等它蹲满日子——死信队列【DEAD_QUEUE_NAME1】的TTL:30s
- 日子蹲满后,这条消息就会根据死信队列【DEAD_QUEUE_NAME1】绑定的死信路由和死信交换机 自动变成一条
绑定路由为【FOR_QUEUE2】的消息发往交换机【EXCHANGE】
然后接着重复上面的流程·······
具体的效果就可以是:
处理RabbitMQ中的消息时,因为响应时间过久,导致出现异常。于是,系统会在30s再重新处理该消息。要是还出现异常,系统就会再1min后再次处理该消息。
最后还出现异常,就去他的,爱咋咋。
二、 具体代码
配置文件:src\\main\\resources\\application.yml
spring:
rabbitmq:
host: #地址
username: #用户名
password: #密码
port: #端口
virtual-host: #虚拟主机
listener:
type: simple #监听类型
simple:
default-requeue-rejected: false #无法消费的消息因此进入死信队列
acknowledge-mode: manual #设置手动消费消息
config:
rabbitmg:
ttl1: 30000 # 30 s
ttl2: 60000 # 1 min
RabbitMQ生成文件:com/study/config/RabbitMQConfig.java
@Configuration
public class RabbitMQConfig
//交换机
public static final String EXCHANGE_NAME = "exchange";
public static final String DEAD_EXCHANGE_NAME = "dead.exchange";
//队列
public static final String QUEUE_NAME1 = "queue1";
public static final String QUEUE_NAME2 = "queue2";
public static final String QUEUE_NAME3 = "queue3";
public static final String DEAD_QUEUE_NAME1 = "delay.queue1";
public static final String DEAD_QUEUE_NAME2 = "delay.queue2";
public static final String DEAD_QUEUE_NAME3 = "delay.queue3";
//各队列绑定的路由
public static final String QUEUE1_ROUTINGKEY = "key.notify1";
public static final String QUEUE2_ROUTINGKEY = "key.notify2";
public static final String QUEUE3_ROUTINGKEY = "key.notify3";
public static final String DEAD_QUEUE1_ROUTINGKEY = "key.delay1";
public static final String DEAD_QUEUE2_ROUTINGKEY = "key.delay2";
public static final String DEAD_QUEUE3_ROUTINGKEY = "key.delay3";
//各死信队列的ttl 未消费消息的过期时间
@Value("$config.rabbitmg.ttl1")
private int DEAD_QUEUE_TTL1;
@Value("$config.rabbitmg.ttl2")
private int DEAD_QUEUE_TTL2;
//声明业务EXchange
@Bean("exchange")
public DirectExchange exchange()
return new DirectExchange(EXCHANGE_NAME);
//声明私信Exchange
@Bean("deadExchange")
public DirectExchange deadExchange()
return new DirectExchange(DEAD_EXCHANGE_NAME);
//声明支付信息队列1
@Bean("queue1")
public Queue queue1()
Map<String,Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",DEAD_QUEUE1_ROUTINGKEY);
return QueueBuilder.durable(QUEUE_NAME1).withArguments(args).build();
//声明支付信息队列2
@Bean("queue2")
public Queue queue2()
Map<String,Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",DEAD_QUEUE2_ROUTINGKEY);
return QueueBuilder.durable(QUEUE_NAME2).withArguments(args).build();
//声明支付信息队列3
@Bean("queue3")
public Queue queue3()
Map<String,Object> args = new HashMap<>(1);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
// // x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",DEAD_QUEUE3_ROUTINGKEY);
return QueueBuilder.durable(QUEUE_NAME3).withArguments(args).build();
//声明死信队列1
@Bean("deadQueue1")
public Queue deadQueue1()
Map<String,Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",QUEUE2_ROUTINGKEY);
// x-message-ttl 这里声明消息未被消费的过期时间
args.put("x-message-ttl",DEAD_QUEUE_TTL1);
return QueueBuilder.durable(DEAD_QUEUE_NAME1).withArguments(args).build();
//声明死信队列2
@Bean("deadQueue2")
public Queue deadQueue2()
Map<String,Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",QUEUE3_ROUTINGKEY);
// x-message-ttl 这里声明消息未被消费的过期时间
args.put("x-message-ttl",DEAD_QUEUE_TTL2);
return QueueBuilder.durable(DEAD_QUEUE_NAME2).withArguments(args).build();
//声明死信队列3
@Bean("deadQueue3")
public Queue deadQueue3()
Map<String,Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange",EXCHANGE_NAME);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
// args.put("x-dead-letter-routing-key",QUEUE4_ROUTINGKEY);
return QueueBuilder.durable(DEAD_QUEUE_NAME3).withArguments(args).build();
//声明支付信息序列1绑定的关系
@Bean
public Binding infoBinding1(@Qualifier("queue1") Queue queue,
@Qualifier("exchange") DirectExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with(QUEUE1_ROUTINGKEY);
//声明支付信息序列2绑定的关系
@Bean
public Binding infoBinding2(@Qualifier("queue2") Queue queue,
@Qualifier("exchange") DirectExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with(QUEUE2_ROUTINGKEY);
//声明支付信息序列3绑定的关系
@Bean
public Binding infoBinding3(@Qualifier("queue3") Queue queue,
@Qualifier("exchange") DirectExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with(QUEUE3_ROUTINGKEY);
//声明死信队列1绑定的关系
@Bean
public Binding deadInfoBinding1(@Qualifier("deadQueue1") Queue queue,
@Qualifier("deadExchange") DirectExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE1_ROUTINGKEY);
//声明死信队列2绑定的关系
@Bean
public Binding deadInfoBinding2(@Qualifier("deadQueue2") Queue queue,
@Qualifier("deadExchange") DirectExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE2_ROUTINGKEY);
//声明死信队列3绑定的关系
@Bean
public Binding deadInfoBinding3(@Qualifier("deadQueue3") Queue queue,
@Qualifier("deadExchange") DirectExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with(DEAD_QUEUE3_ROUTINGKEY);
消费者:com/test/listener/InfoListener.java
@Component
public class InfoListener
public static final Logger logger = LoggerFactory.getLogger(InfoListener.class);
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME1, RabbitMQConfig.QUEUE_NAME2, RabbitMQConfig.QUEUE_NAME3)
public void listenInfo(Message message, Channel channel) throws Exception
String queueName = message.getMessageProperties().getConsumerQueue(); //获取消息序列的名字
String msg = new String(message.getBody());
Exception e = null;
logger.info("-------------开始处理序列[" + queueName + "]中的消息-------------");
try
//相关的业务流程
if ("1".equals(msg))
throw new Exception("找茬的来了");
catch (Exception finalException)
e =finalException;
if (e == null)
logger.info("-------------成功处理了序列[" + queueName + "]中的消息-------------");
//因为配置文件中的acknowledge-mode设为了manual,需要手动ACK来标识该消息已被消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else
logger.info("-------------处理序列[" + queueName + "]中的消息时出现了异常:" + e.getMessage()+"-------------");
//Nack后消息就会被发往死信序列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
生产者:com/test/sender/InfoSender.java
@Component
public class PayInfoSender
@Autowired
private RabbitTemplate template;
public void sendMsg(String msg)
//生成一条绑定路由为RabbitMQConfig.QUEUE1_ROUTINGKEY的发往RabbitMQConfig.EXCHANGE_NAME交换机的消息
template.convertSendAndReceive(RabbitMQConfig.EXCHANGE_NAME,RabbitMQConfig.QUEUE1_ROUTINGKEY,msg);
Controller:com/test/controller/TestController.java
@RestController
@RequestMapping("rabbitmq")
public class TestController
@Autowired
PayInfoSender payInfoSender;
@RequestMapping("sendmsg")
public void test(HttpServletRequest request )
String id = request.getParameter("id");
payInfoSender.sendMsg(id);
三、具体效果
成功演示:
发送请求:http://localhost:8081/rabbitmq/sendmsg?id=2
控制台:
失败演示:
发送请求:http://localhost:8081/rabbitmq/sendmsg?id=1
控制台:
四、参考文献
http://www.rabbitmq.com/getstarted.html
https://blog.csdn.net/whitebearclimb/article/details/108959110
https://www.cnblogs.com/mfrank/p/11184929.html
转载声明:https://blog.csdn.net/weixin_45884459/article/details/111508104
以上是关于通过RabbitMQ的DIRECT模式以及死信队列实现延时任务的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码