RocketMQ同一个项目中,同一个topic,可以通过不同的tag来订阅消息吗?

Posted 756623607-zhang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ同一个项目中,同一个topic,可以通过不同的tag来订阅消息吗?相关的知识,希望对你有一定的参考价值。

一、问题答案

  是不可以的

而且后注册的会替换前注册的,MqConsumer2会替换MqConsumer,并且只结束tag-2的消息

技术图片

/**
 * @date 2019/05/28
 */
@Component
@Slf4j
public class MqConsumer implements MessageConsumer 


    @Override
    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED)
    public void onMessage(String msg) 
        log.info("接收到的库存MQ消息:", msg);
        log.info("接收到的库存MQ消息:", msg);
        log.info("接收到的库存MQ消息:", msg);
    

    @Override
    public String getTopic() 
        return "topic-1";
    

    @Override
    public String getTag() 
        return "tag-1";
    
@Component
@Slf4j
public class MqConsumer2 implements MessageConsumer 


    @Override
    @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED)
    public void onMessage(String msg) 
        log.info("接收到的库存MQ消息:", msg);
        log.info("接收到的库存MQ消息:", msg);
        log.info("接收到的库存MQ消息:", msg);
    

    @Override
    public String getTopic() 
        return "topic-1";
    

    @Override
    public String getTag() 
        return "tag-2";
    

 

二、为什么呢?

我们从源码的角度来分析下

1.订阅消息的方法 public void subscribe(String topic, String subExpression, MessageListener listener) ,其中subExpression即为tag

package com.aliyun.openservices.ons.api.impl.rocketmq;
....
@Generated("ons-client")
public class ConsumerImpl extends ONSConsumerAbstract implements Consumer 
    private final ConcurrentHashMap<String, MessageListener> subscribeTable = new ConcurrentHashMap<String, MessageListener>();

    public ConsumerImpl(final Properties properties) 
        super(properties);
        boolean postSubscriptionWhenPull = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false"));
        this.defaultMQPushConsumer.setPostSubscriptionWhenPull(postSubscriptionWhenPull);

        String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));
    

    @Override
    public void start() 
        this.defaultMQPushConsumer.registerMessageListener(new MessageListenerImpl());
        super.start();
    


    @Override
    public void subscribe(String topic, String subExpression, MessageListener listener) 
        if (null == topic) 
            throw new ONSClientException("topic is null");
        

        if (null == listener) 
            throw new ONSClientException("listener is null");
        
        this.subscribeTable.put(topic, listener);
        super.subscribe(topic, subExpression);
    

.....

 

从上面的类中我们可以从this.subscribeTable.put(topic, listener);看到subscribeTable这样的一个Map,该Map与tag无关

2.我们再看super.subscribe(topic, subExpression)方法,属于ONSConsumerAbstract类中

protected void subscribe(String topic, String subExpression) 
        try 
            this.defaultMQPushConsumer.subscribe(topic, subExpression);
         catch (MQClientException e) 
            throw new ONSClientException("defaultMQPushConsumer subscribe exception", e);
        
    

DefaultMQPushConsumer中:

@Override
    public void subscribe(String topic, String subExpression) throws MQClientException 
        this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
    

DefaultMQPushConsumerImpl中:

public void subscribe(String topic, String subExpression) throws MQClientException 
        try 
      //此处用来构建订阅数据,并且指定了tag SubscriptionData subscriptionData
= FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression);
      //此处将topic和该topic的订阅数据存放到subscriptionInner这个Map中
   // protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); catch (Exception e) throw new MQClientException("subscription exception", e);

 

三、总结

从上面简单的源码可以看到,有用到两个Map,

subscribeTable 和 subscriptionInner ,并且Map的key都为topic。所以我们可以笃定,RocketMQ在同一个项目中,只支持注册一个topic消费者,那么也就只能指定一个tag

 

以上是关于RocketMQ同一个项目中,同一个topic,可以通过不同的tag来订阅消息吗?的主要内容,如果未能解决你的问题,请参考以下文章

rocketmq特性(features)

rocketmq批量消息投递

RocketMQ(二)——基本概念

rocketMq-Topic创建过程

RocketMQ原理解析-NameServer

RocketMQ Topic/Group/Tags介绍