说说 MQ 之 RocketMQ ( 三 )
Posted ImportNew
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了说说 MQ 之 RocketMQ ( 三 )相关的知识,希望对你有一定的参考价值。
来源:Valleylord ,
valleylord.github.io/post/201607-mq-rocketmq/
RocketMQ 的主备模式
按之前所说,只有 RocketMQ 的多主多从异步复制是可以生产使用的,因此只在这个场景下测试。另外,消息采用 Push 顺序模式消费。
假设集群采用2主2备的模式,需要启动4个 Broker,配置文件如下,
brokerName=broker-a
brokerId=0
listenPort=10911
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlog
brokerRole=ASYNC_MASTER
brokerName=broker-a
brokerId=1
listenPort=10921
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slave
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlog
brokerRole=SLAVE
brokerName=broker-b
brokerId=0
listenPort=20911
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlog
brokerRole=ASYNC_MASTER
brokerRole=ASYNC_MASTER
brokerName=broker-b
brokerId=1
listenPort=20921
storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slave
storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlog
brokerRole=SLAVE
另外,每个机构共通的配置项如下,
brokerClusterName=DefaultCluster
brokerIP1=192.168.232.23
namesrvAddr=192.168.232.23:9876
deleteWhen=04
fileReservedTime=120
flushDiskType=ASYNC_FLUSH
其他设置均采用默认。启动 NameServer 和所有 Broker,并试运行一下 Producer,然后看一下 TestTopic1 当前的情况,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
"brokerDatas":[
{
"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"
},
"brokerName":"broker-b"
},
{
"brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"
},
"brokerName":"broker-a"
}
],
"filterServerTable":{},
"queueDatas":[
{
"brokerName":"broker-a",
"perm":6,
"readQueueNums":4,
"topicSynFlag":0,
"writeQueueNums":4
},
{
"brokerName":"broker-b",
"perm":6,
"readQueueNums":4,
"topicSynFlag":0,
"writeQueueNums":4
}
]
}
可见,TestTopic1 在2个 Broker 上,且每个 Broker 备机也在运行。下面开始主备切换的实验,分别启动 Consumer 和 Producer 进程,消息采用 Pull 顺序模式消费。在消息发送接收过程中,使用 kill -9 停掉 broker-a 的主进程,模拟突然宕机。此时,TestTopic1 的状态如下,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
"brokerDatas":[
{
"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"
},
"brokerName":"broker-b"
},
{
"brokerAddrs":{1:"192.168.232.23:10921"
},
"brokerName":"broker-a"
}
],
"filterServerTable":{},
"queueDatas":[
{
"brokerName":"broker-a",
"perm":6,
"readQueueNums":4,
"topicSynFlag":0,
"writeQueueNums":4
},
{
"brokerName":"broker-b",
"perm":6,
"readQueueNums":4,
"topicSynFlag":0,
"writeQueueNums":4
}
]
}
broker-a 的节点已经减少为只有1个从节点。然后启动broker-a 的主节点,模拟恢复,再看一下 TestTopic1 的状态,
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1
{
"brokerDatas":[
{
"brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"
},
"brokerName":"broker-b"
},
{
"brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"
},
"brokerName":"broker-a"
}
],
"filterServerTable":{},
"queueDatas":[
{
"brokerName":"broker-a",
"perm":6,
"readQueueNums":4,
"topicSynFlag":0,
"writeQueueNums":4
},
{
"brokerName":"broker-b",
"perm":6,
"readQueueNums":4,
"topicSynFlag":0,
"writeQueueNums":4
}
]
}
此时,RocketMQ 已经恢复。
再来看看 Producer 和 Consumer 的日志,先看 Producer 的,如下,
......
00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]
00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]
00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]
00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]
00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]
00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]
00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]
00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]
00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed
at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)
at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)
00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]
00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]
00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]
00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]
00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]
00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]
00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]
00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]
......
01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]
01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]
01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]
01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]
01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]
01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]
01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]
01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]
01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]
日志中显示,在发送完00583条消息之后,开始发生异常 connect to <192.168.232.23:10911> failed,原因应该是 broker-a 的主节点被 kill 掉。之后,从00596条消息开始,RocketMQ 又恢复正常,原因是 broker-b 已经开始提供服务,承担了所有的工作。然后,又重新启动了 broker-a 主节点,由于该节点的加入,从01392条消息开始,broker-a 又开始恢复工作。实验中可以验证,RocketMQ 所谓的多主多备模式,实际上,备机被弱化到无以复加,在主节点宕机的时候,备机无法接替主机的工作,而只是将尚未发送的数据发送出去,由剩下的主节点接替工作。也就是说,N 主 N 备的 RocketMQ 集群中,总共有 2N 台机器,实际工作的只有 N 台,如果有一台挂了,就只有 N-1 台工作了,机器的利用率太低了。
再来看一下 Consumer 的日志,如下,
RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0,
...
RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]
详细日志请查看原文。
可以看到,Consumer 在 broker-a 宕机时间的附近,也出现了异常,connect to <192.168.232.23:10911> failed。虽然还能保持分区上的顺序性,但是已经某种程度上出现了一些紊乱,例如,将我在实验之前的数据给取了出来(Hello MetaQ的消息)。可是,我在实验前,明明做过删除这个 Topic 的动作,看来 RocketMQ 所谓的删除,并未删除 Topic 的数据。之后,broker-a 主机重启之后,又恢复正常。
RocketMQ Pull模式消费需要手动管理 offset 和指定分区,这个在调用的时候不觉得,实际运行的时候才会发现每次总是消费一个分区,消费完之后,才开始消费下一个分区,而下一个分区可能已经堆积了很多消息了,手动做消息分配又比较费事。或许,Push 顺序模式消费才是更好的选择。
另外还有几个比较异常的情况,实验中有几次出现了 CODE: 17 DESC: topic[TopicTest1] not exist, apply first please! 这样的错误,实际上,这时候我只是关掉了 Producer;还有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1 明明文档中说可以用来新增 Topic,而实际上不行。
补充一下:之后,我又使用 Push 顺序模式消费重做了上述实验,结论差不多。只是因为有多线程的原因,日志看起来偶尔有错位,这个问题不大,可以解决。而且,在关闭重启 Broker 的附近,往往伴随着多次的消息重发,不过,RocketMQ 也不保证消息只收到一次就是了。消息重复的问题,Kafka 要比 RocketMQ 显得不那么严重一些。Push 顺序模式消费不需要指定 offset,不需要指定分区,第二次启动可以自动从前一次的 offset 后开始消费。功能上这个与 Kafka 的 Consumer 更类似,虽然 RocketMQ 采用的是异步模式。
RocketMQ 最佳实践
实际上,RocketMQ 自己就有一份《RocketMQ 最佳实践》的文档,里面提到了一些系统设计的问题,例如消费者要幂等,一个应用对应一个 Topic,如此等等。这些经验不仅仅是对 RocketMQ 有用,对 Kafka 也颇有借鉴意义。
后记
这里谈谈我对选择 RocketMQ 还是 Kafka 的个人建议。以上已经做了多处 RocketMQ 和 Kafka 的对比,我个人觉得,Kafka 是一个不断发展中的系统,开源社区比 RocketMQ 要大,也要更活跃一些;另外,Kafka 最新版本已经有了同步复制,消息可靠性更有保障;还有,Kafka 的分区机制,几乎实现了自动负载均衡,这绝对是个杀手级特性;RocketMQ 虽然提供了很多易用的功能,远超出 Kafka,但这些功能并不一定都能用得上,而且多数可以绕过。相比之下,Kafka 的基本功能更加吸引我,再处理故障恢复的时候,细节上要胜过 RocketMQ。当然,如果是 A 公司内部,或者所在公司使用了 A 公司的云产品,那么 RocketMQ 的企业级特性更多一些,或许我会选择 RocketMQ。
系列
【关于投稿】
如果大家有原创好文投稿,请直接给公号发送留言。
① 留言格式:
【投稿】+《 文章标题》+ 文章链接
② 示例:
【投稿】《不要自称是程序员,我十多年的 IT 职场总结》:http://blog.jobbole.com/94148/
③ 最后请附上您的个人简介哈~
看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
以上是关于说说 MQ 之 RocketMQ ( 三 )的主要内容,如果未能解决你的问题,请参考以下文章
阿里面试:说说你项目里使用的 MQ ,分布式系统中 MQ 作用?
#yyds干货盘点#Alibaba中间件技术系列「RocketMQ技术专题」让我们一同来看看RocketMQ和Kafka索引设计