RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)
Posted 程序媛一枚~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)相关的知识,希望对你有一定的参考价值。
RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)
1. RabbitMQ简介及AMQP协议
- 开源的消息代理和队列服务器。基于AMQP(Advanced Message Queuing Protocol 高级消息队列协议)。
- 底层基于Erlang语言编写;
- 开源,性能优秀,稳定性保障;
- 与SpringAMQP完美的整合,API丰富
- 集群模式丰富,表达式配置,HA模式(负载均衡),镜像队列模型
- 保证数据不丢失的前提做到高可靠性、可用性
Publisher
Server
Virtual Host
Exchange
Queue
Consumer
2. RabbitMQ安装及使用
启动
rabbitmq-server start &
rabbitmq 默认端口号5672,查看是否启动
lsof -i:5672
安装插件rabbitmq-management后才能登录
查看插件列表
rabbitmq-plugins list
启动插件
rabbitmq-plugins enable rabbitmq-management
rabbit-management管控台的端口号15672
浏览器访问:http://localhost:15672/
3. RabbitMQ核心概念
Producer
Consumer
Exchange
AMQP核心概念
- Server:又称Broker,接受客户端的了解,实现AMQP实体服务;
- Connection:连接,应用程序与Broker的网络连接;类似pg的jdbcTemplate
- Channel:网络信道,几乎所有的操作都在Channel中进行。Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。类似pg的Session。
- Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级,延迟等高等特性;Body则就是消息体内容。
- Virtual host:虚拟地址,用于进行逻辑分离,最上层的消息路由。 一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。每一个Virtual Host是独立的单元,用于隔离不同的项目应用;
- Exchange:交换机,接受生产者的消息,根据路由键转发消息到绑定的队列;
- Binding:Exchange和Queue之间的虚拟连接,bingding中可以包含routing key;
- Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息;
- Queue:也成为Message Queue,消息队列,保存消息并将它们转发给消费者;
4. 与SpringBoot2.x整合-急速入门
1. 引入相关pom
2. 配置application.yml
3. 对象Message 实现接口 implements Serilizable
4. Producer,RabbitTemplate自动装配@Autowired
消费者自动监听 @RabbitHandler @RabbitListener
5. 生产者发消息:converAndSend(exchange, routeKey, MessageObject, CorrelationData); 交换机,路由key,消息体内容,消息唯一id
消费者监听消息:
@RabbitListener指定bindings,queue,exchange,routingKey. 这个注解强大的地方是当交换机,队列,绑定关系,路由键不存在时自动创建;
@RabbitListener(bindings= @QueueBinding(
value=@Queue(value="order-queue",durable="true"),
exchange= @Exchange(name="order-exchange",durable="true",type="topic"),
key="order.*")
)
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) #**消息体,headers,通道**
// 消费者操作
// ...
Long deliveryTag = (Long)headers.get(AmqpHdears.DELIVEY_TAG);
// 当配置为手动签收时,必须要手动ACK,否则管理控制台即使消费了还是显示队列有数据。
channel.basicAck(deliveryTag,false); // 手动确认提交
可以在管理界面:http://xx.xx.xx.xx:15672/ 创建交换机,队列,交换机和队列进行绑定,或者删除;
路由key RoutingKey: order.*, order.#匹配差别;
order.#: order.111 order.111.abcd 只能匹配前者;
order.#: order.111 order.111.abcd 都能匹配;
<!--rabbitmq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# springboot整合rabbitmq基本配置
spring.rabbitmq.addresses=xx.xx.xx.xx:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# 保证100%投递模式
# 发送消息后等待消息确认
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
# springboot整合rabbitmq消费端配置
# 并发数
spring.rabbitmq.listener.simple.concurrency=5
# 自动签收 NONE AUTO manual
spring.rabbitmq.listener.simple.acknowledge-model=AUTO
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.idle-event-interval=10000
spring.rabbitmq.listener.simple.retry.enabled=true
# 高峰期海量数据压过来,则进行限流,表示最大100条
#spring.rabbitmq.listener.simple.prefetch=100
# 不建议事务,性能太差了
#spring.rabbitmq.listener.simple.transaction-size=1
5. 保证100%的消息可靠性投递方案落地实现
蓝色框为生产者
红色框为消费者
为保证100%生产者消息投递成功,当Sender在收到需要发送的消息时
- 先存储到业务数据库Biz DB;
- 存储到消息数据库 MSG DB,修改状态为0;
- 发送消息到Broker
- 等待Broker返回确认收到的状态
- 更新MSG DB中消息状态为1;
- 对于网络闪断/超时等长时间未收到状态返回,则更新状态为失败-1;多次时可更新状态为2进行人工确认等。
- 定时任务从MSG DB查询对失败的消息-1进行重新发送;
这种情况只能保证100%发送到队列,但可能会重复推送,需要消费者业务端自动去确认收到重复消息的处理。
3.2 pg 预先把消息存进message记录表
记录消息体,重试次数,投递状态,下一次投递时间或者超时时间,下次重试投递时间;
3.3 定时任务补救重发
3.4 RabbitMQOrderSender 发送消息确认
3.5
参考
- http://www.rabbitmq.com
- https://www.imooc.com/video/17851
- https://github.com/niezhiliang/springboot-rabbitmq
以上是关于RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)的主要内容,如果未能解决你的问题,请参考以下文章
转 RabbitMQ 入门教程(PHP版) 使用rabbitmq-delayed-message-exchange插件实现延迟功能