Apache Spark (PySpark) 在读取 CSV 时处理空值

Posted

技术标签:

【中文标题】Apache Spark (PySpark) 在读取 CSV 时处理空值【英文标题】:Apache Spark (PySpark) handling null values when reading in CSV 【发布时间】:2017-07-09 03:06:48 【问题描述】:

我正在尝试从交通部读取航班数据。它存储在 CSV 中,并不断收到java.lang.NumberFormatException: null

我尝试将nanValue 设置为空字符串,因为它的默认值是NaN,但这并没有奏效。

我当前的代码是:

spark = SparkSession.builder \
    .master('local') \
    .appName('Flight Delay') \
    .getOrCreate()

schema = StructType([
    StructField('Year', IntegerType(), nullable=True),
    StructField('Month', IntegerType(), nullable=True),
    StructField('Day', IntegerType(), nullable=True),
    StructField('Dow', IntegerType(), nullable=True),
    StructField('CarrierId', StringType(), nullable=True),
    StructField('Carrier', StringType(), nullable=True),
    StructField('TailNum', StringType(), nullable=True),
    StructField('Origin', StringType(), nullable=True),
    StructField('Dest', StringType(), nullable=True),
    StructField('CRSDepTime', IntegerType(), nullable=True),
    StructField('DepTime', IntegerType(), nullable=True),
    StructField('DepDelay', DoubleType(), nullable=True),
    StructField('TaxiOut', DoubleType(), nullable=True),
    StructField('TaxiIn', DoubleType(), nullable=True),
    StructField('CRSArrTime', IntegerType(), nullable=True),
    StructField('ArrTime', IntegerType(), nullable=True),
    StructField('ArrDelay', DoubleType(), nullable=True),
    StructField('Cancelled', DoubleType(), nullable=True),
    StructField('CancellationCode', StringType(), nullable=True),
    StructField('Diverted', DoubleType(), nullable=True),
    StructField('CRSElapsedTime', DoubleType(), nullable=True),
    StructField('ActualElapsedTime', DoubleType(), nullable=True),
    StructField('AirTime', DoubleType(), nullable=True),
    StructField('Distance', DoubleType(), nullable=True),
    StructField('CarrierDelay', DoubleType(), nullable=True),
    StructField('WeatherDelay', DoubleType(), nullable=True),
    StructField('NASDelay', DoubleType(), nullable=True),
    StructField('SecurityDelay', DoubleType(), nullable=True),
    StructField('LateAircraftDelay', DoubleType(), nullable=True)
])

flts = spark.read \
    .format('com.databricks.spark.csv') \
    .csv('/home/william/Projects/flight-delay/data/201601.csv',
         schema=schema, nanValue='', header='true')

这是我正在使用的 CSV:http://pastebin.com/waahrgqB

最后一行是它中断并引发java.lang.NumberFormatException: null的地方

似乎有些数字列是空字符串,而另一些只是空的。有人可以帮我解决这个问题吗?

【问题讨论】:

你确定它只在最后一个数据行中断吗?其他数据行没问题? --- 也许你也可以让你的脚本完整,所以我们可以复制粘贴它。我认为现在缺少一些 import 语句。 【参考方案1】:

感谢 KiranM 的建议,我找到了解决方案。我让 Spark 推断架构(所有内容都设置为字符串),然后手动将我想要的列设置为数字。

代码如下:

from pyspark.sql import (SQLContext,
                     SparkSession)

from pyspark.sql.types import (StructType,
                           StructField,
                           DoubleType,
                           IntegerType,
                           StringType)

spark = SparkSession.builder \
    .master('local') \
    .appName('Flight Delay') \
    .getOrCreate()


flts = spark.read \
    .format('com.databricks.spark.csv') \
    .csv('/home/william/Projects/flight-delay/data/merged/2016.csv',
         inferSchema='true', nanValue="", header='true', mode='PERMISSIVE')


flts = flts \
    .withColumn('Year', flts['Year'].cast('int')) \
    .withColumn('Month', flts['Month'].cast('int')) \
    .withColumn('Day', flts['Day'].cast('int')) \
    .withColumn('Dow', flts['Dow'].cast('int')) \
    .withColumn('CRSDepTime', flts['CRSDepTime'].cast('int')) \
    .withColumn('DepTime', flts['DepTime'].cast('int')) \
    .withColumn('DepDelay', flts['DepDelay'].cast('int')) \
    .withColumn('TaxiOut', flts['TaxiOut'].cast('int')) \
    .withColumn('TaxiIn', flts['TaxiIn'].cast('int')) \
    .withColumn('CRSArrTime', flts['CRSArrTime'].cast('int')) \
    .withColumn('ArrTime', flts['ArrTime'].cast('int')) \
    .withColumn('ArrDelay', flts['ArrDelay'].cast('int')) \
    .withColumn('Cancelled', flts['Cancelled'].cast('int')) \
    .withColumn('Diverted', flts['Diverted'].cast('int')) \
    .withColumn('CRSElapsedTime', flts['CRSElapsedTime'].cast('int')) \
    .withColumn('ActualElapsedTime', flts['ActualElapsedTime'].cast('int')) \
    .withColumn('AirTime', flts['AirTime'].cast('int')) \
    .withColumn('Distance', flts['Distance'].cast('int')) \
    .withColumn('CarrierDelay', flts['CarrierDelay'].cast('int')) \
    .withColumn('WeatherDelay', flts['WeatherDelay'].cast('int')) \
    .withColumn('NASDelay', flts['NASDelay'].cast('int')) \
    .withColumn('SecurityDelay', flts['SecurityDelay'].cast('int')) \
    .withColumn('LateAircraftDelay ', flts['LateAircraftDelay '].cast('int'))

也许我可以把它放到一个循环中,但我现在要运行它。

【讨论】:

【参考方案2】:

问题在于具有空字符串的数字类型列(使用“”而不是空白数据)。

然后一种选择是将数据读取为 StringType 列,然后将该列类型转换为您的相关类型(例如:int)。以免影响其他列数据。

StructField('CRSDepTime', StringType(), nullable=True),


flts.withColumn('CRSDepTime', flts['CRSDepTime'].cast("int")) \
    .printSchema()

这应该可以解决您的问题。

【讨论】:

以上是关于Apache Spark (PySpark) 在读取 CSV 时处理空值的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark JDBC SQL 注入 (pyspark)

Apache Spark:启动 PySpark 时出错

Apache Spark:启动 PySpark 的问题

NoSuchMethodException:Pyspark 模型加载中的 org.apache.spark.ml.classification.GBTClassificationModel

PySpark 中的 org.apache.spark.ml.feature.Tokenizer NPE

真香!PySpark整合Apache Hudi实战