在 Spark Scala 中将一行从一个数据集添加到另一个数据集

Posted

技术标签:

【中文标题】在 Spark Scala 中将一行从一个数据集添加到另一个数据集【英文标题】:Add one row from one Data set to Another Data set in Spark Scala 【发布时间】:2018-05-10 01:54:51 【问题描述】:

有两个DataFrame集,一个是“Training set”,另一个是“Test set”。我想要做的是使用“训练集加上一行测试集”迭代一些算法(让我们调用 AAA,它需要 RDD 输入格式),按照以下步骤。

    合并整个训练集 + 测试集的“第一行”。 使用 1 的数据运行一些算法并得到一些结果。 合并整个训练集 + 测试集的“第二行”。 使用 2 的数据运行一些算法并得到一些结果。 合并整个训练集 + 测试集的“第三行”。 …迭代直到测试集的最后一行。

其实在spark手册中我查过spark中的RDD和DataFrame是不可变的,所以无法使用

Testset.map( x => AAA(Trainset.union(x)) )

另外,我也试过用

Testset.map( x => AAA(Trainset.union(Array(x.get(0).toString.toDouble, x.get(1).toString.toDouble, ... x.get(19).toString.toDouble))

但是,它不起作用:(。是否有任何解决方案可以使上述步骤成为可能?如果您对此问题有一个好主意,请帮助我。

//修改和添加条件

由于耗时问题,我需要使用并行计算。因此,我不能使用“for 循环”。谢谢。

【问题讨论】:

我相信你想要的操作是fold而不是map。然而,TestsetTrainset 都是 DataFrames 的事实意味着这不起作用,afaik,因为你会遇到序列化问题。 您可以将两个数据帧都转换为 rdd 并在测试数据中使用 for 循环并进行计算。简单的。但请记住,您的逻辑不适合分布式/并行计算 【参考方案1】:

不确定这是一个多么好的主意,但如何:

1) 在训练数据帧上创建一个名为 helper 的新列,值为 -1

2) 在测试数据框上创建一个名为 helper 的新列,如下所示:

test.withColumn("helper", monotonically_increasing_id())

3) 将 2) 的输出写入磁盘以确保 ids 永远不会改变

4) Union 1) with 3) read back in,然后缓存/持久化/写入磁盘并读回

5) 编写一个循环过滤联合数据帧并执行逻辑:

val data = unioned.filter($"helper" === lit(-1) || $"helper" === lit(n))
val result = logic(data)

其中 n 是您要循环的值,从 0 开始测试的第一行

【讨论】:

以上是关于在 Spark Scala 中将一行从一个数据集添加到另一个数据集的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 2 Scala 中将 Row 转换为 json

如何在scala spark中将数据框的特定列与另一个列连接[重复]

在 Spark-Scala 中将单个字符串列拆分为多列

如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?

在scala中将Spark Dataframe转换为RDD

如何读取 CSV 文件,然后在 Spark Scala 中将其保存为 JSON?