在 Spark 中从具有不同标头的 CSV 文件形成 DataFrame
Posted
技术标签:
【中文标题】在 Spark 中从具有不同标头的 CSV 文件形成 DataFrame【英文标题】:Forming DataFrames from CSV files with different headers in Spark 【发布时间】:2018-02-15 17:17:12 【问题描述】:我正在尝试读取包含变量列表的 Gzipped CSV(无扩展名)文件夹。例如:
CSV file 1: TIMESTAMP | VAR1 | VAR2 | VAR3
CSV file 2: TIMESTAMP | VAR1 | VAR3
每个文件代表一天。列的顺序可以不同(或者一个文件中可能缺少列)。
使用spark.read
一次性读取整个文件夹的第一个选项被丢弃,因为文件之间的连接考虑了列顺序而不是列名。
我的下一个选项是按文件读取:
for (String key : pathArray)
Dataset<Row> rawData = spark.read().option("header", true).csv(key);
allDatasets.add(rawData);
然后对列名进行完全外连接:
Dataset<Row> data = allDatasets.get(0);
for (int i = 1; i < allDatasets.size(); i++)
ArrayList<String> columns = new
ArrayList(Arrays.asList(data.columns()));
columns.retainAll(new
ArrayList(Arrays.asList(allDatasets.get(i).columns())));
data = data.join(allDatasets.get(i),
JavaConversions.asScalaBuffer(columns), "outer");
但这个过程非常慢,因为它一次加载一个文件。
下一种方法是使用sc.binaryFiles
和sc.readFiles
一样,无法解决添加自定义Hadoop 编解码器的问题(以便能够在没有gz
扩展名的情况下读取Gzipped 文件)。
使用最新的方法并将this code 转换为Java 我有以下内容:
一个JavaPairRDD<String, Iterable<Tuple2<String, String>>>
包含变量的名称 (VAR1
) 和一个可迭代的元组 TIMESTAMP,VALUE
用于该 VAR
。
我想用这个形成一个代表所有文件的 DataFrame,但是我完全不知道如何将这个最终的 PairRDD 转换为一个 Dataframe。 DataFrame 应该代表所有文件的内容。我想要的最终 DataFrame 示例如下:
TIMESTAMP | VAR1 | VAR2 | VAR3
01 32 12 32 ==> Start of contents of file 1
02 10 5 7 ==> End of contents of file 1
03 1 5 ==> Start of contents of file 2
04 4 8 ==> End of contents of file 2
有什么建议或想法吗?
【问题讨论】:
【参考方案1】:最后我得到了非常好的性能:
-
在“后台”中按月读取(使用 Java
Executor
与 CSV 并行读取其他文件夹),通过这种方法,Driver
在扫描每个文件夹时花费的时间减少了,因为它是并行完成的。
接下来,该过程一方面提取标头,另一方面提取其内容(具有 varname、timestamp、value 的元组)。
最后,使用RDD
API 合并内容,并制作带有标题的Dataframe。
【讨论】:
以上是关于在 Spark 中从具有不同标头的 CSV 文件形成 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
在Spark中使用不同标头在DataFrame中导入多个csv
Python Spark-如何将空 DataFrame 输出到 csv 文件(仅输出标头)?