RocketMQ订阅关系一致性

Posted 爱是与世界平行

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ订阅关系一致性相关的知识,希望对你有一定的参考价值。

订阅关系的一致性指的是,同一个消费者组(Group ID相同)下所有Consumer实例所订阅的Topic与Tag及对消息的处理逻辑必须完全一致。否则,消息消费的逻辑就会混乱,甚至导致消息丢失。

1 正确订阅关系

多个消费者组订阅了多个Topic,并且每个消费者组里的多个消费者实例的订阅关系保持了一致。

2 错误订阅关系

一个消费者组订阅了多个Topic,但是该消费者组里的多个Consumer实例的订阅关系并没有保持一致。

2.1 订阅了不同Topic

该例中的错误在于,同一个消费者组中的两个Consumer实例订阅了不同的Topic。

Consumer实例1-1:(订阅了topic为jodie_test_A,tag为所有的消息)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "*", new MessageListener() 
    public Action consume(Message message, ConsumeContext context) 
        System.out.println(message.getMsgID());
        return Action.CommitMessage;
    
);

Consumer实例1-2:(订阅了topic为jodie_test_B,tag为所有的消息)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_B", "*", new MessageListener() 
    public Action consume(Message message, ConsumeContext context) 
        System.out.println(message.getMsgID());
        return Action.CommitMessage;
    
);

2.2、订阅了不同Tag

该例中的错误在于,同一个消费者组中的两个Consumer订阅了相同Topic的不同Tag。

Consumer实例2-1:(订阅了topic为jodie_test_A,tag为TagA的消息)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "TagA", new MessageListener() 
    public Action consume(Message message, ConsumeContext context) 
        System.out.println(message.getMsgID());
        return Action.CommitMessage;
    
);

Consumer实例2-2:(订阅了topic为jodie_test_A,tag为所有的消息)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "*", new MessageListener() 
    public Action consume(Message message, ConsumeContext context) 
        System.out.println(message.getMsgID());
        return Action.CommitMessage;
    
);

2.3、订阅了不同数量的Topic

该例中的错误在于,同一个消费者组中的两个Consumer订阅了不同数量的Topic。

Consumer实例3-1:(该Consumer订阅了两个Topic)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "TagA", new MessageListener() 
    public Action consume(Message message, ConsumeContext context) 
        System.out.println(message.getMsgID());
        return Action.CommitMessage;
    
);
consumer.subscribe("jodie_test_B", "TagB", new MessageListener() 
    public Action consume(Message message, ConsumeContext context) 
        System.out.println(message.getMsgID());
        return Action.CommitMessage;
    
);

Consumer实例3-2:(该Consumer订阅了一个Topic)

Properties properties = new Properties();
properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("jodie_test_A", "TagB", new MessageListener() 
    public Action consume(Message message, ConsumeContext context) 
        System.out.println(message.getMsgID());
        return Action.CommitMessage;
    
);

以上是关于RocketMQ订阅关系一致性的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ为什么要保证订阅关系的一致性?

【rocketmq客户端】订阅关系一致

阿里云RocketMQ的性能测试(本地测试)

RocketMQ

填坑笔记:RocketMQ消息订阅失败问题?

从源码告诉你,RocketMQ的tag有什么坑。