Java API获取topic所占磁盘空间(Kafka 1.0.0)

Posted 大数据Kafka技术分享

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java API获取topic所占磁盘空间(Kafka 1.0.0)相关的知识,希望对你有一定的参考价值。

很多用户都有这样的需求:实时监控某个topic各分区在broker上所占的磁盘空间大小总和。Kafka并没有提供直接的脚本工具用于统计这些数据。

如果依然要实现这个需求,一种方法是通过监控JMX指标得到分区当前总的日志大小,然后手动相加所有分区的值得出;另一种方法就是使用1.0.0引入的DescribeLogDirsRequest请求。本文即介绍如何通过Java API获取某broker上某topic总的空间大小,代码如下:

package huxihx;


import org.apache.kafka.clients.admin.AdminClient;

import org.apache.kafka.clients.admin.AdminClientConfig;

import org.apache.kafka.clients.admin.DescribeLogDirsResult;

import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.common.requests.DescribeLogDirsResponse;


import java.util.Collections;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutionException;


public class TopicDiskSizeSummary {


    private static AdminClient admin;


    public static void main(String[] args) throws ExecutionException, InterruptedException {

        String brokers = "localhost:9092";

        initialize(brokers);

        try {

            long topic1InBroker1 = getTopicDiskSizeForSomeBroker("t2", 1);

            long topic2InBroker0 = getTopicDiskSizeForSomeBroker("t1", 0);

            System.out.println(topic1InBroker1);

            System.out.println(topic2InBroker0);

        } finally {

            shutdown();

        }

    }


    public static long getTopicDiskSizeForSomeBroker(String topic, int brokerID)

            throws ExecutionException, InterruptedException {

        long sum = 0;

        DescribeLogDirsResult ret = admin.describeLogDirs(Collections.singletonList(brokerID));

        Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();

        for (Map.Entry<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> entry : tmp.entrySet()) {

            Map<String, DescribeLogDirsResponse.LogDirInfo> tmp1 = entry.getValue();

            for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry1 : tmp1.entrySet()) {

                DescribeLogDirsResponse.LogDirInfo info = entry1.getValue();

                Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoMap = info.replicaInfos;

                for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicas : replicaInfoMap.entrySet()) {

                    if (topic.equals(replicas.getKey().topic())) {

                        sum += replicas.getValue().size;

                    }

                }

            }

        }

        return sum;

    }


    private static void initialize(String bootstrapServers) {

        Properties props = new Properties();

        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        admin = AdminClient.create(props);

    }


    private static void shutdown() {

        if (admin != null) {

            admin.close();

        }

    }

}


其中主要的方法是AdminClient.describeLogDirs(),它返回DescribeLogDirsResult实例,里面封装了给定broker上所有log.dirs路径下对应的分区的日志大小,将它们加到一起即可实现统计topic磁盘空间占用的功能。

以上是关于Java API获取topic所占磁盘空间(Kafka 1.0.0)的主要内容,如果未能解决你的问题,请参考以下文章

sharepoint 查看网站集所占磁盘空间

查询Mysql数据库所有数据库所占磁盘空间大小

java怎么获取一个对象所占的内存?类似于c的sizeof

mysql查看每个数据库所占磁盘大小

虚拟机磁盘瘦身(虚拟机内删除文件后,所占物理空间不变小)

Java API获取非compacted topic总消息数