pySpark:java.lang.UnsupportedOperationException:未实现类型:StringType
Posted
技术标签:
【中文标题】pySpark:java.lang.UnsupportedOperationException:未实现类型:StringType【英文标题】:pySpark: java.lang.UnsupportedOperationException: Unimplemented type: StringType 【发布时间】:2017-09-15 15:27:23 【问题描述】:在读取不一致的模式编写的镶木地板文件组时,我们遇到了模式合并问题。 在切换到手动指定架构时,我收到以下错误。任何指针都会有所帮助。
java.lang.UnsupportedOperationException:未实现类型:StringType 在 org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readDoubleBatch(VectorizedColumnReader.java:389) 在 org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:195) 在 org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230) 在 org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
source_location = "///dt=//*__.parquet".format(source_initial,
bucket,
source_prefix,
date,
source_file_pattern,
date,
source_file_pattern)
schema = StructType([
StructField("Unnamed", StringType(), True),StructField("nanos", LongType(), True),StructField("book", LongType(), True),
StructField("X_o", LongType(), True),StructField("Y_o", LongType(), True),StructField("Z_o", LongType(), True),
StructField("Total", DoubleType(), True),StructField("P_v", DoubleType(), True),StructField("R_v", DoubleType(), True),
StructField("S_v", DoubleType(), True),StructField("message_type", StringType(), True),StructField("symbol", StringType(), True),
StructField("date", StringType(), True),StructField("__index_level_0__", StringType(), True)])
print("Querying data from source location ".format(source_location))
df_raw = spark.read.format('parquet').load(source_location, schema = schema, inferSchema = False,mergeSchema="true")
df_raw = df_raw.filter(df_raw.nanos.between(open_nano,close_nano))
df_raw = df_raw.withColumn("timeInWindow_nano",(fun.ceil(df_raw.nanos/(window_nano))).cast("int"))
df_core = df_raw.groupBy("date","symbol","timeInWindow_nano").agg(fun.sum("Total").alias("Total"),
fun.sum("P_v").alias("P_v"),
fun.sum("R_v").alias("R_v"),
fun.sum("S_v").alias("S_v"))
df_core = df_core.withColumn("P_v",fun.when(df_core.Total < 0,0).otherwise(df_core.P_v))
df_core = df_core.withColumn("R_v",fun.when(df_core.Total < 0,0).otherwise(df_core.R_v))
df_core = df_core.withColumn("S_v",fun.when(df_core.Total < 0,0).otherwise(df_core.S_v))
df_core = df_core.withColumn("P_pct",df_core.P_v*df_core.Total)
df_core = df_core.withColumn("R_pct",df_core.R_v*df_core.Total)
df_core = df_core.withColumn("S_pct",df_core.S_v*df_core.Total)
【问题讨论】:
如果您不手动指定架构(但保留mergeSchema
选项不变)会发生什么?
@Mariusz :由于有些文件没有任何内容作为没有事件的情况,所以自动推断会导致 org.apache.spark.SparkException: Failed merging schema of file。
【参考方案1】:
如果架构不兼容,您将无法在一个 load
中读取镶木地板文件。我的建议是将其分离为两个负载,然后在它们兼容时合并数据帧。见示例代码:
schema1_df = spark.read.parquet('path/to/files/with/string/field.parquet')
schema2_df = spark.read.parquet('path/to/files/with/double/field.parquet')
df = schema2_df.unionAll(schema1.df.withColumn('invalid_col', schema2_df.invalid_col.cast('double')))
【讨论】:
以上是关于pySpark:java.lang.UnsupportedOperationException:未实现类型:StringType的主要内容,如果未能解决你的问题,请参考以下文章
PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?
pyspark.sql.functions.col 和 pyspark.sql.functions.lit 之间的 PySpark 区别