rocketmq
Posted carl_ysz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq相关的知识,希望对你有一定的参考价值。
参考:
开源社区:https://github.com/alibaba/RocketMQ
rocketmq入门: http://www.cnblogs.com/LifeOnCode/p/4805953.html
考拉哥的博客: http://lifestack.cn/archives/tag/rocketmq
[简书]rocketmq原理和实践:http://www.jianshu.com/p/453c6e7ff81c
一、ROCKETMQ简介
11 月 28 日,阿里巴巴宣布将开源分布式消息中间件 RocketMQ 捐赠给 Apache,成为 Apache 孵化项目,孵化成功后 RocketMQ 有望成为国内首个互联网中间件在 Apache上 的顶级项目,成为全球继 ActiveMQ,Kafka 之后,分布式消息引擎家族中的新成员。此次捐赠,意味着以 MQ(消息队列)为代表的互联网中间件在新兴物联网、大数据领域会发挥着越来越大的作用,将有更多的开发者因此受益。
和rabbitmq一样,Rocketmq是一个分布式的、队列模型的消息中间件,它参考KAFKA,并结合阿里巴巴双11业务需要需要实现的分布式框架。
RocketMQ参考了JMS规范和CORBA Notification规范,但是内部设计没有遵循任何的规范,内部采用Netty NIO进行数据通信。
3.X版本之前使用zookeeper进行路由,之后采用自研发的NameServer进行网络路由,更加的轻量。
Rocketmq的功能非常强大:
- 天然支持集群,消费者负载均衡,水平扩展。
- 支持顺序消息,可以保证消息按照既定的顺序被处理
- 采用零拷贝原理,超大的消息的堆积能力
- 提供丰富的消息机制,如顺序消息、事务消息等等(闭源版本才支持)
在 RocketMQ 项目基础上衍生的项目如下:
- com.taobao.metaq v3.0 = RocketMQ + 淘宝个性化需求 为淘宝应用提供消息服务
- com.alipay.zpullmsg v1.0 = RocketMQ + 支付宝个性化需求 为支付宝应用提供消息服务
- com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B 个性化需求 为 B2B 应用提供消息服务
二、术语
-
Producer
消息生产者,负责产生消息,一般是业务系统
-
Consumer
消息消费者,负责消费消息,一般是后台系统
-
Push Consumer
消费者的一种,应用程序通过向 Consumer对象注册一个Listener接口,一旦收到消息,Cosumer对象立刻回调Listener中的方法
-
Pull Consumer
消费者的一种,应用程序主动调用Consumer的拉取方法去拉取消息,此时主动权在消费者手中
-
Producer Group
一类Producer的集合名称,这类Producer通常发送一类消息,并且发送逻辑一致
-
Consumer Group
一类Consumer的集合名称,这类Consumer通常消费一致,逻辑一致
-
Broker
服务器
-
广播消费
类似于rabbitmq中的sanhout,即一条消息会被一个组(Consumer Group)中全部的Consumer都消费。
-
集群消费
一个Consumer Group中的Consumer实例会平均分摊消息,即天生的负载均衡,例如某个Topic有9条消息,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息。
-
顺序消息
消费消息的顺序同发送消息的顺序一致,在Rocketmq中主要是指局部顺序,即一类消息为满足顺序性,必须Producer单线程顺序发送,且发送同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息。
-
普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常后,Broker重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息不一致。
如果消息能够容忍异常,则普通顺序比较合适
-
严格顺序消息
顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。
如果服务器部署为同步双写模式,则此缺陷可以通过备用机器自动切换避免。
目前已经的应用只有数据库binlog同步强依赖严格的顺序消息,其他应用绝大部分可以容忍短暂乱序。
-
Message Queue
在RocketMQ中,所有消息队列都是持久化的,长度无限的数据结构,所谓长度无限是指队列中每个存储单元都是定长,访问其中的存储单元用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。
可以认为Message Queue就是一个长度无限的数组,offset就是下标。
三、安装
首先看一下 rocketmq的物理结构图
- Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步,用于集群服务的协调管理,之前有zk实现,3.x之后由自身实现,更加轻量稳定
- Broker 部署相对复杂,Broker分为Maser和Slave,一个Master可以对应多个Slave,但是一个Slave只能有一个Master,Master和Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示是Master,非0表示Slave。Master也可以部署多个。每个Broker和Name Server集群中所有的节点建立长连接,定时注册Topic信息到所有的NameServer。
- Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时发送心跳。Producer完全无状态,可以集群部署。
- Consumer和NameServer集群中的其中一个节点(随机选择)建立长连接,定期从Name Server 取Topic路由信息,并且提供Topic服务的Master,Slave建立长连接,且定时向Master、Slave发送心跳。Consumer即可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定
rocketmq的部署方式也很多,一句话总结其特征就是:不支持主从自动切换、slave只能读不能写,所以故障后必须人工干预恢复负载。
集群方式
|
运维特点
|
消息可靠性(master宕机情况)
|
服务可用性(master宕机情况)
|
其他特点
|
备注
|
---|---|---|---|---|---|
一组主主 | 结构简单,扩容方便,机器要求低 | 同步刷盘消息一条都不会丢 |
整体可用 未被消费的消息无法取得,影响实时性 |
性能最高 | 适合消息可靠性最高、实时性低的需求。 |
一组主从 |
异步有毫秒级丢失; 同步双写不丢失; |
差评,主备不能自动切换,且备机只能读不能写,会造成服务整体不可写。 |
不考虑,除非自己提供主从切换的方案。 | ||
多组主从(异步复制) | 结构复杂,扩容方便 | 故障时会丢失消息; |
整体可用,实时性影响毫秒级别 该组服务只能读不能写 |
性能很高 | 适合消息可靠性中等,实时性中等的要求。 |
多组主从(同步双写) | 结构复杂,扩容方便 | 不丢消息 |
整体可用,不影响实时性 该组服务只能读不能写。 不能自动切换? |
性能比异步低10%,所以实时性也并不比异步方式太高。 |
适合消息可靠性略高,实时性中等、性能要求不高的需 |
后面会详细演示如何安装集群。
同步双写和异步复制:
非常容易理解,在主从同步数据时的常见做法,是对Master写了一条数据,先给客户端响应还是先在Slave上同步数据的问题,因为Rocketmq中的架构设计也非常简单,只有一个Slave,所以即使同步双写,也不会有太大的性能损失,当然不用同步性能肯定更好,像mysql那样的多个Slave架构还有半同步复制的概念,只要有一个Slave同步成功就给客户端响应,都差不多。
搭建双Master(一组主主):
1. 环境
CentOS7
jdk1.8
ysz211(192.168.1.211)
ysz212(192.168.1.212)
搭建集群一般环境:时间同步,无密码ssh通信等
Hosts搭建信息便于理解:
192.168.1.211 rocketmq-namesrv1
192.168.1.211 rocketmq-m1
192.168.1.212 rocketmq-namesrv2
192.168.1.212 rocketmq-m2
2. 上传文件(2台机器)
tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local cd /usr/local/ mv alibaba-rocketmq alibaba-rocketmq-3.2.6 ln -sv alibaba-rocketmq-3.2.6 rocketmq mkdir /usr/local/rocketmq/store mkdir /usr/local/rocketmq/store/commitlog mkdir /usr/local/rocketmq/store/consumequeue mkdir -p /usr/local/rocketmq/logs
3. 修改配置文件
我们搭建的是2master,因此,进入到目录/usr/local/rocketmq/conf/2m-noslave/broker-a.properties中进行修改,目录名非常贴切..
内容大致如下,注意brokerName、namesrvAddr、brokerId(这里不用)要修改即可
#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a|broker-b #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-namesrv1:9876;rocketmq-namesrv2:9876 # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # Broker 对外服务的监听端口 listenPort=10911 # 删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
cd /usr/local/rocketmq/conf && sed -i \'s#${user.home}#/usr/local/rocketmq#g\' *.xml
命令说明:在conf目录下对所有以.xml结尾的文本使用sed命令,-i表示修改原文件,将所有${user.home}这样的内容直接修改为/usr/local/rocketmq即可
4. 修改启动脚本参数
由于是虚拟机,没有默认那么大的内存,直接修改/usr/local/rocketmq/bin/runbroker.sh和/usr/local/rocketmq/bin/runserver.sh
修改为-server -Xms1g -Xmx1g -Xmn512m
防止jvm内存分配不足无法启动
5. 启动namesrv(2台机器)
cd /usr/local/rocketmq/bin nohup sh mqnamesrv &
6. 启动srv
启动a:
cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 & netstat -ntlp jps tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
启动b:
cd /usr/local/rocketmq/bin nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 & netstat -ntlp jps tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
7. tomcat中部署rocketmq-console.war
注意修改war包中的config.properties中nameSRV地址为192.168.1.211:9876;192.168.1.212:9876
如何进行数据清理?
cd /usr/local/rocketmq/bin sh mqshutdown broker sh mqshutdown namesrv # --等待停止 rm -rf /usr/local/rocketmq/store mkdir /usr/local/rocketmq/store mkdir /usr/local/rocketmq/store/commitlog mkdir /usr/local/rocketmq/store/consumequeue mkdir /usr/local/rocketmq/store/index # --按照上面步骤重启NameServer与BrokerServer
四、Hello World
Producer
import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; /** * Created by carl.yu on 2016/11/30. */ public class Porducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer"); /** * produder: * 同一个组名则表示是同一个应用 * createTopicKey TBW102 在发送消息时,自动创建服务器不存在的topic 需要key * defaultTopicQueueName 4 默认创建队列数 * sendMsgTimeout 10000 默认超时时间,单位毫秒 * compressMsgBodyOverHowmuch 4096 消息Body超过4096字节开始压缩 * retryTimesWhenSendFailed 重试次数 * retryAnotherBrokerWhenNotStoreOK False * maxMessageSize 131072 客户端限制大小,超过报错,默认128K * transactionCheckListener 事务消息回查监听器 * checkThreadPoolMinSize 1 Broker回查事务状态线程池大小 * checkThreadPoolMaxSize 1 Broker回查事务状态时线程池设置 * checkRequestHoldMax 2000 回查时Producer本地缓冲请求队列大小 */ producer.setNamesrvAddr("192.168.1.211:9876;192.168.1.212:9876"); producer.start(); for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); Thread.sleep(3000); } } } }
Consumer:
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import java.util.List; /** * Created by carl.yu on 2016/12/1. */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("192.168.1.211:9876;192.168.1.212:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); System.out.println(" Receive Message Size: " + msgs.size()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
先启动consumer,再启动producer。
可以观察到:
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=1, sysFlag=0, bornTimestamp=1480557302269, bornHost=/192.168.1.103:57275, storeTimestamp=1480557301214, storeHost=/192.168.1.211:10911, msgId=C0A801D300002A9F0000000000000E0A, commitLogOffset=3594, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, WAIT=true, TAGS=TagA}, body=16]]] Receive Message Size: 1 ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=0, sysFlag=0, bornTimestamp=1480557278159, bornHost=/192.168.1.103:57275, storeTimestamp=1480557277160, storeHost=/192.168.1.211:10911, msgId=C0A801D300002A9F0000000000000666, commitLogOffset=1638, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, WAIT=true, TAGS=TagA}, body=16]]] Receive Message Size: 1 ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=136, queueOffset=0, sysFlag=0, bornTimestamp=1480557284185, bornHost=/192.168.1.103:57275, storeTimestamp=1480557283169, storeHost=/192.168.1.211:10911, msgId=C0A801D300002A9F0000000000000776, commitLogOffset=1910, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, WAIT=true, TAGS=TagA}, body=16]]] Receive Message Size: 1 ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=136, queueOffset=0, sysFlag=0, bornTimestamp=1480557287190, bornHost=/192.168.1.103:57275, storeTimestamp=1480557286170, storeHost=/192.168.1.211:10911, msgId=C0A801D300002A9F00000000000007FE, commitLogOffset=2046, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, WAIT=true, TAGS=TagA}, body=16]]] Receive Message Size: 1 ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=1, sysFlag=0, bornTimestamp=1480557305275, bornHost=/192.168.1.103:57275, storeTimestamp=1480557304213, storeHost=/192.168.1.211:10911, msgId=C0A801D300002A9F0000000000000E92, commitLogOffset=3730, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, WAIT=true, TAGS=TagA}, body=16]]] Receive Message Size: 1 ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=1, storeSize=136, queueOffset=0, sysFlag=0, bornTimestamp=1480557281170, bornHost=/192.168.1.103:57275, storeTimestamp=1480557280172, storeHost=/192.168.1.211:10911, msgId=C0A801D300002A9F00000000000006EE, commitLogOffset=1774, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, WAIT=true, TAGS=TagA}, body=16]]] Receive Message Size: 1 ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=3, storeSize=136, queueOffset=0, sysFlag=0, bornTimestamp=1480557299262, bornHost=/192.168.1.103:57276, storeTimestamp=1480557296407, storeHost=/192.168.1.212:10911, msgId=C0A801D400002A9F0000000000000800, commitLogOffset=2048, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, WAIT=true, TAGS=TagA}, body=16]]] Receive Message Size: 1 ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=0, storeSize=136, queueOffset=0, sysFlag=0, bornTimestamp=1480557290214, bornHost=/192.168.1.103:57276, storeTimestamp=1480557287386, storeHost=/192.168.1.212:10911, msgId=C0A801D400002A9F0000000000000668, commitLogOffset=1640, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, WAIT=true, TAGS=TagA}, body=16]]] Receive Message Size: 1 ConsumeMessageThread_9 Receive New Messages: [MessageExt [queueId=2, storeSize=136, queueOffset=0, sysFlag=0, bornTimestamp=1480557296256, bornHost=/192.168.1.103:57276, storeTimestamp=1480557293409, storeHost=/192.168.1.212:10911, msgId=C0A801D400002A9F0000000000000778, commitLogOffset=1912, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, WAIT=true, TAGS=TagA}, body=16]]] Receive Message Size: 1
说明:
1. 支持tag进行简单匹配
2. 通过web页面观察,消息会被分配到不同的broker上
3. 消费者通过异步注册进行消费,处理线程池可以设置
4. 消息发送到broker上会分配一个msgId 可以用来查询
五、rocketmq架构简介
rocketmq包含9个子模块:
- rocketmq-common:通用的常量枚举、基类方法或者数据结构,按描述的目标来分包通俗易懂。包名有:admin,consumer,filter,hook,message等。
- rocketmq-remoting:用Netty4写的客户端和服务端,fastjson做的序列化,自定义二进制协议。
- rocketmq-srvutil:只有一个ServerUtil类,类注解是,只提供Server程序依赖,目的为了拆解客户端依赖,尽可能减少客户端的依赖。
- rocketmq-store:存储服务,消息存储,索引存储,commitLog存储。
- rocketmq-client:客户端,包含producer端和consumer端,发送消息和接收消息的过程。
- rocketmq-filtersrv:消息过滤器server,现在rocketmq的wiki上有示例代码及说明,https://github.com/alibaba/RocketMQ/wiki/filter_server_guide,以后会专门对每个模块做分析,到时出个完整的demo以及流程图。
- rocketmq-broker:对consumer和producer来说是服务端,接收producer发来的消息并存储,同时consumer来这里拉取消息。
- rocketmq-tools:命令行工具。
- rocketmq-namesrv:NameServer,类似SOA服务的注册中心,这里保存着消息的TopicName,队列等运行时的meta信息。一般系统分dataNode和nameNode,这里是nameNode。
六、顺序消费
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。
假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:
这样可以保证M1先于M2到达MQServer(客户端等待M1成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。
这个模型,理论上可以保证消息的顺序,但在实际运用中你应该会遇到下面的问题:
如果将一组顺序消息发往2台不同的消费者,依旧需要全局时钟下C1和C2的消费顺序保证,解决方式是保证发往同一个Consumer。
示例代码如下:
OrderProducer
import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Created by carl.yu on 2016/12/1. */ public class OrderProducer { public static void main(String[] args) throws IOException { try { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.1.211:9876;192.168.1.212:9876"); producer.start(); // String[] tags = new String[] { "TagA", "TagC", "TagD" }; String[] tags = new String[]{"TagA"}; Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateStr = sdf.format(date); for (int i = 0; i < 30; i++) { // 加个时间后缀 String body = dateStr + " Hello RocketMQ " + i; Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; return mqs.get(id); } }, 0);//0是队列的下标 System.out.println(sendResult + ", body:" + body); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.in.read(); } }
OrderConsumer
import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) */ public class OrderConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("192.168.1.211:9876;192.168.1.212:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTestjjj", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { System.out.print(Thread.currentThread().getName() + " Receive New Messages: "); for (MessageExt msg : msgs) { System.out.println(msg + ", content:" + new String(msg.getBody())); } try { //模拟业务逻辑处理中... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("OrderConsumer Started."); } }
上面代码和一般的普通消息只有2个区别,非常简单:
1. 发送时指定了Queue
2. 消费时使用Order监听器进行监听
通过Web Console和测试发现,只有一个节点上有消息,且在多个Consumer下只有一个Consumer接收到了消息,和上面分析一致。
七、Push和Pull
在rocketmq中的消费者获取消息机制,本质上只有一种,就是:pull ! push在这里可以理解为就是一种封装好了的pull.
- push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
- pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
暂时想不到非要用pull实现消息拉取机制的理由... 感觉push已经够用了,这里列出代码:
import com.alibaba.rocketmq.client.consumer.MQPullConsumer; import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.consumer.PullTaskCallback; import com.alibaba.rocketmq.client.consumer.PullTaskContext; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.common.protocol.heartbeat.Me以上是关于rocketmq的主要内容,如果未能解决你的问题,请参考以下文章