Kafka错误反序列化分区的键/值
Posted
技术标签:
【中文标题】Kafka错误反序列化分区的键/值【英文标题】:Kafka error deserializing key/value for partition 【发布时间】:2018-02-17 05:30:41 【问题描述】:当我发送到没有密钥的 Kafka 主题时,我有一个通过集成测试。但是,当我添加一个键时,我开始出现序列化错误。
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-1 at offset 0
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
这是我的发送者类:
public class Sender
private static final Logger LOG = LoggerFactory.getLogger(Sender.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send()
String topic = "topic";
String data = "data";
String key = "key";
LOG.info("sending to topic: '', key: '', data: ''", topic, key, data);
// does not work
kafkaTemplate.send(topic, key, data);
// works
// kafkaTemplate.send(topic, data);
这是我的配置,我为键指定一个 StringSerializer
@Configuration
@EnableKafka
public class Config
@Bean
public Sender sender()
return new Sender();
@Bean
public Properties properties()
return new Properties();
@Bean
public Map<String, Object> producerConfigs(Properties properties)
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
@Bean
public ProducerFactory<String, String> producerFactory(Properties properties)
return new DefaultKafkaProducerFactory<>(producerConfigs(properties));
@Bean
public KafkaTemplate<String, String> kafkaTemplate(Properties properties)
return new KafkaTemplate<>(producerFactory(properties));
这个问题可能与我测试中的消息监听器有关,但这也是全面使用字符串
@RunWith(SpringRunner.class)
@SpringBootTest()
@DirtiesContext
public class SenderIT
public static final Logger LOG = LoggerFactory.getLogger(SenderIT.class);
private static String SENDER_TOPIC = "topic";
@Autowired
private Sender sender;
private KafkaMessageListenerContainer<String, String> container;
private BlockingQueue<ConsumerRecord<String, String>> records;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
@Before
public void setUp() throws Exception
// set up the Kafka consumer properties
Map<String, Object> consumerProperties =
KafkaTestUtils.consumerProps("sender", "false", embeddedKafka);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<String, String>(consumerProperties);
// set the topic that needs to be consumed
ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);
// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>()
@Override
public void onMessage(ConsumerRecord<String, String> record)
LOG.debug("test-listener received message=''", record.toString());
records.add(record);
);
// start the container and underlying message listener
container.start();
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
@After
public void tearDown()
// stop the container
container.stop();
@Test
public void test() throws InterruptedException
sender.send();
// check that the message was received in Kafka
ConsumerRecord<String, String> kafkaTopicMsg = records.poll(10, TimeUnit.SECONDS);
LOG.debug("kafka recieved = ", kafkaTopicMsg);
assertThat(kafkaTopicMsg).isNotNull();
一如既往,我们将不胜感激。
所有要复制的代码都可以在https://github.com/LewisWatson/kafka-embedded-test/tree/8322621ad4e302d982e5ecd28af9fd314696d850获得
完整的堆栈跟踪可在https://travis-ci.org/LewisWatson/kafka-embedded-test/builds/273227986获得
【问题讨论】:
貌似是测试消息监听器 【参考方案1】:在进一步检查日志后,我能够将问题缩小到测试消息侦听器
2017-09-08 09:30:06.845 ERROR 2550 --- [ -C-1] essageListenerContainer$ListenerConsumer : Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic-1 at offset 0
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
https://travis-ci.org/LewisWatson/kafka-embedded-test/builds/273227986#L2961
出于某种原因,它看起来希望密钥是整数。
为消费者工厂显式设置字符串反序列化器解决了这个问题。
// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<String, String>(consumerProperties,
new StringDeserializer(), new StringDeserializer());
【讨论】:
我遇到了同样的问题!非常感谢你,刘易斯! :-) 对我也有帮助.. :) 这对我也有帮助。 @Bean public ConsumerFactory以上是关于Kafka错误反序列化分区的键/值的主要内容,如果未能解决你的问题,请参考以下文章
objectmapper.readValue() 失败并出现错误“没有从字符串值反序列化的字符串参数构造函数/工厂方法”