消息中间件RabbitMQ
Posted IT__LS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息中间件RabbitMQ相关的知识,希望对你有一定的参考价值。
1.消息队列
1.1. 消息队列MQ
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省 了服务器的请求响应时间,从而提高了系统的吞吐量。
开发中消息队列通常有如下应用场景:
- 任务异步处理 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。(不需同步耗时任务往后稍稍,异步处理)
- 应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。(中间商交互解耦)
1.2. AMQP 和 JMS
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。
1.2.1. AMQP
AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP 不从API层进行限定,而是直接定义网络交换的数据格式。
1.2.2. JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
1.2.3. AMQP 与 JMS 区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式.
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的.
- JMS规定了两种消息模式;而AMQP的消息模式更加丰富.
1.3. 消息队列产品
市场上常见的消息队列有如下:
- ActiveMQ:基于JMS
- ZeroMQ:基于C语言开发
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品
- Kafka:类似MQ的产品;分布式消息系统,高吞吐量
1.4. RabbitMQ
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队 列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics 主题模式,RPC远程调用模式(远程调用,不太算MQ;不作介绍);
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
2.配置RabbitMQ
2.1 安装
安装erlang http://erlang.org/download/
erlang安装完成需要配置erlang 系统环境变量:
ERLANG_HOME=D:\\Program Files\\erl9.3
在path中添 加%ERLANG_HOME%\\bin;
安装完rabbitMQ后检查服务
配置插件
http://localhost:15672/ guest guest
2.2. 用户以及Virtual Hosts配置
2.2.1. 用户角色
RabbitMQ在安装好后,可以访问http://localhost:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:
2.2.2. Virtual Hosts配置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ 中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost 之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头.
创建Virtual Hosts 设置Virtual Hosts权限
3.编写RabbitMQ的入门程序
3.1 依赖
3.2 生产者
package com.test.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Description TODO 简单模式:发送消息
**/
public class Producer {
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
//1. 创建连接工厂(设置RabbitMQ的连接参数);
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机;默认localhost
connectionFactory.setHost("localhost");
//连接端口;默认5672
connectionFactory.setPort(5672);
//虚拟主机;默认/
connectionFactory.setVirtualHost("/test");
//用户名;默认guest
connectionFactory.setUsername("test");
//密码;默认guest
connectionFactory.setPassword("test");
//2. 创建连接;
Connection connection = connectionFactory.newConnection();
//3. 创建频道;
Channel channel = connection.createChannel();
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//5. 发送消息;
String message = "你好!小兔纸。";
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
//6. 关闭资源
channel.close();
connection.close();
}
}
3.3 消费者
package com.test.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//1. 创建连接工厂(设置RabbitMQ的连接参数);
ConnectionFactory connectionFactory = new ConnectionFactory();
//主机;默认localhost
connectionFactory.setHost("localhost");
//连接端口;默认5672
connectionFactory.setPort(5672);
//虚拟主机;默认/
connectionFactory.setVirtualHost("/test");
//用户名;默认guest
connectionFactory.setUsername("w12777");
//密码;默认guest
connectionFactory.setPassword("w12777");
//2. 创建连接;
return connectionFactory.newConnection();
}
}
package com.test.rabbitmq.simple;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description TODO 简单模式;消费者接收消息
**/
public class Consumer {
public static void main(String[] args) throws Exception {
//1. 创建连接工厂;
//2. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
//3. 创建频道;
Channel channel = connection.createChannel();
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//5. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
//6. 监听队列
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);
}
}
3.4 小结
4.RabbitMQ的5种模式特征
4.1. Work queues工作队列模式
Work Queues与入门程序的简单模式的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同 时消费消息的测试。
生产者
package com.test.rabbitmq.work;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* work工作队列模式:发送消息
*/
public class Producer {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//1. 创建连接工厂(设置RabbitMQ的连接参数);
//2. 创建连接;
Connection connection = ConnectionUtil.getConnection();
//3. 创建频道;
Channel channel = connection.createChannel();
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for(int i = 1; i <= 30; i++) {
//5. 发送消息;
String message = "你好!小兔纸。work模式 --- " + i;
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
}
//6. 关闭资源
channel.close();
connection.close();
}
}
消费者1
package com.test.rabbitmq.work;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* work模式;消费者接收消息
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//1. 创建连接工厂;
//2. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
//3. 创建频道;
final Channel channel = connection.createChannel();
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//每次可以预取多少个消息
channel.basicQos(1);
//5. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者1---接收到的消息为:" + new String(body, "utf-8"));
Thread.sleep(1000);
//确认消息
/**
* 参数1:消息id
* 参数2:false表示只有当前这条被处理
*/
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//6. 监听队列
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);
}
}
消费者2
package com.test.rabbitmq.work;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* work模式;消费者接收消息
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
//1. 创建连接工厂;
//2. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
//3. 创建频道;
final Channel channel = connection.createChannel();
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//每次可以预取多少个消息
channel.basicQos(1);
//5. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者2---接收到的消息为:" + new String(body, "utf-8"));
Thread.sleep(1000);
//确认消息
/**
* 参数1:消息id
* 参数2:false表示只有当前这条被处理
*/
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//6. 监听队列
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.QUEUE_NAME, true, defaultConsumer);
}
}
启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
4.2 订阅模式类型
4.3. Publish/Subscribe发布与订阅模式
生产者
package com.test.rabbitmq.ps;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 发布与订阅模式:发送消息
*/
public class Producer {
//交换机名称
static final String FANOUT_EXCHANGE = "fanout_exchange";
//队列名称
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//队列名称
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws Exception {
//1. 创建连接;
Connection connection = ConnectionUtil.getConnection();
//2. 创建频道;
Channel channel = connection.createChannel();
//3. 声明交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic)
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
//5. 队列绑定到交换机;参数1:队列名称,参数2:交换机名称,参数3:路由key
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");
//6. 发送消息;
for(int i = 1; i<=10; i++) {
String message = "你好!小兔纸。发布订阅模式 --- " + i;
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes());
System.out.println("已发送消息:" + message);
}
//6. 关闭资源
channel.close();
connection.close();
}
}
消费者1
package com.test.rabbitmq.ps;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 发布与订阅模式;消费者接收消息
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//1. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
//2. 创建频道;
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
//5. 队列绑定到交换机上
channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, "");
//6. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者1 --- 接收到的消息为:" + new String(body, "utf-8"));
}
};
//6. 监听队列
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.FANOUT_QUEUE_1, true, defaultConsumer);
}
}
消费者2
package com.test.rabbitmq.ps;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 发布与订阅模式;消费者接收消息
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
//1. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
//2. 创建频道;
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
//5. 队列绑定到交换机上
channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE, "");
//6. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者2 --- 接收到的消息为:" + new String(body, "utf-8"));
}
};
//6. 监听队列
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.FANOUT_QUEUE_2, true, defaultConsumer);
}
}
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
消费者1和2都是1-10
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可 以查看到如下的绑定:
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
4.4. Routing路由模式
在编码上与 Publish/Subscribe发布与订阅模式 的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需 要指定routing key。
生产者
package com.test.rabbitmq.routing;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 路由模式:发送消息
*/
public class Producer {
//交换机名称
static final String DIRECT_EXCHANGE = "direct_exchange";
//队列名称
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//队列名称
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws Exception {
//1. 创建连接;
Connection connection = ConnectionUtil.getConnection();
//2. 创建频道;
Channel channel = connection.createChannel();
//3. 声明交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic)
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
//5. 队列绑定到交换机;参数1:队列名称,参数2:交换机名称,参数3:路由key
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHANGE, "insert");
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHANGE, "update");
//6. 发送消息;
String message = "你好!小兔纸。路由模式 ;routing key 为 insert ";
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(DIRECT_EXCHANGE, "insert", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "你好!小兔纸。路由模式 ;routing key 为 update ";
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(DIRECT_EXCHANGE, "update", null, message.getBytes());
System.out.println("已发送消息:" + message);
//6. 关闭资源
channel.close();
connection.close();
}
}
消费者1
package com.test.rabbitmq.routing;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 路由模式;消费者接收消息
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//1. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
//2. 创建频道;
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Producer.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
//5. 队列绑定到交换机上
channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHANGE, "insert");
//6. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者1 --- 接收到的消息为:" + new String(body, "utf-8"));
}
};
//6. 监听队列
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, defaultConsumer);
}
}
消费者2
package com.test.rabbitmq.routing;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 路由模式;消费者接收消息
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
//1. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
//2. 创建频道;
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Producer.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);
//5. 队列绑定到交换机上
channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHANGE, "update");
//6. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者1 --- 接收到的消息为:" + new String(body, "utf-8"));
}
};
//6. 监听队列
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, defaultConsumer);
}
}
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
Routing模式要求队列绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 direct_exchange 的交换机,可 以查看到如下的绑定:
4.5. Topics通配符模式
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可 以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert
红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配
生产者
package com.test.rabbitmq.topic;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 通配符模式:发送消息
*/
public class Producer {
//交换机名称
static final String TOPIC_EXCHAGE = "topic_exchage";
//队列名称
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//队列名称
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
//1. 创建连接;
Connection connection = ConnectionUtil.getConnection();
//2. 创建频道;
Channel channel = connection.createChannel();
//3. 声明交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic)
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
//6. 发送消息;
String message = "商品新增。通配符模式 ;routing key 为 item.insert ";
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "商品修改。通配符模式 ;routing key 为 item.update ";
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
System.out.println("已发送消息:" + message);
message = "商品删除。通配符模式 ;routing key 为 item.delete ";
/**
* 参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
* 参数2:路由key,简单模式中可以使用队列名称
* 参数3:消息其它属性
* 参数4:消息内容
*/
channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
System.out.println("已发送消息:" + message);
//6. 关闭资源
channel.close();
connection.close();
}
}
消费者1
package com.test.rabbitmq.topic;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 通配符模式;消费者接收消息
*/
public class Consumer1 {
public static void main(String[] args) throws Exception {
//1. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
//2. 创建频道;
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);
//5. 队列绑定到交换机上
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete");
//6. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者1 --- 接收到的消息为:" + new String(body, "utf-8"));
}
};
//6. 监听队列
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.TOPIC_QUEUE_1, true, defaultConsumer);
}
}
消费者2
package com.test.rabbitmq.topic;
import com.test.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 通配符模式;消费者接收消息
*/
public class Consumer2 {
public static void main(String[] args) throws Exception {
//1. 创建连接;(抽取一个获取连接的工具类)
Connection connection = ConnectionUtil.getConnection();
//2. 创建频道;
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
//4. 声明队列;
/**
* 参数1:队列名称
* 参数2:是否定义持久化队列(消息会持久化保存在服务器上)
* 参数3:是否独占本连接
* 参数4:是否在不使用的时候队列自动删除
* 参数5:其它参数
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
//5. 队列绑定到交换机上
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");
//6. 创建消费者(接收消息并处理消息);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key为:" + envelope.getRoutingKey());
//交换机
System.out.println("交换机为:" + envelope.getExchange());
//消息id
System.out.println("消息id为:" + envelope.getDeliveryTag());
//接收到的消息
System.out.println("消费者1 --- 接收到的消息为:" + new String(body, "utf-8"));
}
};
//6. 监听队列
/**
* 参数1:队列名
* 参数2:是否要自动确认;设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;
* 如果设置为false则需要手动确认
* 参数3:消费者
*/
channel.basicConsume(Producer.TOPIC_QUEUE_2, true, defaultConsumer);
}
}
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队 列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式的功能;
只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
4.6 模式总结
5.使用SpringBoot整合RabbitMQ
在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ
https://github.com/spring-projects/spring-amqp
尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
5.1 生产者
依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
</parent>
<groupId>org.example</groupId>
<artifactId>springboot-rabbitmq-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<version>2.0.2.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.0.2.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
启动类
package com.test.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
绑定交换机和队列
package com.test.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//交换机名称
public static final String ITEM_TOPIC_EXCHANGE = "item_topic_exchange";
//队列名称
public static final String ITEM_QUEUE = "item_queue";
//声明交换机
@Bean("itemTopicExchange")
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
}
//声明队列
@Bean("itemQueue")
public Queue itemQueue(){
return QueueBuilder.durable(ITEM_QUEUE).build();
}
//将队列绑定到交换机
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue,
@Qualifier("itemTopicExchange")Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
5.2 消费者
配置RabbitMQ
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /test
username: w12777
password: w12777
消息监听处理类
package com.test.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyListener {
/**
* 接收队列消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "item_queue")
public void myListener1(String message){
System.out.println("消费者接收到消息:" + message);
}
}
5.3 测试
package com.test.rabbitmq;
import com.test.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
"item.insert", "商品新增,路由Key为item.insert");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
"item.update", "商品新增,路由Key为item.update");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
"item.delete", "商品新增,路由Key为item.delete");
rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,
"a.item.delete", "商品新增,路由Key为a.item.delete");
}
}
先运行上述测试程序(交换机和队列才能先被声明和绑定),然后启动消费者;在消费者工程springboot-rabbitmqconsumer中控制台查看是否接收到对应消息。
以上是关于消息中间件RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章