02 RabbitMQ基础
Posted IT BOY
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了02 RabbitMQ基础相关的知识,希望对你有一定的参考价值。
目录
02 RabbitMQ基础
RabbitMQ是一套开源(MPL)的消息队列服务软件,他是基于AMQP协议的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 语言编写而成。除了AMQP协议外,RabbitMQ还支持STOMP、MQTT、HTTP和WebSockets等协议。
Pt1 工作模型
由于RabbitMQ是基于AMQP的实现,服务端的工作模型也是基于AMQP的工作模型,先来看看RabbitMQ的工作模型(如下图),再来介绍RabbitMQ各个模块。
在整个工作模型中,包含以下几个组件:
-
Broker:主机
-
Exchange:交换机
-
Queue:队列
-
Connection:TCP长连接
-
Message:消息
-
Channel:消息信道
-
VHost:虚拟主机
-
Consumer:消费者
-
Producer:生产者
Pt1.1 Broker
Broker翻译成中文意思是经纪人、协商人,而MQ正是在生产者和消费者之间提供消息的转发、代理工作。RabbitMQ需要安装一个独立运行的服务,服务器节点就称为Broker,默认端口是5672。
Pt1.2 Exchange
Exchange的作用是接收生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。
通常来说,生产者发送消息给队列,然后被某个消费者消费。但是如果消费者比较多呢,比如广播消息的场景,生产者需要将消息发送给多个队列,然后再由多个消费者进行消费。
这个生产者看起来压力就很大啊,需要将同一条消息多次调用发送给每个队列。先不去考虑这么多次消息发送的一致性问题(要发送10个队列,9个成功,1个失败的场景下,是不断重试?还是要回滚?这时候消息发送的状态是算成功?部分成功?还是失败呢?想想如果你再发送MQ消息的时候,还要让你管理这么多复杂的问题,头大不大?),光是这么多队列的消息发送,对于生产者客户端的编码人员就是一种痛苦,要维护这么长的发送清单,并且每次新增和修改队列,都要变更代码,这还只是一条消息的发送,你说开发人员难不难?
RabbitMQ已经考虑到这一点。
RabbitMQ提供了Exchange的组件,负责消息的路由。
也就是说,一条消息不管需要发送给多少个队列,多少个消费者,生产者只需要将消息发送到Exchange上即可(一次发送),由Exchange负责消息的转发。Exchange会跟一个或者多个Queue之间建立绑定关系,它本身不会存储消息,收到生产者发送的消息后,会根据绑定关系发送到对应的队列上,然后由该队列绑定的消费者完成消费。
Exchange和队列是多对多的绑定关系。建立绑定关系后,生产者发送到Exchange的消息中会携带标识,当队列的绑定关系跟标识匹配时,消息就会发送给队列。
可以使用API或者MQ控台声明Exchange。
ExchangeDeclare(exchange, type, durable, autoDelete, internal, arguments);
-
exchange:名称;
-
type:类型,一共有3种,后面会介绍。
-
durable:元数据是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除(因为没有持久化是保存在内存中的,关闭后内存数据丢失);
-
autoDelete:是否自动删除,如果没有与之绑定的Queue,直接删除;
-
internal:是否内置的,如果为true,只能通过Exchange到Exchange;
-
arguments:结构化参数
-
Alternate exchange:备用交换机,发送消息的时候根据route key并没有把消息路由到队列中去,这就会将此消息路由到Alternate Exchange属性指定的Exchange上。
-
ExchangeType决定了Exchange路由消息的行为,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
(1) Direct 直连模式
Exchange和队列使用明确地绑定键(binding key)完成绑定,生产者发送消息时会携带一个路由键,当消息的路由键和某个队列的绑定键完全匹配时,这条消息才会从交换机路由到这个队列上。
多个队列可以使用相同的绑定键。
直连类型的交换机,适用于一些业务明确的消息,建立明确的绑定键。
代码示例:参照[Pt3.2]
(2) Topic 主题模式
主题模式Exchange和队列绑定时,可以在绑定键中使用通配符,RabbitMQ支持两个通配符:
-
# 代表0个或者多个单词
-
* 代表一个单词,不多不少一个单词
单词(word)指的是使用英文的点“.”隔开的字符。例如,how.are.you就是3个单词,im.fine是2个单词。
主体模式的交换机适用于一些根据业务主题或者消息等级过滤消息的场景,比如,一条消息可能跟资金相关、跟风控相关、又跟账户相关,那就让这个消息指定一个多级的路由键,第一个单词代表资金,第二个代表风控,以此类推等,让每个消费者的队列都能根据不同的绑定键接收到这个主题的消息。
代码示例:参考[Pt3.3]
(3) Fanout广播模式
广播模式的Exchange与队列绑定时,不需要指定绑定键。生产者发送消息到广播Exchange上,也不需要携带路由键。
消息达到Exchange之后,所有与Exchange绑定的队列,都会受到相同消息的副本。
广播模式的Exchange适用于一些通用的业务消息(通知类型)。比如,当交易数据发生变化时,需要通知上层应用,数据中心和各个数据应用系统(反洗钱、反欺诈、风控、合规做数据归集和风险缝隙),就可以使用广播模式的Exchange,将数据变化广播到各个系统,由各个系统的消费者来消费。
代码示例:参照[Pt3.4]
Pt1.3 Queue
FIFO的消息队列,用于存储还未被消费者消费的消息。
队列是生产者和消费者之间的纽带,队列的消息被持久化到一个叫做Mnesia的数据库中,存放在磁盘上。
可以使用API或者在MQ控台声明一个队列:
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
-
queue: 队列名称;
-
durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库;
-
exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景;
-
autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除;
-
arguments:队列中消息处理参数。
-
Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl。生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除。 特性Features=TTL, 单独为某条消息设置过期时间。
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”); channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));
-
Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除;
-
Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息;
-
Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小;
-
Overflow Behanviour:设置队列溢出行为。这决定了当达到队列的最大长度时,消息会发生什么。有效值为Drop Head或Reject Publish;
-
Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉;
-
Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去;
-
Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费;
-
Lazy mode(x-queue-mode=lazy): Lazy Queues,先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中;
-
Master locator(x-queue-master-locator):将队列设置为主位置模式,确定在节点集群上声明时队列主位置所依据的规则。
-
声明了Exchange和Queue之后,接下来就是绑定二者之间的关系,同样可以基于API和MQ控台进行操作:
channel.QueueBind(queue, exchange, routingKey, arguments);
-
queue:队列名称
-
exchange:交换机名称
-
routingKey:绑定键
-
arguments:结构化参数。
Pt1.4 Message
由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。Body是真正需要传输的APP数据。
Pt1.5 Connection
无论是生产者还是消费者,在收发消息前,都要和Broker建立一个TCP的长连接。
Pt1.6 Channel
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销
Pt1.7 VHost
VHost其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host。
每一个RabbitMQ服务器都能创建虚拟消息服务器,我们称之为虚拟主机VHost。每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的交换机、队列、绑定关系等,拥有自己的权限机制。vhost之于Rabbit就像虚拟机之于物理机一样。他们通过在各个实例间提供逻辑上分离,允许为不同的应用程序安全保密的运行数据,这就是经常被提到的多租户概念。
多租户技术
多租户技术(英语:multi-tenancy technology)或称多重租赁技术,是一种软件架构技术,它是在探讨与实现如何于多用户的环境下共用相同的系统或程序组件,并且仍可确保各用户间数据的隔离性。
多租户简单来说是指一个单独的实例可以为多个组织服务。
多租户技术为共用的数据中心内,如何以单一系统架构与服务,来提供多个客户端相同甚至可定制化的服务,并且仍然可以保障客户的数据隔离。一个支持多租户技术的系统需要在设计上对它的数据和配置进行虚拟分区,从而使系统的每个租户或称组织都能够使用一个单独的系统实例,并且每个租户都可以根据自己的需求对租用的系统实例进行个性化配置。
多租户技术可以实现多个租户之间共享系统实例,同时又可以实现租户的系统实例的个性化定制。通过使用多租户技术可以保证系统共性的部分被共享,个性的部分被单独隔离。通过在多个租户之间的资源复用,运营管理维护资源,有效节省开发应用的成本。而且,在租户之间共享应用程序的单个实例,可以实现当应用程序升级时,所有租户可以同时升级。同时,因为多个租户共享一份系统的核心代码,因此当系统升级时,只需要升级相同的核心代码即可。
VHost既能将同一个Rabbit的众多客户区分开来,又可以避免队列和交换器的命名冲突。RabbitMQ提供了开箱即用的默认的虚拟主机“/”,如果不需要多个vhost可以直接使用这个默认的vhost,通过使用缺省的guest用户名和guest密码来访问默认的vhost。
Pt1.8 Consumer
Consumer有两种消费模式。
-
Pull模式:消息存放在MQ服务端,消费者通过主动获取的方式从MQ获取消息进行消费。这样可以有效的保护消费者,根据自己的消费能力决定获取消息的频率,避免消息在Consumer端产生积压造成服务不可用。但是如果消费者消费能力比较差,发送到MQ的消息按照消费者自身的消费节奏,需要等待一段时间才能被消费,会降低消息的实时性。
RabbitMQ中使用basicGet()拉取数据。
-
Push模式:只要生产者发送消息到MQ服务器,就立马推送给消费者,消息保存在消费者端,保证消息的实时性。但是如果消费者自身消费能力不够,可能会造成消息在客户端积压。Spring AMQP采取的就是push模式,通过事件机制对队列进行监听,只要有消息达到队列,就会触发消费消息的方法。
RabbitMQ中使用basicConsume()推送消息。
RabbitMQ中Pull和Push模式都有实现,Kafka和RocketMQ只有Pull模式。
由于队列的FIFO特性,只有确定消息被消费者接收之后,Broker才会把消息从数据库删除,继续投递下一条消息。关于MQ如何确认消费者已经收到消息的机制,在后面可靠性投递部分会详细说明。
一个消费这可以监听多个队列,一个队列也可以被多个消费者监听。在生产环境中,一般建议一个消费者只处理一条队列的消息,如果需要提升队列的消费能力,可以增加多个消费者,MQ会以轮训的方式将消息转发到每个消费者上。
通过basicConsume可以从MQ获取消息进行消费。
channel.basicConsume(queue, true, new DefaultConsumer(channel) @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException String message = new String(body, "UTF-8"); ;);
-
queue:要消费的队列名称
-
body:消息主体
Pt1.9 Producer
Producer就是消息的生产者,这个实在没什么好介绍的。
通过BasicPublish可以发送消息:
channel.BasicPublish(exchange, routingKey, basicProperties, messageBody);
-
exchange:发送的交换机
-
routingKey:消息的路由键
-
basicProperties:设置消息的Header属性,比如ttl等;
-
messageBody:消息的内容;
Pt2 安装操作
Pt2.1 Windows安装RabbitMQ
(1) Erlang环境说明
RabbitMQ依赖Erlang语言,所以要先安装Erlang环境。需要注意的是,RabbitMQ和Erlang有版本对应关系,否则可能产生奇怪的异常。
具体对应关系可以参照官方链接:https://www.rabbitmq.com/which-erlang.html
我们选择安装最新版RabbitMQ3.8.16,对应的Erlang环境选择24.0。
(2) 下载安装Erlang
官方链接:https://www.erlang.org/downloads/24.0
因为是Windows环境下,选择exe文件下载,然后一直next安装即可。
安装完成后,配置Erlang环境变量:
ERLANG_HOME=D:\\Program Files\\erl-24.0 path中添加:%ERLANG_HOME%\\bin;
配置完成后,cmd下输入erl,查看安装是否正确:
C:\\Users\\lucas>erl Eshell V12.0 (abort with ^G) 1>
(3) 下载安装RabbitMQ
官方下载链接:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.16
下载rabbitmq-server-3.8.16.exe,傻瓜式安装。
配置RabbitMQ环境变量:
RABBITMQ_SERVER=D:\\Program Files\\RabbitMQ Server\\rabbitmq_server-3.8.16 path中添加:%RABBITMQ_SERVER%\\sbin
(4) 启用RabbitMQ插件
在cmd中执行命令启用RabbitMQ插件功能,进入%RABBITMQ_SERVER%\\sbin目录,执行以下命令:
rabbitmq-plugins.bat enable rabbitmq_management
C:\\Users\\lucas>"G:\\Software2018\\Developer\\rabbitmq_server-3.8.16\\sbin\\rabbitmq-plugins.bat" enable rabbitmq_management Enabling plugins on node rabbit@lucas-PC: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@lucas-PC... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch set 3 plugins. Offline change; changes will take effect at broker restart.
(5) 启动RabbitMQ
# 启动RabbitMQ在cmd下输入: net start RabbitMQ # 关闭RabbitMQ在cmd下输入: net stop RabbitMQ
如果启动MQ报错,则可能是安装后MQ已经启动了,可以执行关闭命令后在重新尝试启动。
如果提示[发生系统错误 5。拒绝访问],则是需要以管理员身份运行cmd才可以。
(6) RabbitMQ管理控台
RabbitMQ启动后,访问管理控台http://localhost:15672/,默认用户名密码guest/guest。
默认配置文件路径:C:\\Users\\用户\\AppData\\Roaming\\RabbitMQ\\advanced.config
默认数据库路径:C:\\Users\\用户\\AppData\\Roaming\\RabbitMQ\\db\\rabbit@用户-PC-mnesia
(7) 初始化RabbitMQ
如果要初始化RabbitMQ,移除全部数据,可以执行以下命令(谨慎执行):
rabbitmq-service stop rabbitmq-service remove rabbitmq-service install rabbitmq-service start
Pt2.2 CentOS7安装RabbitMQ集群
我之前买过一个云服务器,我准备在上面搭建一个3节点的集群,没有云服务器的可以用VM本地搭建一个。
(1) 安装Docker环境
(1) 更新yum源
sudo yum update
(2) 添加仓库
sudo yum-config-manager \\
--add-repo \\
https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
(3) 查看最新版本
如果之前安装了docker,需要卸载旧版本
yum list docker-ce --showduplicates | sort -r
(4) 安装Docker CE版本
yum install docker-ce -y
(2) 安装RabbitMQ
(1) 拉取RabbitMQ镜像(最新版)
docker pull rabbitmq
(2) 创建docker网络(让容器可以和主机通信)
docker network create rabbitmqnet
(3) 创建三个容器,端口分别是 5673 5674 5675 ,管理端口是 15673 15674 15675
docker run -d --name=rabbitmq1 -p 5673:5672 -p 15673:15672 -e RABBITMQ_NODENAME=rabbitmq1 -e RABBITMQ_ERLANG_COOKIE='GUPAOEDUFORBETTERYOU' -h rabbitmq1 --net=rabbitmqnet rabbitmq:management
docker run -d --name=rabbitmq2 -p 5674:5672 -p 15674:15672 -e RABBITMQ_NODENAME=rabbitmq1 -e RABBITMQ_ERLANG_COOKIE='GUPAOEDUFORBETTERYOU' -h rabbitmq2 --net=rabbitmqnet rabbitmq:management
docker run -d --name=rabbitmq3 -p 5675:5672 -p 15675:15672 -e RABBITMQ_NODENAME=rabbitmq1 -e RABBITMQ_ERLANG_COOKIE='GUPAOEDUFORBETTERYOU' -h rabbitmq3 --net=rabbitmqnet rabbitmq:management
(4) 后两个节点作为内存节点加入集群
docker exec -it rabbitmq2 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbitmq1@rabbitmq1
rabbitmqctl start_app
docker exec -it rabbitmq3 /bin/bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbitmq1@rabbitmq1
rabbitmqctl start_app
结果如下:
[root@VM-0-17-centos ~]# docker exec -it rabbitmq3 /bin/bash root@rabbitmq3:/# rabbitmqctl stop_app RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead. Stopping rabbit application on node rabbitmq1@rabbitmq3 ... root@rabbitmq3:/# rabbitmqctl reset RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead. Resetting node rabbitmq1@rabbitmq3 ... root@rabbitmq3:/# rabbitmqctl join_cluster --ram rabbitmq1@rabbitmq1 RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead. Clustering node rabbitmq1@rabbitmq3 with rabbitmq1@rabbitmq1 root@rabbitmq3:/# rabbitmqctl start_app RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead. Starting node rabbitmq1@rabbitmq3 ...
(5) 访问控制台
通过http://ip:15673/访问RabbitMQ控台,默认用户名密码guest/guest
Pt3 基本使用
Pt3.1 基于JAVAAPI示例
(1) 创建Maven项目,引入RabbitMQ依赖:
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
(2) 在RabbitMQ控台创建VHost、Exchange、Queue,以及建立二者绑定关系,如下图所示:
-
添加VHost
-
添加Exchange
-
添加Queue
-
绑定Exchange和Queue
(3) 编写生产者代码
public class FirstProducer
private static final String FIRST_MSG = "This is my first rabbitmq message.";
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 发送消息
// RabbitMQ的API都是基于channel的
channel.basicPublish(RabbitMQConstants.EXCH_FIRST_EXCHANGE, "first", null, FIRST_MSG.getBytes());
System.out.println("消息发送成功。");
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
运行生产者,可以看到队列中已经收到了消息,但是我们此时还没有建立消费者,所以消息保存在内存中没有被消费。
(4) 编写消费者代码
public class FirstConsumer
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 前面已经在控台绑定了Exchange和Queue,这里不需要重复操作。如果是不操作控台,只是用API,可以使用下列API创建Queue并绑定Exchange
// 声明Exchange
// channel.exchangeDeclare(RabbitMQConstants.EXCH_FIRST_EXCHANGE, "direct", false, false, null);
// 声明队列
// channel.queueDeclare(RabbitMQConstants.QUEUE_FIRST_QUEUE, false, false, false, null);
// 绑定队列和交换机
// channel.queueBind(RabbitMQConstants.QUEUE_FIRST_QUEUE, RabbitMQConstants.EXCH_FIRST_EXCHANGE, "first");
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
;
while (true)
// 消费者接受Queue消息
channel.basicConsume(RabbitMQConstants.QUEUE_FIRST_QUEUE, true, consumer);
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
测试结果:
收到消息:This is my first rabbitmq message. 收到消息:This is my first rabbitmq message.
Pt3.2 Direct模式
在示例中,Exchange、Queue和绑定关系都是通过API来完成,在实际生产应用中,建议使用控台,这样可以有效进行权限控制,并且可以增加审核机制,避免硬代码。
// 创建生产者
public class DirectProducer
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 声明Exchange
channel.exchangeDeclare("DIRECT_EXCHANGE", "direct", false, false, null);
// 发送消息,路由键不同
channel.basicPublish("DIRECT_EXCHANGE", "first", null, "first message".getBytes());
channel.basicPublish("DIRECT_EXCHANGE", "second", null, "second message".getBytes());
channel.basicPublish("DIRECT_EXCHANGE", "third", null, "third message".getBytes());
System.out.println("消息发送成功。");
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
// 创建消费者:一共创建3个消费者,绑定同一个Exchange,绑定键不同。
public class DirectConsumer1
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare("DIRECT_FIRST_QUEUE", false, false, false, null);
// 绑定队列和交换机
channel.queueBind("DIRECT_FIRST_QUEUE", "DIRECT_EXCHANGE", "first");
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("消费者1收到消息:" + message);
;
while (true)
// 消费者接受Queue消息
channel.basicConsume("DIRECT_FIRST_QUEUE", true, consumer);
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
public class DirectConsumer2
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare("DIRECT_SECOND_QUEUE", false, false, false, null);
// 绑定队列和交换机
channel.queueBind("DIRECT_SECOND_QUEUE", "DIRECT_EXCHANGE", "second");
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("消费者2收到消息:" + message);
;
while (true)
// 消费者接受Queue消息
channel.basicConsume("DIRECT_SECOND_QUEUE", true, consumer);
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
public class DirectConsumer3
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare("DIRECT_THIRD_QUEUE", false, false, false, null);
// 绑定队列和交换机
channel.queueBind("DIRECT_THIRD_QUEUE", "DIRECT_EXCHANGE", "third");
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("消费者3收到消息:" + message);
;
while (true)
// 消费者接受Queue消息
channel.basicConsume("DIRECT_THIRD_QUEUE", true, consumer);
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
分别运行生产者和消费者:
// 生产和 消息发送成功。 // 消费者1 消费者1收到消息:first message // 消费者2 消费者2收到消息:second message // 消费者3 消费者3收到消息:third message
Pt3.3 Topic模式
在示例中,Exchange、Queue和绑定关系都是通过API来完成,在实际生产应用中,建议使用控台,这样可以有效进行权限控制,并且可以增加审核机制,避免硬代码。
// 创建生产者
public class TopicProducer
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 声明Exchange
channel.exchangeDeclare("TOPIC_EXCHANGE", "topic", false, false, null);
// 发送消息:Topic类型不用路由键
channel.basicPublish("TOPIC_EXCHANGE", "money.risk", null, "money.risk".getBytes());
channel.basicPublish("TOPIC_EXCHANGE", "money.risk.account", null, "money.risk.account".getBytes());
channel.basicPublish("TOPIC_EXCHANGE", "risk.account", null, "risk.account".getBytes());
channel.basicPublish("TOPIC_EXCHANGE", "risk.account.order", null, "risk.account".getBytes());
System.out.println("消息发送成功。");
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
// 创建消费者
public class TopicConsumer1
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare("TOPIC_FIRST_QUEUE", false, false, false, null);
// 绑定队列和交换机
channel.queueBind("TOPIC_FIRST_QUEUE", "TOPIC_EXCHANGE", "money.#");
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("消费者1收到消息:" + message);
;
while (true)
// 消费者接受Queue消息
channel.basicConsume("TOPIC_FIRST_QUEUE", true, consumer);
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
public class TopicConsumer2
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare("TOPIC_SECOND_QUEUE", false, false, false, null);
// 绑定队列和交换机
channel.queueBind("TOPIC_SECOND_QUEUE", "TOPIC_EXCHANGE", "*.risk.#");
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("消费者2收到消息:" + message);
;
while (true)
// 消费者接受Queue消息
channel.basicConsume("TOPIC_SECOND_QUEUE", true, consumer);
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
public class TopicConsumer3
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare("TOPIC_THIRD_QUEUE", false, false, false, null);
// 绑定队列和交换机
channel.queueBind("TOPIC_THIRD_QUEUE", "TOPIC_EXCHANGE", "#.account");
// 创建消费者,绑定队列
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
String message = new String(body, "UTF-8");
System.out.println("消费者3收到消息:" + message);
;
while (true)
// 消费者接受Queue消息
channel.basicConsume("TOPIC_THIRD_QUEUE", true, consumer);
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
运行结果:
// 生产者输出 消息发送成功。 // 消费者1输出 消费者1收到消息:money.risk 消费者1收到消息:money.risk.account // 消费者2输出 消费者2收到消息:money.risk 消费者2收到消息:money.risk.account // 消费者3输出 消费者3收到消息:money.risk.account 消费者3收到消息:risk.account
Pt3.4 Fanout模式
在示例中,Exchange、Queue和绑定关系都是通过API来完成,在实际生产应用中,建议使用控台,这样可以有效进行权限控制,并且可以增加审核机制,避免硬代码。
// 创建生产者
public class FanoutProducer
public static void main(String[] args)
Connection connection = null;
Channel channel = null;
try
// 配置连接工厂类参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RabbitMQConstants.HOST);
factory.setPort(RabbitMQConstants.PORT);
factory.setVirtualHost(RabbitMQConstants.VHOST);
factory.setUsername(RabbitMQConstants.USERNAME);
factory.setPassword(RabbitMQConstants.PASSWORD);
// 与RabbitMQ服务器建立连接
connection = factory.newConnection();
channel = connection.createChannel();
// 声明Exchange
channel.exchangeDeclare("FANOUT_EXCHANGE", "fanout", false, false, null);
// 发送消息:FANOUT类型不用路由键
channel.basicPublish("FANOUT_EXCHANGE", "", null, "first message".getBytes());
channel.basicPublish("FANOUT_EXCHANGE", "", null, "second message".getBytes());
channel.basicPublish("FANOUT_EXCHANGE", "", null, "third message".getBytes());
System.out.println("消息发送成功。");
catch (Exception ex)
ex.printStackTrace();
finally
try
channel.close();
connection.close();
catch (Exception ex)
ex.printStackTrace();
// 创建消费者
public class FanoutConsumer1
public static void main(String[] args)以上是关于02 RabbitMQ基础的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQphp语言对RabbitMQ四种工作模式的基础实现
RabbitMQ基础组件和SpringBoot整合RabbitMQ简单示例