无法将 Catalyst 类型 IntegerType 转换为 Avro 类型 ["null","int"]

Posted

技术标签:

【中文标题】无法将 Catalyst 类型 IntegerType 转换为 Avro 类型 ["null","int"]【英文标题】:Cannot convert Catalyst type IntegerType to Avro type ["null","int"] 【发布时间】:2020-11-18 09:22:06 【问题描述】:

我使用 Pyspark 构建了 Spark Structured Streaming 流程,它从 kafka 主题中读取 avro 消息,进行一些转换并将数据作为 avro 加载到目标主题中。

我使用 ABRIS 包 (https://github.com/AbsaOSS/ABRiS) 序列化/反序列化来自 Confluent 的 Avro,并与 Schema Registry 集成。

架构包含如下整数列:


  "name": "total_images",
  "type": [
    "null",
    "int"
  ],
  "default": null
,

  "name": "total_videos",
  "type": [
    "null",
    "int"
  ],
  "default": null
,

该过程引发以下错误:Cannot convert Catalyst type IntegerType to Avro type ["null","int"].

我尝试将列转换为可为空,但错误仍然存​​在。

如果有人有建议,我将不胜感激

【问题讨论】:

【参考方案1】:

我在这个上花了几个小时

其实和 Abris 依赖无关(行为与原生 spark-avro apis 相同)

可能有几个根本原因,但就我而言……使用 Spark 3.0.1、Scala 和 Dataset:它与编码器和案例类处理数据中的错误类型有关。

简而言之,用 "type": ["null","int"] 定义的 avro 字段不能映射到 scala Int,它需要 Option[Int]

使用以下代码:

test("Avro Nullable field") 
val schema: String =
  """
    |
    | "namespace": "com.mberchon.monitor.dto.avro",
    | "type": "record",
    | "name": "TestAvro",
    | "fields": [
    |  "name": "strVal", "type": ["null", "string"],
    |  "name": "longVal",  "type": ["null", "long"]
    |  ]
    |
  """.stripMargin
val topicName = "TestNullableAvro"
val testInstance = TestAvro("foo",Some(Random.nextInt()))

import sparkSession.implicits._

val dsWrite:Dataset[TestAvro] = Seq(testInstance).toDS
val allColumns = struct(dsWrite.columns.head, dsWrite.columns.tail: _*)

dsWrite
  .select(to_avro(allColumns,schema) as 'value)
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrap)
  .option("topic", topicName)
  .save()

val dsRead:Dataset[TestAvro] = sparkSession.read
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrap)
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .load()
  .select(from_avro(col("value"), schema) as 'Metric)
  .select("Metric.*")
  .as[TestAvro]

assert(dsRead.collect().contains(testInstance))

如果case类定义如下:

case class TestAvro(strVal:String,longVal:Long)

无法将 Catalyst 类型 LongType 转换为 Avro 类型 ["null","long"]。 org.apache.spark.sql.avro.IncompatibleSchemaException:无法将 Catalyst 类型 LongType 转换为 Avro 类型 ["null","long"]。 在 org.apache.spark.sql.avro.AvroSerializer.newConverter(AvroSerializer.scala:219) 在 org.apache.spark.sql.avro.AvroSerializer.$anonfun$newStructConverter$1(AvroSerializer.scala:239)

它适用于:

case class TestAvro(strVal:String,longVal:Option[Long])

顺便说一句,在 Spark 编码器中支持 SpecificRecord 会非常好(您可以使用 Kryo,但效率较低) 因为,为了在我的 avro 数据中使用有效类型的数据集……我需要创建额外的案例类(与我的 SpecificRecords 重复)。

【讨论】:

以上是关于无法将 Catalyst 类型 IntegerType 转换为 Avro 类型 ["null","int"]的主要内容,如果未能解决你的问题,请参考以下文章

苹果iOS 14代码暗示:macOS Catalyst版Messages应用将至!

无法使用 Xcode 成功签署 Mac Catalyst 应用程序

Mac Catalyst 调整屏幕截图的窗口大小

UIMarkupTextPrintFormatter 和 Mac Catalyst

Mac Catalyst 无法使用 pressesBegan 覆盖捕获 .command 键修饰符

如何有条件地为 Catalyst 编译代码?