spring cloud stream kafka - 获取批量并错过心跳

Posted

技术标签:

【中文标题】spring cloud stream kafka - 获取批量并错过心跳【英文标题】:spring cloud stream kafka - fetches bulks and misses heartbeats 【发布时间】:2019-07-22 18:25:18 【问题描述】:

我正在查看一个 spring boot 服务,它从 apache kafka 读取消息,通过 http 从另一个服务请求消息指示的记录,处理它们,将一些数据保存到数据库中并将结果发布到另一个主题。

这是通过

完成的
@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)

这是在几个服务中完成的,通常工作得很好。唯一的属性集是

spring.cloud.stream.binder.consumer.concurrency=20

主题本身有 20 个分区,应该适合。

在监控来自 kafka 的读取时,我们发现吞吐量非常低且行为异常:

该应用一次最多可读取 500 条消息,然后 1-2 分钟无内容。在此期间,消费者反复记录“丢失心跳,因为分区已重新平衡”,“重新分配分区”,有时甚至会抛出异常说“提交失败,因为轮询间隔已过”

我们得出结论,这意味着消费者获取 500 条消息,处理所有消息需要很长时间,错过了时间窗口,因此无法将 500 条消息中的任何一条提交给代理 - 代理重新分配分区并重新发送重复相同的消息。

查看线程和文档后,我找到了“max.poll.records”属性,但在哪里设置此属性的建议相互冲突。

有人说要设置在下面

spring.cloud.stream.bindings.consumer.<input>.configuration

有人说

spring.cloud.stream.kafka.binders.consumer-properties

我尝试将两者都设置为 1,但服务行为没有改变。

我该如何正确处理消费者无法在默认设置下跟上所需轮询间隔的情况?

通用yaml:

spring.cloud.stream.default.group=$spring.application.name

服务-yaml

spring:
  clould:
    stream:
      default:
        consumer.headerMode: embeddedHeaders
        producer.headerMode: embeddedHeaders
      bindings:
       someOutput:
         destination: outTopic
       someInput:
         destination: inTopic
           consumer:
             concurrency: 30
      kafka:
        bindings:
          consumer:
            someInput:
              configuarion:
                max.poll.records: 20 # ConsumerConfig ignores this
              consumer:
                enableDlq: true
                configuarion:
                  max.poll.records: 30 # ConsumerConfig ignores this
          someInput:
            configuarion:
              max.poll.records: 20 # ConsumerConfig ignores this
            consumer:
              enableDlq: true
              configuarion:
                max.poll.records: 30 # ConsumerConfig ignores this
        binder:
          consumer-properties:
            max.poll.records: 10 # this gets used first
          configuration:
            max.poll.records: 40 # this get used when the first one is not present

“忽略这个”总是意味着,如果没有设置其他属性,ConsumerConfiguration 保持默认为 500 以获取最大轮询记录

编辑:我们已经接近了:

问题与设置了exponentialBackoffStrategy 的spring 重试有关 - 以及一系列错误有效地停止了应用程序。

我没有得到的是,我们通过向相关主题发布格式错误的消息来强制出现 200 个错误,这导致应用程序读取 200 个,花费很长时间(使用旧的重试配置),然后一次提交所有 200 个错误。

如果我们有这有什么意义

max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)

【问题讨论】:

您好,我也遇到了同样的问题。您能找到解决方法吗? 是的,答案是正确的 - 最大轮询记录数量过多会导致处理时间过长,从而使消费者超时,因此请将最大轮询设置得更低和/或增加轮询间隔 【参考方案1】:

这是

spring.cloud.stream.kafka.bindings.consumer.<input>.consumer.configuration.max.poll.records
.

见the documentation...

Kafka 消费者属性

以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.&lt;channelName&gt;.consumer. 为前缀

...

配置

使用包含通用 Kafka 消费者属性的键/值对映射。

默认值:空地图。

...

你也可以增加max.poll.interval.ms

编辑

我刚刚使用 2.1.0.RELEASE 进行了测试 - 它的工作原理与我描述的一样:

无设置

2019-03-01 08:47:59.560  INFO 44698 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 500
    ...

默认启动

spring.kafka.consumer.properties.max.poll.records=42

2019-03-01 08:49:49.197  INFO 45044 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 42
    ...

Binder 默认 #1

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.consumer-properties.max.poll.records=43

2019-03-01 08:52:11.469  INFO 45842 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

Binder 默认 #2

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43

2019-03-01 08:54:06.211  INFO 46252 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

绑定默认

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44

2019-03-01 09:02:26.004  INFO 47833 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 44
    ...

绑定特定

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

2019-03-01 09:05:01.452  INFO 48330 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 45
    ...

EDIT2

这是完整的测试应用。我只是在http://start.spring.io 创建了一个新应用程序并选择了“Kafka”和“Cloud Stream”。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So54932453Application 

    public static void main(String[] args) 
        SpringApplication.run(So54932453Application.class, args).close();
    

    @StreamListener(Sink.INPUT)
    public void listen(String in) 

    


spring.cloud.stream.bindings.input.group=so54932453

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>net.gprussell</groupId>
    <artifactId>so54932453</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>so54932453</name>
    <description>Demo</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>$spring-cloud.version</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>

【讨论】:

我也读到过,让我不确定的是,将“binder.consumer-properties”设置为 20,而您提到的设置为 10 - 实际出现在 org.apache 中的那个.kafka.clients.consumer.ConsumerConfig 是 20。而我们没有找到取值为 10 的配置。这两个属性是不相关的,只是碰巧被称为相同的吗? 有层次结构。引导 (spring.kafka.consumer.properties...) 然后是绑定默认值,然后是绑定默认值,然后是特定绑定,所以绑定应该获胜。您使用的是什么版本的活页夹?您可以编辑问题以显示您的.properties/.yml吗? 我更新了问题并添加了我现在尝试过的所有属性 - 它们都没有影响我的行为我总是得到这些“阅读很多,2分钟内什么都不读”循环 你没有回答我关于版本的问题;我刚刚对其进行了测试,它的工作原理与我描述的一样;查看我的答案的编辑。 抱歉,wenn 正在使用 boot 2.1.0 和 Cloud greenwich。有趣的是,我不明白这些 o.a.k.完全记录日志,我通过向 consumerconfig 构造函数添加断点来验证配置。我们只是缺少一个注释来激活一些自动配置吗?我们只是设置了 EnableBindings

以上是关于spring cloud stream kafka - 获取批量并错过心跳的主要内容,如果未能解决你的问题,请参考以下文章

Dispatcher 没有频道订阅者 - spring-cloud-stream-kafka

spring-cloud-stream kafka 消费者并发

Spring cloud kafka stream pitfalls

Spring cloud kafka stream pitfalls

spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息

带有Spring Cloud Stream的Kafka Streams进程中的Serd错误