RockeMQ通过代码监控消费者状态

Posted quchunhui

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RockeMQ通过代码监控消费者状态相关的知识,希望对你有一定的参考价值。

==背景==

 物联网场景,在设备端写了一个小的API服务程序,这个程序包括:

1、向平台上报设备数据

2、创建消费者客户端,用来监听平台的下行命令

 

==问题==

平台层需要知道设备的状态:在线  or  离线。我能想到的解决办法

1、设备上报心跳数据,平台通过心跳来判断设备是否在线。

2、rocketmq应该有可以监控消费者状态的命令,是否可以通过这个命令实现。

方案1肯定是没有问题的,不过缺点就是需要在平台上写状态管理的代码,麻烦不说,可能还有延迟。

于是想尝试方法2是否可行。

 

==践行过程==

首先,我观察了rocketmq-console(RocketMQ的Web界面,需要独立部署),发现可以通过Web界面查看消费者状态,结果如图:

技术图片

 

通过浏览器的控制台日志,可以看到调用的是consumerConnection.query接口。

很好,我是否可以借鉴一下这个思路,去监听消费者状态呢。技术图片

 

按照这个思路走,去github上找了源码:https://github.com/apache/rocketmq-externals

通过查看他们的源码,才知道RocketMQ已经提供了供查看消费者链接信息的API。

 

==API示例==

需要引入新的pom文件rocketmq-tools、rocketmq-common,增加只有,所有的pom为

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-store</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-tools</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-common</artifactId>
    <version>4.5.0</version>
</dependency>

 

 

Java代码示例

package admin;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

public class AdminExtSample {
    public static void main(String[] args)
        throws MQClientException, InterruptedException, MQBrokerException, RemotingException {
        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
        defaultMQAdminExt.setNamesrvAddr("101.132.242.90:9876;47.116.50.192:9876");
        defaultMQAdminExt.start();

        ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo("device_cg_notice_down");
        System.out.println(cc.toString());

        defaultMQAdminExt.shutdown();

    }
}

 

这样就可以获取上面web页面中的所有信息了。

 

--END--

 

以上是关于RockeMQ通过代码监控消费者状态的主要内容,如果未能解决你的问题,请参考以下文章

阿里二面:RockeMQ 是如何通过 mmap 大幅提升单机吞吐量的?

阿里二面:RockeMQ 是如何通过 mmap 大幅提升单机吞吐量的?

kafka消费者状态检查—消费的offset是不是滞后/堆积

VSCode自定义代码片段13——Vue的状态大管家

VSCode自定义代码片段13——Vue的状态大管家

VSCode自定义代码片段13——Vue的状态大管家