无法将 Catalyst 类型 IntegerType 转换为 Avro 类型 ["null","int"]
Posted
技术标签:
【中文标题】无法将 Catalyst 类型 IntegerType 转换为 Avro 类型 ["null","int"]【英文标题】:Cannot convert Catalyst type IntegerType to Avro type ["null","int"] 【发布时间】:2020-11-18 09:22:06 【问题描述】:我使用 Pyspark 构建了 Spark Structured Streaming 流程,它从 kafka 主题中读取 avro 消息,进行一些转换并将数据作为 avro 加载到目标主题中。
我使用 ABRIS 包 (https://github.com/AbsaOSS/ABRiS) 序列化/反序列化来自 Confluent 的 Avro,并与 Schema Registry 集成。
架构包含如下整数列:
"name": "total_images",
"type": [
"null",
"int"
],
"default": null
,
"name": "total_videos",
"type": [
"null",
"int"
],
"default": null
,
该过程引发以下错误:Cannot convert Catalyst type IntegerType to Avro type ["null","int"].
我尝试将列转换为可为空,但错误仍然存在。
如果有人有建议,我将不胜感激
【问题讨论】:
【参考方案1】:我在这个上花了几个小时
其实和 Abris 依赖无关(行为与原生 spark-avro apis 相同)
可能有几个根本原因,但就我而言……使用 Spark 3.0.1、Scala 和 Dataset:它与编码器和案例类处理数据中的错误类型有关。
简而言之,用 "type": ["null","int"] 定义的 avro 字段不能映射到 scala Int,它需要 Option[Int]
使用以下代码:
test("Avro Nullable field")
val schema: String =
"""
|
| "namespace": "com.mberchon.monitor.dto.avro",
| "type": "record",
| "name": "TestAvro",
| "fields": [
| "name": "strVal", "type": ["null", "string"],
| "name": "longVal", "type": ["null", "long"]
| ]
|
""".stripMargin
val topicName = "TestNullableAvro"
val testInstance = TestAvro("foo",Some(Random.nextInt()))
import sparkSession.implicits._
val dsWrite:Dataset[TestAvro] = Seq(testInstance).toDS
val allColumns = struct(dsWrite.columns.head, dsWrite.columns.tail: _*)
dsWrite
.select(to_avro(allColumns,schema) as 'value)
.write
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap)
.option("topic", topicName)
.save()
val dsRead:Dataset[TestAvro] = sparkSession.read
.format("kafka")
.option("kafka.bootstrap.servers", bootstrap)
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.load()
.select(from_avro(col("value"), schema) as 'Metric)
.select("Metric.*")
.as[TestAvro]
assert(dsRead.collect().contains(testInstance))
如果case类定义如下:
case class TestAvro(strVal:String,longVal:Long)
无法将 Catalyst 类型 LongType 转换为 Avro 类型 ["null","long"]。 org.apache.spark.sql.avro.IncompatibleSchemaException:无法将 Catalyst 类型 LongType 转换为 Avro 类型 ["null","long"]。 在 org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) 在 org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)
它适用于:
case class TestAvro(strVal:String,longVal:Option[Long])
顺便说一句,在 Spark 编码器中支持 SpecificRecord 会非常好(您可以使用 Kryo,但效率较低) 因为,为了在我的 avro 数据中使用有效类型的数据集……我需要创建额外的案例类(与我的 SpecificRecords 重复)。
【讨论】:
以上是关于无法将 Catalyst 类型 IntegerType 转换为 Avro 类型 ["null","int"]的主要内容,如果未能解决你的问题,请参考以下文章
苹果iOS 14代码暗示:macOS Catalyst版Messages应用将至!
无法使用 Xcode 成功签署 Mac Catalyst 应用程序
UIMarkupTextPrintFormatter 和 Mac Catalyst