监控Kafka主题的消费者数量

Posted

技术标签:

【中文标题】监控Kafka主题的消费者数量【英文标题】:Monitoring number of consumer for the Kafka topic 【发布时间】:2018-12-28 22:16:42 【问题描述】:

我们正在使用 Prometheus 和 Grafana 来监控我们的 Kafka 集群。

在我们的应用程序中,我们使用 Kafka 流,Kafka 流有可能由于异常而停止。我们正在记录事件setUnCaughtExceptionHandler,但是,当流停止时,我们还需要某种警报。

我们目前拥有的是,jmx_exporter 作为代理运行并通过端点公开 Kafka 指标,prometheus 从端点获取指标。

我们没有看到任何衡量每个主题的活跃消费者数量的指标。我们错过了什么吗?有关如何获取活跃消费者数量并在消费者停止时发送警报的任何建议。

【问题讨论】:

您的消费者是否作为操作系统服务运行? 没有。其带有 kafka 流的 Java 应用程序 您可以将 Java 应用程序作为 systemd 服务运行,可以轻松监控甚至在失败时重新启动。 我的服务不仅有流,还处理其他东西。如果出现异常,只有流停止,其他继续运行 也许添加自定义指标会有所帮助? docs.confluent.io/current/streams/… 【参考方案1】:

我们有类似的需求,并将每个分区的 Kafka Consumer Lag 添加到 Grafana 中,如果延迟超过指定阈值,还会添加警报(每个主题的阈值应该不同,具体取决于负载,例如,对于某些主题,它可能是 10,对于高负载 - 100000)。所以如果你有更多,例如1000 条未处理的消息,您将收到警报。

您可以为每个 kafka 流添加状态侦听器,如果流处于错误状态,记录错误或发送电子邮件:

kafkaStream.setStateListener((newState, oldState) -> 
    log.info("Kafka stream state changed [] >>>>> []", oldState, newState);
    if (newState == KafkaStreams.State.ERROR || newState == KafkaStreams.State.PENDING_SHUTDOWN) 
        log.error("Kafka Stream is in [] state. Application should be restarted", newState);
    
);

您还可以添加运行状况检查指示器(例如,通过 REST 端点或通过 spring-boot HealthIndicator),以提供流是否正在运行的信息:

KafkaStreams.State streamState = kafkaStream.state(); state.isRunning();

我还没有找到任何 kafka 流指标来提供有关活动消费者或可用连接分区的信息,但对我而言,如果 kafka 流提供此类数据会很好(并希望它在未来的版本中可用)。

【讨论】:

感谢 Vasiliy 的回答。我认为健康检查指标将是我的最佳选择。但仍然想知道,为什么这不是开箱即用的解决方案。 我猜它不是开箱即用的,因为 kafka 流提供当前状态信息和状态侦听器,由我们决定我们想要做什么,或者添加健康检查指示器 (它可以通过 spring-boot 轻松完成),或者在流未运行的情况下发送电子邮件。我同意它可以在未来开箱即用的spring-kafka 中实现:) 再次感谢瓦西里。希望spring-kafka也一样。 如果你使用 spring cloud kafka 流,那么它会在健康状态页面上显示所有监听器 KStream、KTable 和 GlobalKTable 的健康状态。对于某些 kafka 流出现故障,它显示状态为 Down。

以上是关于监控Kafka主题的消费者数量的主要内容,如果未能解决你的问题,请参考以下文章

KafKa消费者组重平衡能避免吗

具有动态数量的并行消费者的 Kafka 工作队列

使用javaApi监控 kafka 集群的环境下消费组的积压信息

kafka专栏消费者组数据积压的查看与处理方法

Python 基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控

Kafka集群监控安全机制与最佳实践