如何在 KafkaStreams api 中添加主题的健康检查

Posted

技术标签:

【中文标题】如何在 KafkaStreams api 中添加主题的健康检查【英文标题】:How to add health check for topics in KafkaStreams api 【发布时间】:2020-04-29 08:10:57 【问题描述】:

我有一个关键的 Kafka 应用程序需要一直启动和运行。源主题由 debezium kafka connect for mysql binlog 创建。不幸的是,此设置可能会出现许多问题。很多时候 debezium 连接器失败并需要重新启动,然后我的应用程序也是如此(因为没有抛出任何异常它只是挂断并停止消耗)。我手动测试和发现故障的方法是检查 kibana 日志,然后通过终端消费可疑主题。我可以在代码中模仿这一点,但显然不是最佳实践。我想知道KafkaStream api中是否有能力让我做这样的健康检查,并检查kafka集群的其他部分? 困扰我的另一点是,当连接器再次启动时,我是否可以保持流活跃并重新加入主题。

【问题讨论】:

你能在 Elasticsearch/Kibana 中设置警报吗?如果您想自动化运行状况检查,请专注于设置这些以重新启动服务 @cricket_007 这是我的后备计划,但现在似乎是我唯一的方法。感谢您的建议。 【参考方案1】:

您可以检查Kafka Streams State 以查看它是否正在重新平衡/运行,这表明运行状况良好。虽然,如果没有数据进入拓扑,我会假设不会发生错误,因此您需要查找上游依赖项的健康状况。

总体而言,听起来您可能需要花一些时间来使用 Consul 或 Sensu 等监控工具,这些工具可以运行本地服务健康检查并在服务出现故障时发出警报。或者至少Elasticseach alerting

就 Kafka 健康检查而言,您可以通过多种方式进行检查

    broker 和 zookeeper 进程是否正在运行? (SSH 到节点,检查进程) broker 和 zookeeper 端口是否打开? (使用 Socket 连接) 是否有可以跟踪的重要 JMX 指标? (Metricbeat) 你能找到一个活跃的控制器代理吗(使用AdminClient#describeCluster) 作为控制器元数据的一部分,您希望响应的代理的最低数量是否有要求(可以从AdminClient 获得) 您使用的主题配置是否正确? (保留、最小isr、复制因子、分区计数等)? (再次,使用AdminClient

【讨论】:

以上是关于如何在 KafkaStreams api 中添加主题的健康检查的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore

如何在单个 Kafka Streams 应用程序中连接到多个集群?

如何有效地链接来自 Kafka Streams 中平面 api 数据的 groupby 查询?

[翻译和注解]Kafka Streams简介: 让流处理变得更简单

好文推荐kafkastreams加时间窗口的count

如何使用kafka流处理块/批处理数据?