迭代多个 CSV 并加入 Spark SQL
Posted
技术标签:
【中文标题】迭代多个 CSV 并加入 Spark SQL【英文标题】:Iterating over multiple CSVs and joining with Spark SQL 【发布时间】:2019-09-19 16:12:23 【问题描述】:我有几个具有相同标题和相同 ID 的 csv 文件。我正在尝试迭代以将所有文件合并到一个索引“31”。在我的 while 循环中,我试图初始化合并的数据集,以便它可以用于循环的其余部分。在最后一行中,有人告诉我“合并的局部变量可能尚未初始化”。我应该怎么做呢?
SparkSession spark = SparkSession.builder().appName("testSql")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///c:tmp")
.getOrCreate();
Dataset<Row> first = spark.read().option("header", true).csv("mypath/01.csv");
Dataset<Row> second = spark.read().option("header", true).csv("mypath/02.csv");
IntStream.range(3, 31)
.forEach(i ->
while(i==3)
Dataset<Row> merged = first.join(second, first.col("customer_id").equalTo(second.col("customer_id")));
Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
Dataset<Row> merged = merged.join(next, merged.col("customer_id").equalTo(next.col("customer_id")));
【问题讨论】:
1.从编译器的角度来看,Dataset<Row> merged = merged...
在初始化之前可能碰巧使用了merged
。 2. 你的while
看起来像一个经典的无限循环。
我应该怎么做呢?
【参考方案1】:
已编辑基于 cmets 中的反馈。
按照你的模式,这样的事情会起作用:
Dataset<Row> ds1 = spark.read().option("header", true).csv("mypath/01.csv");
Dataset<?>[] result = ds1;
IntStream.range(2, 31)
.forEach(i ->
Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
result[0] = result[0].join(next, "customer_id");
);
我们将 Dataset
包装到一个数组中,以解决 lambda 表达式中对变量捕获的限制。
对于这种特殊情况,更直接的方法是简单地使用 for 循环而不是 stream.forEach
:
Dataset<Row> result = spark.read().option("header", true).csv("mypath/01.csv");
for( int i = 2 ; i < 31 ; i++ )
Dataset<Row> next = spark.read().option("header", true).csv("mypath/"+i+".csv");
result[0] = result[0].join(next, "customer_id");
;
【讨论】:
这看起来不错,但在最后一行,我收到错误:'在封闭范围中定义的局部变量结果必须是最终或有效最终' 已选择使用常规 for 循环而不是 lambda 表达式,它工作正常^_^ @user3058703 啊! lambda中的变量捕获!完全忘记了,感谢您的反馈!以上是关于迭代多个 CSV 并加入 Spark SQL的主要内容,如果未能解决你的问题,请参考以下文章
解压多个 *.gz 文件并在 spark scala 中制作一个 csv 文件