Apache RocketMQ:理解官方的OrderExample的例子
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache RocketMQ:理解官方的OrderExample的例子相关的知识,希望对你有一定的参考价值。
1. 声明
当前内容主要为学习和分析官方的OrderExample的例子,并理解其含义,内容参考官方文档
2. 官方demo
序号消息生产者
public class OrderedProducer {
public static void main(String[] args) throws Exception {
// Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
producer.setNamesrvAddr("192.168.1.102:9876");
// Launch the instance.
producer.start();
// String[] tags = new String[] { "TagA" };
for (int i = 0; i < 10; i++) {
int orderId = i;
// Create a message instance, specifying topic, tag and message body.
System.out.println("orderId==>" + orderId + ",i=" + i);
Message msg = new Message("TopicTestjjj", "TagA", "KEY" + i,
("Hello RocketMQ i=" + i + ",orderId=" + orderId).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 所谓的发送有序的消息其实就是在发送消息的时候传递序号即可。选择发送的消息队列
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
// 消息队列的个数就是4
System.out.println("mqs的大小==》" + mqs.size());
// 只向一个消息队列中发送消息
return mqs.get(index);
}
// 这里的传递的arg对象就是序号
}, orderId);
System.out.printf("%s%n", sendResult);
}
// server shutdown
producer.shutdown();
}
}
序号消息消费者
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
// 推的消费模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test2");
// 拉的消费者模式
// DefaultMQPullConsumer consumer =new DefaultMQPullConsumer(consumerGroup)
consumer.setNamesrvAddr("192.168.1.102:9876");
// 设置消费者消费消息从哪里开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 表示订阅TopicTestjjj中的消息且具有特定标签的
consumer.subscribe("TopicTestjjj", "TagA");
MessageListenerOrderly listenerOrderly = new MyMessageListenerOrderly();
consumer.registerMessageListener(listenerOrderly);
consumer.start();
System.out.printf("Consumer Started.%n");
}
static class MyMessageListenerOrderly implements MessageListenerOrderly {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 关闭自动提交
context.setAutoCommit(false);
for (MessageExt msgExt : msgs) {
try {
String msg = new String(msgExt.getBody(), RemotingHelper.DEFAULT_CHARSET).intern();
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msg + "%n");
} catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
long incrementAndGet = this.consumeTimes.incrementAndGet();
System.out.println("当前的消费次数:" + incrementAndGet);
return ConsumeOrderlyStatus.SUCCESS;
}
}
}
一般消费者
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
/*
* consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQAdminImpl()
*/
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.1.102:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTestjjj", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt messageExt : msgs) {
try {
String msg = new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET).intern();
System.out.println("Receive New Messages: "+msg);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
上面主要是通过MessageListenerConcurrently
和MessageListenerOrderly
来区分消费是否为order方式
采取的消费手段主要为:服务器向消费者推送消息方式
(push)
其中order消费者是手动将消息发送到4个MessageQueue中,orderId是有序的
实际消息应该是这样分布的(消息为FIFO)
3. 测试
1.首先启动普通的消费者,然后启动order消费者,最后启动order生产者,得到结果为:
一般消费者:
Receive New Messages: Hello RocketMQ i=9,orderId=9
Receive New Messages: Hello RocketMQ i=5,orderId=5
Receive New Messages: Hello RocketMQ i=1,orderId=1
Receive New Messages: Hello RocketMQ i=0,orderId=0
Receive New Messages: Hello RocketMQ i=4,orderId=4
Receive New Messages: Hello RocketMQ i=8,orderId=8
Receive New Messages: Hello RocketMQ i=3,orderId=3
Receive New Messages: Hello RocketMQ i=7,orderId=7
Receive New Messages: Hello RocketMQ i=2,orderId=2
Receive New Messages: Hello RocketMQ i=6,orderId=6
order消费者为:
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=0,orderId=0
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=1,orderId=1
当前的消费次数:1
当前的消费次数:2
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=4,orderId=4
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=5,orderId=5
当前的消费次数:3
当前的消费次数:4
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=8,orderId=8
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=9,orderId=9
当前的消费次数:5
当前的消费次数:6
ConsumeMessageThread_3 Receive New Messages: Hello RocketMQ i=3,orderId=3
当前的消费次数:7
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ i=2,orderId=2
ConsumeMessageThread_3 Receive New Messages: Hello RocketMQ i=7,orderId=7
当前的消费次数:8
当前的消费次数:9
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ i=6,orderId=6
当前的消费次数:10
看起来好像当前的order消费者消费的更多(几乎每次都消费两个)
2.再次启动order生产者:
普通消费者:
Receive New Messages: Hello RocketMQ i=0,orderId=0
Receive New Messages: Hello RocketMQ i=1,orderId=1
Receive New Messages: Hello RocketMQ i=2,orderId=2
Receive New Messages: Hello RocketMQ i=3,orderId=3
Receive New Messages: Hello RocketMQ i=4,orderId=4
Receive New Messages: Hello RocketMQ i=5,orderId=5
Receive New Messages: Hello RocketMQ i=6,orderId=6
Receive New Messages: Hello RocketMQ i=7,orderId=7
Receive New Messages: Hello RocketMQ i=8,orderId=8
Receive New Messages: Hello RocketMQ i=9,orderId=9
order消费者:
ConsumeMessageThread_5 Receive New Messages: Hello RocketMQ i=0,orderId=0
当前的消费次数:11
ConsumeMessageThread_6 Receive New Messages: Hello RocketMQ i=1,orderId=1
当前的消费次数:12
ConsumeMessageThread_7 Receive New Messages: Hello RocketMQ i=2,orderId=2
当前的消费次数:13
ConsumeMessageThread_8 Receive New Messages: Hello RocketMQ i=3,orderId=3
当前的消费次数:14
ConsumeMessageThread_9 Receive New Messages: Hello RocketMQ i=4,orderId=4
当前的消费次数:15
ConsumeMessageThread_10 Receive New Messages: Hello RocketMQ i=5,orderId=5
当前的消费次数:16
ConsumeMessageThread_11 Receive New Messages: Hello RocketMQ i=6,orderId=6
当前的消费次数:17
ConsumeMessageThread_12 Receive New Messages: Hello RocketMQ i=7,orderId=7
当前的消费次数:18
ConsumeMessageThread_13 Receive New Messages: Hello RocketMQ i=8,orderId=8
当前的消费次数:19
ConsumeMessageThread_14 Receive New Messages: Hello RocketMQ i=9,orderId=9
当前的消费次数:20
随后无论多次启动order生产者,两个消费者消费顺序保持一致
说明:当消息发送时检测有消费者,那么发送的消息会直接发送给订阅的消费者
3.删除该topic,然后关闭两个消费者,然后启动order生产者后,过几秒中后启动两个消费者
order生产者:
orderId==>0,i=0
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34070000, offsetMsgId=C0A8016600002A9F0000000000018B36, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=0], queueOffset=0]
orderId==>1,i=1
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E343D0001, offsetMsgId=C0A8016600002A9F0000000000018C18, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=1], queueOffset=0]
orderId==>2,i=2
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E343F0002, offsetMsgId=C0A8016600002A9F0000000000018CFA, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=2], queueOffset=0]
orderId==>3,i=3
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34610003, offsetMsgId=C0A8016600002A9F0000000000018DDC, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=3], queueOffset=0]
orderId==>4,i=4
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34710004, offsetMsgId=C0A8016600002A9F0000000000018EBE, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=0], queueOffset=1]
orderId==>5,i=5
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34800005, offsetMsgId=C0A8016600002A9F0000000000018FA0, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=1], queueOffset=1]
orderId==>6,i=6
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E349F0006, offsetMsgId=C0A8016600002A9F0000000000019082, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=2], queueOffset=1]
orderId==>7,i=7
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E349F0007, offsetMsgId=C0A8016600002A9F0000000000019164, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=3], queueOffset=1]
orderId==>8,i=8
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34AF0008, offsetMsgId=C0A8016600002A9F0000000000019246, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=0], queueOffset=2]
orderId==>9,i=9
mqs的大小==》4
SendResult [sendStatus=SEND_OK, msgId=7F00000107DC73D16E939C8E34C00009, offsetMsgId=C0A8016600002A9F0000000000019328, messageQueue=MessageQueue [topic=TopicTestjjj, brokerName=localhost, queueId=1], queueOffset=2]
普通消费者
Receive New Messages: Hello RocketMQ i=7,orderId=7
Receive New Messages: Hello RocketMQ i=1,orderId=1
Receive New Messages: Hello RocketMQ i=2,orderId=2
Receive New Messages: Hello RocketMQ i=5,orderId=5
Receive New Messages: Hello RocketMQ i=6,orderId=6
Receive New Messages: Hello RocketMQ i=3,orderId=3
Receive New Messages: Hello RocketMQ i=9,orderId=9
Receive New Messages: Hello RocketMQ i=0,orderId=0
Receive New Messages: Hello RocketMQ i=4,orderId=4
Receive New Messages: Hello RocketMQ i=8,orderId=8
基本上完全是乱的没有规律
order消费者
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=1,orderId=1
当前的消费次数:1
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=5,orderId=5
当前的消费次数:2
ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ i=9,orderId=9
当前的消费次数:3
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=0,orderId=0
当前的消费次数:4
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=4,orderId=4
当前的消费次数:5
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ i=8,orderId=8
当前的消费次数:6
ConsumeMessageThread_3 Receive New Messages: Hello RocketMQ i=3,orderId=3
当前的消费次数:7
ConsumeMessageThread_3 Receive New Messages: Hello RocketMQ i=7,orderId=7
当前的消费次数:8
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ i=2,orderId=2
当前的消费次数:9
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ i=6,orderId=6
当前的消费次数:10
观察发现:先是消费队列1中的,然后消费队列0中的,队列3,队列2,实现局部队列中的顺序消费(消息时一条一条的推送的…)
4. 总结
- 如果消费者固定且一直存在,那么消息只要发送到服务器立刻会被推送给消费者
- order消费者是按照messageQueue的消息顺序进行消费(前提必须已经全部写入到queue中)
- MessageQueueSelector就是一种选择发布指定的消费者队列的一个队列选择器,可以将消息发送发送到指定的队列中,如果只要一个队列,那么返回固定的即可
context.setAutoCommit(false);表示关闭自动提交
,相当于开启事务,可以执行回滚操作ConsumeOrderlyStatus.SUCCESS;表示消费成功且提交事务
,ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT表示让当前给我发送消息的队列过段指定的时间再给我发送消息
,ConsumeOrderlyStatus.COMMIT表示手动提交消息消费
,ConsumeOrderlyStatus.ROLLBACK表示事务回滚,重新再给我发消息
以上是关于Apache RocketMQ:理解官方的OrderExample的例子的主要内容,如果未能解决你的问题,请参考以下文章
Apache RocketMQ:理解官方的Broadcasting的例子
Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子
Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子