RocketMQ订阅关系一致性
Posted 爱是与世界平行
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ订阅关系一致性相关的知识,希望对你有一定的参考价值。
订阅关系的一致性指的是,同一个消费者组(Group ID相同)下所有Consumer实例所订阅的Topic与Tag及对消息的处理逻辑必须完全一致。否则,消息消费的逻辑就会混乱,甚至导致消息丢失。
1 正确订阅关系
多个消费者组订阅了多个Topic,并且每个消费者组里的多个消费者实例的订阅关系保持了一致。
2 错误订阅关系
一个消费者组订阅了多个Topic,但是该消费者组里的多个Consumer实例的订阅关系并没有保持一致。
2.1 订阅了不同Topic
该例中的错误在于,同一个消费者组中的两个Consumer实例订阅了不同的Topic。
Consumer实例1-1:(订阅了topic为jodie_test_A,tag为所有的消息)
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "*", new MessageListener()
public Action consume(Message message, ConsumeContext context)
System.out.println(message.getMsgID());
return Action.CommitMessage;
);
Consumer实例1-2:(订阅了topic为jodie_test_B,tag为所有的消息)
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_B", "*", new MessageListener()
public Action consume(Message message, ConsumeContext context)
System.out.println(message.getMsgID());
return Action.CommitMessage;
);
2.2、订阅了不同Tag
该例中的错误在于,同一个消费者组中的两个Consumer订阅了相同Topic的不同Tag。
Consumer实例2-1:(订阅了topic为jodie_test_A,tag为TagA的消息)
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "TagA", new MessageListener()
public Action consume(Message message, ConsumeContext context)
System.out.println(message.getMsgID());
return Action.CommitMessage;
);
Consumer实例2-2:(订阅了topic为jodie_test_A,tag为所有的消息)
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "*", new MessageListener()
public Action consume(Message message, ConsumeContext context)
System.out.println(message.getMsgID());
return Action.CommitMessage;
);
2.3、订阅了不同数量的Topic
该例中的错误在于,同一个消费者组中的两个Consumer订阅了不同数量的Topic。
Consumer实例3-1:(该Consumer订阅了两个Topic)
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "TagA", new MessageListener()
public Action consume(Message message, ConsumeContext context)
System.out.println(message.getMsgID());
return Action.CommitMessage;
);
consumer.subscribe("jodie_test_B", "TagB", new MessageListener()
public Action consume(Message message, ConsumeContext context)
System.out.println(message.getMsgID());
return Action.CommitMessage;
);
Consumer实例3-2:(该Consumer订阅了一个Topic)
Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "TagB", new MessageListener()
public Action consume(Message message, ConsumeContext context)
System.out.println(message.getMsgID());
return Action.CommitMessage;
);
以上是关于RocketMQ订阅关系一致性的主要内容,如果未能解决你的问题,请参考以下文章