Apache RocketMQ:理解官方的OrderExample的例子

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache RocketMQ:理解官方的OrderExample的例子相关的知识,希望对你有一定的参考价值。

1. 声明

当前内容主要为学习和分析官方的OrderExample的例子,并理解其含义,内容参考官方文档

2. 官方demo

序号消息生产者

public class OrderedProducer {
	public static void main(String[] args) throws Exception {
		// Instantiate with a producer group name.
		DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
		producer.setNamesrvAddr("192.168.1.102:9876");
		// Launch the instance.
		producer.start();
		// String[] tags = new String[] { "TagA" };
		for (int i = 0; i < 10; i++) {
			int orderId = i;
			// Create a message instance, specifying topic, tag and message body.
			System.out.println("orderId==>" + orderId + ",i=" + i);
			Message msg = new Message("TopicTestjjj", "TagA", "KEY" + i,
					("Hello RocketMQ i=" + i + ",orderId=" + orderId).getBytes(RemotingHelper.DEFAULT_CHARSET));
			// 所谓的发送有序的消息其实就是在发送消息的时候传递序号即可。选择发送的消息队列
			SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

					Integer id = (Integer) arg;
					int index = id % mqs.size();

					// 消息队列的个数就是4
					System.out.println("mqs的大小==》" + mqs.size());
					// 只向一个消息队列中发送消息
					return mqs.get(index);
				}
				// 这里的传递的arg对象就是序号
			}, orderId);

			System.out.printf("%s%n", sendResult);
		}
		// server shutdown
		producer.shutdown();
	}
}

序号消息消费者

public class OrderedConsumer {
	public static void main(String[] args) throws Exception {
		// 推的消费模式
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test2");
		// 拉的消费者模式
		// DefaultMQPullConsumer consumer =new DefaultMQPullConsumer(consumerGroup)
		consumer.setNamesrvAddr("192.168.1.102:9876");

		// 设置消费者消费消息从哪里开始
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

		// 表示订阅TopicTestjjj中的消息且具有特定标签的
		consumer.subscribe("TopicTestjjj", "TagA");

		MessageListenerOrderly listenerOrderly = new MyMessageListenerOrderly();
		consumer.registerMessageListener(listenerOrderly);

		consumer.start();

		System.out.printf("Consumer Started.%n");
	}

	static class MyMessageListenerOrderly implements MessageListenerOrderly {
		AtomicLong consumeTimes = new AtomicLong(0);

		@Override
		public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
			// 关闭自动提交
			context.setAutoCommit(false);

			for (MessageExt msgExt : msgs) {
				try {
					String msg = new String(msgExt.getBody(), RemotingHelper.DEFAULT_CHARSET).intern();
					System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msg + "%n");
				} catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			long incrementAndGet = this.consumeTimes.incrementAndGet();
			System.out.println("当前的消费次数:" + incrementAndGet);
			return ConsumeOrderlyStatus.SUCCESS;

		}

	}
}

