RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列代码实现
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列代码实现相关的知识,希望对你有一定的参考价值。
参考技术A 这次我使用的是RabbitTemplate为什么Template不需要定义configuration文件来接收yml文件的参数?
这是个常识问题,我这里做个记录。。。
我都能忘记昨天吃了东西的,好在我喜欢做笔记。。。
我们的demo都是基于RabbitTemplate来写。。。
通过枚举ExchangeEnum、QueueEnum、BindingEnum动态维护和创建
1.初始化交换机
2.初始化队列
3.交换机和队列绑定
1.定义队列
2.定义交换机
3.交换机和队列绑定
4.不定义超时队列的@RabbitListener,只定义超时接收队列的@RabbitListener
测试:
1.AsyncRabbitTemplate定义
2.测试
3.监听方法
1.消息回退:
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
2.拒绝消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
3.确认ack
void basicAck(long deliveryTag, boolean multiple) throws IOException;
4.创建一个队列
5.启动一个消费者,并返回服务端生成的消费者标识
6.取消消费者订阅
7.主动拉取队列中的一条消息
1.队列参数
2.消息参数
RabbitMQ死信交换永远不会得到消息
我正在尝试设置我的第一个RabbitMQ死信交换,这是我通过Web管理界面使用的步骤:
- 创建名为“dead.letter.test”的新DIRECT交换
- 创建新队列“dead.letter.queue”
- 将“dead.letter.queue”绑定到“dead.letter.test”
- 创建新队列“test1”,将死信交换设置为“dead.letter.test”
- 发送消息到“test1”
- Nack(带有requeue = false)“test1”中的消息
我期待这些步骤应该通过“dead.letter.test”交换记录到“dead.letter.queue”。这不会发生。
我可以手动将消息放入“dead.letter.test”交换中,它显示在“dead.letter.queue”中,所以我知道这很好。
当我查看管理UI时,它显示在队列“test1”上设置了DLX参数。
我哪里错了?
Gentilissimo Signore非常友好地在Twitter上回答我的问题。问题是,如果您的死信交换设置为DIRECT,您必须指定一个死信路由密钥。如果您只是希望所有NACKed消息进入死信桶以供以后调查(就像我一样)那么您的死信交换应设置为FANOUT。
以下是有效的更新步骤:
- 创建名为“dead.letter.test”的新FANOUT交换
- 创建新队列“dead.letter.queue”
- 将“dead.letter.queue”绑定到“dead.letter.test”
- 创建新队列“test1”,将死信交换设置为“dead.letter.test”
- 发送消息到“test1”
- Nack(带有requeue = false)“test1”中的消息
死信交换没有路由密钥和直接交换
Follow the steps these will work for sure:-
1. Create a new queue named 'dead_queue'.
2. Create an exchange named 'dead_exchange' and type of exchange should be 'direct'.
3. Bind 'dead_queue' and 'dead_exchange' without routing key.
4. Create a new queue named 'test_queue' and set its 'x-dead-letter-exchange' name as 'dead_exchange'
5. Create an exchange named 'test_exchange' and type of exchange should be 'direct'
6. Bind 'test_exchange' and 'test_queue' without routing key.
最后我们将检查它。为此,在'test_exchange'上发布一些参数'expiration'设置为10000.在此之后,当'test_exchange'上发布消息时,它将转到'test_queue',当消息在队列中过期时,它将查找DLX参数(死信交换名称)那条消息找到名称'dead_exchange'然后该消息将达到'dead_exchange'将其传递给'死队列'..如果你仍然有任何问题,如果我错过了解你的问题..写下你的问题我一定会看看它...谢谢..
注意:必须在'test_exchange'上发布消息,因为test_queue和test_exchange绑定没有路由密钥,它会正常工作但如果你在'test_queue'上发布消息,将使用默认的交换和路由密钥。然后在消息队列尝试到期后使用某个默认路由密钥将该死信息传递给dead_exchange,并且消息不会进入该队列。
如果要在死信交换上使用自定义路由密钥,则必须在声明工作队列时设置x-dead-letter-routing-key
(在您的情况下为test1
),否则将使用默认路由密钥。在您的情况下,RabbitMQ代理检测循环并简单地丢弃被拒绝的消息。
你需要的是在x-dead-letter-exchange=dead.letter.test
队列上设置x-dead-letter-routing-key=dead.letter.queue
和test1
参数。
如果您希望所有队列都具有相同的死信交换,则更容易设置一般策略:
sudo rabbitmqctl -p /my/vhost/path set_policy DLX ".*" '{"dead-letter-exchange":"MyExchange.DEAD"}' --apply-to queues
如果不是强制性的,则不需要创建FANOUT交换。
您可以使用已用于其他交换的相同路由密钥创建DIRECT交换。并且也不需要为新交换创建新队列。您可以使用现有队列进行新交换。您只需要将新交换绑定到队列。
这是我的receive.js文件:
var amqp = require("amqplib/callback_api");
var crontab = require('node-crontab');
amqp.connect("amqp://localhost", function (err, conn) {
conn.createChannel(function (err, ch) {
var ex = 'direct_logs';
var ex2 = 'dead-letter-test';
var severity = 'enterprise-1-key';
//assert "direct" exchange
ch.assertExchange(ex, 'direct', { durable: true });
//assert "dead-letter-test" exchange
ch.assertExchange(ex2, 'direct', { durable: true });
//if acknowledgement is nack() then message will be stored in second exchange i.e. ex2="dead-letter-test"
ch.assertQueue('enterprise-11', { exclusive: false, deadLetterExchange: ex2 }, function (err, q) {
var n = 0;
console.log(' [*] Waiting for logs. To exit press CTRL+C');
console.log(q);
//Binding queue with "direct_logs" exchange
ch.bindQueue(q.queue, ex, severity);
//Binding the same queue with "dead-letter-test"
ch.bindQueue(q.queue, ex2, severity);
ch.consume(q.queue, function (msg) {
// consume messages via "dead-letter-exchange" exchange at every second.
if (msg.fields.exchange === ex2) {
crontab.scheduleJob("* * * * * *", function () {
console.log("Received by latest exchange %s", msg.fields.routingKey, msg.content.toString());
});
} else {
console.log("Received %s", msg.fields.routingKey, msg.content.toString());
}
if (n < 1) {
// this will executes first time only. Here I'm sending nack() so message will be stored in "deadLetterExchange"
ch.nack(msg, false, false);
n += 1;
} else {
ch.ack(msg)
n = 0
}
}, { noAck: false });
});
});
});
创建名为“dead.letter.test”的新DIRECT交换
正确
创建新队列“dead.letter.queue”
正确
将“dead.letter.queue”绑定到“dead.letter.test”
正确
创建新队列“test1”,将死信交换设置为“dead.letter.test”
我假设您正在创建test1队列并将其绑定到dead.letter.test exchange
发送消息到“test1”
如果你希望你的消息被dead.letter.queue收到,你必须在发送消息时提供路由密钥,而使用dead.letter.queue的客户端也应该使用相同的路由密钥
如果您在没有路由密钥的情况下发布,则只有订阅测试1的客户端才会收到该消息。
如果您将消息发布到direct.letter.test exchange,则所有队列都将收到该消息。它会像扇出交换一样工作
因此,如果您希望dead.letter.queue接收消息,您将不得不在该队列中发布消息,或者您必须在发布和订阅时使用相同的路由密钥并将消息发布到交换
以上是关于RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列代码实现的主要内容,如果未能解决你的问题,请参考以下文章