Spring-Kafka:反序列化 kafka 消息时出现问题 - 类不在“受信任的包”中?

Posted

技术标签:

【中文标题】Spring-Kafka:反序列化 kafka 消息时出现问题 - 类不在“受信任的包”中?【英文标题】:Spring-Kafka : Issue while deserialising kafka message - class not in a "trusted package"? 【发布时间】:2019-05-26 15:11:51 【问题描述】:

我得到以下异常,因为我从一个项目生产,而消费者从另一个项目消费。我怎样才能解决这个问题。显然,包装不一样。那么如何确保有正确的 json 序列化。

The class 'com.lte.assessment.assessments.AssessmentAttemptRequest' is not in the trusted packages: [java.util, java.lang, com.lte.assessmentanalytics.model

消费者配置

@EnableKafka
@Configuration
public class KafkaConfig 
    static Map<String, Object> config = new HashMap();

    static 
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    


      @Bean
public ConsumerFactory<String, AssessmentAttemptRequest> assessmentAttemptDetailsEntityConsumerFactory() 
    JsonDeserializer<AssessmentAttemptRequest> deserializer = new JsonDeserializer<>();
    deserializer.addTrustedPackages("com.lte.assessment.assessments");
    return new DefaultKafkaConsumerFactory(config, new StringDeserializer(), deserializer);



生产者配置

@Configuration
public class KafkaConfiguration 

    @Bean
    public ProducerFactory producerConfig() 
        Map<String, Object> config = new HashMap();

        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory(config);
    

    @Bean
    public KafkaTemplate kafkaTemplate() 
        return new KafkaTemplate(producerConfig());
    

 @Bean
    public ConcurrentKafkaListenerContainerFactory aaKafkaListenerFactory() 
        ConcurrentKafkaListenerContainerFactory<String, AssessmentAttemptDetailsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(assessmentAttemptDetailsEntityConsumerFactory());
        return factory;
    

【问题讨论】:

【参考方案1】:

您可以通过更改assessmentAttemptDetailsEntityConsumerFactory() 将您的包裹列入白名单,如下所示:

    @Bean
    public ConsumerFactory<String, AssessmentAttemptDetailsEntity> assessmentAttemptDetailsEntityConsumerFactory() 
            JsonDeserializer<AssessmentAttemptDetailsEntity> 
            deserializer = new JsonDeserializer<>();
            deserializer.addTrustedPackages("com.lte.assessment.assessments");//your package
        return new DefaultKafkaConsumerFactory(config,deserializer);
    

【讨论】:

我仍然遇到同样的问题。用代码编辑问题 看起来我添加的那个在其他地方被覆盖了。放在什么地方最好。 你检查过你的 application.property 吗? 在调试器中运行并在反序列化器中设置断点。 我在配置中有多个工厂导致了这个问题。现在修复它

以上是关于Spring-Kafka:反序列化 kafka 消息时出现问题 - 类不在“受信任的包”中?的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot、Spring-Kafka 和 Spring-Cloud 兼容性

如何使用 testcontainers 和 spring-kafka 准备测试

spring-kafka整合:KafkaTemplate-kafka模板类介绍

spring-kafka 固定事件迁移实现

使用 Spring-kafka 在 GC/消费者重新平衡时清理 Kafka Metric 计量器

如何使用 spring-kafka 暂停和恢复 @KafkaListener