使用 Spring Cloud Stream Kafka Binder 批量使用带有密钥的 Kafka 消息

Posted

技术标签:

【中文标题】使用 Spring Cloud Stream Kafka Binder 批量使用带有密钥的 Kafka 消息【英文标题】:Consuming Kafka messages with its key in batches using Spring Cloud Stream Kafka Binder 【发布时间】:2022-01-22 04:26:14 【问题描述】:

consuming them as batches时是否有可能获取kafka消息的key?

当使用Message<String> 作为我的消费者函数的输入时,我设法访问了消息键,但这仅适用于非批处理模式:

@SpringBootApplication
class KafkaSink 

    private val log = logger()

    @Bean
    fun sink() : Consumer<Message<String>> 
        return Consumer 
            log.info("key: $it.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] value: $it.payload")      
        
    


设置属性spring.cloud.stream.binding.sink.consumer.batch-mode=true时,我只能使用List&lt;String&gt;作为消费者的参数,而不是List&lt;Message&lt;String&gt;&gt;

【问题讨论】:

【参考方案1】:

使用Message&lt;List&lt;String&gt;&gt;;然后,KafkaHeaders.RECEIVED_MESSAGE_KEYList&lt;?&gt;,其顺序与有效负载相同 - 所有其他标头都相同。

编辑

在下面重新评论;默认内容类型为application/json。添加

      bindings:
        sink-in-0:
          content-type: text/plain

修正你的测试。

【讨论】:

不幸的是,我无法完成这项工作。在批处理模式下,只有 List&lt;String&gt; 有效,当使用 Message&lt;List&lt;String&gt;&gt; 时,会发生一些奇怪的 JSON 转换,这会导致列表中出现空值,正如您在运行 test 时在我的 example 中看到的那样。 在 Spring Cloud Function 中看起来有些不一致;默认内容类型为application/json;我不确定为什么在将内容类型设置为text/plain 时它可以与List&lt;String&gt; 一起正常工作。查看我的编辑 - 我会 ping SCF 人员。 github.com/spring-cloud/spring-cloud-stream/issues/2258

以上是关于使用 Spring Cloud Stream Kafka Binder 批量使用带有密钥的 Kafka 消息的主要内容,如果未能解决你的问题,请参考以下文章

spring-cloud-starter-eureka-server 和 spring-cloud-starter-netflix-eureka-server的区别

随手记录关于spring-cloud-starter-eureka-server 和 spring-cloud-starter-netflix-eureka-server

`spring-cloud-starter-eureka-server`和`spring-cloud-starter-netflix-eureka-server`之间的区别

spring-cloud-stream 请求-回复消息模式

使用 spring-boot:1.5.1 和 spring-cloud-stream 时无法启动 bean 'inputBindingLifecycle'

spring-cloud-stream kafka 消费者并发