Spark:scala - 如何将集合从 RDD 转换为另一个 RDD

Posted

技术标签:

【中文标题】Spark:scala - 如何将集合从 RDD 转换为另一个 RDD【英文标题】:Spark: scala - how to convert collection from RDD to another RDD 【发布时间】:2016-06-13 02:03:04 【问题描述】:

如何将调用 take(5) 后返回的集合转换为另一个 RDD,以便将前 5 条记录保存在输出文件中?

如果我使用saveAsTextfile,它不会让我同时使用takesaveAsTextFile(这就是为什么您会看到下面评论的那一行)。它按排序顺序存储来自 RDD 的所有记录,因此前 5 个记录是前 5 个国家,但我只想存储前 5 个记录 - 是否可以在 RDD 中转换集合 [take(5)]?

val Strips =  txtFileLines.map(_.split(","))
                         .map(line => (line(0) + "," + (line(7).toInt + line(8).toInt)))
                         .sortBy(x => x.split(",")(1).trim().toInt, ascending=false)
                         .take(5)
                       //.saveAsTextFile("output\\country\\byStripsBar")

解决方案: sc.parallelize(Strips, 1).saveAsTextFile("output\\country\\byStripsBar")

【问题讨论】:

【参考方案1】:
val rowsArray: Array[Row] = rdd.take(5)
val slicedRdd = sparkContext.parallelize(rowsArray, 1)

slicedRdd.savesTextFile("specify path here")

【讨论】:

【参考方案2】:

除非您绝对需要 saveAsTextFile 格式,否则我只会使用简单的 IO(如 File)将 take(5) 输出打印到文件中。

否则,这里是罗嗦的RDD唯一解决方案:

scala> val rdd = sc.parallelize(5 to 1 by -1 mapx => (x, x*x))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[71] at parallelize at <console>:27

scala> rdd.collect
res1: Array[(Int, Int)] = Array((5,25), (4,16), (3,9), (2,4), (1,1))

scala> val top2 = rdd.sortBy(_._1).zipWithIndex.collectcase x if (x._2 < 2) => x._1
top2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[79] at collect at <console>:29

scala> top2.collect
res2: Array[(Int, Int)] = Array((1,1), (2,4))

【讨论】:

以上是关于Spark:scala - 如何将集合从 RDD 转换为另一个 RDD的主要内容,如果未能解决你的问题,请参考以下文章

在scala中将Spark Dataframe转换为RDD

Scala和Spark,rdd从字典创建数据框

[Spark]-RDD之创建

spark知识点_RDD

修改 Spark RDD foreach 中的集合

Scala Spark 中的 NullPointerException,似乎是由集合类型引起的?