Rabbitmq小书
Posted 热爱编程的大忽悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq小书相关的知识,希望对你有一定的参考价值。
Rabbitmq小书
- RabbitMQ
RabbitMQ
安装
Docker安装
Rabbitmq - Official Image | Docker Hub
拉取镜像:
docker pull rabbitmq:3.10.1-management
运行容器:
docker run -d --hostname my-rabbit --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq镜像id(只需要填前几位,确保与其他镜像id即可识别)
hostname:容器内的主机名
通讯端口:5672
web界面端口:15672
记得开启5672和15672端口
默认用户名和密码为guest
Rabbitmq初识
1.生产者(Publisher): 发布消息到RabbitMQ中的交换机(Exchange)上
2.交换机(Exchange): 和生产者建立连接并接受生产者投递的消息
3.消费者(Consumer): 监听RabbitMQ中的Queue中的消息
4.队列(Queue): Exchange将消息分发到指定的Queue
5.路由(Routes):交换机转换消息到队列的规则
AMQP 0.9.1 协议解析
详细参考官方文档:
AMQP 0-9-1 Model Explained — RabbitMQ
AMQP协议简介
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。
消息代理和他们所扮演的角色
消息代理(message brokers)从发布者(publishers)亦称生产者(producers)那儿接收消息,并根据既定的路由规则把接收到的消息发送给处理消息的消费者(consumers)。
由于AMQP是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以存在于不同的设备上。
AMQP 0-9-1 模型简介
AMQP 0-9-1的工作过程如下图:消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
发布者(publisher)发布消息时可以给消息指定各种消息属性(message meta-data)。有些属性有可能会被消息代理(brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。
从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。基于此原因,AMQP模块包含了一个消息确认(message acknowledgements)的概念:当一个消息从队列中投递给消费者后(consumer),消费者会通知一下消息代理(broker),这个可以是自动的也可以由处理消息的应用的开发者执行。当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。
在某些情况下,例如当一个消息无法被成功路由时,消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
队列,交换机和绑定统称为AMQP实体(AMQP entities)。
AMQP是一个可编程的协议
AMQP 0-9-1是一个可编程协议,某种意义上说AMQP的实体和路由规则是由应用本身定义的,而不是由消息代理定义。包括像声明队列和交换机,定义他们之间的绑定,订阅队列等等关于协议本身的操作。
这虽然能让开发人员自由发挥,但也需要他们注意潜在的定义冲突。当然这在实践中很少会发生,如果发生,会以配置错误(misconfiguration)的形式表现出来。
应用程序(Applications)声明AMQP实体,定义需要的路由方案,或者删除不再需要的AMQP实体。
交换机和交换机类型
交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 0-9-1的代理提供了四种交换机
Name(交换机类型) | Default pre-declared names(预声明的默认名称) |
---|---|
Direct exchange(直连交换机) | (Empty string) and amq.direct |
Fanout exchange(扇型交换机) | amq.fanout |
Topic exchange(主题交换机) | amq.topic |
Headers exchange(头交换机) | amq.match (and amq.headers in RabbitMQ) |
除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:
- Name
- Durability (消息代理重启后,交换机是否还存在)
- Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
- Arguments(依赖代理本身)
交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。
默认交换机
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
For example:当你声明了一个名为"search-indexing-online"的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为"search-indexing-online"。因此,当携带着名为"search-indexing-online"的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为"search-indexing-online"的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。
直连交换机
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:
- 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
- 当一个携带着路由键为
R
的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R
的队列。
直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。
直连型交换机图例:
扇型交换机
扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:
- 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
- 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
- 分发系统使用它来广播各种状态和配置更新
- 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)
扇型交换机图例:
主题交换机
主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者/多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。
使用案例:
- 分发有关于特定地理位置的数据,例如销售点
- 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
- 股票价格更新(以及其他类型的金融数据更新)
- 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
- 云端的不同种类服务的协调
- 分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。
什么是绑定模式,后面我们讲到RabbitMQ对AMQP协议具体实现的时候会看到
头交换机
有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
我们可以绑定一个队列到头交换机上,并给他们之间的绑定使用多个用于匹配的头(header)。这个案例中,消息代理得从应用开发者那儿取到更多一段信息,换句话说,它需要考虑某条消息(message)是需要部分匹配还是全部匹配。上边说的“更多一段消息”就是"x-match"参数。当"x-match"设置为“any”时,消息头的任意一个值被匹配就可以满足条件,而当"x-match"设置为“all”的时候,就需要消息头的所有值都匹配成功。
头交换机可以视为直连交换机的另一种表现形式。头交换机能够像直连交换机一样工作,不同之处在于头交换机的路由规则是建立在头属性值之上,而不是路由键。路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。
队列
AMQP中的队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,但是队列也有一些另外的属性。
- Name
- Durable(消息代理重启后,队列依旧存在)
- Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
- Auto-delete(当最后一个消费者退订后即被删除)
- Arguments(一些消息代理用他来完成类似与TTL的某些额外功能)
队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。
队列名称
队列的名字可以由应用(application)来取,也可以让消息代理(broker)直接生成一个。队列的名字可以是最多255字节的一个utf-8字符串。若希望AMQP消息代理生成队列名,需要给队列的name参数赋值一个空字符串:在同一个通道(channel)的后续的方法(method)中,我们可以使用空字符串来表示之前生成的队列名称。之所以之后的方法可以获取正确的队列名是因为通道可以默默地记住消息代理最后一次生成的队列名称。
以"amq."开始的队列名称被预留做消息代理内部使用。如果试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出。
队列持久化
持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
绑定
绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
拥有了交换机这个中间层,很多由发布者直接到队列难以实现的路由方案能够得以实现,并且避免了应用开发者的许多重复劳动。
如果AMQP的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。
消费者
消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。在AMQP 0-9-1 模型中,有两种途径可以达到此目的:
- 将消息投递给应用 (“push API”)
- 应用根据需要主动获取消息 (“pull API”)
使用push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。
每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。
消息确认
消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:
- 当消息代理(broker)将消息发送给应用后立即删除。(使用AMQP方法:basic.deliver或basic.get-ok)
- 待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用AMQP方法:basic.ack)
前者被称作自动确认模式(automatic acknowledgement model),后者被称作显式确认模式(explicit acknowledgement model)。在显式模式下,由消费者应用来选择什么时候发送确认回执(acknowledgement)。应用可以在收到消息后立即发送,或将未处理的消息存储后发送,或等到消息被处理完毕后再发送确认回执。
如果一个消费者在尚未发送确认回执的情况下挂掉了,那AMQP代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
拒绝消息
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于“拒绝消息(Rejecting Messages)”的原因处理失败了(或者未能在此时完成)。当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。
Negative Acknowledgements
在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。
注意,RabbitMQ只支持通道级的预取计数,而不是连接级的或者基于大小的预取。
消息属性和有效载荷(消息主体)
AMQP模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以至于AMQP 0-9-1 明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。例如:
- Content type(内容类型)
- Content encoding(内容编码)
- Routing key(路由键)
- Delivery mode (persistent or not)
投递模式(持久化 或 非持久化) - Message priority(消息优先权)
- Message publishing timestamp(消息发布的时间戳)
- Expiration period(消息有效期)
- Publisher application id(发布应用的ID)
有些属性是被AMQP代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头(headers)。他们跟HTTP协议的X-Headers很相似。消息属性需要在消息被发布的时候定义。
AMQP的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被AMQP代理当作不透明的字节数组来对待。消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似JSON这种序列化的格式数据,为了节省,协议缓冲器和MessagePack将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP及其同行者们通常使用"content-type" 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。
消息能够以持久化的方式发布,AMQP代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。
消息确认
由于网络的不确定性和应用失败的可能性,处理确认回执(acknowledgement)就变的十分重要。有时我们确认消费者收到消息就可以了,有时确认回执意味着消息已被验证并且处理完毕,例如对某些数据已经验证完毕并且进行了数据存储或者索引操作。
这种情形很常见,所以 AMQP 0-9-1 内置了一个功能叫做 消息确认(message acknowledgements),消费者用它来确认消息已经被接收或者处理。如果一个应用崩溃掉(此时连接会断掉,所以AMQP代理亦会得知),而且消息的确认回执功能已经被开启,但是消息代理尚未获得确认回执,那么消息会被从新放入队列(并且在还有还有其他消费者存在于此队列的前提下,立即投递给另外一个消费者)。
协议内置的消息确认功能将帮助开发者建立强大的软件。
AMQP 0-9-1 方法
AMQP 0-9-1由许多方法(methods)构成。方法即是操作,这跟面向对象编程中的方法没半毛钱关系。AMQP的方法被分组在类(class)中。这里的类仅仅是对AMQP方法的逻辑分组而已。在 AMQP 0-9-1参考 中有对AMQP方法的详细介绍。
让我们来看看交换机类,有一组方法被关联到了交换机的操作上。这些方法如下所示:
- exchange.declare
- exchange.declare-ok
- exchange.delete
- exchange.delete-ok
以上的操作来自逻辑上的配对:exchange.declare 和 exchange.declare-ok,exchange.delete 和 exchange.delete-ok. 这些操作分为“请求 - requests”(由客户端发送)和“响应 - responses”(由代理发送,用来回应之前提到的“请求”操作)。
如下的例子:客户端要求消息代理使用exchange.declare方法声明一个新的交换机:
如上图所示,exchange.declare方法携带了好几个参数。这些参数可以允许客户端指定交换机名称、类型、是否持久化等等。
操作成功后,消息代理使用exchange.declare-ok方法进行回应:
exchange.declare-ok方法除了通道号之外没有携带任何其他参数。
AMQP队列类的配对方法 - queue.declare方法 和 queue.declare-ok有着与其他配对方法非常相似的一系列事件:
不是所有的AMQP方法都有与其配对的“另一半”。许多(basic.publish是最被广泛使用的)都没有相对应的“响应”方法,另外一些(如basic.get)有着一种以上与之对应的“响应”方法。
连接
AMQP连接通常是长连接。AMQP是一个使用TCP提供可靠投递的应用层协议。AMQP使用认证机制并且提供TLS(SSL)保护。当一个应用不再需要连接到AMQP代理的时候,需要优雅的释放掉AMQP连接,而不是直接将TCP连接关闭。
通道
有些应用需要与AMQP代理建立多个连接。无论怎样,同时开启多个TCP连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1提供了通道(channels)来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。
在涉及多线程/进程的应用中,为每个线程/进程开启一个通道(channel)是很常见的,并且这些通道不能被线程/进程共享。
一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
虚拟主机
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟Web servers虚拟主机概念非常相似,这为AMQP实体提供了完全隔离的环境。当连接被建立的时候,AMQP客户端来指定使用哪个虚拟主机。
Java客户端开发指南
详细参考官方文档
Java Client API Guide — RabbitMQ
RabbitMQ Java 客户端使用com.rabbitmq.client
作为它的顶级包。关键的类和接口有:
- Channel: 代表 AMQP 0-9-1通道,并提供了大多数操作(协议方法)。
- Connection: 代表 AMQP 0-9-1 连接
- ConnectionFactory: 构建
Connection
实例 - Consumer: 代表消息的消费者
- DefaultConsumer: 消费者通用的基类
- BasicProperties: 消息的属性(元信息)
- BasicProperties.Builder:
BasicProperties
的构建器
通过Channel
(通道)的接口可以对协议进行操作。Connection
(连接)用于开启通道,注册连接的生命周期内的处理事件,并且关闭不再需要的连接。ConnectionFactory
用于实例化Connection
对象,并且可以通过ConnectionFactory
来进行诸如vhost、username等属性的设置。
初始化
建立连接
相关源码全部发布在下面的仓库中
- RabbitmqClient封装建立连接用的相关属性
@Builder
public class RabbitmqClient
private String userName;
private String password;
private String virtualHost;
private String host;
private Integer port;
public Connection getConnection() throws IOException, TimeoutException
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setHost(host);
connectionFactory.setPort(port);
return connectionFactory.newConnection();
- 工具类
public class RabbitmqUtil
private final String keyPrefix="spring.rabbitmq.";
private final YamlUtil yamlUtil;
private final RabbitmqClient rabbitmqClient;
public RabbitmqUtil(String ymlPath)
this.yamlUtil =new YamlUtil(ymlPath);
this.rabbitmqClient=RabbitmqClient.builder()
.userName(yamlUtil.get(keyPrefix+"username"))
.password(yamlUtil.get(keyPrefix+"password"))
.host(yamlUtil.get(keyPrefix+"host"))
.port(Integer.valueOf(yamlUtil.get(keyPrefix+"port")))
.virtualHost(yamlUtil.get(keyPrefix+"virtual-host"))
.build();
public Connection getConnection() throws IOException, TimeoutException
return rabbitmqClient.getConnection();
- 测试
RabbitmqUtil rabbitmqUtil = new RabbitmqUtil("application.yml");
Connection connection = rabbitmqUtil.getConnection();
对于运行在本地的RabbitMQ节点而言,这些参数都有合适的默认值。
如果在创建连接前没有指定参数值,则会使用默认参数:
Property | Default Value |
---|---|
Username | “guest” |
Password | “guest” |
Virtual host | “/” |
Hostname | “localhost” |
port | 5672正常通信端口,5671用于SSL加密通信 |
需要注意的是,默认情况下guest(来宾)用户只能用本地进行连接。目的是为了限制已知凭证在生产系统中的使用。
#在配置文件中设置loopback_users为none,那么guest账号就可以进行远程连接了
loopback_users = none
创建通道
public Channel createChannel() throws IOException
log.info("通道创建中...");
return connection.createChannel();
关闭Rabbitmq连接
通过简单的对通道和连接进行关闭即可关闭掉RabbitMQ的连接:
public void close()
log.info("关闭rabbitmq连接中...");
try
//channel.close(); 非必须
connection.close();
catch (IOException e)
log.error("关闭连接过程中出现错误: ",e);
需要注意的是,将通道关闭掉不是必须的操作。因为无论何种情况,通道都会在底层的连接关闭时自动关闭掉。
连接和通道的寿命
客户端connections是长连接。底层协议的设计和优化都考虑到了长连接的需求。这意味着对诸如消息发送之类的每个操作都建立一个连接的形式是极其不推荐的,那样做会产生大量的网络往返和开销。
Channels 虽然也是长期存活的,但是由于有大量的可恢复的协议错误会导致通道关闭,通道的存活期会比连接短一些。虽然每个操作都打开和关闭一个通道不是必须的操作,但是也不是不可行。有的选的情况下,还是优先考虑通道的复用为好。
类似于尝试从一个不存在的队列里消费消息这种 通道级别的异常 会导致通道关闭。已经关闭的通道不可以再被使用,也不会再接收到如消息投递之类的服务器事件。RabbitMQ会记录下通道级别的异常,并且会为通道初始化一个关闭顺序
提供本次连接的标记名称
RabbitMQ 节点可以持有有限的的客户端信息:
- 客户端的TCP节点(来源IP地址和端口)
- 使用的凭证
包括RabbitMQ Java客户端在内的AMQP 0-9-1客户端链接可以提供一个自定义标识符,一遍在服务器日志 和管理界面中方便地对客户端进行区分。设置好后,日志内容管理界面中便会对标识符有所体现。标识符即为客户端提供的连接名称。名称可以用于标识应用或应用中特定的组件。虽然名称是可选的,但是强烈建议提供一个,这将大大简化某些操作任务。
newConnection方法提供了很多重载方法,其中一部分提供了此次连接名称的设置
public Connection getConnection(String connectionName) throws IOException, TimeoutException
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setHost(host);
connectionFactory.setPort(port);
return connectionFactory.newConnection(connectionName);
交换机和队列
交换机和队列在使用事前必须对他们进行声明。
简单来讲,对任何一种对象类型进行声明的目的是为了确保它们已经存在,并在需要的时候对其进行创建。
客户端独占队列
以下代码声明了一个交换机以及一个服务端命名的队列,然后将它们绑定到一起
channel.exchangeDeclare("dhy-exchange", "direct", true);
//queueDeclare创建的队列名为""
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "dhy-exchange", "dhy");
这将会主动声明以下对象,这两个对象都可以使用附加参数进行自定义。但在这里,没有给他们俩定义特殊的参数。
- 持久化、非自动删除的“直连”形交换机
- 具有系统生成的名称的,非持久化、独占、自动删除的队列
注意,当只有一个客户端打算独占队列时,这是一个典型的队列声明方式。队列不需要既定的名称,没有其他客户端使用此队列(独占),队列会被自动清理掉(自动删除)。如果有多个客户端消费打算消费一个既定名称的队列,一下代码更为合适:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
这将会主动进行以下声明:
- 持久化、非自动删除的“直连”交换机
- 拥有既定名称的,持久化、非独占、非自动删除的队列
许多Channel
接口方法都是被重载的。这里用到的关于 exchangeDeclare
, queueDeclare
和 queueBind
的短结构的重载方法使用了合适的默认值,更易于使用。当然也有更多参数的长结构的重载方法,使用那些方法可以将一些必要的默认参数进行重写,进行更全面的控制。
队列和交换机的被动声明
队列和交换机可以被动地进行声明。被动声明会简单地检查提供的名称所对应的实体是否存在。对成功检测到的队列来说,被动声明会返回跟非被动声明同样的信息,即队列中处于就绪状态的消费者和消息数量。
如果对应的实体不存在,操作会抛出一个通道级别的异常。然后通道就不可以继续使用了,需要打开一个新的通道。通常在进行被动声明的时候使用临时的一次性通道。
Channel#queueDeclarePassive
和 Channel#exchangeDeclarePassive
方法被用来进行被动声明。下边演示Channel#queueDeclarePassive
的使用:
AMQP.Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");
response.getMessageCount();
response.getConsumerCount();
Channel#exchangeDeclarePassive
方法的返回值没包含什么有用的信息。只要方法正确返回,并且没有通道异常发生,就意味着交换机已经存在了。
不等待服务器响应
一些常见的操作还带有“非等待”版本,这种版本的操作不会等待服务器的响应。例如,以下方法会声明一个队列并且通知服务器不要发送任何响应
channel.queueDeclareNoWait(queueName, true, false, false, null);
“非等待”版本的操作会更具效率,但是安全保障较低,例如,它们更依赖心跳机制去检测失败的操作。如果不确定,就从标准版本的操作用起。“非等待”版本只是在高级拓扑结构(队列、绑定)的情况下需要。
实体和消息的清除
可以显示地将队列和交换机删除:
channel.queueDelete("queue-name")
也可以做到当队列为空时对其进行删除:
channel.queueDelete("queue-name", false, true)
或者当它不再被使用的时候(没有任何消费者对其进行消费):
channel.queueDelete("queue-name", true, false)
队列可以被清除(删除里边的所有消息):
channel.queuePurge("queue-name")
发布消息
//发布消息到交换机
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,"hello rabbitmq".getBytes(StandardCharsets.UTF_8));
想要实现更完善的控制,可以使用重载的变体来指定mandatory
标识,或者发送预设好消息属性的消息。
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
mandatory是强制的意思
当我们向某个交换机发送消息后,交换机发现消息无法被路由到任何一个绑定到该交换机的队列上,那么如果publiher发送消息时,将mandatory属性设置为了false(默认就是false),那么消息会被转交给alternate exchange兜底交换机,前提是该交换机存在,不存在会记录警告日志。
当我们向某个交换机发送消息后,交换机发现消息无法被路由到任何一个绑定到该交换机的队列上,那么如果publiher发送消息时,将mandatory属性设置为了true,该消息会被返回给publisher,对应的消息发送方需要提供一个处理回退消息的回调接口,可以通过该接口完成对路由失败消息的记录或者尝试将其转交给其他交换机发送。
mandatory属性具体说明大家也可以参考官方文档:
兜底交换机具体大家可以参考官方文档:
Alternate Exchanges — RabbitMQ
以下示例发送消息的时候会指定投递模式为2(持久化),优先级为1并且消息体类型(content-type)为"text/plain"。使用Builder
类去创建一个需要指定多个属性的消息属性对象,例如:
//发布消息到交换机
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(以上是关于Rabbitmq小书的主要内容,如果未能解决你的问题,请参考以下文章