使用 Spring Cloud Stream Kafka Binder 批量使用带有密钥的 Kafka 消息
Posted
技术标签:
【中文标题】使用 Spring Cloud Stream Kafka Binder 批量使用带有密钥的 Kafka 消息【英文标题】:Consuming Kafka messages with its key in batches using Spring Cloud Stream Kafka Binder 【发布时间】:2022-01-22 04:26:14 【问题描述】:consuming them as batches时是否有可能获取kafka消息的key?
当使用Message<String>
作为我的消费者函数的输入时,我设法访问了消息键,但这仅适用于非批处理模式:
@SpringBootApplication
class KafkaSink
private val log = logger()
@Bean
fun sink() : Consumer<Message<String>>
return Consumer
log.info("key: $it.headers[KafkaHeaders.RECEIVED_MESSAGE_KEY] value: $it.payload")
设置属性spring.cloud.stream.binding.sink.consumer.batch-mode=true
时,我只能使用List<String>
作为消费者的参数,而不是List<Message<String>>
【问题讨论】:
【参考方案1】:使用Message<List<String>>
;然后,KafkaHeaders.RECEIVED_MESSAGE_KEY
是 List<?>
,其顺序与有效负载相同 - 所有其他标头都相同。
编辑
在下面重新评论;默认内容类型为application/json
。添加
bindings:
sink-in-0:
content-type: text/plain
修正你的测试。
【讨论】:
不幸的是,我无法完成这项工作。在批处理模式下,只有List<String>
有效,当使用 Message<List<String>>
时,会发生一些奇怪的 JSON 转换,这会导致列表中出现空值,正如您在运行 test 时在我的 example 中看到的那样。
在 Spring Cloud Function 中看起来有些不一致;默认内容类型为application/json
;我不确定为什么在将内容类型设置为text/plain
时它可以与List<String>
一起正常工作。查看我的编辑 - 我会 ping SCF 人员。
github.com/spring-cloud/spring-cloud-stream/issues/2258以上是关于使用 Spring Cloud Stream Kafka Binder 批量使用带有密钥的 Kafka 消息的主要内容,如果未能解决你的问题,请参考以下文章
spring-cloud-starter-eureka-server 和 spring-cloud-starter-netflix-eureka-server的区别
随手记录关于spring-cloud-starter-eureka-server 和 spring-cloud-starter-netflix-eureka-server
`spring-cloud-starter-eureka-server`和`spring-cloud-starter-netflix-eureka-server`之间的区别
使用 spring-boot:1.5.1 和 spring-cloud-stream 时无法启动 bean 'inputBindingLifecycle'