Kafka Streams:使用相同的 `application.id` 从多个主题消费

Posted

技术标签:

【中文标题】Kafka Streams:使用相同的 `application.id` 从多个主题消费【英文标题】:Kafka Streams: use the same `application.id` to consume from multiple topics 【发布时间】:2018-06-08 09:17:24 【问题描述】:

我有一个需要监听多个不同主题的应用程序;每个主题对于如何处理消息都有单独的逻辑。我曾想过为每个 KafkaStreams 实例使用相同的 kafka 属性,但我收到如下错误。

错误

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic

代码 (kotlin)

class KafkaSetup() 
    companion object 
        private val LOG = LoggerFactory.getLogger(this::class.java)
    

    fun getProperties(): Properties 
        val properties = Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
        return properties
    

    private fun listenOnMyTopic() 
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")

        kStream.foreach  key, value -> LOG.info("do stuff") 

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    

    private fun listenOnMyOtherTopic() 
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")

        kStream.foreach  key, value -> LOG.info("do other stuff") 

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    

我发现这个reference 表明您不能将application.id 用于多个主题,但是我发现很难找到支持它的参考文档。 documentation for application.id 声明:

流处理应用程序的标识符。在 Kafka 集群中必须是唯一的。它用作 1) 默认客户端 ID 前缀,2) 用于成员管理的组 ID,3) 变更日志主题前缀。

问题

    此错误是什么意思,是什么原因造成的。 鉴于您可以让应用程序的多个实例以相同的 id 运行以从多个主题分区消费,“在 Kafka 集群中必须是唯一的”是什么意思? 您能否使用相同的 Kafka 流 application.id 来启动两个列出不同主题的 KafkaStreams?如果有,怎么做?

详情: kafka 0.11.0.2

【问题讨论】:

【参考方案1】:

Kafka Streams 通过分区而不是主题进行扩展。因此,如果您使用相同的application.id 启动多个应用程序,则它们订阅的输入主题及其处理逻辑必须相同。应用程序使用application.id 作为group.id 形成一个消费者组,因此输入主题的不同分区被分配给不同的实例。

如果您有不同的主题具有相同的逻辑,您可以一次订阅所有主题(在您开始的每个实例中)。缩放仍然基于分区。 (它基本上是您输入主题的“合并”。)

如果您想通过主题进行扩展和/或具有不同的处理逻辑,您必须为不同的 Kafka Streams 应用程序使用不同的 application.id

【讨论】:

谢谢。是否有任何文档说明“具有相同应用程序的多个应用程序。id 它们在输入主题方面必须相同” 我不确定。请注意,适用于 v1.0 的 Streams 的 AK 文档在 atm 上进行了大量修改——欢迎通过邮件列表提出 cmets/建议。 出于好奇,假设您有两个服务实例从一个主题中使用,其中两个分区使用相同的应用程序 ID。如果其中一台服务器死机,那台仍在运行的服务器是否可以从两个分区获取消息,或者在已分配停机服务器的分区上执行消息,直到该消费者备份为止 如果一个实例发生故障,将触发重新平衡并将故障实例的分区重新分配给正在运行的实例,重新平衡后将同时处理这两个实例。当失败的实例重新上线时,另一个重新平衡将确保每个实例将再次处理一个分区(以确保负载平衡)。

以上是关于Kafka Streams:使用相同的 `application.id` 从多个主题消费的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams 与时间窗口聚合的问题

Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

如何限制kafka-streams中的rocksdb内存使用量

Akka Stream Kafka vs Kafka Streams

Kafka Streams应用程序在kafka服务器上打开了太多文件

是否可以使用 Kafka Streams 访问消息头?