RocketMQ 源码阅读 ---- Tag 过滤

Posted wenniuwuren

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 源码阅读 ---- Tag 过滤相关的知识,希望对你有一定的参考价值。

零、简介

RocketMQ 消息过滤分成 TAG过滤和 SQL Filter 过滤,SQL Filter是在服务端处理,会影响 MQ 的性能一般不建议使用,语法比较灵活,实现方式也相对复杂一些。Tags 过滤实现比较简单,在客户端实现。这样就有一个问题,如果某个 TOPIC 消息非常多,主要消费这个 TOPIC 应用A要是 500 台,另一个只想消费部分 Tag 消息的应用B只有 2 台,500台能承接的海量消息会也会到只有2台机器的应用B,因为机器少,可能应用B就有消息堆积的报警。 (tips:一个 24C-125G内存-60G 机器部署MQ服务端大概能支撑5W左右的 TPS)

一、源码解析

DefaultMQPushConsumerImpl 类为消息消费类

public void pullMessage(final PullRequest pullRequest) 
        .......

            if (this.isPause()) 

                    final SubscriptionData subscriptionData = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
                    if (null == subscriptionData) 
                        this.executePullRequestLater(pullRequest, 3000L);
                        this.log.warn("find the consumer's subscription failed, ", pullRequest);
                     else 
                        final long beginTimestamp = System.currentTimeMillis();
                        PullCallback pullCallback = new PullCallback() 
                            public void onSuccess(PullResult pullResult) 
                                if (pullResult != null) 
                                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
                                    switch(pullResult.getPullStatus()) 
                                    case FOUND:
                                        

      ......                                      

processPullResult 就是拉取处理消息的真正位置

public PullResult processPullResult(MessageQueue mq, PullResult pullResult, SubscriptionData subscriptionData) 
        PullResultExt pullResultExt = (PullResultExt)pullResult;
        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        if (PullStatus.FOUND == pullResult.getPullStatus()) 
            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            List<MessageExt> msgList = MessageDecoder.decodesBatch(byteBuffer, this.mQClientFactory.getClientConfig().isDecodeReadBody(), this.mQClientFactory.getClientConfig().isDecodeDecompressBody(), true);
            List<MessageExt> msgListFilterAgain = msgList;
            Iterator i$;
            MessageExt msg;
            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) 
                msgListFilterAgain = new ArrayList(msgList.size());
                i$ = msgList.iterator();

                while(i$.hasNext()) 
                    msg = (MessageExt)i$.next();
                    if (msg.getTags() != null && subscriptionData.getTagsSet().contains(msg.getTags()))  // tags 过滤
                        ((List)msgListFilterAgain).add(msg);
                    
                
            

            if (this.hasHook()) 
                FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(this.unitMode);
                filterMessageContext.setMsgList((List)msgListFilterAgain);
                this.executeHook(filterMessageContext);
            

            i$ = ((List)msgListFilterAgain).iterator();

            while(i$.hasNext()) 
                msg = (MessageExt)i$.next();
                MessageAccessor.putProperty(msg, "MIN_OFFSET", Long.toString(pullResult.getMinOffset()));
                MessageAccessor.putProperty(msg, "MAX_OFFSET", Long.toString(pullResult.getMaxOffset()));
            

            pullResultExt.setMsgFoundList((List)msgListFilterAgain);
        

        pullResultExt.setMessageBinary((byte[])null);
        return pullResult;
    

可以看到上述代码,是判断 MetaQ 消息体中是否 contains Tags,如果有就放进结果集。

 

 

以上是关于RocketMQ 源码阅读 ---- Tag 过滤的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 消息过滤

RocketMQ 消息过滤

RocketMQ 消息过滤

从源码告诉你,RocketMQ的tag有什么坑。

使用RocketMQ的小细节(上)

rocketmq的消息过滤