Spark Confluent 模式注册表客户端 - 无法识别的字段“schemaType”

Posted

技术标签:

【中文标题】Spark Confluent 模式注册表客户端 - 无法识别的字段“schemaType”【英文标题】:Spark Confluent schema registry client - Unrecognized field "schemaType" 【发布时间】:2021-11-17 16:55:33 【问题描述】:

我在我的 spark 应用程序中使用 confluent-kafka-schemaregistry 客户端。我的用例是我想从注册表中读取 JSON 模式并在我的应用程序中处理它。

当我注册架构时,我使用参数schemaTypeJSON 作为值以及schema 参数。

但是,在我的 spark 应用程序中读取架构时,我遇到了以下异常。

[spark-listener-group-appStatus] ERROR test.JobListener - Batch failed:com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "schemaType" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"])
 at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@6a228f09; line: 1, column: 209] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["schemaType"])
    at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
    at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:855)
    at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1083)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1389)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1343)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:401)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1123)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2856)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)

我的代码是:

val restService = new RestService("http://localhost:8081")
val valueRestResponseSchema = restService.getLatestVersion("jsontesting1")
val jsonSchema = valueRestResponseSchema.getSchema

有人可以帮忙看看如何从这个有效载荷中提取schema

【问题讨论】:

【参考方案1】:

成功了!我使用了restService.getLatestVersionSchemaOnly("jsontesting1"),它只拾取了模式! 谢谢!

【讨论】:

以上是关于Spark Confluent 模式注册表客户端 - 无法识别的字段“schemaType”的主要内容,如果未能解决你的问题,请参考以下文章

Apache Camel Kafka 支持 Confluent 模式注册表

Flink 与 Confluent Kafka 模式注册表

企业版Spark Databricks + 企业版Kafka Confluent 联合高效挖掘数据价值

使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka

具有逻辑类型的 Avro 模式不能与最新的 confluent-kafka 一起使用

每个模式的 JDBC Confluent kafka 连接器和主题