Spark:spark-csv 花费的时间太长
Posted
技术标签:
【中文标题】Spark:spark-csv 花费的时间太长【英文标题】:Spark: spark-csv takes too long 【发布时间】:2015-08-28 07:33:49 【问题描述】:我正在尝试使用 Databricks spark-csv 包和 flights dataset 从 EMR Spark 集群上 S3 上的 CSV 源创建 DataFrame
:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('s3n://h2o-airlines-unpacked/allyears.csv')
df.first()
这不会在 4 个m3.xlarge
s 的集群上终止。我正在寻找从 PySpark 中 S3 上的 CSV 文件创建 DataFrame
的建议。或者,我尝试将文件放在 HDFS 上并从 HFDS 读取,但这也不会终止。该文件不是太大(12 GB)。
【问题讨论】:
如果 spark-csv lib 是 1.2.0+ 版本,您可以尝试将parserLib
选项设置为 univocity
吗?
@rchukh 这不是默认的吗?今天从大师那里建造了罐子。编辑:不,不是。会试试的。
【参考方案1】:
要读取只有 12GB 的行为良好的 csv 文件,您可以将其复制到所有工作人员和驱动程序机器上,然后在“,”上手动拆分。这可能无法解析任何 RFC4180 csv,但它解析了我所拥有的。
在申请集群时,为每个工作线程至少增加 12GB 额外的工作磁盘空间。 使用至少具有 12GB RAM 的机器类型,例如c3.2xlarge
。如果您不打算让集群闲置并负担得起更大的费用,请扩大规模。更大的机器意味着更少的磁盘文件复制开始。我经常在现货市场上看到 c3.8xlarge 低于 0.50 美元/小时。
将文件复制到您的每个工作人员,在每个工作人员的同一目录中。这应该是物理连接的驱动器,即每台机器上的不同物理驱动器。
确保您在驱动程序机器上也有相同的文件和目录。
raw = sc.textFile("/data.csv")
print "Counted %d lines in /data.csv" % raw.count()
raw_fields = raw.first()
# this regular expression is for quoted fields. i.e. "23","38","blue",...
matchre = r'^"(.*)"$'
pmatchre = re.compile(matchre)
def uncsv_line(line):
return [pmatchre.match(s).group(1) for s in line.split(',')]
fields = uncsv_line(raw_fields)
def raw_to_dict(raw_line):
return dict(zip(fields, uncsv_line(raw_line)))
parsedData = (raw
.map(raw_to_dict)
.cache()
)
print "Counted %d parsed lines" % parsedData.count()
parsedData 将是 dicts 的 RDD,其中 dicts 的键是第一行的 CSV 字段名称,值是当前行的 CSV 值。如果您在 CSV 数据中没有标题行,这可能不适合您,但应该清楚的是,您可以覆盖此处读取第一行的代码并手动设置字段。
请注意,这对于创建数据帧或注册 Spark SQL 表并不会立即有用。但是其他的都可以,如果需要dump到spark SQL中,可以进一步提取转换成更好的格式。
我在一个 7GB 的文件上使用它没有任何问题,除了我删除了一些过滤器逻辑来检测有效数据,这些数据具有从解析数据中删除标题的副作用。您可能需要重新实现一些过滤。
【讨论】:
Paul,感谢您的评论,感谢您尝试回答我的问题。但是你会建议一种不同的方法吗?就像先从 HDFS 将 CSV 读入 Hive 并从 Hive 表中创建 DataFrame?鉴于基础设施,将 12 GB 文件作为 DataFrame 读取的最佳方式是什么? 抱歉,我们暂时不使用 HDFS/Hive 基础架构,所以没有意见。 Paul,您能否提供一些有关如何获取 RDD 字典并从中生成DataFrame
的具体信息?
使用 map 和 lambda 函数将每个 dict 转换为所需值的值元组。手动编写架构。如果您想要每一行的所有值,将解析映射更改为不使用 zip 可能会更容易。我没有运行任何这些,但对我来说似乎是合理的。
如果您只需要快速 SQL,您还可以查看 Google 专有的托管 BigQuery。每 TB 扫描费用为 5 美元,另加每月每 GB 2 美分的存储费用。以上是关于Spark:spark-csv 花费的时间太长的主要内容,如果未能解决你的问题,请参考以下文章