Spark RDD 上的列操作
Posted
技术标签:
【中文标题】Spark RDD 上的列操作【英文标题】:Column operation on Spark RDDs 【发布时间】:2015-06-05 02:47:31 【问题描述】:我有一个包含许多列(例如数百个)的 RDD,并且我的大部分操作都在列上,例如我需要从不同的列创建许多中间变量。
最有效的方法是什么?
例如,如果我的dataRDD[Array[String]]
如下所示:
123, 523, 534, ..., 893
536, 98, 1623, ..., 98472
537, 89, 83640, ..., 9265
7297, 98364, 9, ..., 735
......
29, 94, 956, ..., 758
我需要创建一个新列或一个变量作为newCol1 = 2ndCol+19thCol
,并基于newCol1
和现有列创建另一个新列:newCol2 = function(newCol1, 34thCol)
。
最好的方法是什么?
我一直在考虑使用索引作为中间变量和dataRDD
,然后将它们连接到索引上进行计算:
var dataRDD = sc.textFile("/test.csv").map(_.split(","))
val dt = dataRDD.zipWithIndex.map(_.swap)
val newCol1 = dataRDD.map(x => x(1)+x(18)).zipWithIndex.map(_.swap)
val newCol2 = newCol1.join(dt).map(x=> function(.........))
有更好的方法吗?
【问题讨论】:
【参考方案1】:为什么不把这一切都做一个:
var dataRDD = sc.textFile("/test.csv").map(_.split(","))
dataRDD.map(x=>
val newCol = x(1) + x(18)
val newCol2 = function(newCol, x(33))
//anything else you need to do
newCol +: newCol2 +: x //This will return the original array with the new columns prepended
//x +: newCol +: newCol2 //Alternatively, this will return the original array with the new columns appended
)
【讨论】:
感谢贾斯汀的回复。我可能误解了你的意思。但是当我尝试像“dataRDD.map(x => val a=x(1)).collect”这样的东西时,我得到了像 Array[Unit] = Array((), (), (), ( ), (), (), (), (), (), ())。我错过了什么吗? 对不起,我猜你对 Scala 不太熟悉。函数中的最后一条语句是返回值。在您列出的情况下,变量赋值的结果是 Unit 或 ()。我已经修改了我的代码,以便更清楚地了解如何完全进行更改 谢谢贾斯汀!是的,我是 Scala 的新手,所以不熟悉一些概念。对不起。 最后一行有错字。应该是 x +: newCol +: newCol2以上是关于Spark RDD 上的列操作的主要内容,如果未能解决你的问题,请参考以下文章