PySpark 脚本对 1GB 文件成功,对 20GB 文件失败 - java.lang.NullPointerException

Posted

技术标签:

【中文标题】PySpark 脚本对 1GB 文件成功,对 20GB 文件失败 - java.lang.NullPointerException【英文标题】:PySpark script succeeds for 1GB file, fails for 20GB file - java.lang.NullPointerException 【发布时间】:2017-01-20 09:48:54 【问题描述】:

我正在使用 PySpark 和 spark-submit 来读取和操作带有标题的 CSV 文件。

第一个操作与截断某些列、转换为整数类型等有关。

主要操作是使用groupBy,以便根据另一列值计算一列的统计度量。

当我在 1GB 文件上运行我的脚本时,它运行良好! 问题是,在 20GB 文件 上运行它时,它会失败,据我所知,因为 groupBy 中的错误。

两个文件具有相同的格式和完全相同的列,例如:

TRANSACTION_URL    START_TIME        END_TIME           SIZE    FLAG  COL6 COL7 ...
www.google.com     20170113093210    20170113093210     150      1    ...  ...
www.cnet.com       20170113114510    20170113093210     150      2    ...  ... 

只有第一个文件包含 X 个交易,第二个包含更多(20GB 记录)。

错误日志:(错误从第 32 行开始)

pastebin link for error log

我的脚本:

import datetime
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import mean, stddev, regexp_replace, col

sc = SparkContext('local[*]')
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)

print ('** Script Started: %s **' % str(datetime.datetime.now()))  # Analysis Start Time

print "Loading file... ",
log_df = sqlContext.read.format('csv').\
    options(header='true', inferschema='true', delimiter='\t', dateFormat='yyyyMMddHHmmss').\
    load("hdfs:/user/BGU/logs/01_transactions.log")  # Load data file
print "Done!\nAdjusting data to fit our needs... ",

'''
Manipulate columns to fit our needs:
'''
size_col = 'DOWNSTREAM_SIZE'
flag_col = 'CONGESTION_FLAG'
url_col = 'TRANSACTION_URL'

log_df = log_df.filter(~log_df[url_col].rlike("(SNI.*)")).\
    withColumn(flag_col, regexp_replace(col(flag_col), "(;.*)", "").
               cast(IntegerType()))
log_df = log_df.withColumn(size_col, log_df[size_col].cast(IntegerType()))

print "done!\n\n** %s Statistical Measures **\n" % size_col

'''
Calculations:
    DOWNSTREAM_SIZE statistics:
    In accordance to CONGESTION_FLAG value
'''
log_df.cache().groupBy(flag_col).agg(mean(size_col).alias("Mean"), stddev(size_col).alias("Stddev")).\
    withColumn("Variance", pow(col("Stddev"), 2)).show(3, False)

print ('** Script Ended: %s **' % str(datetime.datetime.now()))  # Analysis End Time

如果需要更多信息,请告诉我,我会提供。

谢谢

【问题讨论】:

尝试在选项dateFormat='yyyyMMddHHmmss'link中设置日期格式。顺便说一句,您可以在使用 2.0+ 时使用 spark 内置的 CSV 解析器。 setting date format in options 是什么意思?如何准确使用它?火花对我来说很新。什么是 spark 内置 CSV 解析器?这不是我目前使用的“com.databricks.spark.csv”吗?我的 cmd 是 spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 ./script.py 用法options(header='true', inferschema='true', delimiter='\t', dateFormat='yyyyMMddHHmmss'). 这不是我当前使用的 'com.databricks.spark.csv' 吗? 是相同的代码。但它从 2.0 开始转向激发 sql。最后代码应该是format('csv').options(header='true', inferschema='true', delimiter='\t', dateFormat='yyyyMMddHHmmss')不需要在命令中添加--packages com.databricks:spark-csv_2.11:1.5.0,可以是spark-submit ./script.py 很高兴知道,我会试一试。关于dateFormat,为什么重要?我目前没有在我的脚本中使用时间列 【参考方案1】:

我猜错误的原因是一些“坏”记录。

通过在 CSV 解析选项中添加 mode='DROPMALFORMED', 问题已解决,脚本完成且没有错误。

【讨论】:

以上是关于PySpark 脚本对 1GB 文件成功,对 20GB 文件失败 - java.lang.NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

使用php上传1GB文件

Pyspark:如何在 Yarn 集群上运行作业时对多个文件使用 --files 标签

spark 提交 pyspark 脚本上的纱线投掷超过最大递归深度

使用python对pyspark代码进行单元测试

如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?

PySpark 单元测试方法