Spark RDD 上的列操作

Posted

技术标签:

【中文标题】Spark RDD 上的列操作【英文标题】:Column operation on Spark RDDs 【发布时间】:2015-06-05 02:47:31 【问题描述】:

我有一个包含许多列(例如数百个)的 RDD,并且我的大部分操作都在列上,例如我需要从不同的列创建许多中间变量。

最有效的方法是什么?

例如,如果我的dataRDD[Array[String]] 如下所示:

123, 523, 534, ..., 893 
536, 98, 1623, ..., 98472 
537, 89, 83640, ..., 9265 
7297, 98364, 9, ..., 735 
...... 
29, 94, 956, ..., 758 

我需要创建一个新列或一个变量作为newCol1 = 2ndCol+19thCol,并基于newCol1 和现有列创建另一个新列:newCol2 = function(newCol1, 34thCol)

最好的方法是什么?

我一直在考虑使用索引作为中间变量和dataRDD,然后将它们连接到索引上进行计算:

var dataRDD = sc.textFile("/test.csv").map(_.split(","))
val dt = dataRDD.zipWithIndex.map(_.swap)
val newCol1 = dataRDD.map(x => x(1)+x(18)).zipWithIndex.map(_.swap)
val newCol2 = newCol1.join(dt).map(x=> function(.........))

有更好的方法吗?

【问题讨论】:

【参考方案1】:

为什么不把这一切都做一个:

var dataRDD = sc.textFile("/test.csv").map(_.split(","))
dataRDD.map(x=>
  val newCol = x(1) + x(18)
  val newCol2 = function(newCol, x(33))
  //anything else you need to do
  newCol +: newCol2 +: x //This will return the original array with the new columns prepended
  //x +: newCol +: newCol2 //Alternatively, this will return the original array with the new columns appended
)

【讨论】:

感谢贾斯汀的回复。我可能误解了你的意思。但是当我尝试像“dataRDD.map(x => val a=x(1)).collect”这样的东西时,我得到了像 Array[Unit] = Array((), (), (), ( ), (), (), (), (), (), ())。我错过了什么吗? 对不起,我猜你对 Scala 不太熟悉。函数中的最后一条语句是返回值。在您列出的情况下,变量赋值的结果是 Unit 或 ()。我已经修改了我的代码,以便更清楚地了解如何完全进行更改 谢谢贾斯汀!是的,我是 Scala 的新手,所以不熟悉一些概念。对不起。 最后一行有错字。应该是 x +: newCol +: newCol2

以上是关于Spark RDD 上的列操作的主要内容,如果未能解决你的问题,请参考以下文章

[Spark快速大数据分析]Spark基础

spark算子 分为3大类

Spark笔记:RDD基本操作(上)

Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

Learning Spark中文版--第三章--RDD编程

spark的Pair RDD的转化操作