在Spring boot kafka中的ProducerConfigs中将ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG设置为IntegerSerializer时

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Spring boot kafka中的ProducerConfigs中将ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG设置为IntegerSerializer时相关的知识,希望对你有一定的参考价值。

我正在使用此Spring boot with kafka设置我的项目。但是当我运行它时,它会显示org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.Integer to class org.apache.kafka.common.serialization.StringSerializer specified in key.serializer Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String使用kafka模板发送请求时出现此异常。

@EmbeddedKafka(partitions = 1)
@SpringBootTest
class KafkaApplicationTests 

    @Autowired
    private MyKafkaListener listener;

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Test
    public void testSimple() throws Exception 
        template.send("annotated1", 0, "foo");
        template.flush();
        assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
    

    @Configuration
    @EnableKafka
    public class Config 

        @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String>
        kafkaListenerContainerFactory() 
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        

        @Bean
        public ConsumerFactory<Integer, String> consumerFactory() 
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        

        @Bean
        public Map<String, Object> consumerConfigs() 
            Map<String, Object> props = new HashMap<>();
//          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            return props;
        

        @Bean
        public ProducerFactory<Integer, String> producerFactory() 
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        

        @Bean
        public Map<String, Object> producerConfigs() 
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
//          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            return props;
        

        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate() 
            return new KafkaTemplate<Integer, String>(producerFactory());
        

    

但是当我更改template.send("annotated1", "key-foo", "foo");时它将起作用。我用过

@Bean
public Map<String, Object> producerConfigs() 
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return props;

同样是此配置。但这仍然给了我原因:java.lang.ClassCastException:java.lang.Integer无法转换为java.lang.String

[如果有人在那里请帮助我。谢谢

答案

您需要将主应用程序类和Config类添加到@SpringBootTest

@SpringBootTest(classes =  So61985794Application.class, So61985794ApplicationTests.Config.class )

以覆盖Boot的正常配置。

以上是关于在Spring boot kafka中的ProducerConfigs中将ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG设置为IntegerSerializer时的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 JMX 将 Spring Boot 应用程序中的 Kafka 指标公开给 Prometheus?

java——spring boot集成kafka——kafka集群中controller的作用

在Spring boot kafka中的ProducerConfigs中将ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG设置为IntegerSerializer时

java——spring boot集成kafka——broker主题分区副本——概念理解

Spring Boot App 拒绝连接到 Kafka 代理

解决 spring boot 访问 docker kafka 失败