在 Spark 中从具有不同标头的 CSV 文件形成 DataFrame

Posted

技术标签:

【中文标题】在 Spark 中从具有不同标头的 CSV 文件形成 DataFrame【英文标题】:Forming DataFrames from CSV files with different headers in Spark 【发布时间】:2018-02-15 17:17:12 【问题描述】:

我正在尝试读取包含变量列表的 Gzipped CSV(无扩展名)文件夹。例如:

CSV file 1: TIMESTAMP | VAR1 | VAR2 | VAR3

CSV file 2: TIMESTAMP | VAR1 | VAR3

每个文件代表一天。列的顺序可以不同(或者一个文件中可能缺少列)。

使用spark.read 一次性读取整个文件夹的第一个选项被丢弃,因为文件之间的连接考虑了列顺序而不是列名。 我的下一个选项是按文件读取:

 for (String key : pathArray) 
       Dataset<Row> rawData = spark.read().option("header", true).csv(key);
       allDatasets.add(rawData);
    

然后对列名进行完全外连接:

Dataset<Row> data = allDatasets.get(0);
     for (int i = 1; i < allDatasets.size(); i++) 
        ArrayList<String> columns = new 
        ArrayList(Arrays.asList(data.columns()));
        columns.retainAll(new  
        ArrayList(Arrays.asList(allDatasets.get(i).columns())));
        data = data.join(allDatasets.get(i), 
        JavaConversions.asScalaBuffer(columns), "outer");
      

但这个过程非常慢,因为它一次加载一个文件。

下一种方法是使用sc.binaryFilessc.readFiles 一样,无法解决添加自定义Hadoop 编解码器的问题(以便能够在没有gz 扩展名的情况下读取Gzipped 文件)。

使用最新的方法并将this code 转换为Java 我有以下内容:

一个 JavaPairRDD&lt;String, Iterable&lt;Tuple2&lt;String, String&gt;&gt;&gt; 包含变量的名称 (VAR1) 和一个可迭代的元组 TIMESTAMP,VALUE 用于该 VAR

我想用这个形成一个代表所有文件的 DataFrame,但是我完全不知道如何将这个最终的 PairRDD 转换为一个 Dataframe。 DataFrame 应该代表所有文件的内容。我想要的最终 DataFrame 示例如下:

  TIMESTAMP | VAR1 | VAR2 | VAR3 
   01           32      12    32  ==> Start of contents of file 1
   02           10       5     7  ==> End of contents of file 1
   03                    1     5  ==> Start of contents of file 2
   04                    4     8  ==> End of contents of file 2

有什么建议或想法吗?

【问题讨论】:

【参考方案1】:

最后我得到了非常好的性能:

    在“后台”中按月读取(使用 Java Executor 与 CSV 并行读取其他文件夹),通过这种方法,Driver 在扫描每个文件夹时花费的时间减少了,因为它是并行完成的。 接下来,该过程一方面提取标头,另一方面提取其内容(具有 varname、timestamp、value 的元组)。 最后,使用RDD API 合并内容,并制作带有标题的Dataframe。

【讨论】:

以上是关于在 Spark 中从具有不同标头的 CSV 文件形成 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

在Spark中使用不同标头在DataFrame中导入多个csv

选择具有不同标头python的csv / df中的特定列

使用 spark 读取 csv.file 时如何省略标头?

Python Spark-如何将空 DataFrame 输出到 csv 文件(仅输出标头)?

在 java spark 中从 REST API 读取 csv

读取具有不同列顺序的文件