pySpark:java.lang.UnsupportedOperationException:未实现类型:StringType

Posted

技术标签:

【中文标题】pySpark:java.lang.UnsupportedOperationException:未实现类型:StringType【英文标题】:pySpark: java.lang.UnsupportedOperationException: Unimplemented type: StringType 【发布时间】:2017-09-15 15:27:23 【问题描述】:

在读取不一致的模式编写的镶木地板文件组时,我们遇到了模式合并问题。 在切换到手动指定架构时,我收到以下错误。任何指针都会有所帮助。

java.lang.UnsupportedOperationException:未实现类型:StringType 在 org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readDoubleBatch(VectorizedColumnReader.java:389) 在 org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:195) 在 org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230) 在 org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)

source_location = "///dt=//*__.parquet".format(source_initial,
                                                                       bucket,
                                                                       source_prefix,
                                                                       date,
                                                                       source_file_pattern,
                                                                       date,
                                                                       source_file_pattern)
schema = StructType([
        StructField("Unnamed", StringType(), True),StructField("nanos", LongType(), True),StructField("book", LongType(), True),
        StructField("X_o", LongType(), True),StructField("Y_o", LongType(), True),StructField("Z_o", LongType(), True),
        StructField("Total", DoubleType(), True),StructField("P_v", DoubleType(), True),StructField("R_v", DoubleType(), True),
        StructField("S_v", DoubleType(), True),StructField("message_type", StringType(), True),StructField("symbol", StringType(), True),
        StructField("date", StringType(), True),StructField("__index_level_0__", StringType(), True)])

print("Querying data from source location ".format(source_location))
df_raw = spark.read.format('parquet').load(source_location, schema = schema, inferSchema = False,mergeSchema="true")
df_raw = df_raw.filter(df_raw.nanos.between(open_nano,close_nano))
df_raw = df_raw.withColumn("timeInWindow_nano",(fun.ceil(df_raw.nanos/(window_nano))).cast("int"))
df_core = df_raw.groupBy("date","symbol","timeInWindow_nano").agg(fun.sum("Total").alias("Total"),
                                                     fun.sum("P_v").alias("P_v"),
                                                     fun.sum("R_v").alias("R_v"),
                                                     fun.sum("S_v").alias("S_v"))

df_core = df_core.withColumn("P_v",fun.when(df_core.Total < 0,0).otherwise(df_core.P_v))
df_core = df_core.withColumn("R_v",fun.when(df_core.Total < 0,0).otherwise(df_core.R_v))
df_core = df_core.withColumn("S_v",fun.when(df_core.Total < 0,0).otherwise(df_core.S_v))
df_core = df_core.withColumn("P_pct",df_core.P_v*df_core.Total)
df_core = df_core.withColumn("R_pct",df_core.R_v*df_core.Total)
df_core = df_core.withColumn("S_pct",df_core.S_v*df_core.Total)

【问题讨论】:

如果您不手动指定架构(但保留mergeSchema 选项不变)会发生什么? @Mariusz :由于有些文件没有任何内容作为没有事件的情况,所以自动推断会导致 org.apache.spark.SparkException: Failed merging schema of file。 【参考方案1】:

如果架构不兼容,您将无法在一个 load 中读取镶木地板文件。我的建议是将其分离为两个负载,然后在它们兼容时合并数据帧。见示例代码:

schema1_df = spark.read.parquet('path/to/files/with/string/field.parquet')
schema2_df = spark.read.parquet('path/to/files/with/double/field.parquet')
df = schema2_df.unionAll(schema1.df.withColumn('invalid_col', schema2_df.invalid_col.cast('double')))

【讨论】:

以上是关于pySpark:java.lang.UnsupportedOperationException:未实现类型:StringType的主要内容,如果未能解决你的问题,请参考以下文章

PYSPARK:如何将带有多个 case 语句的 SQL 查询转换为 Pyspark/Pyspark-SQL?

pyspark.sql.functions.col 和 pyspark.sql.functions.lit 之间的 PySpark 区别

Pyspark 安装错误:没有名为“pyspark”的模块

Pyspark:将 sql 查询转换为 pyspark?

Pyspark - ImportError:无法从“pyspark”导入名称“SparkContext”

Pyspark:基于所有列减去/差异 pyspark 数据帧