RabbitMQ 如何决定何时删除消息?

Posted

技术标签:

【中文标题】RabbitMQ 如何决定何时删除消息?【英文标题】:How does RabbitMQ decide when it is time to delete a message? 【发布时间】:2018-07-26 18:11:51 【问题描述】:

我正在尝试理解 RabbitMQ 中消息删除的逻辑。

我的目标是让消息即使没有连接的客户端读取它们也能持久保存,这样当客户端重新连接时,消息正在等待它们。我可以使用持久的惰性队列,以便将消息持久化到磁盘,并且我可以使用 HA 复制来确保多个节点获得所有排队消息的副本。

我想让消息进入两个或多个队列,使用主题或标头路由,并让一个或多个客户端读取每个队列。

我有两个队列,A 和 B,由标头交换提供。队列 A 获取所有消息。队列 B 仅获取带有“归档”标头的消息。队列 A 有 3 个消费者正在阅读。队列 B 有 1 个消费者。如果 B 的消费者死掉了,但 A 的消费者还在继续确认消息,RabbitMQ 会删除这些消息还是继续存储它们?在 B 重新启动之前,队列 B 不会有任何人使用它,我希望这些消息保持可用以供以后使用。

到目前为止,我已经阅读了大量文档,但仍然没有找到明确的答案。

【问题讨论】:

希望我能回答您的问题!如果不让我知道我可以做些什么来改进它! 【参考方案1】:

RabbitMQ 将在确认后决定何时删除消息。

假设您有一个消息发送者:

var factory = new ConnectionFactory()  HostName = "localhost", Port = 5672, UserName = "guest", Password = "guest" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())

    channel.QueueDeclare(queue: "hello",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);

    string message = "Hello World!";
    var body = Encoding.UTF8.GetBytes(message);

    channel.BasicPublish(exchange: "",
                         routingKey: "hello",
                         basicProperties: null,
                         body: body);
    Console.WriteLine(" [x] Sent 0", message);

这将创建一个持久队列“hello”并发送消息“Hello World!”给它。这是向队列发送一条消息后的样子。

现在让我们设置两个消费者,一个确认收到了消息,一个没有收到。

channel.BasicConsume(queue: "hello",
                    autoAck: false,
                    consumer: consumer);

channel.BasicConsume(queue: "hello",
                    autoAck: true,
                    consumer: consumer);

如果只运行第一个消费者,则消息永远不会从队列中删除,因为消费者声明只有客户端手动确认消息才会从队列中消失:https://www.rabbitmq.com/confirms.html

然而,第二个消费者会告诉队列它可以自动/立即安全地删除它收到的所有消息。

如果您不想自动删除这些消息,则必须禁用 autoAck 并使用文档进行一些手动确认:

http://codingvision.net/tips-and-tricks/c-send-data-between-processes-w-memory-mapped-file(向下滚动到“手动确认”)。

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

【讨论】:

【参考方案2】:

简单的答案是,从一个队列中消耗的消息与另一个队列中的消息无关。发布消息后,代理会根据需要将副本分发到尽可能多的队列 - 但它们是消息的真实副本,并且就代理而言,从那时起绝对不相关。

排队到持久队列中的消息会一直保留,直到它们被队列中的消费者拉出,并且可以选择确认。

注意,有特定的queue-level and message-level TTL settings 可能会影响这一点。例如,如果队列有 TTL,并且消费者在过期之前没有重新连接,则队列将连同其所有消息一起消失。类似地,如果一条消息已使用特定的 TTL 排队(也可以将其设置为特定队列上所有消息的默认值),那么一旦该 TTL 通过,该消息将不会传递给消费者。

次要说明如果消息由于 TTL 在队列中过期,它实际上会保留在队列中,直到下一次被传递。

【讨论】:

【参考方案3】:

RabbitMQ 删除消息的方式有多种。 其中一些是:

消费者确认后 已达到该队列的生存时间 (TTL)。 已达到该队列上消息的生存时间 (TTL)。

最后两点表明 RabbitMQ 允许您为消息和队列设置 TTL(生存时间)。 可以通过将 x-message-ttl 参数设置为 queue.declare 或通过设置 message-ttl 策略 为给定队列设置 TTL >。 通过将 x-expires 参数设置为 queue.declare 或通过设置 expires 政策,可以为给定队列设置到期时间。

如果消息在队列中的停留时间超过配置的 TTL,则称该消息已死亡。 这里要注意的重要一点是,路由到不同队列的单个消息可能会在不同的时间死亡,或者有时永远不会在它所在的每个队列中死亡。 一个队列中的消息死亡对其他队列中同一消息的生命没有影响

【讨论】:

以上是关于RabbitMQ 如何决定何时删除消息?的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 笔记-工作队列

干货实战-RabbitMQ的消息高可用和确认消费

干货实战-RabbitMQ的消息高可用和确认消费

分布式消息队列:如何保证消息队列的高可用

消息队列之RabbitMQ

消息中间件选型,kafka和rabbitmq如何选择