# 技术栈知识点巩固——消息队列

Posted 爱码代码的喵

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了# 技术栈知识点巩固——消息队列相关的知识,希望对你有一定的参考价值。

技术栈知识点巩固——消息队列

MQ 的优点

  • 异步处理:相比于传统的串行、并行方式,提高了系统吞吐量
  • 应用解耦:系统间通过消息通信,不用关心其它系统的处理
  • 流量削锋:可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。
  • 日志处理:解决大量日志传输。

MQ 的缺点

  • 系统可用性降低
  • 系统复杂度提高:加入了消息队列,要多考虑很多方面的问题
  • 一致性问题:复杂度提升

RabbitMq工作模式

  • 简单模式(Simple):消息队列中的消息被消费后,消息就拿不到了。

  • 工作模式(Work):多个消费者消费同一个队列中的消息,队列采用轮询的方式将消息平均发送给消费者。谁先拿到谁先消费。

  • 发布订阅模式(Publish/Subscirbe):每个消费者监听自己的队列,生产者将消息由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。

  • 路由模式(Routing):每个消费者监听自己的队列,并且设置routingkey生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列;

  • topic 主题模式(路由模式的一种),消息产生者产生消息,把消息交给交换机,交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费


RabbitMq有可能发生重复消费,如何避免,做到幂等

  • 每条消息有唯一的编号,消费完之后存到消费日志表中,下次拿到消息后,先去日志表中查找一下是否有这条记录,如果有那么就不消费了,直接丢弃或者进行更新操作。

集群消费和广播消费

集群消费

  • 当使用集群消费模式时,消息队列RocketMQ认为任意一条消息只需要被集群内的任意一个消费者处理即可。使用与消费之集群部署,每条消息只处理一次。

特点

  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。

广播消费

  • 当使用广播消费模式时,消息队列RocketMQ会将每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

特点

  • 不支持顺序消息。
  • 不支持重置消费位点。
  • 不支持线下联调分组消息。
  • 每条消息都需要被相同订阅逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。
  • 广播模式下,消息队列保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 广播模式下服务端不维护消费进度,所以消息队列控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

消息队列有哪些使用场景

  • 异步处理:异步发送消息、发送短信
  • 应用解耦:基于线程的异步处理,能确保用户体验,但是极端情况下可能会出现异常,影响系统的稳定性,而同步调用很多时候无法保证理想的性能,那么我们就可以用MQ来进行处理。
  • 日志处理
  • 流量削锋:面对大量的请求,可以使用消息队列进行流量削锋

消息中间件如何解决消息丢失问题

生产端开启事务机制

  • RabbitMQ 提供的事务功能,生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit

  • 事务机制会对吞吐量有一定的影响,开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。

  • 事务机制和 cnofirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的

消费端消息丢失

  • RabbitMQ 提供的 ack 机制
  • 通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

Rabbit消息持久化

  • 开启Rabbitmq消息持久化
  • 消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
  • 将队列设置为持久化
  • 发送消息的时候将消息deliveryMode 设置为 2

Rabbitmq 有几种广播类型

  • direct(默认模式):发送方把消息发送给订阅方,如果有多个订阅者,默认采取轮询的方式进行消息发送
dequeOneConsumeOne received message: hello world
dequeOneConsumeTwo received message: hello world
dequeOneConsumeOne received message: hello world
dequeOneConsumeTwo received message: hello world
dequeOneConsumeOne received message: hello world
dequeOneConsumeTwo received message: hello world
  • headers:与direct类似,但是性能差,基本不用
  • fanout:分发模式,将消息发送给所有订阅者
 : dequeOne consume one received message: 我是交换机发出的消息
dequeTwo consume two message: (Body:'我是交换机发出的消息' MessageProperties [headers=, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ExchangeOne, receivedRoutingKey=, deliveryTag=10, consumerTag=amq.ctag-H6qjuEJTupM-ihNzwi-dnA, consumerQueue=TestDequeueTwo])
  • topic:匹配订阅模式,使用正则匹配到消息队列,能匹配到的都能接收到

RabbitMQ 有哪些重要组件

  • ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。

  • Channel(信道):消息推送使用的通道。

  • Exchange(交换器):用于接受、分配消息。

  • Queue(队列):用于存储生产者的消息。

  • RoutingKey(路由键):用于把生成者的数据分配到交换器上。

  • BindingKey(绑定键):用于把交换器的消息绑定到队列上。

  • Virtual host:用于消息隔离(类似Redis 16db这种概念),最上层的消息路由,一个包含若干ExchangeQueue,同一个里面Exchange

