Spring kstreams 无法让处理器工作 - 类“[B”不在受信任的包中

Posted

技术标签:

【中文标题】Spring kstreams 无法让处理器工作 - 类“[B”不在受信任的包中【英文标题】:Spring kstreams cannot get processor to work - The class '[B' is not in the trusted packages 【发布时间】:2021-08-23 02:52:12 【问题描述】:

完整代码:https://github.com/BenedictWHD/kstreams-example

所以我有一个生产者 (data-ingest)、处理器 (external-message-processor) 和消费者 (internal-message-processor (一旦我开始工作,这将成为一个处理器,所以很抱歉目前的命名,但是它是消费者))。

data-ingest 的工作原理据我所知,因为它向主题 external_messages 发送消息。

external-message-processor 尝试读取该主题,但失败并显示以下内容:

Caused by: java.lang.IllegalArgumentException: The class '[B' is not in the trusted packages: [java.util, java.lang, com.yetti.common.externalmessage, com.yetti.common.externalmessage.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

关于该主题的消息示例:

Headers: __TypeId__: [B, contentType: application/json, spring_json_header_types: "contentType":"java.lang.String""eyJpZCI6IjE4ZGQ2ODc4LWYwNWQtNDJiOC1iYTdlLTU2MDhmMTkzOWU3YyIsImV4dGVybmFsTWVzc2FnZVNvdXJjZSI6IlNNUyIsIm1lc3NhZ2VUeXBlIjoiVFJBTlNBQ1RJT04iLCJudW1iZXJGcm9tIjoiMSIsIm51bWJlclRvIjoiMiIsImNjeSI6Ik5UVEwiLCJxdWFudGl0eSI6IjIuNSJ9"

如您所见,TypeId 出于某种原因是“[B”。

我已为所有 3 个应用程序指定使用以下序列化器和反序列化器:

          serializer: org.springframework.kafka.support.serializer.JsonSerializer
          deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

当应用程序在生产者和消费者配置中启动时,只有data-ingest 一个似乎实际上使用了正确的序列化程序,我们有这个:

value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

从我在 application.yml 文件中看到的一切都应该如此,所以我不知道为什么它没有使用我指定的序列化器/解串器以及为什么处理器无法读取消息主题?

非常感谢任何帮助,因为这几天我一直在摸索。

一开始的 repo 包含所有配置文件、poms 和 docker-composer 文件来运行它。

编辑 - 处理器配置:


spring.cloud.stream:
  function:
    definition: processExternalMessage
  bindings:
    processExternalMessage-in-0:
      destination: external_messages
    processExternalMessage-out-0:
      destination: internal_messages
  kafka:
      bindings:
        processExternalMessage-out-0:
          producer:
            configuration:
              value:
                serializer: org.springframework.kafka.support.serializer.JsonSerializer
        processExternalMessage-in-0:
          consumer:
            configuration:
              value:
                deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

生产者配置:


spring.cloud.stream:
  function:
    definition: externalMessageProducer
  bindings:
    externalMessageProducer-out-0:
      destination: external_messages
  kafka:
      bindings:
        externalMessageProducer-out-0:
          producer:
            configuration:
              value:
                serializer: org.springframework.kafka.support.serializer.JsonSerializer

【问题讨论】:

【参考方案1】:

value.serializer 是一个平面配置属性名称。 value 不是带有 serializer 字段的嵌套对象,在 YAML 术语中

这就是为什么另一个似乎可以工作的原因

还值得指出的是Kstreams uses serde properties, not serializers directly

【讨论】:

谢谢,你有配置应该是什么样子的例子吗?我会将当前的内容添加到问题中 就像我说的,configuration:(换行符),value.serializer: com.spring...。你也不应该同时使用 yaml 和属性文件,因为我相信只有一个被读取 啊,知道了,应该是:configuration: value.serializer: org.springframework.kafka.support.serializer.JsonSerializer 而不是configuration: value: serializer: org.springframework.kafka.support.serializer.JsonSerializer 所以我现在需要做的就是实现 Serde 方法? 事实上,我想你不知道 kstreams 的一个很好的例子,它不仅仅使用盒装原语来处理在主题之间发送和处理的数据?

以上是关于Spring kstreams 无法让处理器工作 - 类“[B”不在受信任的包中的主要内容,如果未能解决你的问题,请参考以下文章

带有 Spring Boot 的 Kafka 流

KStream-KStream 内连接抛出 java.lang.ClassCastException

Kafka Streams API:KStream 到 KTable

Thymeleaf (Java Spring):无法让 mvc.uri 工作

将Kafka Streams代码迁移到Spring Cloud Stream吗?

如何处理 kafka KStream 并直接写入数据库而不是发送另一个主题