基于RocketMQ的推送猜想

Posted 无趣的码农

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于RocketMQ的推送猜想相关的知识,希望对你有一定的参考价值。

       RocketMQ是基于Topic的分布消息中间件,一个Topic对应多个ConsumerQueue,消费者消费指定的ConsumerQueue队列,并且消息是通过Pull方式拉取。为每个用户定义一个Topic这种方式不现实,即使为每个用户定义一个queueId也是不切实际的做法。ConsumerQueue本来是处理分布式及负载均衡,但我们可以换种思路,一个用户关联一个queueId,一个queueId可以被多个用户关联。手机客户端不直接做为消费者,而是添加一个中间代理做为消费者,由某种路由机制来控制具有相同queueId的手机客户端连接相同的消费者代理,这样消费者就只需拉取少数的队列消息而不用拉取所有的队列。

       一般情况下服务器集群是不太可能直接暴露在外网的,所以增加一个代理层,这个代理解决推送协议、保持长连接以及拉取/推送消息。代理可以使用Netty来接收手机客户端的长连接,现在基于Netty实现的TCP长连接C10K、C100K已不是什么大问题,C1M都有很多,只要机器配置跟的上,Linux放开相关限制资源即可。推送协议可以使用开放的MQTT,有很多相关资料,并且也有很多参考实现。

       单独为推送功能定义一个Topic(比如就叫PUSH_TOPIC),根据用户量定义若干数量的ConsumerQueue(不够可以扩张),Queue可以分布在多台机器上组成一个集群。在用户注册的时候根据某种路由算法给用户分配一个queueId,并记录下该queue最大的消息位置。往后所有发往该用户的消息都写入这个队列中。推送连接建立的时候,客户端先根据queueId+offset拉取属于该用户或无过滤条件的消息列表,消费完了之后,就保持正常长连接,实时的推送消息。对单个用户推送的消息要记录下用户id,对所有用户或标记的用户推送的消息要记录下过滤的字段,对于所有用户推送的消息则向所有queue推送一条消息。因为代理层拉取消息的时候是不区分用户的,pull到一批消息后再根据用户id或标记等过滤条件找到对应的Socket,把消息返回客户端。对于客户端不在线的情况,RocketMQ不做特殊处理,由客户端建立连接的时候以搜索的方式拉取消息。

       对于厌倦了CRUD的,造这么个轮子也是有点意思,这种方式就是把RocketMQ做一个分布式的消息存储来使用,不光可以应用到推送,也可以应用到简单的客服聊天上。

以上是关于基于RocketMQ的推送猜想的主要内容,如果未能解决你的问题,请参考以下文章

rocketmq消息重复推送的问题

RocketMQ系列4-长轮询模式实现推送消息

27 发送消息零丢失方案:RocketMQ事务消息的实现流程分析

Spring Cloud(11)——基于RocketMQ的Stream实现

RocketMQ的消费模式

基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台