MongoTypeConversionException:即使显式架构不包含 NullTypes,也无法使用 Mongo Spark 连接器将 STRING 转换为 NullType
Posted
技术标签:
【中文标题】MongoTypeConversionException:即使显式架构不包含 NullTypes,也无法使用 Mongo Spark 连接器将 STRING 转换为 NullType【英文标题】:MongoTypeConversionException: Cannot cast STRING into a NullType with Mongo Spark Connector even when explicit schema does not contain NullTypes 【发布时间】:2021-08-12 10:55:14 【问题描述】:我正在将一个集合从 MongodB 导入到 Spark。
val partitionDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "db").option("collection", collectionName).load()
对于生成的DataFrame
中的data
列,我得到这种类型:
StructType(StructField(configurationName,NullType,true), ...
所以至少某些列中的某些类型是NullType
。
根据 Writing null values to Parquet in Spark when the NullType is inside a StructType ,我尝试通过将所有 NullType
s 替换为 StringType
s 来修复架构:
def denullifyStruct(struct: StructType): StructType =
val items = struct.map field => StructField(field.name, denullify(field.dataType), field.nullable, field.metadata)
StructType(items)
def denullify(dt: DataType): DataType =
if (dt.isInstanceOf[StructType])
val struct = dt.asInstanceOf[StructType]
return denullifyStruct(struct)
else if (dt.isInstanceOf[ArrayType])
val array = dt.asInstanceOf[ArrayType]
return ArrayType(denullify(array.elementType), array.containsNull)
else if (dt.isInstanceOf[NullType])
return StringType
return dt
val fixedDF = spark.createDataFrame(partitionDF.rdd, denullifyStruct(partitionDF.schema))
发出fixedDF.printSchema
我可以看到fixedDF
的架构中不再存在NullType
。但是当我尝试将其保存到 Parquet 时
fixedDF.write.mode("overwrite").parquet(partitionName + ".parquet")
我收到以下错误:
Caused by: com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a NullType (value: BsonStringvalue='117679.8')
at com.mongodb.spark.sql.MapFunctions$.convertToDataType(MapFunctions.scala:214)
at com.mongodb.spark.sql.MapFunctions$.$anonfun$documentToRow$1(MapFunctions.scala:37)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
又是NullType
!
当我只计算行数时会出现同样的问题:fixedDF.count()
。
在写入 Parquet(或计数)时,Spark 是否会再次推断架构?是否可以关闭这种推理(或以其他方式克服)?
【问题讨论】:
您找到解决方案了吗? @jonas 我找到了一个适合我的解决方案,请看我的回答 【参考方案1】:问题不是由 parquet 写入方法引起的。由于某些类型转换问题,在将数据作为数据帧读取时发生错误。这个jira page 表示我们需要在从mondoDB 读取数据时添加samplePoolSize
选项和other options。
【讨论】:
【参考方案2】:问题在于,即使您提供具有显式架构的 DataFrame
,对于某些操作(如 count()
或保存到磁盘),Mongo 派生的 DataFrame
仍会推断架构。
为了推断模式,它使用采样,这意味着它在推断时看不到某些数据。如果它只看到某个字段具有null
值,它会为它推断NullType
。再后来,当它遇到这个字段带有一些字符串时,这样的字符串将无法转换为NullType
。
所以这里的基本问题是采样。如果您的架构稳定且“密集”(每个或几乎每个文档都填充了所有字段),那么采样将运行良好。但是,如果某些字段是“稀疏”的(很有可能为 null),则采样可能会失败。
粗略的解决方案是完全避免采样。也就是说,使用一般人群而不是样本来推断模式。如果没有太多数据(或者您可以等待),它可以工作。
这里是一个实验分支:https://github.com/rpuch/mongo-spark/tree/read-full-collection-instead-of-sampling
如果这样配置,想法是从采样切换到使用整个集合。引入一个新的配置选项有点太麻烦了,所以如果'sampleSize'配置选项设置为1,我就禁用采样,如下所示:
.option("sampleSize", 1) // MAGIC! This effectively turns sampling off, instead the schema is inferred based on general population
在这种情况下,完全避免采样。使用等于集合大小的 N 进行采样的明显解决方案使 MongoDB 对内存中的大量数据进行排序,这似乎有问题。因此我完全禁用了采样。
【讨论】:
以上是关于MongoTypeConversionException:即使显式架构不包含 NullTypes,也无法使用 Mongo Spark 连接器将 STRING 转换为 NullType的主要内容,如果未能解决你的问题,请参考以下文章