40 RocketMQ的生产实践总结

Posted 鮀城小帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了40 RocketMQ的生产实践总结相关的知识,希望对你有一定的参考价值。

1.灵活的运用tags来过滤数据

MQ中基于tags来过滤数据的功能,在真正的项目中,要合理的规划Topic和里面的tags,一个Topic代表了一类业务消息数据,然后对于这类业务消息数据,如果你希望能划分一些类别的话,可以在发送消息的时候设置 tags。

案例:如外卖平台有美团外卖、饿了么外卖等外卖,如果系统要发送外卖订单数据到MQ里去,就可以针对性的设置 tags ,比如不同的外卖数据都到一个 “WaimaiOrderTopic” 里去。

但是不同类型的外卖可以有不同的 tags : “meituan_waimai” 、“eleme_waimai”、"other_waimai" 等等。然后在消费“WaimaiOrderTopic” 的系统,可以根据 tags 来筛选。

2.基于消息 key 来定位消息是否丢失

前面讲过如何解决消息是否丢失的问题,如果消息真的丢失了,那么该怎么去排查该消息是否丢失?

MQ中可以基于消息key来实现查看消息是否丢失,比如通过设置一个消息的 key 为订单id: message.setKeys(orderId),这样这个消息就具备一个key了。

接着这个消息放到 broker 上,会基于 key 构建 hash 索引,这个 hash 索引就存放在 IndexFile 索引文件里。

后面就可以通过MQ提供的命令去根据 key 查询这个消息,类似这样:

mqadmin queryMsgByKey -n 127.0.0.1:9876 -t SCANRECORD -k orderId。

3. 消息零丢失方案的补充

在消息零丢失方案中,如果MQ集群整体故障了,那么该方案就不可用了。这时候,就需要有更高级别的高可用保障机制。

假设MQ集群彻底崩溃了,生产者应该把消息写入本地磁盘文件里去进行持久化,或者是写入数据库去暂存起来,等待MQ恢复后,再把持久化的消息继续投递到MQ里去。

4.提高消费者的吞吐量

如果生产的消息太多,或者消费的时候消费比较慢,就应该提高消费者的并行度,常见的就是部署更多的consumer机器。

要注意的是,Topic的MessageQueue数量不能少于consumer 数量,否则会有 consumer获取不到消息。

然后就是增加consumer的线程数量,可以设置 consumer端的参数: consumeThreadMin、consumeThreadMax,这样一台consumer机器上的消费线程越多,消费的速度就越快。

还可以开启消费者的批量消费功能,就是设置 consumeMessageBatchMaxSize参数,默认值是1,可以设置的多一些,那么一次就会交给消费者的回调函数一批消息来进行处理了,此时可以通过SQL语句一次性批量处理一些数据,比如: update xxx set xxx where id in (xx,xx,xx)。

5.要不要消费历史消息

consumer是支持设置从哪里开始消费消息的,常见的有两种:一个是从Topic的第一条数据开始消费,一个是从最后一次消费国的消息之后开始消费。对应的是:

CONSUME_FROM_LAST_OFFSET,

CONSUME_FROM_FIRST_OFFSET

一般来说,会选择 CONSUME_FROM_FIRST_OFFSET,这样刚开始就从Topic的第一条消息开始消费,但是以后每次重启,都是从上一次消费到的位置继续往后进行消费的。

以上是关于40 RocketMQ的生产实践总结的主要内容,如果未能解决你的问题,请参考以下文章

19 条 Node.js 生产环境中的最佳实践

RocketMQ消息队列的最佳实践

RocketMQ吐血总结

RocketMQ 千锤百炼哈啰在分布式消息治理和微服务治理中的实践

RocketMq简单的消费者和生产者(示例代码)

JAVA代码之RocketMQ生产和消费数据