消息中间件PMQ揭秘 - 消息消费

Posted 拍码场

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息中间件PMQ揭秘 - 消息消费相关的知识,希望对你有一定的参考价值。

在PMQ系列文章的前面几篇幅中已经对其“ ”和“ ”两部分进行了详细的阐述,本文主要从PMQ客户端的消费过程进行详细的阐述。

消息消费介绍

目前市面上消息消费有二种模式,一种是消费时动态创建消费者组和topic的订阅关系,这种方式好处是代码非常的灵活和自由,可以随意订阅topic。另一种是先在管理后台创建好相关的消费者组,topic以及它们之间的订阅关系,当消费方启动消费的时候,首先会与服务端进行元数据进行比对,如果消费者组名称不一致会启动报错,我们称这种消息消费方式称为元数据前置。这样做的好处是可以将这些元数据信息关联到人并进行权限控制。目前我们的PMQ系统使用的是第二种方式,通过我们实践发现这种方式对后续的消息治理提供了非常多的便利,比如当某个队列发生消息堆积的时候很容易发送通知给相关负责人。正由于元数据前置,我们可以在运行过程中动态修改元数据配置信息,比如修改线程数,拉取条数等消息的高级治理能力。


介绍完上面这些,我们来看下在PMQ中是如何消费消息的。在PMQ中,有二种方式,一种是基于xml配置式,另一种是基于代码订阅式。下面来简单介绍这二种方式。 以下消息消费配置示例如下,testClusterSub 表示一个消费者组,testTopic 表示一个topic,com.ppdai.infrastructure.demo.TestClusterSub 表示topic对应的消息处理类。
<messageQueue> <consumer groupName="testClusterSub" > <topics>        <topic name="testTopic" receiverType="com.ppdai.infrastructure.demo.TestClusterSub" /> </consumer></messageQueue>
消息处理类 com.ppdai.infrastructure.demo.TestClusterSub 代码如下:
public class TestClusterSub implements ISubscriber { @Override public List<Long> onMessageReceived(List<MessageDto> messages) { //do somthing return null;   }}
在PMQ中消费消息,我们只需要关注消息消费的订阅关系和消息处理类。在消息处理类中,用户只需要实现一个消费类接口即可,在实现消费处理方法中,用户编写消息处理业务逻辑,并返回消息处理的结果(主要分消费成功或失败,消费失败会返回具体失败的消息ID)。消息消费完成后,偏移量提交和失败重试等所有的事情,全部有消息系统框架层处理,用户不需要进行额外的配置与处理。通过这种方式,大大降低消息系统用户使用的复杂度。

还有一种消费方式是代码订阅方式,代码示例如下所示:
ConsumerGroupVo consumerGroup = new ConsumerGroupVo("testClusterSub");ConsumerGroupTopicVo topicVo = new ConsumerGroupTopicVo();topicVo.setName("testTopic");topicVo.setSubscriber(new ISubscriber() { @Override public List<Long> onMessageReceived(List<MessageDto> messages) {        //do somthing return 失败的id列表;    }});consumerGroup.addTopic(topicVo);MqClient.registerConsumerGroup(consumerGroup);

代码订阅的用处是,在一些场景下,我们需要根据配置动态地订阅Topic。这时代码订阅就提供了很多的灵活性。实际上在PMQ内部最终都是代码订阅,在基于xml订阅消费的情况下,项目启动后,客户端会将xml代码翻译成了代码订阅的方式。


在PMQ中,对于SpringBoot类型的项目,PMQ内置了消息消费的初始化操作。在应用启动时,客户端根据配置信息,自动进行了初始化等操作,方便用户最简单地使用消息中间件。 介绍完了上面的消息消费过程后,下面简单介绍一下PMQ消息消费端的架构设计。

架构设计

客户端消费核心架构图如下:


下面来详细介绍一下消息处理的详细过程:


  1. 当客户端启动完成后,重平衡监听线程会通过长链接的方式向broker查询当前实例的队列订阅情况。此过程即是重平衡的过程,在上一章节有介绍,不在此细述。

  2. 当重平衡完成后,当前消费实例,最终可能会被分配到一些队列(如上图的queue),客户端会根据这些队列信息进行消费。

  3. queue的处理是PMQ消息系统客户端消费的关键。下面来详细介绍单个queue的处理过程,多个queue跟单个queue处理过程一致。

  4. 当重平衡完成后,broker会将分配的队列的元数据信息返回给客户端,比如queue的id,queue偏移量等信息。客户端会为每个queue开启一个线程,此线程会根据这些元数据信息定时向broker拉取消息。注意是每个queue都有对应的独立拉取线程。

  5. 当线程拉取到消息后,会将拉取的消息缓存到当前queue对应的缓冲队列中。如果此时缓冲队列满了,则暂停拉取直到缓冲队列不再满载为止。缓存完毕后,会开启新的一轮拉取,如果出现拉取的消息为空,则拉取线程会sleep 50ms时间,再开启新的拉取。如果再次拉取还是没有消息,则加大等待拉取时间。直到拉取等待最大值。一旦拉取到新的消息,则重新开始新的循环。

  6. 重平衡完成后,在每个队列中,也会启动一个消费调度线程,它会定时循环获取缓冲队列的消息,然后根据缓冲消息数量和批量消费的条数,计算出执行线程的个数。在PMQ中,批量消费条数和线程数可以通过后台控制页面进行动态调整,实时生效。

  7. 计算出待执行的消费条数后,会启动相对应的线程进行消费。注意此线程是一个线程池。消费线程根据批量消费条数从缓冲队列中获取待消费的消息,然后调用消费者实例的本地方法。

  8. 当消息消费完成后,会更新内存中当前队列的偏移量。偏移量提交线程,会定时提交相关的偏移量。

  9. 如果出现消费失败,会将此消费失败的消息,发送到对应的失败topic中,然后进行重新消费。失败topic的消费逻辑与正常topic的消费逻辑一致。


在消息消费的过程中,有两个很重要的地方,一个是消费模式,另一个是偏移量提交,下面来分别介绍一下。

1. 消息消费模式
目前市面上主流的消息系统有2种消息消费模式,一种是推模式,一种是拉模式。在推模式下,消息是由broker服务端将消息推送到消费者实例中的,这种模式有些很大的缺点,第一当消息出现堆积,会产生严重的性能问题;第二在推模式下,服务端与客户端之间变得有状态(比如服务端需要记住实例需要推送给哪些客户端),不利于服务端水平扩展。第二种是拉模式。在拉模式下,消息是由客户端定时去向服务端拉取消息,然后触发本地消费。这种模式的缺点是由于采用定时拉取,时效性会差点,好处是服务端会变得无状态,有利于水平扩展,同时支持海量消息堆积。在PMQ中,客户端主要采取的是拉模式,对于一些消息量很少,但是对消息消费速度比较敏感的,采取的是推拉结合的模式。

2. 偏移量提交
由于在PMQ中,消息消费默认是采用多线程进行消费,所以偏移量的提交是一个很复杂的问题。试想一下,在单个队列里面会出现多个线程消费,当个某个线程中消息id比较大的消息消费完了,如果此时提交此大id的偏移量,就可能会出现消息id小的消息还没有消费完的情况。如果此时出现系统宕机,这个时候就会产生部分消息丢失的情况。为了保证消息不丢失,必须要最小化偏移量提交的方式,这样子可以保证偏移量之前的消息都被客户端消费到。那在PMQ中是如何做到最小化提交呢?一种方式是当调度线程启动多线程消费时,会等待多线程全部执行完毕后,再提交偏移量,这样可以可以保证偏移量之前的消息都被消费到,不管成功还是失败。但是这种方式会有一个问题,就是如果出现部分线程消费很慢,部分线程消费很快,如果这时采用等待全部消费完毕再提交,然后再进行下一次的消费的方式,会导致线程池资源浪费。所以在PMQ中,在上面这种方式的基础上引入了线程批次的概念。

线程批次的意思是,当线程调度器每次检查线程池中有空闲的线程和缓冲队列里面有数据就会产生新的批次消费线程组,并将此线程批次保存到一个列表中。每个批次线程组都有一个自增编号,当批次线程组中的线程都执行完成时,就表示当前批次线程组执行完毕,并记录好当前批次线程组的偏移量。然后再在批次线程组列表中,检查是否有连续执行完毕的批次线程组,直到遇到未执行完毕的批次线程组为止。找到此批次线程组后,就可以提交此队列的偏移量。这种方式的好处是,不用等待线程池所有的线程都执行完毕。只要线程池中还有空闲的资源就可以进行新的消费,提高了消费速度。

消费组的配置解析

到目前为止,我们已经介绍完了客户端消费的原理。PMQ的用户文档列出了所有与消费者相关的配置说明。大部分参数都有合理的默认值,用户可以在后台管理页面中修改相关的配置,并且实时生效。接下来介绍一些重要的配置,它们与消费者的性能和可用性有很大关系。

1. 重试次数

消息消费失败时,消息消费最大重试次数。当正常消费失败的时候,消息会存储到失败topic中,另外一个线程会去消费失败Topic中的消息,此时会进行失败消息的重试,重新消费成功则pass,否则继续重试。


2. 告警阈值

当消息堆积达到该值时,会发送告警。


3. 消息Tag

消息的标识,设置tag之后,消费者组只能消费这个Topic下带有该Tag的消息。


4. 延迟时间(单位:秒)

以秒为单位,主要为了满足一些需要延迟去消费的场景。


5. 拉取等待时间单位:

客户端拉取消息的最大等待时间。以50ms递增,初始为0。


6. 线程数

单个队列的消费线程数,影响消费速度。


7. 批量拉取条数

是指每次去向broker拉取多少条消息到客户端,批量拉取条数最大为500。


8. 批量消费条数

消息单次消费的最大条数。批量拉取条数,批量消费条数,线程数量 存在以下关系,批量拉取条数>=批量消费条数*线程数量。

代理消费

在公司中,经常会遇到多语言使用消息系统的情况,在实际使用中,我们会对sdk客户端会做一些功能增强。大多数时候我们只会以某个语言的客户端作为主sdk版本,比如在信也科技公司内部,主语言是java,消息维护的人也主要是java背景,这个时候就会出现一个情况,当java版本的sdk有新功能的时候,其他语言的sdk就会面临功能同步的问题。如果我们为每种语言都开发对应的sdk,会面临工作量很大的情况。所以迫切需要一种轻量级的,能支持多语言的通用的消息消费方法。


基于以上痛点,我们开发了一种通用的基于代理消息的多语言消息消费方法。架构设计图如下:



原理说明如下:


  1. 正常的java客户端消息消费过程是java应用向broker拉取消息,然后触发本地的消费方法。
  2. 代理消息是通过部署一个proxy 集群来代理消费消息。
  3. 当代理集群消费到消息时,将消息通过http转发到python或者php语言的http接口,这样其他语言就只需要提供一个简单的rest接口,就可以消费消息了。


通过这种方式,我们就可以只用维护一个版本的消息消费客户端,不必未每种语言编写单独的客户端,极大的减轻了多语言的开发与维护成本。至此我们通过proxy代理的方式就可以彻底解决多语言的问题。


目前PMQ也已经在github上开源(https://github.com/ppdaicorp/pmq),欢迎大家一起交流学习。

作者介绍

lorgine, 信也科技资深架构师

terry,科技高级工程师

以上是关于消息中间件PMQ揭秘 - 消息消费的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 揭秘与实战 消息队列篇 - RabbitMQ

Redis专题3:发布订阅模式事务Lua脚本揭秘

消息中间件之Kafka

干货消息中间件—RocketMQ消息消费

Kafka如何保证消息顺序消费

消息队列怎么避免重复消费