Spark:scala - 如何将集合从 RDD 转换为另一个 RDD
Posted
技术标签:
【中文标题】Spark:scala - 如何将集合从 RDD 转换为另一个 RDD【英文标题】:Spark: scala - how to convert collection from RDD to another RDD 【发布时间】:2016-06-13 02:03:04 【问题描述】:如何将调用 take(5)
后返回的集合转换为另一个 RDD,以便将前 5 条记录保存在输出文件中?
如果我使用saveAsTextfile
,它不会让我同时使用take
和saveAsTextFile
(这就是为什么您会看到下面评论的那一行)。它按排序顺序存储来自 RDD 的所有记录,因此前 5 个记录是前 5 个国家,但我只想存储前 5 个记录 - 是否可以在 RDD 中转换集合 [take(5)]?
val Strips = txtFileLines.map(_.split(","))
.map(line => (line(0) + "," + (line(7).toInt + line(8).toInt)))
.sortBy(x => x.split(",")(1).trim().toInt, ascending=false)
.take(5)
//.saveAsTextFile("output\\country\\byStripsBar")
解决方案:
sc.parallelize(Strips, 1).saveAsTextFile("output\\country\\byStripsBar")
【问题讨论】:
【参考方案1】:val rowsArray: Array[Row] = rdd.take(5)
val slicedRdd = sparkContext.parallelize(rowsArray, 1)
slicedRdd.savesTextFile("specify path here")
【讨论】:
【参考方案2】:除非您绝对需要 saveAsTextFile
格式,否则我只会使用简单的 IO(如 File
)将 take(5)
输出打印到文件中。
否则,这里是罗嗦的RDD
唯一解决方案:
scala> val rdd = sc.parallelize(5 to 1 by -1 mapx => (x, x*x))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[71] at parallelize at <console>:27
scala> rdd.collect
res1: Array[(Int, Int)] = Array((5,25), (4,16), (3,9), (2,4), (1,1))
scala> val top2 = rdd.sortBy(_._1).zipWithIndex.collectcase x if (x._2 < 2) => x._1
top2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[79] at collect at <console>:29
scala> top2.collect
res2: Array[(Int, Int)] = Array((1,1), (2,4))
【讨论】:
以上是关于Spark:scala - 如何将集合从 RDD 转换为另一个 RDD的主要内容,如果未能解决你的问题,请参考以下文章