Spark 2.1.1 上的 Pyspark,StructType 中的 StructFields 始终可以为空
Posted
技术标签:
【中文标题】Spark 2.1.1 上的 Pyspark,StructType 中的 StructFields 始终可以为空【英文标题】:Pyspark on Spark 2.1.1, StructFields in StructType are always nullable 【发布时间】:2017-06-15 16:01:49 【问题描述】:我正在使用多个 StructField 创建一个 StructType - 名称和数据类型似乎工作正常,但无论在每个 StructField 中将 nullable 设置为 False,生成的架构报告 nullable 对于每个 StructField 都是 True。
谁能解释为什么?谢谢!
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, FloatType, TimestampType
sparkSession = SparkSession.builder \
.master("local") \
.appName("SparkSession") \
.getOrCreate()
dfStruct = StructType().add("date", TimestampType(), False)
dfStruct.add("open", FloatType(), False)
dfStruct.add("high", FloatType(), False)
dfStruct.add("low", FloatType(), False)
dfStruct.add("close", FloatType(), False)
dfStruct.add("ticker", StringType(), False)
#print elements of StructType -- reports nullable is false
for d in dfStruct: print d
#data looks like this:
#date,open,high,low,close,ticker
# 2014-10-14 23:20:32,7.14,9.07,0.0,7.11,ARAY
# 2014-10-14 23:20:36,9.74,10.72,6.38,9.25,ARC
# 2014-10-14 23:20:38,31.38,37.0,28.0,30.94,ARCB
# 2014-10-14 23:20:44,15.39,17.37,15.35,15.3,ARCC
# 2014-10-14 23:20:49,5.59,6.5,5.31,5.48,ARCO
#read csv file and apply dfStruct as the schema
df = sparkSession.read.csv(path = "/<path tofile>/stock_data.csv", \
schema = dfStruct, \
sep = ",", \
ignoreLeadingWhiteSpace = True, \
ignoreTrailingWhiteSpace = True \
)
#reports nullable as True!
df.printSchema()
【问题讨论】:
【参考方案1】:这是 Spark 中的 known issue。目前 Spark 中有一个 open pull request 旨在解决此问题。如果您确实需要您的字段不可为空,请尝试:
#read csv file and apply dfStruct as the schema
df = sparkSession.read.csv(path = "/<path tofile>/stock_data.csv", \
schema = dfStruct, \
sep = ",", \
ignoreLeadingWhiteSpace = True, \
ignoreTrailingWhiteSpace = True \
).rdd.toDF(dfStruct)
【讨论】:
我不确定这样的转换速度,所以我不会将它用于 TB 级数据,但如果您只是读取 csv 文件,它应该可以很好地工作。以上是关于Spark 2.1.1 上的 Pyspark,StructType 中的 StructFields 始终可以为空的主要内容,如果未能解决你的问题,请参考以下文章
具有非默认 spark.executor.memory 设置的 EMR 上的 pyspark 代码未生效?
无法使用 Jupyter 笔记本上的 pyspark 从 Apache Spark 连接到 MS SQL
在 python (pyspark) 中使用 combinebykey spark rdd 计算组上的聚合
Dataproc 上的 PySpark 因 SocketTimeoutException 而停止
在 Spark 2.4 上的 pyspark.sql.functions.max().over(window) 上使用 .where() 会引发 Java 异常