Alpakka kafka 消费者抵消

Posted

技术标签:

【中文标题】Alpakka kafka 消费者抵消【英文标题】:Alpakka kafka consumer offset 【发布时间】:2020-07-15 02:35:44 【问题描述】:

我在 scala 中使用 Alpakka-kafka 来使用 Kafka 主题。这是我的代码:

    val kafkaConsumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers(kafkaConfig.server)
        .withGroupId(kafkaConfig.group)
        .withProperties(
          ConsumerConfig.MAX_POLL_RECORDS_CONFIG       -> "100",
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
        )

    Consumer
        .plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
        .runWith(Sink.foreach(println))

但是,消费者只从主题中第一条未提交的消息开始轮询。无论提交的消息如何,我都希望始终从偏移量 0 开始。 使用 Alpakka 消费者,如何手动指定偏移量?

【问题讨论】:

【参考方案1】:

我想你想添加几个配置条目:

    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> False 所以你的工作永远不会保存任何偏移量

    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" 所以你的工作从头开始。

如果您的作业过去已经提交了偏移量,您可能需要将其偏移量重置为最早。

【讨论】:

以上是关于Alpakka kafka 消费者抵消的主要内容,如果未能解决你的问题,请参考以下文章

kafka——消费者原理解析

Kafka快速入门(Kafka消费者)

如何查看kafka消费者信息

kafka的消费者组该怎么删除

Kafka 系列—— Kafka 消费者详解

如何查看kafka消费者信息