带有架构的 pyspark.sql SparkSession load() :架构中的非字符串类型字段使所有值都为空
Posted
技术标签:
【中文标题】带有架构的 pyspark.sql SparkSession load() :架构中的非字符串类型字段使所有值都为空【英文标题】:pyspark.sql SparkSession load() with schema : Non-StringType fields in schema make all values null 【发布时间】:2019-09-11 02:46:30 【问题描述】:嗨, 我在加载 csv 文件以创建数据框时使用非 StringType 作为架构的一部分时遇到问题。
我希望在加载时使用给定的模式将每条记录的每个字段即时转换为相应的数据类型。 相反,我得到的只是空值。
这是重现我的问题的简化方法。在此示例中,有一个包含四列的小 csv 文件,我希望将其分别处理为 str、date、int 和 bool:
python
Python 3.6.5 (default, Jun 17 2018, 12:13:06)
[GCC 4.2.1 Compatible Apple LLVM 9.1.0 (clang-902.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyspark
>>> from pyspark import SparkContext
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import *
>>>
>>> data_flnm = 'four_cols.csv'
>>> lines = [ln.rstrip() for ln in open(data_flnm).readlines()[:3]]
>>> lines
['zzzc7c09:66d7:47d6:9415:87e5010fe282|2019-04-08|0|f', 'zzz304fa:6fc0:4337:91d0:05ef4657a6db|2019-07-08|1|f', 'yy251cf0:aa11:44e9:88f4:f6f9c1899cee|2019-05-13|0|t']
>>> parts = [ln.split("|") for ln in lines]
>>> parts
[['zzzc7c09:66d7:47d6:9415:87e5010fe282', '2019-04-08', '0', 'f'], ['zzz304fa:6fc0:4337:91d0:05ef4657a6db', '2019-07-08', '1', 'f'], ['yy251cf0:aa11:44e9:88f4:f6f9c1899cee', '2019-05-13', '0', 't']]
>>> cols1 = [StructField('u_id', StringType(), True), StructField('week', StringType(), True), StructField('flag_0_1', StringType(), True), StructField('flag_t_f', StringType(), True)]
>>> cols2 = [StructField('u_id', StringType(), True), StructField('week', DateType(), True), StructField('flag_0_1', IntegerType(), True), StructField('flag_t_f', BooleanType(), True)]
>>> sch1 = StructType(cols1)
>>> sch2 = StructType(cols2)
>>> sch1
StructType(List(StructField(u_id,StringType,true),StructField(week,StringType,true),StructField(flag_0_1,StringType,true),StructField(flag_t_f,StringType,true)))
>>> sch2
StructType(List(StructField(u_id,StringType,true),StructField(week,DateType,true),StructField(flag_0_1,IntegerType,true),StructField(flag_t_f,BooleanType,true)))
>>> spark_sess = SparkSession.builder.appName("xyz").getOrCreate()
19/09/10 19:32:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
>>> df1 = spark_sess.read.format('csv').option("nullValue", "null").load([data_flnm], sep='|', schema = sch1)
>>> df2 = spark_sess.read.format('csv').option("nullValue", "null").load([data_flnm], sep='|', schema = sch2)
>>> df1.show(5)
+--------------------+----------+--------+--------+
| u_id| week|flag_0_1|flag_t_f|
+--------------------+----------+--------+--------+
|zzzc7c09:66d7:47d...|2019-04-08| 0| f|
|zzz304fa:6fc0:433...|2019-07-08| 1| f|
|yy251cf0:aa11:44e...|2019-05-13| 0| t|
|yy1d2f8e:d8f0:4db...|2019-07-08| 1| f|
|zzz5ccad:2cf6:44e...|2019-05-20| 1| f|
+--------------------+----------+--------+--------+
only showing top 5 rows
>>> df2.show(5)
+----+----+--------+--------+
|u_id|week|flag_0_1|flag_t_f|
+----+----+--------+--------+
|null|null| null| null|
|null|null| null| null|
|null|null| null| null|
|null|null| null| null|
|null|null| null| null|
+----+----+--------+--------+
only showing top 5 rows
>>>
我尝试了几个不同版本的 .read(...)....load(...) 代码。 没有产生预期的结果。 请指教。谢谢!
PS:无法添加标签“structfield”和“structtype”:声誉不足(__.
【问题讨论】:
【参考方案1】:解析时,需要将 flag_t_f 列读取为字符串。以下架构将起作用:
StructType(List(StructField(u_id,StringType,true),StructField(week,DateType,true),StructField(flag_0_1,IntegerType,true),StructField(flag_t_f,StringType,true)))
之后,如果需要,您可以在数据框中添加一个布尔列:
import pyspark.sql.functions as f
df = df.withColumn("flag_t_f",
f.when(f.col("flag_t_f") == 'f', 'False')
.when(f.col("flag_t_f") == 't', 'True')
)
如果您有多个布尔列,其值为“f”和“t”,则可以通过遍历所有列来转换所有这些列
cols = df.columns
for col in cols:
df = df.withColumn(col,
f.when(f.col(col) == 'f', 'False')
.when(f.col(col) == 't','True')
.otherwise(f.col(col))
)
【讨论】:
谢谢!我还没有尝试过你的解决方案,但听起来很棒! :) 我已经在做类似但更丑陋的事情:创建了所有列都是“StringType”的模式,然后一一更改除了那些确实是 StringType 的模式(并且有数百个)。非常感谢! :) 但为什么?这是在哪里记录的? 另外:我尝试使用 LazySimpleSerde 以完全不同的方式解决这个问题,但也找不到解决方案:也许你想看看? ——[链接]***.com/questions/57811034/…[/链接] 您能再帮忙吗?如果我有多个布尔字段,正确的语法是什么?我应该为每一列重复上面的代码( df=df.withColumn(col1.... df=df=df.withColumn(col2... - 或者我可以在一个语句中结合所有内容?我的问题实际上是关于性能: 哪个解决方案更好?谢谢!!! 如果您需要数据框中的特定数据类型,我建议您先读取字符串,然后再转换为数据框。读入字符串总是好的,特别是在数字的情况下,如果他们想出空格,那么就很难理解为什么它会以空值出现。 我已添加代码以遍历所有列。如果您知道要转换的列名,可以将 df.columns 替换为已知的列列表。以上是关于带有架构的 pyspark.sql SparkSession load() :架构中的非字符串类型字段使所有值都为空的主要内容,如果未能解决你的问题,请参考以下文章
发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询
从 Row 创建 DataFrame 会导致“推断架构问题”
pyspark.sql.functions.col 和 pyspark.sql.functions.lit 之间的 PySpark 区别