说说 MQ 之 RocketMQ ( 三 )

Posted ImportNew

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了说说 MQ 之 RocketMQ ( 三 )相关的知识,希望对你有一定的参考价值。


来源:Valleylord ,

valleylord.github.io/post/201607-mq-rocketmq/


RocketMQ 的主备模式


按之前所说,只有 RocketMQ 的多主多从异步复制是可以生产使用的,因此只在这个场景下测试。另外,消息采用 Push 顺序模式消费。


假设集群采用2主2备的模式,需要启动4个 Broker,配置文件如下,


brokerName=broker-a

brokerId=0

listenPort=10911

storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async

storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async/commitlog

brokerRole=ASYNC_MASTER

brokerName=broker-a

brokerId=1

listenPort=10921

storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-a-async-slave

storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlog

brokerRole=SLAVE

brokerName=broker-b

brokerId=0

listenPort=20911

storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async

storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async/commitlog

brokerRole=ASYNC_MASTER

brokerRole=ASYNC_MASTER

brokerName=broker-b

brokerId=1

listenPort=20921

storePathRootDir=/home/arnes/alibaba-rocketmq/data/store-b-async-slave

storePathCommitLog=/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlog

brokerRole=SLAVE


另外,每个机构共通的配置项如下,


brokerClusterName=DefaultCluster

brokerIP1=192.168.232.23

namesrvAddr=192.168.232.23:9876

deleteWhen=04

fileReservedTime=120

flushDiskType=ASYNC_FLUSH


其他设置均采用默认。启动 NameServer 和所有 Broker,并试运行一下 Producer,然后看一下 TestTopic1 当前的情况,


$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1

{

        "brokerDatas":[

                {

                        "brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"

                        },

                        "brokerName":"broker-b"

                },

                {

                        "brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"

                        },

                        "brokerName":"broker-a"

                }

        ],

        "filterServerTable":{},

        "queueDatas":[

                {

                        "brokerName":"broker-a",

                        "perm":6,

                        "readQueueNums":4,

                        "topicSynFlag":0,

                        "writeQueueNums":4

                },

                {

                        "brokerName":"broker-b",

                        "perm":6,

                        "readQueueNums":4,

                        "topicSynFlag":0,

                        "writeQueueNums":4

                }

        ]

}


可见,TestTopic1 在2个 Broker 上,且每个 Broker 备机也在运行。下面开始主备切换的实验,分别启动 Consumer 和 Producer 进程,消息采用 Pull 顺序模式消费。在消息发送接收过程中,使用 kill -9 停掉 broker-a 的主进程,模拟突然宕机。此时,TestTopic1 的状态如下,


$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1

{

        "brokerDatas":[

                {

                        "brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"

                        },

                        "brokerName":"broker-b"

                },

                {

                        "brokerAddrs":{1:"192.168.232.23:10921"

                        },

                        "brokerName":"broker-a"

                }

        ],

        "filterServerTable":{},

        "queueDatas":[

                {

                        "brokerName":"broker-a",

                        "perm":6,

                        "readQueueNums":4,

                        "topicSynFlag":0,

                        "writeQueueNums":4

                },

                {

                        "brokerName":"broker-b",

                        "perm":6,

                        "readQueueNums":4,

                        "topicSynFlag":0,

                        "writeQueueNums":4

                }

        ]

}


broker-a 的节点已经减少为只有1个从节点。然后启动broker-a 的主节点,模拟恢复,再看一下 TestTopic1 的状态,


$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1

{

        "brokerDatas":[

                {

                        "brokerAddrs":{0:"192.168.232.23:20911",1:"192.168.232.23:20921"

                        },

                        "brokerName":"broker-b"

                },

                {

                        "brokerAddrs":{0:"192.168.232.23:10911",1:"192.168.232.23:10921"

                        },

                        "brokerName":"broker-a"

                }

        ],

        "filterServerTable":{},

        "queueDatas":[

                {

                        "brokerName":"broker-a",

                        "perm":6,

                        "readQueueNums":4,

                        "topicSynFlag":0,

                        "writeQueueNums":4

                },

                {

                        "brokerName":"broker-b",

                        "perm":6,

                        "readQueueNums":4,

                        "topicSynFlag":0,

                        "writeQueueNums":4

                }

        ]

}


