Spark Confluent 模式注册表客户端 - 无法识别的字段“schemaType”
Posted
技术标签:
【中文标题】Spark Confluent 模式注册表客户端 - 无法识别的字段“schemaType”【英文标题】:Spark Confluent schema registry client - Unrecognized field "schemaType" 【发布时间】:2021-11-17 16:55:33 【问题描述】:我在我的 spark 应用程序中使用 confluent-kafka-schemaregistry 客户端。我的用例是我想从注册表中读取 JSON 模式并在我的应用程序中处理它。
当我注册架构时,我使用参数schemaType
和JSON
作为值以及schema
参数。
但是,在我的 spark 应用程序中读取架构时,我遇到了以下异常。
[spark-listener-group-appStatus] ERROR test.JobListener - Batch failed:com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "schemaType" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"])
at [Source: sun.net.www.protocol.http.HttpURLConnection$HttpInputStream@6a228f09; line: 1, column: 209] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["schemaType"])
at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:855)
at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1083)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1389)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1343)
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:401)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1123)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:298)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2856)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)
我的代码是:
val restService = new RestService("http://localhost:8081")
val valueRestResponseSchema = restService.getLatestVersion("jsontesting1")
val jsonSchema = valueRestResponseSchema.getSchema
有人可以帮忙看看如何从这个有效载荷中提取schema
。
【问题讨论】:
【参考方案1】:成功了!我使用了restService.getLatestVersionSchemaOnly("jsontesting1")
,它只拾取了模式!
谢谢!
【讨论】:
以上是关于Spark Confluent 模式注册表客户端 - 无法识别的字段“schemaType”的主要内容,如果未能解决你的问题,请参考以下文章
Apache Camel Kafka 支持 Confluent 模式注册表
企业版Spark Databricks + 企业版Kafka Confluent 联合高效挖掘数据价值
使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka