服务异步通信RabbitMQ
Posted youwhua
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了服务异步通信RabbitMQ相关的知识,希望对你有一定的参考价值。
服务异步通信
目录
1. 初识MQ
- 同步通讯
直播、微服务间基于Feign的调用
问题:耦合高、性能下降(调用耗时、等待响应(资源浪费)、吞吐量下降、服务阻塞、级联失败) - 异步通讯
微信聊天
事件驱动模式
优势一:服务解耦
优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,解决级联失败问题
优势四:流量消峰
问题一:依赖于Broker的可靠性、安全性、吞吐能力
问题二:架构复杂了,业务没有明显的流水线,不好追踪管理 - 异步通讯框架
MQ,存放消息的队列,也就是事件驱动架构中的Broker。
2. RabbitMQ快速入门
- RabbitMQ概述与安装
基于Erlang语言开发的开源消息通信中间件
使用docker安装
1.1拉取
docker pull rabbitmq:3-management
1.2导入后加载镜像
docker load -i mq.tar
2 启动MQ
docker run \\
-e RABBITMQ_DEFAULT_USER=admin \\
-e RABBITMQ_DEFAULT_PASS=admin \\
--name mq \\
--hostname mq1 \\
-p 15672:15672 \\
-p 5672:5672 \\
-d \\
rabbitmq:3-managment
登录管理界面
- 常见消息模型
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处理队列中的消息
subscribe:订阅
- 基本消息队列 BasicQueue
HelloWorld
- 工作消息队列 WorkQueue
可以提高消息处理速度,避免队列消息堆积
消息预取机制:在处理消息之前,取出所有消息
解决:每次只能获取一条消息,处理完成才能获取下一个消息
spring:
rabbitmq:
listener:
simple:
prefetch: 1
- 发布订阅
允许将同一消息发送给多个消费者。
Exchange:只负责消息的转发
广播 Fanout Exchange
将收到的消息路由到每一个跟其绑定的queue
publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;
......
String exchangeName = "name.fanout";
String message = "Hello World";
rabbitTemplate.convertAndSend(exchangeName, "", message);
......
consumer
@Configration
public class FanoutConfig
@Bean
public FanoutExchange fanoutExchange()
return new FanoutExchange("name.fanout");
@Bean
public Queue fanoutQueue1()
return new Queue("name.queue1");
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange)
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
@Bean
public Queue fanoutQueue2()
return new Queue("name.queue2");
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange)
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
路由 Direct Exchange
将接收到的消息根据规则路由到指定的Queue。
每个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
BindingKey可以相同
consumer
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener
@RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称"), exchange = @Exchange(name = "交换机名称1", type = ExchangeTypes.DIRECT), key = "key1","key2"))
public void listenerDirectQueue1(String msg)
// 业务逻辑
@RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.DIRECT), key = "key1","key2"))
public void listenerDirectQueue2(String msg)
// 业务逻辑
publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;
......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "key1";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......
主题 Topic Exchange
与DirectExchange类似,但Topic Exchange 的RoutingKey必须时多个单词列表,并且以 . 分割
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
consumer
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener
@RabbitListener(bindings= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.TOPIC), key = "xxx.#"))
public void listenerTopicQueue2(String msg)
// 业务逻辑
publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;
......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "xxx.aaa";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......
- 快速入门
# publisher
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 发送消息
String message = "Hello World";
channel.basicPubulsh("", queueName, null, message.getBytes);
// 关闭通道、连接
channel.close();
connetion.close();
# consumer
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 订阅消息
String message = "Hello World";
channel.basicPubulsh(queueName, true, new DefaultConsumer()channel
@Override
pubilc void handleDelivery(String consumerTag, Envelope envelpoe, AMQP.BasicProperties properties, byte[] body) throe IOException
// 处理消息
);
// 关闭通道、连接
channel.close();
connetion.close();
3. SpringAMQP
-
AMQP,应用程序之间传递消息的协议,与平台无关。
-
SpringAMQP,基于AMQP协议定义的一套API。spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
监听器容器,用于异步处理入站消息
用于发送和接收消息的RabbitTemplate
RabbitAdmin用于自动声明队列,交换和绑定
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
- 引入依赖
<!-- 父工程中引入spring-amqp的依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- publisher 发送消息
spring:
rabbitmq:
host: 127.0.0.1 # rabbitMQ ip地址
port: 5672
virtual-host: /
username: admin
password: admin
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;
......
String queueName = "queueName";
String message = "Hello World";
rabbitTemplate.convertAndSend(queueName, message);
......
- consumer 监听消息
spring:
rabbitmq:
host: 127.0.0.1 # 主机名
port: 5672
virtual-host: /
username: admin
password: admin
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener
@RabbitListener(queue = "队列名称")
public void listenerSimpleQueue(String msg)
// 业务逻辑
消息转换器
RabbitTemplate ,将message对象序列化为字节
修改序列化方式
父工程
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
publisher
@Configration
public class SpringRabbitListener
@Bean
public MessageConverter messageConverter()
return new Jackson2JsonMessageConverter();
最后
以上是学习 黑马程序员《微服务技术全栈教程》的学习笔记
以上是关于服务异步通信RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
Python开发项目:RPC异步执行命令(RabbitMQ双向通信)
RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)SpringAMQP发布订阅模式(FanoutExchangeDirectExchangeTopicExchange)消息转换器