使用 Spring Kafka 反序列化来自同一 Kafka 主题的不同 JSON 有效负载

Posted

技术标签:

【中文标题】使用 Spring Kafka 反序列化来自同一 Kafka 主题的不同 JSON 有效负载【英文标题】:Deserializing different JSON payload from same Kafka topic with Spring Kafka 【发布时间】:2019-06-17 20:48:27 【问题描述】:

我正在尝试反序列化来自同一 Kafka 主题的不同 JSON 有效负载。这里提出的其他问题引导我进行了第一次尝试,但我无法让它运行。

正如 Gary 提到的 (here) 有一些提示 (JsonSerializer.ADD_TYPE_INFO_HEADERS),但是当我发送和接收两条消息时,我得到了一个异常。

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.foo.message.ConsumerImpl.consumeSelf(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.util.Map<java.lang.String, java.lang.Object>,com.foo.message.KafkaMessage,org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.Object>)]
Bean [com.foo.message.ConsumerImpl@6df2a206]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.foo.message.KafkaMessageWithAdditionalField] to [com.foo.message.KafkaMessage] for GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers=kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481], failedMessage=GenericMessage [payload=com.foo.message.KafkaMessageWithAdditionalField@4e3168f7, headers=kafka_offset=22, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@c0e2fcf, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=fromBar, kafka_receivedTimestamp=1548310583481]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:292) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1207) [spring-kafka-2.2.2.RELEASE.jar:2.2.2.RELEASE]

...

LoggingErrorHandler 已经在 ConsumerRecord 中提到了一个(正确的)值。

2019-01-24 07:16:27.630 ERROR 27204 --- [ntainer#2-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = fromBar, partition = 0, offset = 22, CreateTime = 1548310583481, serialized key size = -1, serialized value size = 196, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = com.foo.bar.message.KafkaMessageWithAdditionalField@4e3168f7)

首先是我的配置:

@EnableKafka
@Configuration
public class KafkaConsumerConfig 
    @Value("$spring.kafka.bootstrap-servers")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, KafkaMessage> consumerFactoryMessage()
    
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);           
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new JsonDeserializer<>(KafkaMessage.class));
    

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> kafkaListenerMessageContainerFactory()
    
        ConcurrentKafkaListenerContainerFactory<String, KafkaMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryMessage());
        return factory;
    

    @Bean
    public ConsumerFactory<String, KafkaMessageWithAdditionalField> consumerFactoryMessageWithAdditionalField()
    
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new JsonDeserializer<>(KafkaMessageWithAdditionalField.class));
    

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaMessageWithAdditionalField> kafkaListenerMessageWithAdditionalFieldContainerFactory()
    
        ConcurrentKafkaListenerContainerFactory<String, KafkaMessageWithAdditionalField> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryMessageWithAdditionalField());
        return factory;
    

以下是听众:

    @KafkaListener(topicPartitions = @TopicPartition(partitions = "0", topic = "$foo.kafka.topic-springBoot"), containerFactory = "kafkaListenerMessageContainerFactory")
    public void consumeSelf(@Headers Map<String, Object> map, KafkaMessage message, ConsumerRecord<String, Object> cr)
    
        log.info("message received %s", message);
    

    @KafkaListener(topicPartitions = @TopicPartition(partitions = "0", topic = "$foo.kafka.topic-springBoot"), containerFactory = "kafkaListenerMessageWithAdditionalFieldContainerFactory")
    public void consumeSelfAdd(@Headers Map<String, Object> map, KafkaMessageWithAdditionalField message, ConsumerRecord<String, Object> cr)
    
        log.info("messageKafkaMessageWithAdditionalField received %s", message);
    

【问题讨论】:

【参考方案1】:

你不能那样做;您有 2 个不同的侦听器容器,其中的侦听器期望不同的对象。

对于接收不同类型的多个监听方法,需要在类级别使用@KafkaListener,在方法级别使用@KafkaHandler

见@KafkaListener on a Class。

在类级别使用@KafkaListener 时,您在方法级别指定@KafkaHandler。消息传递时,转换后的消息负载类型用于确定调用哪个方法。

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean 

    @KafkaHandler
    public void listen(String foo) 
        ...
    

    @KafkaHandler
    public void listen(Integer bar) 
        ...
    

    @KafkaHandler(isDefault = true`)
    public void listenDefault(Object object) 
        ...
    


默认方法是可选的,用于未知的负载类型。

但这仅适用于智能反序列化器(知道如何转换为不同的有效负载)。

或者,您可以将RecordFilterStrategy 添加到侦听器容器工厂以跳过每个侦听器中的其他记录。

【讨论】:

我明白了。谢谢。

以上是关于使用 Spring Kafka 反序列化来自同一 Kafka 主题的不同 JSON 有效负载的主要内容,如果未能解决你的问题,请参考以下文章

LocalDateTime 的自定义 spring-kafka 反序列化器

Spring-Kafka:反序列化 kafka 消息时出现问题 - 类不在“受信任的包”中?

如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?

在火花结构化流中反序列化 kafka avro 主题的 int 编码无效

Newtonsoft.json 序列化和反序列化基/继承,其中类来自共享项目

从Kafka主题消费消息时反序列化的问题