spring.cloud.stream.kafka.bindings.<channelName>.producer.configuration 未应用

Posted

技术标签:

【中文标题】spring.cloud.stream.kafka.bindings.<channelName>.producer.configuration 未应用【英文标题】:spring.cloud.stream.kafka.bindings.<channelName>.producer.configuration is not applied 【发布时间】:2021-09-19 01:44:29 【问题描述】:

我有一个用于流数据处理的小型应用程序。我从一个主题接收数据,处理并写入另一个主题。我将 spring cloud 和 apache kafka 用于我的 java 应用程序。 我想为生产者设置一些属性,比如 bufferSize。这是我的 application.yaml 的示例

spring:
  cloud:
    stream:
      bindings:
        point-output-channel:
          destination: DST4-topic
        point-input-channel:
          destination: SRC4-topic
      kafka:
        streams:
          bindings:
            point-output-channel:
              producer:
                bufferSize: 14000
          binder:
            brokers:  localhost:9092
            configuration:
              commit.interval.ms: 10000
              state.dir: state-store
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde

但是bufferSize的值并没有应用到生产者,可能是什么问题?谢谢。

【问题讨论】:

【参考方案1】:

从documentation 看来,您的配置有点错误。 Kafka 绑定属性应该是 spring.cloud.stream.kafka.bindings.&lt;channelName&gt;.producer 而不是 spring.cloud.stream.kafka.streams.bindings.&lt;channelName&gt;.producer

试试这个:

spring:
  cloud:
    stream:
      bindings:
        point-output-channel:
          destination: DST4-topic
        point-input-channel:
          destination: SRC4-topic
      kafka:
        bindings:
          point-output-channel:
            producer:
              bufferSize: 14000
        binder:
          brokers: localhost:9092
          configuration:
            commit.interval.ms: 10000
            state.dir: state-store
            default:
              key:
                serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              value:
                serde: org.apache.kafka.common.serialization.Serdes$StringSerde

【讨论】:

以上是关于spring.cloud.stream.kafka.bindings.<channelName>.producer.configuration 未应用的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Stream kafka绑定配置最大请求

了解 Spring Cloud Stream Kafka 和 Spring Retry

使用 Spring Cloud Stream Kafka Binder 批量使用带有密钥的 Kafka 消息

在 Spring Cloud Stream Kafka 中正确管理 DLQ

Spring Cloud Stream Kafka 是不是支持嵌入式标头?

Spring Cloud Stream Kafka 消费者模式