Spark:spark-csv 花费的时间太长

Posted

技术标签:

【中文标题】Spark:spark-csv 花费的时间太长【英文标题】:Spark: spark-csv takes too long 【发布时间】:2015-08-28 07:33:49 【问题描述】:

我正在尝试使用 Databricks spark-csv 包和 flights dataset 从 EMR Spark 集群上 S3 上的 CSV 源创建 DataFrame

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('s3n://h2o-airlines-unpacked/allyears.csv')

df.first()

这不会在 4 个m3.xlarges 的集群上终止。我正在寻找从 PySpark 中 S3 上的 CSV 文件创建 DataFrame 的建议。或者,我尝试将文件放在 HDFS 上并从 HFDS 读取,但这也不会终止。该文件不是太大(12 GB)。

【问题讨论】:

如果 spark-csv lib 是 1.2.0+ 版本,您可以尝试将 parserLib 选项设置为 univocity 吗? @rchukh 这不是默认的吗?今天从大师那里建造了罐子。编辑:不,不是。会试试的。 【参考方案1】:

要读取只有 12GB 的行为良好的 csv 文件,您可以将其复制到所有工作人员和驱动程序机器上,然后在“,”上手动拆分。这可能无法解析任何 RFC4180 csv,但它解析了我所拥有的。

在申请集群时,为每个工作线程至少增加 12GB 额外的工作磁盘空间。 使用至少具有 12GB RAM 的机器类型,例如 c3.2xlarge。如果您不打算让集群闲置并负担得起更大的费用,请扩大规模。更大的机器意味着更少的磁盘文件复制开始。我经常在现货市场上看到 c3.8xlarge 低于 0.50 美元/小时。

将文件复制到您的每个工作人员,在每个工作人员的同一目录中。这应该是物理连接的驱动器,即每台机器上的不同物理驱动器。

确保您在驱动程序机器上也有相同的文件和目录。

raw = sc.textFile("/data.csv")

print "Counted %d lines in /data.csv" % raw.count()

raw_fields  = raw.first()
# this regular expression is for quoted fields. i.e. "23","38","blue",...
matchre = r'^"(.*)"$'
pmatchre = re.compile(matchre)

def uncsv_line(line):
    return [pmatchre.match(s).group(1) for s in line.split(',')]

fields = uncsv_line(raw_fields)

def raw_to_dict(raw_line):
    return dict(zip(fields, uncsv_line(raw_line)))

parsedData = (raw
        .map(raw_to_dict)
        .cache()
        )

print "Counted %d parsed lines" % parsedData.count()

parsedData 将是 dicts 的 RDD,其中 dicts 的键是第一行的 CSV 字段名称,值是当前行的 CSV 值。如果您在 CSV 数据中没有标题行,这可能不适合您,但应该清楚的是,您可以覆盖此处读取第一行的代码并手动设置字段。

请注意,这对于创建数据帧或注册 Spark SQL 表并不会立即有用。但是其他的都可以,如果需要dump到spark SQL中,可以进一步提取转换成更好的格式。

我在一个 7GB 的文件上使用它没有任何问题,除了我删除了一些过滤器逻辑来检测有效数据,这些数据具有从解析数据中删除标题的副作用。您可能需要重新实现一些过滤。

【讨论】:

Paul,感谢您的评论,感谢您尝试回答我的问题。但是你会建议一种不同的方法吗?就像先从 HDFS 将 CSV 读入 Hive 并从 Hive 表中创建 DataFrame?鉴于基础设施,将 12 GB 文件作为 DataFrame 读取的最佳方式是什么? 抱歉,我们暂时不使用 HDFS/Hive 基础架构,所以没有意见。 Paul,您能否提供一些有关如何获取 RDD 字典并从中生成 DataFrame 的具体信息? 使用 map 和 lambda 函数将每个 dict 转换为所需值的值元组。手动编写架构。如果您想要每一行的所有值,将解析映射更改为不使用 zip 可能会更容易。我没有运行任何这些,但对我来说似乎是合理的。 如果您只需要快速 SQL,您还可以查看 Google 专有的托管 BigQuery。每 TB 扫描费用为 5 美元,另加每月每 GB 2 美分的存储费用。

以上是关于Spark:spark-csv 花费的时间太长的主要内容,如果未能解决你的问题,请参考以下文章

无法访问此站点花费了太长时间 - Laravel

正则表达式验证花费太长时间 c#

花费太长时间来完成操作并使用大量物理内存

OWB 映射挂起或花费太长时间 - 可能锁定?

sqlite3_prepare_v2 不会返回花费太长时间(慢)和使用内存

python中.mp4到.wav的转换花费了太长时间