Mq 消息确认机制

生产者确认机制

  • 生产者使用的机制,用来确认消息是否被成功消费。

消费者接受确认 ConfirmCallback

  • yml
rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 发布消息成功到交换器后会触发回调方法
    publisher-confirm-type: correlated
    # 消息成功确认
    publisher-confirms: true
    # 指定消息确认模式为手动确认
    template:
      mandatory: true # 手动签收机制
    # 消息失败确认
    publisher-returns: true
  • publisher-confirm-typeNONE 值是禁用发布确认模式,是默认值,CORRELATED 值是发布消息成功到交换器后会触发回调方法。SIMPLE值经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirmswaitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;
  • RabbitTemplate只允许设置一个callback方法,你可以将RabbitTemplatebean设为单例然后设置回调,在controller、service都设置为 protype
// 设置 bena 为多例
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  • 注册 noSingleRabbitTemplate bean
/**
     * 注册多例 bean
     *
     * @param connectionFactory connectionFactory
     * @return RabbitTemplate
     */
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate noSingleRabbitTemplate(ConnectionFactory connectionFactory) 
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMandatory(true);
    return template;

  • test.java
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Service
public class RabbitMqServiceImpl implements RabbitMqService 

    @Resource(name = "noSingleRabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    @Override
    public String testOne() 
        RabbitMqProduceConfirm confirm = new RabbitMqProduceConfirm();
        RabbitMqProduceReturnBack callback = new RabbitMqProduceReturnBack();
        rabbitTemplate.convertAndSend(RabbitMqConst.DEQUE_ONE, "hello world");
        // 生产者拿到没有到达消费者的消息
        rabbitTemplate.setReturnsCallback(callback);
        // 生产者得到消费者拿到消息的确认
        rabbitTemplate.setConfirmCallback(confirm);
        return null;
    
    
  • RabbitMqProduceConfirm.java
public class RabbitMqProduceConfirm implements RabbitTemplate.ConfirmCallback 

    private static final Logger logger = LoggerFactory.getLogger(RabbitMqProduceConfirm.class);

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) 
        if (ack) 
            logger.info("ACK : ", JSON.toJSONString(correlationData));
         else 
            logger.info("ACK error");
        
    

将不可达目的地消息返回给生产者 ReturnsCallback

  • yml
# 消息失败确认
publisher-returns: true
  • RabbitMqProduceReturnBack.java
public class RabbitMqProduceReturnBack implements RabbitTemplate.ReturnsCallback

    private static final Logger logger = LoggerFactory.getLogger(RabbitMqProduceReturnBack.class);

    @Override
    public void returnedMessage(ReturnedMessage returned) 
        logger.info("Confirmed RabbitMQ");
    

消费者确认

自动确认

  • RabbitMQ消费者默认为自动确认,不会管消费者是否成功消费/处理了消息

根据情况确认

  • 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
  • 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)
  • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
  • 其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置

手动确认

  • 消费者收到消息后,手动对消息进行处理,完成消费
  • Basic.Ack :用于确认当前消息
  • Basic.Nack :用于否定当前消息
  • Basic.Reject :用于拒绝当前消息

RabbitMQ broker

  • 如果要使用RabbitMQ,必须先要安装一个RabbitMQ服务。这个服务就是Broker,中文叫做代理,因为MQ服务器帮我们对消息做了存储和转发。一般情况下为了保证服务的高可用,需要多个Broker
  • 客户端和服务端之间消息传递中介

RabbitMQ 如何确保消息接收方消费了消息

  • 发送方进行Confirm确认和消息回退设置
  • 消费方进行消息接收确认

RabbitMQ 消息基于什么传输

  • RabbitMQ是基于信道Channel的方式来传输数据,排除了使用TCP链接来进行数据的传输,因为TCP链接创建和销毁对于系统性能的开销比较大,且并发能力受系统资源的限制,这样很容易造成rabbitMQ的性能瓶颈。
  • 消费者链接RabbitMQ其实就是一个TCP链接,一旦链接创建成功之后,就会基于链接创建Channel,每个线程把持一个Channel,Channel复用TCP链接,减少了系统创建和销毁链接的消耗,提高了性能

以上是关于# 技术栈知识点巩固——消息队列的主要内容,如果未能解决你的问题,请参考以下文章

消息队列高手课——开篇词 | 优秀的程序员,你的技术栈中不能只有“增删改查”

消息队列高手课——开篇词 | 优秀的程序员,你的技术栈中不能只有“增删改查”

全栈系统化的学习路线,一篇文章帮你解答

顺序栈

顺序栈

队列