一般消费者

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
		/*
		 * consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQAdminImpl()
		 */
        // Specify name server addresses.
        consumer.setNamesrvAddr("192.168.1.102:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTestjjj", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                //System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                for (MessageExt messageExt : msgs) {
                	try {
						String msg = new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET).intern();
						System.out.println("Receive New Messages: "+msg);
					} catch (UnsupportedEncodingException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

上面主要是通过MessageListenerConcurrentlyMessageListenerOrderly来区分消费是否为order方式

采取的消费手段主要为:服务器向消费者推送消息方式(push)

其中order消费者是手动将消息发送到4个MessageQueue中,orderId是有序的

实际消息应该是这样分布的(消息为FIFO)

3. 测试

1.首先启动普通的消费者,然后启动order消费者,最后启动order生产者,得到结果为:

一般消费者:

Receive New Messages: Hello RocketMQ i=9,orderId=9
Receive New Messages: Hello RocketMQ i=5,orderId=5
Receive New Messages: Hello RocketMQ i=1,orderId=1
Receive New Messages: Hello RocketMQ i=0,orderId=0
Receive New Messages: Hello RocketMQ i=4,orderId=4
Receive New Messages: Hello RocketMQ i=8,orderId=8
Receive New Messages: Hello RocketMQ i=3,orderId=3
Receive New Messages: Hello RocketMQ i=7,orderId=7
Receive New Messages: Hello RocketMQ i=2,orderId=2
Receive New Messages: Hello RocketMQ i=6,orderId=6

order消费者为:

ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=0,orderId=0
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=1,orderId=1
当前的消费次数:1
当前的消费次数:2
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=4,orderId=4
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=5,orderId=5
当前的消费次数:3
当前的消费次数:4
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=8,orderId=8
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=9,orderId=9
当前的消费次数:5
当前的消费次数:6
ConsumeMessageThread_3 Receive New Messages: Hello RocketMQ i=3,orderId=3
当前的消费次数:7
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ i=2,orderId=2
ConsumeMessageThread_3 Receive New Messages: Hello RocketMQ i=7,orderId=7
当前的消费次数:8
当前的消费次数:9
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ i=6,orderId=6
当前的消费次数:10

看起来好像当前的order消费者消费的更多(几乎每次都消费两个)

2.再次启动order生产者:
普通消费者:

Receive New Messages: Hello RocketMQ i=0,orderId=0
Receive New Messages: Hello RocketMQ i=1,orderId=1
Receive New Messages: Hello RocketMQ i=2,orderId=2
Receive New Messages: Hello RocketMQ i=3,orderId=3
Receive New Messages: Hello RocketMQ i=4,orderId=4
Receive New Messages: Hello RocketMQ i=5,orderId=5
Receive New Messages: Hello RocketMQ i=6,orderId=6
Receive New Messages: Hello RocketMQ i=7,orderId=7
Receive New Messages: Hello RocketMQ i=8,orderId=8
Receive New Messages: Hello RocketMQ i=9,orderId=9

order消费者:

ConsumeMessageThread_5 Receive New Messages: Hello RocketMQ i=0,orderId=0
当前的消费次数:11
ConsumeMessageThread_6 Receive New Messages: Hello RocketMQ i=1,orderId=1
当前的消费次数:12
ConsumeMessageThread_7 Receive New Messages: Hello RocketMQ i=2,orderId=2
当前的消费次数:13
ConsumeMessageThread_8 Receive New Messages: Hello RocketMQ i=3,orderId=3
当前的消费次数:14
ConsumeMessageThread_9 Receive New Messages: Hello RocketMQ i=4,orderId=4
当前的消费次数:15
ConsumeMessageThread_10 Receive New Messages: Hello RocketMQ i=5,orderId=5
当前的消费次数:16
ConsumeMessageThread_11 Receive New Messages: Hello RocketMQ i=6,orderId=6
当前的消费次数:17
ConsumeMessageThread_12 Receive New Messages: Hello RocketMQ i=7,orderId=7
当前的消费次数:18
ConsumeMessageThread_13 Receive New Messages: Hello RocketMQ i=8,orderId=8
当前的消费次数:19
ConsumeMessageThread_14 Receive New Messages: Hello RocketMQ i=9,orderId=9
当前的消费次数:20

随后无论多次启动order生产者,两个消费者消费顺序保持一致

说明:当消息发送时检测有消费者,那么发送的消息会直接发送给订阅的消费者

3.删除该topic,然后关闭两个消费者,然后启动order生产者后,过几秒中后启动两个消费者
order生产者:

orderId==>0,i=0
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34070000, offsetMsgId=C0A8016600002A9F0000000000018B36, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=0], queueOffset=0]
orderId==>1,i=1
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E343D0001, offsetMsgId=C0A8016600002A9F0000000000018C18, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=1], queueOffset=0]
orderId==>2,i=2
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E343F0002, offsetMsgId=C0A8016600002A9F0000000000018CFA, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=2], queueOffset=0]
orderId==>3,i=3
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34610003, offsetMsgId=C0A8016600002A9F0000000000018DDC, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=3], queueOffset=0]
orderId==>4,i=4
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34710004, offsetMsgId=C0A8016600002A9F0000000000018EBE, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=0], queueOffset=1]
orderId==>5,i=5
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34800005, offsetMsgId=C0A8016600002A9F0000000000018FA0, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=1], queueOffset=1]
orderId==>6,i=6
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E349F0006, offsetMsgId=C0A8016600002A9F0000000000019082, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=2], queueOffset=1]
orderId==>7,i=7
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E349F0007, offsetMsgId=C0A8016600002A9F0000000000019164, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=3], queueOffset=1]
orderId==>8,i=8
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34AF0008, offsetMsgId=C0A8016600002A9F0000000000019246, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=0], queueOffset=2]
orderId==>9,i=9
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34C00009, offsetMsgId=C0A8016600002A9F0000000000019328, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=1], queueOffset=2]

