在 Spark Scala 中将一行从一个数据集添加到另一个数据集
Posted
技术标签:
【中文标题】在 Spark Scala 中将一行从一个数据集添加到另一个数据集【英文标题】:Add one row from one Data set to Another Data set in Spark Scala 【发布时间】:2018-05-10 01:54:51 【问题描述】:有两个DataFrame集,一个是“Training set”,另一个是“Test set”。我想要做的是使用“训练集加上一行测试集”迭代一些算法(让我们调用 AAA,它需要 RDD 输入格式),按照以下步骤。
-
合并整个训练集 + 测试集的“第一行”。
使用 1 的数据运行一些算法并得到一些结果。
合并整个训练集 + 测试集的“第二行”。
使用 2 的数据运行一些算法并得到一些结果。
合并整个训练集 + 测试集的“第三行”。
…迭代直到测试集的最后一行。
其实在spark手册中我查过spark中的RDD和DataFrame是不可变的,所以无法使用
Testset.map( x => AAA(Trainset.union(x)) )
另外,我也试过用
Testset.map( x => AAA(Trainset.union(Array(x.get(0).toString.toDouble, x.get(1).toString.toDouble, ... x.get(19).toString.toDouble))
但是,它不起作用:(。是否有任何解决方案可以使上述步骤成为可能?如果您对此问题有一个好主意,请帮助我。
//修改和添加条件
由于耗时问题,我需要使用并行计算。因此,我不能使用“for 循环”。谢谢。
【问题讨论】:
我相信你想要的操作是fold
而不是map
。然而,Testset
和 Trainset
都是 DataFrames 的事实意味着这不起作用,afaik,因为你会遇到序列化问题。
您可以将两个数据帧都转换为 rdd 并在测试数据中使用 for 循环并进行计算。简单的。但请记住,您的逻辑不适合分布式/并行计算
【参考方案1】:
不确定这是一个多么好的主意,但如何:
1) 在训练数据帧上创建一个名为 helper 的新列,值为 -1
2) 在测试数据框上创建一个名为 helper 的新列,如下所示:
test.withColumn("helper", monotonically_increasing_id())
3) 将 2) 的输出写入磁盘以确保 ids 永远不会改变
4) Union 1) with 3) read back in,然后缓存/持久化/写入磁盘并读回
5) 编写一个循环过滤联合数据帧并执行逻辑:
val data = unioned.filter($"helper" === lit(-1) || $"helper" === lit(n))
val result = logic(data)
其中 n 是您要循环的值,从 0 开始测试的第一行
【讨论】:
以上是关于在 Spark Scala 中将一行从一个数据集添加到另一个数据集的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark 2 Scala 中将 Row 转换为 json
如何在scala spark中将数据框的特定列与另一个列连接[重复]