如何解决 com.mongodb.spark.exceptions.MongoTypeConversionException:无法转换... Java Spark
Posted
技术标签:
【中文标题】如何解决 com.mongodb.spark.exceptions.MongoTypeConversionException:无法转换... Java Spark【英文标题】:How to resolve com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast... Java Spark 【发布时间】:2020-04-01 07:14:43 【问题描述】:您好,我是 Java Spark 的新手,这几天一直在寻找解决方案。
我正在将 MongoDB 数据加载到 hive 表中,但是,我在 saveAsTable 时发现了一些错误,出现此错误
com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType(StructField(oid,StringType,true)) (value: BsonStringvalue='54d3e8aeda556106feba7fa2')
我尝试增加 sampleSize,不同的 mongo-spark-connector 版本,...但没有工作解决方案。
我无法弄清楚根本原因是什么,以及需要做的差距是什么?
最令人困惑的部分是我有类似的数据集,使用相同的流程没有问题。
mongodb 数据模式就像嵌套的结构和数组
root
|-- sample: struct (nullable = true)
| |-- parent: struct (nullable = true)
| | |-- expanded: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- distance: integer (nullable = true)
| | | | |-- id: struct (nullable = true)
| | | | | |-- oid: string (nullable = true)
| | | | |-- keys: array (nullable = true)
| | | | | |-- element: string (containsNull = true)
| | | | |-- name: string (nullable = true)
| | | | |-- parent_id: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- oid: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | |-- id: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- oid: string (nullable = true)
样本数据
"sample":
"expanded": [
"distance": 0,
"type": "domain",
"id": "54d3e17b5cf737074d4065b0",
"parent_id": [
"54d3e1775cf737074d406599"
],
"name": "level2"
,
"distance": 1,
"type": "domain",
"id": "54d3e1775cf737074d406599",
"name": "level1"
],
"id": [
"54d3e17b5cf737074d4065b0"
]
示例代码
public static void main(final String[] args) throws InterruptedException
// spark session read mongodb
SparkSession mongo_spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("mongo_spark.master", "local")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/test_db.test_collection")
.enableHiveSupport()
.getOrCreate();
// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(mongo_spark.sparkContext());
// Load data and infer schema, disregard toDF() name as it returns Dataset
Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
implicitDS.printSchema();
implicitDS.show();
// createOrReplaceTempView to see if the data being read
// implicitDS.createOrReplaceTempView("my_table");
// implicitDS.printSchema();
// implicitDS.show();
// saveAsTable
implicitDS.write().saveAsTable("my_table");
mongo_spark.sql("SELECT * FROM my_table limit 1").show();
mongo_spark.stop();
如果有人有一些想法,我将非常感激。 谢谢
【问题讨论】:
你能发布示例数据吗? 这看起来对象 id 没有从字符串正确转换 我更新了一个示例数据,架构几乎可以告诉我们。如果 objectid 没有从字符串正确转换,我该怎么办?非常感谢 【参考方案1】:随着我适当增加样本量,这个问题不再存在了。
How to config Java Spark sparksession samplesize
【讨论】:
有同样的问题,增加 sampleSize 对我没有帮助 即使对我来说,增加样本量对我也没有帮助。我添加了与问题相同的关注点。【参考方案2】:我遇到了同样的问题,sampleSize 部分解决了这个问题,但如果您有大量数据,则无法解决。
这是解决此问题的方法。将此方法与增加的 sampleSize(在我的情况下为 100000)一起使用:
def fix_schema(schema: StructType) -> StructType:
"""Fix spark schema due to inconsistent MongoDB schema collection.
It fixes such issues like:
Cannot cast STRING into a NullType
Cannot cast STRING into a StructType
:param schema: a source schema taken from a Spark DataFrame to be fixed
"""
if isinstance(schema, StructType):
return StructType([fix_schema(field) for field in schema.fields])
if isinstance(schema, ArrayType):
return ArrayType(fix_schema(schema.elementType))
if isinstance(schema, StructField) and is_struct_oid_obj(schema):
return StructField(name=schema.name, dataType=StringType(), nullable=schema.nullable)
elif isinstance(schema, StructField):
return StructField(schema.name, fix_schema(schema.dataType), schema.nullable)
if isinstance(schema, NullType):
return StringType()
return schema
def is_struct_oid_obj(struct_field: StructField) -> bool:
"""
Checks that our schema has StructType field with single oid name inside
:param struct_field: a StructField from Spark schema
:return bool
"""
return (isinstance(struct_field.dataType, StructType)
and len(struct_field.dataType.fields) == 1
and struct_field.dataType.fields[0].name == "oid")
【讨论】:
以上是关于如何解决 com.mongodb.spark.exceptions.MongoTypeConversionException:无法转换... Java Spark的主要内容,如果未能解决你的问题,请参考以下文章