此时,RocketMQ 已经恢复。


再来看看 Producer 和 Consumer 的日志,先看 Producer 的,如下,


......

00578SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F08, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=141]

00579SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000126F9F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=141]

00580SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078D47, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=700]

00581SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078DDE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=700]

00582SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078E75, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=699]

00583SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078F0C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=699]

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

00588SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000078FA3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=701]

00589SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007903A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=701]

00590SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000790D1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=700]

00591SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079168, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=700]

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.232.23:10911> failed

    at com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)

    at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)

    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)

    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)

    at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)

00596SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000791FF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=702]

00597SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079296, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=702]

00598SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007932D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=701]

00599SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000793C4, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=701]

00600SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000007945B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=703]

00601SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000794F2, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=703]

00602SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079589, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=702]

00603SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000079620, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=702]

......

01389SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000965BE, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=900]

01390SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096655, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=2], queueOffset=899]

01391SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF00000000000966EC, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=3], queueOffset=899]

01392SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127036, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=143]

01393SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001270CD, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=141]

01394SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F0000000000127164, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=142]

01395SendResult [sendStatus=SEND_OK, msgId=C0A8E81700002A9F00000000001271FB, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=142]

01396SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF0000000000096783, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=0], queueOffset=901]

01397SendResult [sendStatus=SEND_OK, msgId=C0A8E817000051AF000000000009681A, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-b, queueId=1], queueOffset=901]


日志中显示,在发送完00583条消息之后,开始发生异常 connect to <192.168.232.23:10911> failed,原因应该是 broker-a 的主节点被 kill 掉。之后,从00596条消息开始,RocketMQ 又恢复正常,原因是 broker-b 已经开始提供服务,承担了所有的工作。然后,又重新启动了 broker-a 主节点,由于该节点的加入,从01392条消息开始,broker-a 又开始恢复工作。实验中可以验证,RocketMQ 所谓的多主多备模式,实际上,备机被弱化到无以复加,在主节点宕机的时候,备机无法接替主机的工作,而只是将尚未发送的数据发送出去,由剩下的主节点接替工作。也就是说,N 主 N 备的 RocketMQ 集群中,总共有 2N 台机器,实际工作的只有 N 台,如果有一台挂了,就只有 N-1 台工作了,机器的利用率太低了。


再来看一下 Consumer 的日志,如下,


RocketMQ 00551PullResult [pullStatus=FOUND, nextBeginOffset=696, minOffset=0, maxOffset=696, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=695, sysFlag=0, bornTimestamp=1469175032446, bornHost=/192.168.234.98:51987, storeTimestamp=1469175020973, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF000000000007859C, commitLogOffset=492956, bodyCRC=943070764, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=696, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

RocketMQ 00559PullResult [pullStatus=FOUND, nextBeginOffset=697, minOffset=0, maxOffset=697, msgFoundList=1]MessageExt [queueId=3, storeSize=151, queueOffset=696, sysFlag=0, bornTimestamp=1469175032720, bornHost=/192.168.234.98:51987, storeTimestamp=1469175021247, storeHost=/192.168.232.23:20911, msgId=C0A8E817000051AF00000000000787F8, commitLogOffset=493560, bodyCRC=921540126, reconsumeTimes=0, 


...


RocketMQ 01408PullResult [pullStatus=FOUND, nextBeginOffset=211, minOffset=0, maxOffset=211, msgFoundList=1]MessageExt [queueId=0, storeSize=151, queueOffset=145, sysFlag=0, bornTimestamp=1469175072078, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060614, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F00000000001274EE, commitLogOffset=1209582, bodyCRC=98787231, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=211, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]

RocketMQ 01416PullResult [pullStatus=FOUND, nextBeginOffset=214, minOffset=0, maxOffset=214, msgFoundList=3]MessageExt [queueId=0, storeSize=151, queueOffset=146, sysFlag=0, bornTimestamp=1469175072405, bornHost=/192.168.234.98:52034, storeTimestamp=1469175060934, storeHost=/192.168.232.23:10911, msgId=C0A8E81700002A9F000000000012774A, commitLogOffset=1210186, bodyCRC=2067809241, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=214, KEYS=OrderID188, WAIT=true, TAGS=TagB}, body=14]]


