在 Pyspark 中使用正确的数据类型读取 CSV

Posted

技术标签:

【中文标题】在 Pyspark 中使用正确的数据类型读取 CSV【英文标题】:Read in CSV in Pyspark with correct Datatypes 【发布时间】:2018-10-26 16:47:59 【问题描述】:

当我尝试使用 spark 导入本地 CSV 时,默认情况下,每一列都作为字符串读入。但是,我的列仅包含整数和时间戳类型。更具体地说,CSV 如下所示:

"Customer","TransDate","Quantity","PurchAmount","Cost","TransID","TransKey"
149332,"15.11.2005",1,199.95,107,127998739,100000

我找到了应该在this question 中工作的代码,但是当我执行它时,所有条目都返回为NULL

我使用以下内容创建自定义架构:

from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, TimestampType

customSchema = StructType(Array(
        StructField("Customer", IntegerType, true),
        StructField("TransDate", TimestampType, true),
        StructField("Quantity", IntegerType, true),
        StructField("Cost", IntegerType, true),
        StructField("TransKey", IntegerType, true)))

然后在 CSV 中读取:

myData = spark.read.load('myData.csv', format="csv", header="true", sep=',', schema=customSchema)

返回:

+--------+---------+--------+----+--------+
|Customer|TransDate|Quantity|Cost|Transkey|
+--------+---------+--------+----+--------+
|    null|     null|    null|null|    null|
+--------+---------+--------+----+--------+

我错过了一个关键步骤吗?我怀疑 Date 列是问题的根源。注意:我在 GoogleCollab 中运行它。

【问题讨论】:

我很惊讶整数被错误地读取。这些日期肯定不起作用,因为它们不是预期的YYYY-MM-DD 格式。我建议使用inferSchema = True(例如“myData = spark.read.csv("myData.csv", header=True, inferSchema=True))然后manually converting the Timestamp fields from string to date阅读csv。 哦,现在我明白了问题所在:您传入的是 header="true" 而不是 header=True。您需要将其作为布尔值传递,但由于格式不正确,您仍然会得到时间戳的空值。 出了什么问题? header = "true" @Prazy 虽然documentation 不清楚,但我很确定header 应该是(False, True, None) 之一(布尔值/无与字符串)。 @pault header = "true" 总是对我有用。 【参考方案1】:

给你!

"Customer","TransDate","Quantity","PurchAmount","Cost","TransID","TransKey"
149332,"15.11.2005",1,199.95,107,127998739,100000
PATH_TO_FILE="file:///u/vikrant/LocalTestDateFile"
Loading above file to dataframe:
df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "true") \
  .option("inferschema", "true") \
  .option("delimiter", ",").load(PATH_TO_FILE)

您的日期将作为字符串列类型加载,但当您将其更改为日期类型时,它将将此日期格式视为 NULL。

df = (df.withColumn('TransDate',col('TransDate').cast('date'))

+--------+---------+--------+-----------+----+---------+--------+
|Customer|TransDate|Quantity|PurchAmount|Cost|  TransID|TransKey|
+--------+---------+--------+-----------+----+---------+--------+
|  149332|     null|       1|     199.95| 107|127998739|  100000|
+--------+---------+--------+-----------+----+---------+--------+

所以我们需要将日期格式从 dd.mm.yy 更改为 yy-mm-dd。

from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
from pyspark.sql.functions import col

改变日期格式的Python函数:

  change_dateformat_func =  udf (lambda x: datetime.strptime(x, '%d.%m.%Y').strftime('%Y-%m-%d'))

现在为您的数据框列调用此函数:

newdf = df.withColumn('TransDate', change_dateformat_func(col('TransDate')).cast(DateType()))

+--------+----------+--------+-----------+----+---------+--------+
|Customer| TransDate|Quantity|PurchAmount|Cost|  TransID|TransKey|
+--------+----------+--------+-----------+----+---------+--------+
|  149332|2005-11-15|       1|     199.95| 107|127998739|  100000|
+--------+----------+--------+-----------+----+---------+--------+

以下是架构:

 |-- Customer: integer (nullable = true)
 |-- TransDate: date (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- PurchAmount: double (nullable = true)
 |-- Cost: integer (nullable = true)
 |-- TransID: integer (nullable = true)
 |-- TransKey: integer (nullable = true)

让我知道它是否适合你。

【讨论】:

【参考方案2】:

您可以为DataFrameReader 指定一个选项('dateFormat','d.M.y') 以解析特定格式的日期。

df = spark.read.format("csv").option("header","true").option("dateFormat","M.d.y").schema(my_schema).load("path_to_csv")

参考

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html

【讨论】:

以上是关于在 Pyspark 中使用正确的数据类型读取 CSV的主要内容,如果未能解决你的问题,请参考以下文章

使用 PySpark 读取 CSV 时是不是可以仅覆盖一种列类型?

在 Pyspark 代码中读取嵌套的 Json 文件。 pyspark.sql.utils.AnalysisException:

PySpark:读取 pyspark 框架中的 csv 数据。为啥它在框架中显示特殊字符?除了使用熊猫之外,以表格形式显示的任何方式[重复]

Pandas UDF (PySpark) - 不正确的类型错误

如何在 pyspark 中读取 s3 上的表格数据?

如何在 pyspark 数据框中读取 csv 文件时读取选定的列?