如何向普罗米修斯报告 Kafka Producer 的指标(使用 Spring Boot)

Posted

技术标签:

【中文标题】如何向普罗米修斯报告 Kafka Producer 的指标(使用 Spring Boot)【英文标题】:How can I Report Kafka Producer's metrics to prometheus(using spring boot) 【发布时间】:2019-09-18 20:46:03 【问题描述】:

我正在使用 spring-integration 来处理从 UDP 端点到 kafka 的数据流。我已经使用消费者和生产者配置在@Configuration 中将replyingKafkaTemplate 初始化为@Bean。当我的服务器启动并发送一些 udp 请求后,我可以看到消费者的指标。但是,即使在生产者配置中设置了 jmx 报告器,我也看不到生产者的指标。

我尝试不设置生产者指标报告器,假设它会自动显示为消费者指标所做的(没有额外的配置)

生产者配置

Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                KafkaAvroSerializer.class);
        configProps.put("schema.registry.url", "http://schema-regisry-server:8081");
        configProps.put(
                ProducerConfig.RETRIES_CONFIG,
                3);
        configProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 500);
        configProps.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 5000);
        configProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
        configProps.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO");

        printConfigProps(configProps);
        return new DefaultKafkaProducerFactory<>(configProps);

消费者配置

Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put("schema.registry.url", "http://schema-regisry-server:8081");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-integration");
        // automatically reset the offset to the earliest offset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return properties;

kafka 模板创建

@Bean
    public ReplyingKafkaTemplate<String, DataModel, DataModel> replyKafkaTemplate(ProducerFactory<String, DataModel> pf, KafkaMessageListenerContainer<String, DataModel> container) 
        ReplyingKafkaTemplate<String, DataModel, DataModel> template = new ReplyingKafkaTemplate<>(pf, container);
        template.start();
        return template;
    

监听器容器创建:

@Bean
    public KafkaMessageListenerContainer<String, DataModel> replyContainer(ConsumerFactory<String, DataModel> cf) 
        ContainerProperties containerProperties = new ContainerProperties(destinationTopic);
        containerProperties.setGroupId("test");
        return new KafkaMessageListenerContainer<>(cf, containerProperties);
    

ConsumerFactory 创建

@Bean
    public ConsumerFactory<?, ?> consumerFactory() 
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    

【问题讨论】:

对,我已经在同一个@configuration类中的bean中定义了模板和生产者。 我与 Micrometer 维护人员交谈,他说在 spring boot 2.1.x 中没有实现 kafka 生产者统计信息。所以,看来你要么必须等待它被实现,要么自己实现它(并希望回馈它)。 【参考方案1】:

默认情况下,2.3.0 之前的 Spring Boot 2 版本仅公开消费者指标。 Spring Boot 2.3.0(几周前发布)依赖于 Micrometer 1.4,默认情况下同时公开消费者和生产者指标。 如果您不能使用最新版本的 Spring Boot,则必须自己实现。

【讨论】:

您能指出某个文档或链接吗? Spring Boot 仍然只提到 Kafka 消费者指标。 docs.spring.io/spring-boot/docs/current/reference/htmlsingle/… github.com/spring-projects/spring-boot/wiki/… 谢谢!由于我使用的是自定义生产者工厂,因此我必须添加 factory.addListener(new MicrometerProducerListener&lt;&gt;(meterRegistry)); 才能显示指标。官方文档还需要更新。 是的,在您发表评论后,我检查了文档并发现它丢失了,打开了一个问题并修复了 (github.com/spring-projects/spring-boot/issues/21910)。将在下一次构建的文档中显示:)

以上是关于如何向普罗米修斯报告 Kafka Producer 的指标(使用 Spring Boot)的主要内容,如果未能解决你的问题,请参考以下文章

Producer机制

Kafka生产者producer简要总结

Shell 直接向kafka 的Topic发送消息

Shell 直接向kafka 的Topic发送消息

如何使用 JMX 导出器将 JMX 指标从 Kafka 消费者推送到普罗米修斯

MQkafka——kafka消费者如何消费的?如何防止重复消费?如何顺序消费?