详细日志请查看原文。


可以看到,Consumer 在 broker-a 宕机时间的附近,也出现了异常,connect to <192.168.232.23:10911> failed。虽然还能保持分区上的顺序性,但是已经某种程度上出现了一些紊乱,例如,将我在实验之前的数据给取了出来(Hello MetaQ的消息)。可是,我在实验前,明明做过删除这个 Topic 的动作,看来 RocketMQ 所谓的删除,并未删除 Topic 的数据。之后,broker-a 主机重启之后,又恢复正常。


RocketMQ Pull模式消费需要手动管理 offset 和指定分区,这个在调用的时候不觉得,实际运行的时候才会发现每次总是消费一个分区,消费完之后,才开始消费下一个分区,而下一个分区可能已经堆积了很多消息了,手动做消息分配又比较费事。或许,Push 顺序模式消费才是更好的选择。


另外还有几个比较异常的情况,实验中有几次出现了 CODE: 17 DESC: topic[TopicTest1] not exist, apply first please! 这样的错误,实际上,这时候我只是关掉了 Producer;还有,sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1 明明文档中说可以用来新增 Topic,而实际上不行。


补充一下:之后,我又使用 Push 顺序模式消费重做了上述实验,结论差不多。只是因为有多线程的原因,日志看起来偶尔有错位,这个问题不大,可以解决。而且,在关闭重启 Broker 的附近,往往伴随着多次的消息重发,不过,RocketMQ 也不保证消息只收到一次就是了。消息重复的问题,Kafka 要比 RocketMQ 显得不那么严重一些。Push 顺序模式消费不需要指定 offset,不需要指定分区,第二次启动可以自动从前一次的 offset 后开始消费。功能上这个与 Kafka 的 Consumer 更类似,虽然 RocketMQ 采用的是异步模式。


RocketMQ 最佳实践


实际上,RocketMQ 自己就有一份《RocketMQ 最佳实践》的文档,里面提到了一些系统设计的问题,例如消费者要幂等,一个应用对应一个 Topic,如此等等。这些经验不仅仅是对 RocketMQ 有用,对 Kafka 也颇有借鉴意义。


后记


这里谈谈我对选择 RocketMQ 还是 Kafka 的个人建议。以上已经做了多处 RocketMQ 和 Kafka 的对比,我个人觉得,Kafka 是一个不断发展中的系统,开源社区比 RocketMQ 要大,也要更活跃一些;另外,Kafka 最新版本已经有了同步复制,消息可靠性更有保障;还有,Kafka 的分区机制,几乎实现了自动负载均衡,这绝对是个杀手级特性;RocketMQ 虽然提供了很多易用的功能,远超出 Kafka,但这些功能并不一定都能用得上,而且多数可以绕过。相比之下,Kafka 的基本功能更加吸引我,再处理故障恢复的时候,细节上要胜过 RocketMQ。当然,如果是 A 公司内部,或者所在公司使用了 A 公司的云产品,那么 RocketMQ 的企业级特性更多一些,或许我会选择 RocketMQ。


系列



【关于投稿】


如果大家有原创好文投稿,请直接给公号发送留言。


① 留言格式:
【投稿】+《 文章标题》+ 文章链接

② 示例:
【投稿】《不要自称是程序员,我十多年的 IT 职场总结》:http://blog.jobbole.com/94148/

③ 最后请附上您的个人简介哈~



看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

以上是关于说说 MQ 之 RocketMQ ( 三 )的主要内容,如果未能解决你的问题,请参考以下文章

说说 MQ 之 RocketMQ ( 一 )

阿里面试:说说你项目里使用的 MQ ,分布式系统中 MQ 作用?

微服务异步架构---MQ之RocketMQ

消息队列之RabbitMQ

#yyds干货盘点#Alibaba中间件技术系列「RocketMQ技术专题」让我们一同来看看RocketMQ和Kafka索引设计

MQ之主流MQ:kafaka+RocketMQ+RabbitMQ对比