卡夫卡消费者不返回任何事件

Posted

技术标签:

【中文标题】卡夫卡消费者不返回任何事件【英文标题】:Kafka consumer not returning any events 【发布时间】:2019-05-20 22:07:40 【问题描述】:

下面的 Scala kafka 消费者没有从 poll 调用返回任何事件。

但是,主题是正确的,我可以看到使用控制台消费者发送到主题的事件:

/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning

当我使用调试器单步执行并调用 kafkaConsumer.listTopics() 时,我还在下面的 Scala 代码示例中看到了该主题

另外,这是从单个单元测试中调用的,所以我只创建了这个特征和消费者的一个实例(即另一个消费者实例不能消费消息)。我也在使用随机 group_id。

下面的代码/配置有什么问题吗?

import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.ByteArrayDeserializer, StringDeserializer

import scala.util.Random

trait KafkaTest 

  val kafkaConsumerProperties = new Properties()

  kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")

  kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)

  kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])

  kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)

kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))

  def checkKafkaHasReceivedEvent(): Assertion = 

    val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
    ...
  

增加轮询超时也无济于事。

【问题讨论】:

【参考方案1】:

要从头开始读取 AUTO_OFFSET_RESET_CONFIG 属性必须设置为最早,默认为“最新”

kafkaConsumerProperties.put(
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
    OffsetResetStrategy.EARLIEST.toString().toLowerCase())

【讨论】:

谢谢,我现在看到返回的事件 - 但是,为什么我上面的代码返回的事件 (298) 少于控制台使用者 (400+)?上面的代码没有返回我的测试插入的新事件(虽然我在控制台消费者上看到了它)。也许我的 Scala 代码只从一个分区获取事件?有没有办法从所有分区获取事件? 我不得不在投票之前添加这一行,现在看来一切正常:kafkaConsumer.seekToBeginning(kafkaConsumer.assignment())

以上是关于卡夫卡消费者不返回任何事件的主要内容,如果未能解决你的问题,请参考以下文章

卡夫卡消费者配置

卡夫卡消费者不是从最新消息开始

卡夫卡消费者不阅读消息

如何暂停卡夫卡消费者?

卡夫卡动物园管理员的目的

消费者。如何指定要读取的分区? [卡夫卡]