# 技术栈知识点巩固——消息队列
Posted 爱码代码的喵
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了# 技术栈知识点巩固——消息队列相关的知识,希望对你有一定的参考价值。
技术栈知识点巩固——消息队列
MQ 的优点
- 异步处理:相比于传统的串行、并行方式,提高了系统吞吐量
- 应用解耦:系统间通过消息通信,不用关心其它系统的处理
- 流量削锋:可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
- 日志处理:解决大量日志传输。
MQ 的缺点
- 系统可用性降低
- 系统复杂度提高:加入了消息队列,要多考虑很多方面的问题
- 一致性问题:复杂度提升
RabbitMq工作模式
-
简单模式(
Simple
):消息队列中的消息被消费后,消息就拿不到了。
-
工作模式(
Work
):多个消费者消费同一个队列中的消息,队列采用轮询的方式将消息平均发送给消费者。谁先拿到谁先消费。
-
发布订阅模式(
Publish/Subscirbe
):每个消费者监听自己的队列,生产者将消息由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
-
路由模式(
Routing
):每个消费者监听自己的队列,并且设置routingkey
生产者将消息发给交换机,由交换机根据routingkey
来转发消息到指定的队列;
-
topic
主题模式(路由模式的一种),消息产生者产生消息,把消息交给交换机,交换机根据key
的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
RabbitMq有可能发生重复消费,如何避免,做到幂等
- 每条消息有唯一的编号,消费完之后存到消费日志表中,下次拿到消息后,先去日志表中查找一下是否有这条记录,如果有那么就不消费了,直接丢弃或者进行更新操作。
集群消费和广播消费
集群消费
- 当使用集群消费模式时,消息队列
RocketMQ
认为任意一条消息只需要被集群内的任意一个消费者处理即可。使用与消费之集群部署,每条消息只处理一次。
特点
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。
广播消费
- 当使用广播消费模式时,消息队列
RocketMQ
会将每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。
特点
- 不支持顺序消息。
- 不支持重置消费位点。
- 不支持线下联调分组消息。
- 每条消息都需要被相同订阅逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
- 广播模式下,消息队列保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 广播模式下服务端不维护消费进度,所以消息队列控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
消息队列有哪些使用场景
- 异步处理:异步发送消息、发送短信
- 应用解耦:基于线程的异步处理,能确保用户体验,但是极端情况下可能会出现异常,影响系统的稳定性,而同步调用很多时候无法保证理想的性能,那么我们就可以用
MQ
来进行处理。 - 日志处理
- 流量削锋:面对大量的请求,可以使用消息队列进行流量削锋
消息中间件如何解决消息丢失问题
生产端开启事务机制
-
用
RabbitMQ
提供的事务功能,生产者发送数据之前开启RabbitMQ
事务channel.txSelect
,然后发送消息,如果消息没有成功被RabbitMQ
接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback
,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit
。 -
事务机制会对吞吐量有一定的影响,开启
confirm
模式,在生产者那里设置开启confirm
模式之后,每次写的消息都会分配一个唯一的id
,然后如果写入了RabbitMQ
中,RabbitMQ
会给你回传一个ack
消息,告诉你说这个消息ok
了。如果RabbitMQ
没能处理这个消息,会回调你的一个nack
接口,告诉你这个消息接收失败,你可以重试。 -
事务机制和
cnofirm
机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm
机制是异步的
消费端消息丢失
- 用
RabbitMQ
提供的ack
机制 - 通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里
ack
一把。这样的话,如果你还没处理完,不就没有ack
了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
Rabbit消息持久化
- 开启
Rabbitmq
消息持久化 - 消息写入之后会持久化到磁盘,哪怕是
RabbitMQ
自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ
还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。 - 将队列设置为持久化
- 发送消息的时候将消息
deliveryMode
设置为2
Rabbitmq 有几种广播类型
direct
(默认模式):发送方把消息发送给订阅方,如果有多个订阅者,默认采取轮询的方式进行消息发送
dequeOneConsumeOne received message: hello world
dequeOneConsumeTwo received message: hello world
dequeOneConsumeOne received message: hello world
dequeOneConsumeTwo received message: hello world
dequeOneConsumeOne received message: hello world
dequeOneConsumeTwo received message: hello world
headers
:与direct
类似,但是性能差,基本不用fanout
:分发模式,将消息发送给所有订阅者
: dequeOne consume one received message: 我是交换机发出的消息
dequeTwo consume two message: (Body:'我是交换机发出的消息' MessageProperties [headers=, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ExchangeOne, receivedRoutingKey=, deliveryTag=10, consumerTag=amq.ctag-H6qjuEJTupM-ihNzwi-dnA, consumerQueue=TestDequeueTwo])
topic
:匹配订阅模式,使用正则匹配到消息队列,能匹配到的都能接收到
RabbitMQ 有哪些重要组件
-
ConnectionFactory
(连接管理器):应用程序与Rabbit
之间建立连接的管理器,程序代码中使用。 -
Channel
(信道):消息推送使用的通道。 -
Exchange
(交换器):用于接受、分配消息。 -
Queue
(队列):用于存储生产者的消息。 -
RoutingKey
(路由键):用于把生成者的数据分配到交换器上。 -
BindingKey
(绑定键):用于把交换器的消息绑定到队列上。 -
Virtual host
:用于消息隔离(类似Redis
16
个db
这种概念),最上层的消息路由,一个包含若干Exchange
和Queue
,同一个里面Exchange
Mq 消息确认机制
生产者确认机制
- 生产者使用的机制,用来确认消息是否被成功消费。
消费者接受确认 ConfirmCallback
yml
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 发布消息成功到交换器后会触发回调方法
publisher-confirm-type: correlated
# 消息成功确认
publisher-confirms: true
# 指定消息确认模式为手动确认
template:
mandatory: true # 手动签收机制
# 消息失败确认
publisher-returns: true
publisher-confirm-type
:NONE
值是禁用发布确认模式,是默认值,CORRELATED
值是发布消息成功到交换器后会触发回调方法。SIMPLE
值经测试有两种效果,其一效果和CORRELATED
值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate
调用waitForConfirms
或waitForConfirmsOrDie
方法等待broker
节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie
方法如果返回 false 则会关闭channel
,则接下来无法发送消息到broker
;RabbitTemplate
只允许设置一个callback
方法,你可以将RabbitTemplate
的bean
设为单例然后设置回调,在controller、service
都设置为protype
// 设置 bena 为多例
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- 注册
noSingleRabbitTemplate bean
/**
* 注册多例 bean
*
* @param connectionFactory connectionFactory
* @return RabbitTemplate
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate noSingleRabbitTemplate(ConnectionFactory connectionFactory)
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
return template;
test.java
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Service
public class RabbitMqServiceImpl implements RabbitMqService
@Resource(name = "noSingleRabbitTemplate")
private RabbitTemplate rabbitTemplate;
@Override
public String testOne()
RabbitMqProduceConfirm confirm = new RabbitMqProduceConfirm();
RabbitMqProduceReturnBack callback = new RabbitMqProduceReturnBack();
rabbitTemplate.convertAndSend(RabbitMqConst.DEQUE_ONE, "hello world");
// 生产者拿到没有到达消费者的消息
rabbitTemplate.setReturnsCallback(callback);
// 生产者得到消费者拿到消息的确认
rabbitTemplate.setConfirmCallback(confirm);
return null;
RabbitMqProduceConfirm.java
public class RabbitMqProduceConfirm implements RabbitTemplate.ConfirmCallback
private static final Logger logger = LoggerFactory.getLogger(RabbitMqProduceConfirm.class);
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
if (ack)
logger.info("ACK : ", JSON.toJSONString(correlationData));
else
logger.info("ACK error");
将不可达目的地消息返回给生产者 ReturnsCallback
yml
# 消息失败确认
publisher-returns: true
RabbitMqProduceReturnBack.java
public class RabbitMqProduceReturnBack implements RabbitTemplate.ReturnsCallback
private static final Logger logger = LoggerFactory.getLogger(RabbitMqProduceReturnBack.class);
@Override
public void returnedMessage(ReturnedMessage returned)
logger.info("Confirmed RabbitMQ");
消费者确认
自动确认
RabbitMQ
消费者默认为自动确认,不会管消费者是否成功消费/处理了消息
根据情况确认
- 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
- 当抛出
AmqpRejectAndDontRequeueException
异常的时候,则消息会被拒绝,且requeue = false
(不重新入队列) - 当抛出
ImmediateAcknowledgeAmqpException
异常,则消费者会被确认 - 其他的异常,则消息会被拒绝,且
requeue = true
(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过setDefaultRequeueRejected
(默认是true
)去设置
手动确认
- 消费者收到消息后,手动对消息进行处理,完成消费
Basic.Ack
:用于确认当前消息Basic.Nack
:用于否定当前消息Basic.Reject
:用于拒绝当前消息
RabbitMQ broker
- 如果要使用
RabbitMQ
,必须先要安装一个RabbitMQ
服务。这个服务就是Broker,中文叫做代理,因为MQ
服务器帮我们对消息做了存储和转发。一般情况下为了保证服务的高可用,需要多个Broker
。 - 客户端和服务端之间消息传递中介
RabbitMQ 如何确保消息接收方消费了消息
- 发送方进行
Confirm
确认和消息回退设置 - 消费方进行消息接收确认
RabbitMQ 消息基于什么传输
RabbitMQ
是基于信道Channel
的方式来传输数据,排除了使用TCP
链接来进行数据的传输,因为TCP
链接创建和销毁对于系统性能的开销比较大,且并发能力受系统资源的限制,这样很容易造成rabbitMQ
的性能瓶颈。- 消费者链接
RabbitMQ
其实就是一个TCP
链接,一旦链接创建成功之后,就会基于链接创建Channel
,每个线程把持一个Channel
,Channel
复用TCP
链接,减少了系统创建和销毁链接的消耗,提高了性能
以上是关于# 技术栈知识点巩固——消息队列的主要内容,如果未能解决你的问题,请参考以下文章
消息队列高手课——开篇词 | 优秀的程序员,你的技术栈中不能只有“增删改查”