spring Kafka 模型不在受信任的包中

Posted

技术标签:

【中文标题】spring Kafka 模型不在受信任的包中【英文标题】:spring Kafka model is not in the trusted packages 【发布时间】:2019-07-05 10:28:42 【问题描述】:

我正在使用 spring-Kafka-2.1.5spring-boot-2.0.5 开发微服务

第一个服务会向 kafka 生成一些消息,第二个服务会消费它们,而消费我遇到了问题

Caused by: java.lang.IllegalArgumentException: The class 'com.service1.model.TopicMessage' is not in the trusted packages: [java.util, java.lang, com.service2.model.ConsumeMessage]. 
If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

所以从错误消息来看,这是com.service1.model.TopicMessage service1 的序列化模型。但我正在尝试将消息反序列化为模型com.service2.model.ConsumeMessage,该模型在 service2 中并遇到此问题

我发现了同样的问题here,并尝试了以下格式以及文档docs

下面是我的配置

  @Bean(name = "kafkaConsumerConfig")
   public Map<String, Object> kafkaConsumerConfig() 

    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetconfig);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);

     

kafkaConsumerFactory

@Bean(name = "kafkaConsumerFactory")
public ConsumerFactory<String, ConsumeMessage> kafkaConsumerFactory()        

    JsonDeserializer<ConsumeMessage> 
    deserializer = new JsonDeserializer<>();
    deserializer.addTrustedPackages("com.service2.model");
return new DefaultKafkaConsumerFactory<String, ConsumeMessage>(kafkaConsumerConfig(),new StringDeserializer(),deserializer);


kafkaListenerContainerFactory

 @Bean(name = "kafkaListenerContainerFactory")
 public ConcurrentKafkaListenerContainerFactory<String, ConsumeMessage > kafkaListenerContainerFactory() 

    ConcurrentKafkaListenerContainerFactory<String, ConsumeMessage > factory = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConcurrency(Integer.parseInt(threads));
    factory.setBatchListener(true);
    factory.setConsumerFactory(kafkaConsumerFactory());
    factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
    factory.getContainerProperties().setAckMode(AckMode.BATCH);

    return factory;

【问题讨论】:

【参考方案1】:

您需要在反序列化器中禁用header precedence

/**
 * Construct an instance with the provided target type, and
 * useHeadersIfPresent with a default @link ObjectMapper.
 * @param targetType the target type.
 * @param useHeadersIfPresent true to use headers if present and fall back to target
 * type if not.
 * @since 2.2
 */
public JsonDeserializer(Class<? super T> targetType, boolean useHeadersIfPresent) 

useHeadersIfPresent 参数必须配置为false。这样一来,inferred 类型将被使用,而标头值将被忽略。

如果你不使用spring-kafka-2.2,你应该考虑用类似的逻辑实现你自己的JsonDeserializer:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java

【讨论】:

这与这个问题无关。虽然不清楚您对此事的关注点是什么:您只需将 AckMode 配置迁移到该新样式即可。 是的,先生,这是一个愚蠢的问题,但我可以知道为什么这个 addTrustedPackages("com.service2.model"); 对我不起作用 因为您在 com.service1.model 上失败了,这是完全不同的包,并且可能在服务 #2 中不存在。 您也可以使用StringDeserializer(或BytesDeserialier) in conjunction with a StringJsonMessageConverter` (或BytesJsonMessageConverter),框架将根据@KafkaListener方法签名自动确定目标类型。只需添加转换器作为@Bean 并且启动会将其连接。 没错。 JSON字符串的转换将由MessageConverter代替。

以上是关于spring Kafka 模型不在受信任的包中的主要内容,如果未能解决你的问题,请参考以下文章

信任库中信任哪些证书?

Spring kstreams 无法让处理器工作 - 类“[B”不在受信任的包中

中信任文件 I/O 权限

kafkak配置仅允许受信任的JNDI连接如何配置?

Azure 托管服务总线:“X.509 证书 CN=servicebus.windows.net 不在受信任的人员存储中。”

Spring SAML 握手失败 - 无法针对受信任的密钥验证不受信任的凭据