从 pyspark 读取 csv 指定模式错误类型

Posted

技术标签:

【中文标题】从 pyspark 读取 csv 指定模式错误类型【英文标题】:reading csv from pyspark specifying schema wrong types 【发布时间】:2018-12-01 02:11:09 【问题描述】:

我正在尝试从 pyspark df 输出 csv 然后重新输入它,但是当我指定架构时,对于作为数组的列,它说某些行是 False

这是我的df

   avg(rating)  belongs_to_collection    budget  \
0     2.909946                  False   5000000   
1     3.291962                  False  18000000   
2     3.239811                  False   8000000   
3     3.573318                  False   1500000   
4     3.516590                  False  40000000   

                                      genres original_language  
0                       ['Drama', 'Romance']                en  
1                                 ['Comedy']                en  
2                        ['Drama', 'Family']                en  
3  ['Crime', 'Drama', 'Mystery', 'Thriller']                en  
4             ['Crime', 'Drama', 'Thriller']                en  

我先输出到csv:df.drop('id').toPandas().to_csv('mergedDf.csv',index=False)

我尝试使用df = spark.read.csv('mergedDf.csv',schema=schema) 阅读,但出现此错误:'CSV data source does not support array<string> data type.;'

所以,我尝试从 pandas 读取数据,然后转换为 spark df,但它告诉我包含列表的列具有布尔值。

df = pd.read_csv('mergedDf.csv')
df = spark.createDataFrame(df,schema=schema)
TypeError: field genres: ArrayType(StringType,true) can not accept object False in type <class 'bool'>

但是,当我检查某些行是否 == 为 False 时,我发现它们都不是。

我检查了: df[df['genres']=="False"]df[df['genres']==False]

【问题讨论】:

错误“TypeError: fieldgenres: ArrayType(StringType,true) can not accept object False in type ”清楚地表明您提供了错误的类型数据....so你是如何检查 False 条件的?使用 df.filter() ?? 【参考方案1】:

不幸的是,spark read csv 函数还不支持像“数组”这样的复杂数据类型。您将处理将字符串列转换为数组列的逻辑

使用 pandas 将 spark 数据帧编写为带有标题的 csv。

df.drop('id').toPandas().to_csv('mergedDf.csv',index=False,header=True)
df1 = spark.read.option('header','true').option("inferSchema","true").csv('mergedDf.csv')
df1.printSchema()
df1.show(10,False)

当你用 spark 读回 csv 时,数组列将被转换为字符串类型

root
 |-- avg(rating): double (nullable = true)
 |-- belongs_to_collection: boolean (nullable = true)
 |-- budget: integer (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)

+-----------+---------------------+--------+-----------------------------------------+-----------------+
|avg(rating)|belongs_to_collection|budget  |genres                                   |original_language|
+-----------+---------------------+--------+-----------------------------------------+-----------------+
|2.909946   |false                |5000000 |['Drama', 'Romance']                     |en               |
|3.291962   |false                |18000000|['Comedy']                               |en               |
|3.239811   |false                |8000000 |['Drama', 'Family']                      |en               |
|3.573318   |false                |1500000 |['Crime', 'Drama', 'Mystery', 'Thriller']|en               |
|3.51659    |false                |40000000|['Crime', 'Drama', 'Thriller']           |en               |
+-----------+---------------------+--------+-----------------------------------------+-----------------+

拆分字符串列以创建一个数组以恢复原始格式。

df2 = df1.withColumn('genres',split(regexp_replace(col('genres'), '\[|\]',''),',').cast('array<string>'))
df2.printSchema()

.

root
 |-- avg(rating): double (nullable = true)
 |-- belongs_to_collection: boolean (nullable = true)
 |-- budget: integer (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- original_language: string (nullable = true)

【讨论】:

以上是关于从 pyspark 读取 csv 指定模式错误类型的主要内容,如果未能解决你的问题,请参考以下文章

使用 PySpark 读取 CSV 时是不是可以仅覆盖一种列类型?

pyspark用正则表达式读取csv文件

Pyspark 解释了使用和不使用自定义模式来读取 csv 的区别

pyspark 将模式应用于 csv - 仅返回空值

在 Pyspark 中读取 CSV 文件引发错误 FileNotFound 错误

在 Pyspark 中使用正确的数据类型读取 CSV