Alpakka kafka 消费者抵消
Posted
技术标签:
【中文标题】Alpakka kafka 消费者抵消【英文标题】:Alpakka kafka consumer offset 【发布时间】:2020-07-15 02:35:44 【问题描述】:我在 scala 中使用 Alpakka-kafka 来使用 Kafka 主题。这是我的代码:
val kafkaConsumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaConfig.server)
.withGroupId(kafkaConfig.group)
.withProperties(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "100",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
)
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
.runWith(Sink.foreach(println))
但是,消费者只从主题中第一条未提交的消息开始轮询。无论提交的消息如何,我都希望始终从偏移量 0 开始。 使用 Alpakka 消费者,如何手动指定偏移量?
【问题讨论】:
【参考方案1】:我想你想添加几个配置条目:
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> False
所以你的工作永远不会保存任何偏移量
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
所以你的工作从头开始。
如果您的作业过去已经提交了偏移量,您可能需要将其偏移量重置为最早。
【讨论】:
以上是关于Alpakka kafka 消费者抵消的主要内容,如果未能解决你的问题,请参考以下文章