Kafka错误反序列化分区的键/值

Posted

技术标签:

【中文标题】Kafka错误反序列化分区的键/值【英文标题】:Kafka error deserializing key/value for partition 【发布时间】:2018-02-17 05:30:41 【问题描述】:

当我发送到没有密钥的 Kafka 主题时,我有一个通过集成测试。但是,当我添加一个键时,我开始出现序列化错误。

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-1 at offset 0

Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

这是我的发送者类:

public class Sender 
    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send() 

        String topic = "topic";
        String data = "data";
        String key = "key";

        LOG.info("sending to topic: '', key: '', data: ''", topic, key, data);

        // does not work
        kafkaTemplate.send(topic, key, data);

        // works
        // kafkaTemplate.send(topic, data);

    

这是我的配置,我为键指定一个 StringSerializer

@Configuration
@EnableKafka
public class Config 

    @Bean
    public Sender sender() 
        return new Sender();
    

    @Bean
    public Properties properties() 
        return new Properties();
    

    @Bean
    public Map<String, Object> producerConfigs(Properties properties) 
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    

    @Bean
    public ProducerFactory<String, String> producerFactory(Properties properties) 
        return new DefaultKafkaProducerFactory<>(producerConfigs(properties));
    

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(Properties properties) 
        return new KafkaTemplate<>(producerFactory(properties));
    


这个问题可能与我测试中的消息监听器有关,但这也是全面使用字符串

@RunWith(SpringRunner.class)
@SpringBootTest()
@DirtiesContext
public class SenderIT 


    public static final Logger LOG = LoggerFactory.getLogger(SenderIT.class);

    private static String SENDER_TOPIC = "topic";

    @Autowired
    private Sender sender;

    private KafkaMessageListenerContainer<String, String> container;

    private BlockingQueue<ConsumerRecord<String, String>> records;

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

    @Before
    public void setUp() throws Exception 

        // set up the Kafka consumer properties
        Map<String, Object> consumerProperties =
            KafkaTestUtils.consumerProps("sender", "false", embeddedKafka);

        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // create a Kafka consumer factory
        DefaultKafkaConsumerFactory<String, String> consumerFactory =
            new DefaultKafkaConsumerFactory<String, String>(consumerProperties);

        // set the topic that needs to be consumed
        ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);

        // create a Kafka MessageListenerContainer
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        // create a thread safe queue to store the received message
        records = new LinkedBlockingQueue<>();

        // setup a Kafka message listener
        container.setupMessageListener(new MessageListener<String, String>() 
        @Override
        public void onMessage(ConsumerRecord<String, String> record) 
            LOG.debug("test-listener received message=''", record.toString());
            records.add(record);
        
        );

        // start the container and underlying message listener
        container.start();

        // wait until the container has the required number of assigned partitions
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
    

    @After
    public void tearDown() 
        // stop the container
        container.stop();
    

    @Test
    public void test() throws InterruptedException 

        sender.send();

        // check that the message was received in Kafka
        ConsumerRecord<String, String> kafkaTopicMsg = records.poll(10, TimeUnit.SECONDS);

        LOG.debug("kafka recieved = ", kafkaTopicMsg);

        assertThat(kafkaTopicMsg).isNotNull();

    


一如既往,我们将不胜感激。

所有要复制的代码都可以在https://github.com/LewisWatson/kafka-embedded-test/tree/8322621ad4e302d982e5ecd28af9fd314696d850获得

完整的堆栈跟踪可在https://travis-ci.org/LewisWatson/kafka-embedded-test/builds/273227986获得

【问题讨论】:

貌似是测试消息监听器 【参考方案1】:

在进一步检查日志后,我能够将问题缩小到测试消息侦听器

2017-09-08 09:30:06.845 ERROR 2550 --- [           -C-1] essageListenerContainer$ListenerConsumer : Container exception

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-1 at offset 0

Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4

https://travis-ci.org/LewisWatson/kafka-embedded-test/builds/273227986#L2961

出于某种原因,它看起来希望密钥是整数。

为消费者工厂显式设置字符串反序列化器解决了这个问题。

// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, String> consumerFactory =
    new DefaultKafkaConsumerFactory<String, String>(consumerProperties,
        new StringDeserializer(), new StringDeserializer());

【讨论】:

我遇到了同样的问题!非常感谢你,刘易斯! :-) 对我也有帮助.. :) 这对我也有帮助。 @Bean public ConsumerFactory consumerFactory() return new DefaultKafkaConsumerFactory(consumerConfigs()); 当我通过命令行进行测试时,它也可以正常工作,但是当生产者 API 发送一些数据时,我遇到了上述报告的问题。我做了这个改变。 DefaultKafkaConsumerFactory consumerFactory() return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(), new StringDeserializer()); 突然发生这种情况的原因是什么?

以上是关于Kafka错误反序列化分区的键/值的主要内容,如果未能解决你的问题,请参考以下文章

objectmapper.readValue() 失败并出现错误“没有从字符串值反序列化的字符串参数构造函数/工厂方法”

没有像默认构造一样的创建者):不能从对象值反序列化(没有基于委托或属性的创建者

如何将 xml 元素值反序列化为 C# 类属性

从Kafka主题消费消息时反序列化的问题

Jackson 使用枚举键、POJO 值反序列化为 Map

没有从字符串值反序列化的字符串参数构造函数/工厂方法('2018-12-14')