RabbitMQ-安装及五种消息模型
Posted PaoShan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-安装及五种消息模型相关的知识,希望对你有一定的参考价值。
本文包含了安装及其五种模式
RabbitMQ中的一些角色:
publisher:生产者
consumer:消费者
exchange个:交换机,负责消息路由
queue:队列,存储消息
virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
什么是SpringAMQP?
AMQP:是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。
Spring AMQP:模板底层是基于RabbitMQ封装,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
我们在Centos7虚拟机中使用Docker来安装
docker pull rabbitmq:3.8-management
docker run \\
-v mq-plugins:/plugins \\
--name=mq \\
-p 15672:15672 \\
-p 5672:5672 \\
-d \\
rabbitmq:3.8-management
http://192.168.136.131:15672/浏览器访问
Basic Queue
生产者和消费者都要先配置MQ地址,在publisher和consumer服务的application.yml中添加配置:
spring:
rabbitmq:
host: 192.168.136.131 # 主机名
port: 5672 # 端口
virtual-host: /quick # 虚拟主机
username: paoshan # 用户名
password: 123456 # 密码
publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送 simple queue
@Test
public void testSimpleQueue() throws Exception
// 队列名称
String queueName = "simple.queue";
// 消息内容
String msg = "hello,spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, msg);
public class PublisherTest
@Test
public void testSendMessage() throws IOException, TimeoutException
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.136.128");
factory.setPort(5672);
factory.setVirtualHost("/quick");
factory.setUsername("paosahn");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
在consumer服务的包中建立类SpringRabbitListener
package com.itheima.mq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@Component
public class SpringRabbitListener
@RabbitListener(queuesToDeclare = @Queue("simple.queues"))
public void ListenSimpleQuessMessage(String msg)
System.out.println("接受简单参数:"+msg);
Work Queue
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
通过设置prefetch来控制消费者预取的消息数量
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 消费者一次处理一条消息,处理完毕后再从MQ中获取
这次我们循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++)
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
消费者:
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException
System.out.println("消费者1接收到消息:【" + msg + "】");
Thread.sleep(20);
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException
System.err.println("消费者2........接收到消息:【" + msg + "】");
Thread.sleep(200);
运行结果
Fanout Exchange广播模式
由于在生产者声明的交换机和队列,只有在第一次发送消息时,才会完成创建工作
FanoutExchange:交换机
Queue:队列
Binding:绑定交换机
@Configuration
public class FanoutConfiguration
//声明一个交换机
@Bean
public FanoutExchange fanoutExchange()
return new FanoutExchange("fanout.exchange");
//声明一个队列
@Bean
public Queue queue1()
return new Queue("fanout.queue1");
//声明两个队列
@Bean
public Queue queue2()
return new Queue("fanout.queue2");
//绑定第一个队列
@Bean
public Binding binding1(FanoutExchange fanoutExchange,Queue queue1)
return BindingBuilder.bind(queue1).to(fanoutExchange);
//绑定第二个队列
@Bean
public Binding binding2(FanoutExchange fanoutExchange ,Queue queue2)
return BindingBuilder.bind(queue2).to(fanoutExchange);
生产者
@Test
public void test01()throws Exception
//交换机参数名
String exChange = "fanout.exchange";
//消息内容
String msg = "订单号:1001";
//发送消息,没填路由器,暂时为null
rabbitTemplate.convertAndSend(exChange,null,msg);
消费者
//监听并接收fanout消息
@RabbitListener(queues = "fanout.queue1")
public void listEnFanoutQueues1(String msg)
System.out.println("仓储微服务接收消息:"+msg);
@RabbitListener(queues = "fanout.queue2")
public void listEnFanoutQueue2(String msg)
System.out.println("短信服务接收消息:"+msg);
运行结果
Direct Exchange(交换机)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
在消费者上直接添加注解
//监听并且接受direct消息
@RabbitListener(
bindings = @QueueBinding(//交换机绑定队列
value = @Queue("direct.queue1"),//创建队列
exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),//创建交换机
key = "vip","base"//指定路由规则
))
public void listEnQueue1(String msg)
System.out.println("vip用户接收:"+msg);
//监听并且接受direct消息
@RabbitListener(
bindings = @QueueBinding(//交换机绑定队列
value = @Queue("direct.queue2"),//创建队列
exchange = @Exchange(value = "direct.exchange",type = ExchangeTypes.DIRECT),//创建交换机
key = "base"//指定路由规则
))
public void listEnQueue2(String msg)
System.out.println("普通用户接收:"+msg);
生产者:
@Test
public void test02()throws Exception
//交换机名
String exchange = "direct.exchange";
//路由名
String routingKey = "base";
//消息内容
String msg = "少林足球";
//发送消息
rabbitTemplate.convertAndSend(exchange,routingKey,msg);
Topic Exchange
Topic类型的类型Exchange与Direct相比,可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: china.news
通配符规则:
#:代指0个或多个单词
*:代指一个单词
消费者
// 监听topic消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue1"),
exchange = @Exchange(value = "topic.exchange", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg)
System.out.println("国内消息:" + msg);
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue2"),
exchange = @Exchange(value = "topic.exchange", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg)
System.out.println("国际消息:" + msg);
生产者
// 发送topic消息
@Test
public void test05() throws Exception
// 1.交换机名
String exchange = "topic.exchange";
// 2.路由名
String routingKey = "china.news";
// 3.消息内容
String msg = "2023年充满希望的一年";
// 发送消息
rabbitTemplate.convertAndSend(exchange, routingKey, msg);
小结:
Topic交换机接收的消息RoutingKey必须是多个单词,以 xx.xx 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词
配置JSON转换器
在父工程中引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.3</version>
</dependency>
在服务启动类中添加json转换器
@Bean
public MessageConverter jsonMessageConverter()
return new Jackson2JsonMessageConverter();
以上是关于RabbitMQ-安装及五种消息模型的主要内容,如果未能解决你的问题,请参考以下文章