普通消费者

Receive New Messages: Hello RocketMQ i=7,orderId=7
Receive New Messages: Hello RocketMQ i=1,orderId=1
Receive New Messages: Hello RocketMQ i=2,orderId=2
Receive New Messages: Hello RocketMQ i=5,orderId=5
Receive New Messages: Hello RocketMQ i=6,orderId=6
Receive New Messages: Hello RocketMQ i=3,orderId=3
Receive New Messages: Hello RocketMQ i=9,orderId=9
Receive New Messages: Hello RocketMQ i=0,orderId=0
Receive New Messages: Hello RocketMQ i=4,orderId=4
Receive New Messages: Hello RocketMQ i=8,orderId=8

基本上完全是乱的没有规律

order消费者

ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=1,orderId=1
当前的消费次数:1
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=5,orderId=5
当前的消费次数:2
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=9,orderId=9
当前的消费次数:3
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=0,orderId=0
当前的消费次数:4
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=4,orderId=4
当前的消费次数:5
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=8,orderId=8
当前的消费次数:6
ConsumeMessageThread_3 Receive New Messages: Hello RocketMQ i=3,orderId=3
当前的消费次数:7
ConsumeMessageThread_3 Receive New Messages: Hello RocketMQ i=7,orderId=7
当前的消费次数:8
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ i=2,orderId=2
当前的消费次数:9
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ i=6,orderId=6
当前的消费次数:10

观察发现:先是消费队列1中的,然后消费队列0中的,队列3,队列2,实现局部队列中的顺序消费(消息时一条一条的推送的…)

4. 总结

  1. 如果消费者固定且一直存在,那么消息只要发送到服务器立刻会被推送给消费者
  2. order消费者是按照messageQueue的消息顺序进行消费(前提必须已经全部写入到queue中)
  3. MessageQueueSelector就是一种选择发布指定的消费者队列的一个队列选择器,可以将消息发送发送到指定的队列中,如果只要一个队列,那么返回固定的即可
  4. context.setAutoCommit(false);表示关闭自动提交,相当于开启事务,可以执行回滚操作
  5. ConsumeOrderlyStatus.SUCCESS;表示消费成功且提交事务ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT表示让当前给我发送消息的队列过段指定的时间再给我发送消息ConsumeOrderlyStatus.COMMIT表示手动提交消息消费ConsumeOrderlyStatus.ROLLBACK表示事务回滚,重新再给我发消息

以上是关于Apache RocketMQ:理解官方的OrderExample的例子的主要内容,如果未能解决你的问题,请参考以下文章

Apache RocketMQ:理解官方的Broadcasting的例子

Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子

Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子

Apache RocketMQ:使用官方demo测试rocketmq

RocketMQ部署以及调优

RocketMQ架构原理及名词概念