RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)

Posted 程序媛一枚~

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)相关的知识,希望对你有一定的参考价值。

RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)

1. RabbitMQ简介及AMQP协议

  • 开源的消息代理和队列服务器。基于AMQP(Advanced Message Queuing Protocol 高级消息队列协议)。
  • 底层基于Erlang语言编写;
  • 开源,性能优秀,稳定性保障;
  • 与SpringAMQP完美的整合,API丰富
  • 集群模式丰富,表达式配置,HA模式(负载均衡),镜像队列模型
  • 保证数据不丢失的前提做到高可靠性、可用性

Publisher
Server
Virtual Host
Exchange
Queue
Consumer

2. RabbitMQ安装及使用

启动 
rabbitmq-server start &
rabbitmq 默认端口号5672,查看是否启动
lsof -i:5672
安装插件rabbitmq-management后才能登录
查看插件列表
rabbitmq-plugins list
启动插件
rabbitmq-plugins enable rabbitmq-management
rabbit-management管控台的端口号15672
浏览器访问:http://localhost:15672/

3. RabbitMQ核心概念

Producer
Consumer
Exchange

AMQP核心概念

  • Server:又称Broker,接受客户端的了解,实现AMQP实体服务;
  • Connection:连接,应用程序与Broker的网络连接;类似pg的jdbcTemplate
  • Channel:网络信道,几乎所有的操作都在Channel中进行。Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务。类似pg的Session。
  • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级,延迟等高等特性;Body则就是消息体内容。
  • Virtual host:虚拟地址,用于进行逻辑分离,最上层的消息路由。 一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。每一个Virtual Host是独立的单元,用于隔离不同的项目应用;
  • Exchange:交换机,接受生产者的消息,根据路由键转发消息到绑定的队列;
  • Binding:Exchange和Queue之间的虚拟连接,bingding中可以包含routing key;
  • Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息;
  • Queue:也成为Message Queue,消息队列,保存消息并将它们转发给消费者;

4. 与SpringBoot2.x整合-急速入门

1. 引入相关pom
2. 配置application.yml

3. 对象Message 实现接口 implements Serilizable
4. Producer,RabbitTemplate自动装配@Autowired
消费者自动监听 @RabbitHandler @RabbitListener
5. 生产者发消息:converAndSend(exchange, routeKey, MessageObject, CorrelationData); 交换机,路由key,消息体内容,消息唯一id

消费者监听消息:
@RabbitListener指定bindings,queue,exchange,routingKey. 这个注解强大的地方是当交换机,队列,绑定关系,路由键不存在时自动创建;

	@RabbitListener(bindings= @QueueBinding(
	value=@Queue(value="order-queue",durable="true"),
	exchange= @Exchange(name="order-exchange",durable="true",type="topic",
	key="order.*")
	)
	@RabbitHandler
	public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) #**消息体,headers,通道**
		// 消费者操作
		// ...
		Long deliveryTag = (Long)headers.get(AmqpHdears.DELIVEY_TAG);
		// 当配置为手动签收时,必须要手动ACK,否则管理控制台即使消费了还是显示队列有数据。
		channel.basicAck(deliveryTag,false); // 手动确认提交
	

可以在管理界面:http://xx.xx.xx.xx:15672/ 创建交换机,队列,交换机和队列进行绑定,或者删除;

路由key RoutingKey: order.*, order.#匹配差别;
order.#: order.111 order.111.abcd 只能匹配前者;
order.#: order.111 order.111.abcd 都能匹配;

<!--rabbitmq依赖-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# springboot整合rabbitmq基本配置
spring.rabbitmq.addresses=xx.xx.xx.xx:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 保证100%投递模式
# 发送消息后等待消息确认
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

# springboot整合rabbitmq消费端配置
# 并发数
spring.rabbitmq.listener.simple.concurrency=5
# 自动签收 NONE AUTO manual
spring.rabbitmq.listener.simple.acknowledge-model=AUTO
spring.rabbitmq.listener.simple.max-concurrency=15
spring.rabbitmq.listener.simple.idle-event-interval=10000
spring.rabbitmq.listener.simple.retry.enabled=true
# 高峰期海量数据压过来,则进行限流,表示最大100条
#spring.rabbitmq.listener.simple.prefetch=100
# 不建议事务,性能太差了
#spring.rabbitmq.listener.simple.transaction-size=1

5. 保证100%的消息可靠性投递方案落地实现

蓝色框为生产者
红色框为消费者
为保证100%生产者消息投递成功,当Sender在收到需要发送的消息时

  1. 先存储到业务数据库Biz DB;
  2. 存储到消息数据库 MSG DB,修改状态为0;
  3. 发送消息到Broker
  4. 等待Broker返回确认收到的状态
  5. 更新MSG DB中消息状态为1;
  6. 对于网络闪断/超时等长时间未收到状态返回,则更新状态为失败-1;多次时可更新状态为2进行人工确认等。
  7. 定时任务从MSG DB查询对失败的消息-1进行重新发送;

这种情况只能保证100%发送到队列,但可能会重复推送,需要消费者业务端自动去确认收到重复消息的处理。

3.2 pg 预先把消息存进message记录表

记录消息体,重试次数,投递状态,下一次投递时间或者超时时间,下次重试投递时间;

3.3 定时任务补救重发

3.4 RabbitMQOrderSender 发送消息确认


3.5

参考

以上是关于RabbitMQ入门教程(安装,管理插件,Publisher/Consumer/交换机/路由/队列/绑定关系,及如何保证100%投递等)的主要内容,如果未能解决你的问题,请参考以下文章

消息队列:RabbitMQ安装和快速入门

安装Erlang和RabbitMQ详细教程

启用 RabbitMQ 管理插件失败

ubuntu16.04 安装rabbitmq

转 RabbitMQ 入门教程(PHP版) 使用rabbitmq-delayed-message-exchange插件实现延迟功能

CentOS7下安装RabbitMQ,并使用Spring Boot实现一个简单的延迟队列(小白教程,附源码)