如何在默认情况下从 Kafka Spring Cloud Stream 消费并消费由 Confluent API 生成的 Kafka 消息?

Posted

技术标签:

【中文标题】如何在默认情况下从 Kafka Spring Cloud Stream 消费并消费由 Confluent API 生成的 Kafka 消息?【英文标题】:How to consume from Kafka Spring Cloud Stream by default and also consume a Kafka message generated by the confluent API? 【发布时间】:2019-01-15 18:48:06 【问题描述】:

我正在构建一个微服务组件,默认情况下它将使用由其他 (SCS) 组件生成的 Spring Cloud Stream (SCS) Kafka 消息。

但我还需要使用来自使用 Confluent API 的其他组件的 Kafka 消息。

我有一个示例存储库,它显示了我正在尝试做的事情。

https://github.com/donalthurley/KafkaConsumeScsAndConfluent

这是下面带有 SCS 输入绑定和融合输入绑定的应用程序配置。

spring:
  application:
    name: kafka
  kafka:
    consumer:
      properties.schema.registry.url: http://192.168.99.100:8081
  cloud:
    stream:
      kafka:
        binder:
          brokers: PLAINTEXT://192.168.99.100:9092
#          configuration:
#            specific:
#              avro:
#                reader: true
#            key:
#              deserializer: org.apache.kafka.common.serialization.StringDeserializer
#            value:
#              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

      bindings:
        inputConfluent:
          contentType: application/*+avro
          destination: confluent-destination
          group: input-confluent-group
        inputScs:
          contentType: application/*+avro
          destination: scs-destination
          group: input-scs-group

通过上述配置,我使用 SCS 默认配置创建了两个消费者 例如 org.apache.kafka.common.serialization.ByteArrayDeserializer 类是两个输入绑定的值反序列化器。

如果我删除上述配置中的 cmets,我会得到两个消费者,配置是从我的 Confluent 客户端发送的 例如 io.confluent.kafka.serializers.KafkaAvroDeserializer 类是两个输入绑定的值反序列化器。

我了解,因为配置在 Kafka 活页夹上,它将适用于使用该活页夹定义的所有消费者。

我有什么方法可以定义这些特定属性,以便它们仅适用于汇合特定消费者绑定,而所有其他输入绑定都可以使用默认的 SCS 配置?

【问题讨论】:

【参考方案1】:

您可以通过configuration 属性设置特定于绑定的消费者和生产者属性。

见the reference manual。

spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration.foo.bar=baz

使用非标准序列化器/反序列化器时,您必须分别为生产者和消费者设置useNativeEncodinguseNativeDecoding。再次,请参阅参考手册。

【讨论】:

我按照spring.cloud.stream.kafka.bindings.inputConfluent.consumer.configuration 下的配置和设置useNativeDecoding 的建议进行了上述更改,这很有效。见github.com/donalthurley/KafkaConsumeScsAndConfluent/commit/… 我想早先尝试此操作时,我将consumer properties和Kafka consumer properties这两个概念混为一谈 确实有通用的生产者/消费者属性(所有绑定器通用)和特定于绑定器的生产者/消费者属性。

以上是关于如何在默认情况下从 Kafka Spring Cloud Stream 消费并消费由 Confluent API 生成的 Kafka 消息?的主要内容,如果未能解决你的问题,请参考以下文章

如何在android studio 1.4的默认情况下从右侧制作导航菜单

Spring,如何在不使用附加转换器的情况下从字符串返回XML响应?

kafka的数据保留清除策略(绝对值得一看)

在使用 Oauth、SAML 和 spring-security 的多租户的情况下从 spring-security.xml 中获取错误

如何在没有默认 YUV2RGB->RGB2GRAY 转换的情况下从 H264 4:2:0 视频中读取 Y 分量

如何在没有kafka服务器的情况下运行spring boot