获取主题和分区偏移量

Posted

技术标签:

【中文标题】获取主题和分区偏移量【英文标题】:get the topic and partition offset 【发布时间】:2017-01-12 08:47:34 【问题描述】:

线程“main”中的异常 java.nio.channels.ClosedChannelException 在 kafka.network.BlockingChannel.send(BlockingChannel.scala:100) 在 kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) 在 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) 在 kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91) 在 kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68) 在 cmb.SparkStream.kafka.kafkaOffsetTool.getTopicOffsets(kafkaOffsetTool.java:47) 在 cmb.SparkStream.LogClassify.main(LogClassify.java:95) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我的代码是:

 public static Map<TopicAndPartition, Long> getTopicOffsets(String zkServers, String topic) 

  Map<TopicAndPartition, Long> retVals = new HashMap<TopicAndPartition, Long>();
  for (String zkserver : zkServers.split(",")) 
   SimpleConsumer simpleConsumer = new SimpleConsumer(zkserver.split(":")[0],
     Integer.valueOf(zkserver.split(":")[1]), Consts.getKafkaConfigBean().getDuration(), 1024,
     "consumser");
   TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Arrays.asList(topic));

   TopicMetadataResponse topicMetadataResponse = simpleConsumer.send(topicMetadataRequest);

   for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) 
    for (PartitionMetadata part : metadata.partitionsMetadata()) 
     Broker leader = part.leader();
     if (leader != null) 
      TopicAndPartition topicAndPartition = new TopicAndPartition(topic, part.partitionId());

      PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(
        kafka.api.OffsetRequest.LatestTime(), 10000);
      OffsetRequest offsetRequest = new OffsetRequest(
        ImmutableMap.of(topicAndPartition, partitionOffsetRequestInfo),
        kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
      OffsetResponse offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
      if (!offsetResponse.hasError()) 
       long[] offsets = offsetResponse.offsets(topic, part.partitionId());
       retVals.put(topicAndPartition, offsets[0]);
      
     

    

   
   simpleConsumer.close();
  
  return retVals;
 

【问题讨论】:

【参考方案1】:

我认为你可能把事情复杂化了。使用 org.apache.kafka.clients.consumer.KafkaConsumer (consumer here) 并做类似的事情

    val partitions = consumer.partitionsFor(topic).map[new TopicPartition(topic,it.partition)]
    consumer.assign(partitions)
    consumer.seekToEnd(partitions)
    val offsets = partitions.map[ it -> consumer.position(it)]
    println(offsets)

你会得到类似的结果

[topicname-8->1917258, topicname-2->1876810, topicname-5->1857012, topicname-4->3844, topicname-7->4043972, topicname-1->1811078, topicname-9- >12217819, topicname-3->3844, topicname-6->1430021, topicname-0->2808969]

【讨论】:

这是什么语言?斯卡拉?你能标记一下吗?它是什么' ?如“it.partition”。代码不完整 这是 xtend (eclipse.org/xtend)。 it 是闭包中的默认参数,与 groovy 中相同。您也可以将其视为伪代码,这里的想法是提供解决方案,而不是复制粘贴 sn-p

以上是关于获取主题和分区偏移量的主要内容,如果未能解决你的问题,请参考以下文章

如何获取 kafka 主题分区的最新偏移量?

什么命令显示 Kafka 中分区的所有主题和偏移量?

Kafka consumerGroup 丢失了所有分区中提交的偏移量信息,并从头开始消费偏移量

Kafka消费者偏移量

kafka-python KafkaConsumer 多分区提交偏移量

如何重置 Kafka 偏移量以匹配尾部位置?