MongoTypeConversionException:即使显式架构不包含 NullTypes,也无法使用 Mongo Spark 连接器将 STRING 转换为 NullType

Posted

技术标签:

【中文标题】MongoTypeConversionException:即使显式架构不包含 NullTypes,也无法使用 Mongo Spark 连接器将 STRING 转换为 NullType【英文标题】:MongoTypeConversionException: Cannot cast STRING into a NullType with Mongo Spark Connector even when explicit schema does not contain NullTypes 【发布时间】:2021-08-12 10:55:14 【问题描述】:

我正在将一个集合从 MongodB 导入到 Spark。

val partitionDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "db").option("collection", collectionName).load()

对于生成的DataFrame 中的data 列,我得到这种类型:

StructType(StructField(configurationName,NullType,true), ...

所以至少某些列中的某些类型是NullType

根据 Writing null values to Parquet in Spark when the NullType is inside a StructType ,我尝试通过将所有 NullTypes 替换为 StringTypes 来修复架构:

def denullifyStruct(struct: StructType): StructType = 
  val items = struct.map field => StructField(field.name, denullify(field.dataType), field.nullable, field.metadata) 
  StructType(items)


def denullify(dt: DataType): DataType = 
  if (dt.isInstanceOf[StructType]) 
    val struct = dt.asInstanceOf[StructType]
    return denullifyStruct(struct)
   else if (dt.isInstanceOf[ArrayType]) 
    val array = dt.asInstanceOf[ArrayType]
    return ArrayType(denullify(array.elementType), array.containsNull)
   else if (dt.isInstanceOf[NullType]) 
    return StringType
  
  return dt


val fixedDF = spark.createDataFrame(partitionDF.rdd, denullifyStruct(partitionDF.schema))

发出fixedDF.printSchema 我可以看到fixedDF 的架构中不再存在NullType。但是当我尝试将其保存到 Parquet 时

fixedDF.write.mode("overwrite").parquet(partitionName + ".parquet")

我收到以下错误:

Caused by: com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a NullType (value: BsonStringvalue='117679.8')
    at com.mongodb.spark.sql.MapFunctions$.convertToDataType(MapFunctions.scala:214)
    at com.mongodb.spark.sql.MapFunctions$.$anonfun$documentToRow$1(MapFunctions.scala:37)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)

又是NullType

当我只计算行数时会出现同样的问题:fixedDF.count()

在写入 Parquet(或计数)时,Spark 是否会再次推断架构?是否可以关闭这种推理(或以其他方式克服)?

【问题讨论】:

您找到解决方案了吗? @jonas 我找到了一个适合我的解决方案,请看我的回答 【参考方案1】:

问题不是由 parquet 写入方法引起的。由于某些类型转换问题,在将数据作为数据帧读取时发生错误。这个jira page 表示我们需要在从mondoDB 读取数据时添加samplePoolSize 选项和other options。

【讨论】:

【参考方案2】:

问题在于,即使您提供具有显式架构的 DataFrame,对于某些操作(如 count() 或保存到磁盘),Mongo 派生的 DataFrame 仍会推断架构。

为了推断模式,它使用采样,这意味着它在推断时看不到某些数据。如果它只看到某个字段具有null 值,它会为它推断NullType。再后来,当它遇到这个字段带有一些字符串时,这样的字符串将无法转换为NullType

所以这里的基本问题是采样。如果您的架构稳定且“密集”(每个或几乎每个文档都填充了所有字段),那么采样将运行良好。但是,如果某些字段是“稀疏”的(很有可能为 null),则采样可能会失败。

粗略的解决方案是完全避免采样。也就是说,使用一般人群而不是样本来推断模式。如果没有太多数据(或者您可以等待),它可以工作。

这里是一个实验分支:https://github.com/rpuch/mongo-spark/tree/read-full-collection-instead-of-sampling

如果这样配置,想法是从采样切换到使用整个集合。引入一个新的配置选项有点太麻烦了,所以如果'sampleSize'配置选项设置为1,我就禁用采样,如下所示:

.option("sampleSize", 1) // MAGIC! This effectively turns sampling off, instead the schema is inferred based on general population

在这种情况下,完全避免采样。使用等于集合大小的 N 进行采样的明显解决方案使 MongoDB 对内存中的大量数据进行排序,这似乎有问题。因此我完全禁用了采样。

【讨论】:

以上是关于MongoTypeConversionException:即使显式架构不包含 NullTypes,也无法使用 Mongo Spark 连接器将 STRING 转换为 NullType的主要内容,如果未能解决你的问题,请参考以下文章