RabbitMQ原理与相关操作消息持久化

Posted exman

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ原理与相关操作消息持久化相关的知识,希望对你有一定的参考价值。

现在聊一下RabbitMQ消息持久化:

问题及方案描述

1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

这种情况可以使用RabbitMQ提供的消息队列的持久化机制。

相关理论描述

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我个人觉得大多数开发人员都会选择持久化。

队列和交换机有一个创建时候指定的标志durabledurable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。

消息队列持久化包括3个部分:

1、exchange持久化,在声明时指定durable => true
2、queue持久化,在声明时指定durable => true
3、消息持久化,在投递时指定delivery_mode=> 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

注意:一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建

程序示例

生产者

技术分享图片 View Code

注:ack是 acknowledgments 的缩写,noAck 是("no manual acks")

因为我前段时间换了笔记本,所以用户的“eric”的操作出踩了个坑,下面进行介绍下:

如果调试运行时报错:None of the specified endpoints were reachable

innerException是:

技术分享图片
{"The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=541, text="Unexpected Exception", classId=0, methodId=0, cause=System.IO.IOException: 无法从传输连接中读取数据: 远程主机强迫关闭了一个现有的连接。。 ---> System.Net.Sockets.SocketException: 远程主机强迫关闭了一个现有的连接。
   在 System.Net.Sockets.Socket.Receive(Byte[] buffer, Int32 offset, Int32 size, SocketFlags socketFlags)
   在 System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, Int32 size)
   --- 内部异常堆栈跟踪的结尾 ---
   在 RabbitMQ.Client.Impl.Frame.ReadFrom(NetworkBinaryReader reader)
   在 RabbitMQ.Client.Impl.SocketFrameHandler.ReadFrame()
   在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop()"}
技术分享图片

这说明我们使用的用户 不是 系统默认的 guest 而是我们自己创建的用户,但是没有足够的权限进行操作。

解决办法:

rabbitmqctl set_user_tags username administrator
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

执行结果:

技术分享图片

相关其他操作见:windows下 安装 rabbitMQ 及操作常用命令

程序运行结果:

技术分享图片

 消费者

技术分享图片 View Code

接受消息还有一种方法,就是通过基于推送事件订阅可以使用内置的 QueueingBasicConsumer 提供简化编程模型允许共享队列阻塞直到收到条消息。

技术分享图片 View Code

程序运行结果:

技术分享图片

 

以上是关于RabbitMQ原理与相关操作消息持久化的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

RabbitMQ 消息队列学习

RabbitMq-confirm发送消息确认深入探讨

RabbitMQ 知识点总结

RabbitMQ事务和Confirm发送方消息确认——深入解读

4RabbitMQ-消息应答与消息持久化