迭代多个 CSV 并加入 Spark SQL

Posted

技术标签:

【中文标题】迭代多个 CSV 并加入 Spark SQL【英文标题】:Iterating over multiple CSVs and joining with Spark SQL 【发布时间】:2019-09-19 16:12:23 【问题描述】:

我有几个具有相同标题和相同 ID 的 csv 文件。我正在尝试迭代以将所有文件合并到一个索引“31”。在我的 while 循环中,我试图初始化合并的数据集,以便它可以用于循环的其余部分。在最后一行中,有人告诉我“合并的局部变量可能尚未初始化”。我应该怎么做呢?

SparkSession spark = SparkSession.builder().appName("testSql")
            .master("local[*]")
            .config("spark.sql.warehouse.dir", "file:///c:tmp")
            .getOrCreate();

Dataset<Row> first = spark.read().option("header", true).csv("mypath/01.csv");
Dataset<Row> second = spark.read().option("header", true).csv("mypath/02.csv");
    
IntStream.range(3, 31)
    .forEach(i -> 
        while(i==3) 
            Dataset<Row> merged = first.join(second, first.col("customer_id").equalTo(second.col("customer_id")));
            
        Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
        Dataset<Row> merged  = merged.join(next, merged.col("customer_id").equalTo(next.col("customer_id")));

【问题讨论】:

1.从编译器的角度来看,Dataset&lt;Row&gt; merged = merged... 在初始化之前可能碰巧使用了merged。 2. 你的while 看起来像一个经典的无限循环。 我应该怎么做呢? 【参考方案1】:

已编辑基于 cmets 中的反馈。

按照你的模式,这样的事情会起作用:

Dataset<Row> ds1 = spark.read().option("header", true).csv("mypath/01.csv");
Dataset<?>[] result = ds1;
IntStream.range(2, 31)
    .forEach(i -> 
        Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
        result[0] = result[0].join(next, "customer_id");
    );

我们将 Dataset 包装到一个数组中,以解决 lambda 表达式中对变量捕获的限制。

对于这种特殊情况,更直接的方法是简单地使用 for 循环而不是 stream.forEach

Dataset<Row> result = spark.read().option("header", true).csv("mypath/01.csv");
for( int i = 2 ; i < 31 ; i++ ) 
  Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
  result[0] = result[0].join(next, "customer_id");
;

【讨论】:

这看起来不错,但在最后一行,我收到错误:'在封闭范围中定义的局部变量结果必须是最终或有效最终' 已选择使用常规 for 循环而不是 lambda 表达式,它工作正常^_^ @user3058703 啊! lambda中的变量捕获!完全忘记了,感谢您的反馈!

以上是关于迭代多个 CSV 并加入 Spark SQL的主要内容,如果未能解决你的问题,请参考以下文章

解压多个 *.gz 文件并在 spark scala 中制作一个 csv 文件

如何在 Scala 中使用 Spark SQL 返回多个 JSON 对象

提高 Spark.SQL 中的数据整理性能

如何处理 Spark 中的多个 csv.gz 文件?

如何使用随机列加载多个 csv 文件?

在Spark中使用不同标头在DataFrame中导入多个csv