RockeMQ通过代码监控消费者状态
Posted quchunhui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RockeMQ通过代码监控消费者状态相关的知识,希望对你有一定的参考价值。
==背景==
物联网场景,在设备端写了一个小的API服务程序,这个程序包括:
1、向平台上报设备数据
2、创建消费者客户端,用来监听平台的下行命令
==问题==
平台层需要知道设备的状态:在线 or 离线。我能想到的解决办法
1、设备上报心跳数据,平台通过心跳来判断设备是否在线。
2、rocketmq应该有可以监控消费者状态的命令,是否可以通过这个命令实现。
方案1肯定是没有问题的,不过缺点就是需要在平台上写状态管理的代码,麻烦不说,可能还有延迟。
于是想尝试方法2是否可行。
==践行过程==
首先,我观察了rocketmq-console(RocketMQ的Web界面,需要独立部署),发现可以通过Web界面查看消费者状态,结果如图:
通过浏览器的控制台日志,可以看到调用的是consumerConnection.query接口。
很好,我是否可以借鉴一下这个思路,去监听消费者状态呢。
按照这个思路走,去github上找了源码:https://github.com/apache/rocketmq-externals
通过查看他们的源码,才知道RocketMQ已经提供了供查看消费者链接信息的API。
==API示例==
需要引入新的pom文件rocketmq-tools、rocketmq-common,增加只有,所有的pom为
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-store</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-acl</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-tools</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.0</version> </dependency>
Java代码示例
package admin; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.protocol.body.ConsumerConnection; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; public class AdminExtSample { public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException, RemotingException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(); defaultMQAdminExt.setNamesrvAddr("101.132.242.90:9876;47.116.50.192:9876"); defaultMQAdminExt.start(); ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo("device_cg_notice_down"); System.out.println(cc.toString()); defaultMQAdminExt.shutdown(); } }
这样就可以获取上面web页面中的所有信息了。
--END--
以上是关于RockeMQ通过代码监控消费者状态的主要内容,如果未能解决你的问题,请参考以下文章
阿里二面:RockeMQ 是如何通过 mmap 大幅提升单机吞吐量的?
阿里二面:RockeMQ 是如何通过 mmap 大幅提升单机吞吐量的?