RocketMQ同一个项目中,同一个topic,可以通过不同的tag来订阅消息吗?
Posted 756623607-zhang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ同一个项目中,同一个topic,可以通过不同的tag来订阅消息吗?相关的知识,希望对你有一定的参考价值。
一、问题答案
是不可以的
而且后注册的会替换前注册的,MqConsumer2会替换MqConsumer,并且只结束tag-2的消息
/** * @date 2019/05/28 */ @Component @Slf4j public class MqConsumer implements MessageConsumer @Override @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED) public void onMessage(String msg) log.info("接收到的库存MQ消息:", msg); log.info("接收到的库存MQ消息:", msg); log.info("接收到的库存MQ消息:", msg); @Override public String getTopic() return "topic-1"; @Override public String getTag() return "tag-1";
@Component @Slf4j public class MqConsumer2 implements MessageConsumer @Override @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED) public void onMessage(String msg) log.info("接收到的库存MQ消息:", msg); log.info("接收到的库存MQ消息:", msg); log.info("接收到的库存MQ消息:", msg); @Override public String getTopic() return "topic-1"; @Override public String getTag() return "tag-2";
二、为什么呢?
我们从源码的角度来分析下
1.订阅消息的方法 public void subscribe(String topic, String subExpression, MessageListener listener) ,其中subExpression即为tag
package com.aliyun.openservices.ons.api.impl.rocketmq; .... @Generated("ons-client") public class ConsumerImpl extends ONSConsumerAbstract implements Consumer private final ConcurrentHashMap<String, MessageListener> subscribeTable = new ConcurrentHashMap<String, MessageListener>(); public ConsumerImpl(final Properties properties) super(properties); boolean postSubscriptionWhenPull = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false")); this.defaultMQPushConsumer.setPostSubscriptionWhenPull(postSubscriptionWhenPull); String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel)); @Override public void start() this.defaultMQPushConsumer.registerMessageListener(new MessageListenerImpl()); super.start(); @Override public void subscribe(String topic, String subExpression, MessageListener listener) if (null == topic) throw new ONSClientException("topic is null"); if (null == listener) throw new ONSClientException("listener is null"); this.subscribeTable.put(topic, listener); super.subscribe(topic, subExpression); .....
从上面的类中我们可以从this.subscribeTable.put(topic, listener);看到subscribeTable这样的一个Map,该Map与tag无关
2.我们再看super.subscribe(topic, subExpression)方法,属于ONSConsumerAbstract类中
protected void subscribe(String topic, String subExpression) try this.defaultMQPushConsumer.subscribe(topic, subExpression); catch (MQClientException e) throw new ONSClientException("defaultMQPushConsumer subscribe exception", e);
DefaultMQPushConsumer中:
@Override public void subscribe(String topic, String subExpression) throws MQClientException this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
DefaultMQPushConsumerImpl中:
public void subscribe(String topic, String subExpression) throws MQClientException try
//此处用来构建订阅数据,并且指定了tag SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subExpression);
//此处将topic和该topic的订阅数据存放到subscriptionInner这个Map中
// protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>(); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); if (this.mQClientFactory != null) this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); catch (Exception e) throw new MQClientException("subscription exception", e);
三、总结
从上面简单的源码可以看到,有用到两个Map,
subscribeTable 和 subscriptionInner ,并且Map的key都为topic。所以我们可以笃定,RocketMQ在同一个项目中,只支持注册一个topic消费者,那么也就只能指定一个tag
以上是关于RocketMQ同一个项目中,同一个topic,可以通过不同的tag来订阅消息吗?的主要内容,如果未能解决你的问题,请参考以下文章