使用 Scala 在 Spark appln 中 FlatMap 一个可变列表

Posted

技术标签:

【中文标题】使用 Scala 在 Spark appln 中 FlatMap 一个可变列表【英文标题】:FlatMap a mutable list in Spark appln using Scala 【发布时间】:2016-12-07 21:23:33 【问题描述】:

我是 Spark-Scala 开发的新手,并试图弄脏手,所以如果你觉得这个问题很愚蠢,请多多包涵。

Sample dataset

[29430500,1104296400000,1938,F,11,2131,
MutableList([123291654450,1440129600000,100121,0,1440734400000],[234564535,2345129600000,345121,1,14567734400000])
]

如果您看到最后一个字段是 Array[],我希望输出如下所示:-

Row 1:
    [29430500,1104296400000,1938,F,11,2131,
    123291654450,1440129600000,100121,0,1440734400000]

Row 2: 
    [29430500,1104296400000,1938,F,11,2131,
    234564535,2345129600000,345121,1,14567734400000]

我想我必须这样做 flatMap 但由于某种原因,以下代码给出了这个错误:

def getMasterRdd(sc: SparkContext, hiveContext: HiveContext, outputDatabase:String, jobId:String,MasterTableName:String, dataSourceType: DataSourceType, startDate:Long, endDate:Long):RDD[Row]=

val Rdd1= ClassName.getMasterRdd(sc, hiveContext, "xyz", "test123", "xyz.abc", DataSourceType.SS, 1435723200000L, 1451538000000L)
Rdd1: holds the sample dataset

val mapRdd1= Rdd1.map(Row => Row.get(6))
val flatmapRdd1 = mapPatientRdd.flatMap(_.split(","))

当我将鼠标悬停在 (_.split(",")) 上时,我收到一条建议,内容如下:

Type mismatch, expected:(Any) => TraversableOnce[NotInferedU], actual: (Any) =>Any 

【问题讨论】:

请在您的问题中包含确切的错误信息 Error:(29, 53) value split is not a member of Any val flatmapPatientRdd = mapPatientRdd.flatMap(_.split(", ")) 如果您提供更多信息,您将获得更多帮助,即。数据集是什么类型的? Rdd1 是如何构建的? 编辑了原始问题..希望这会有所帮助.. @LazyBones: 伙计,你需要以正确的方式使用不带大写字母的变量名,这有点令人困惑! 【参考方案1】:

我认为有更好的方法来构建它(也许使用元组而不是 Lists)但无论如何这对我有用:

scala>  val myRDD = sc.parallelize(Seq(Seq(29430500L,1104296400000L,1938L,"F",11L,2131L,Seq(Seq(123291654450L,1440129600000L,100121L,0L,1440734400000L),Seq(234564535L,2345129600000L,345121L,1L,14567734400000L)))))
myRDD: org.apache.spark.rdd.RDD[Seq[Any]] = ParallelCollectionRDD[11] at parallelize at <console>:27

scala> :pa
// Entering paste mode (ctrl-D to finish)

val myRDD2 = myRDD.flatMap(row => 
    val (beginning, end) = (row.dropRight(1), row.last)
    end.asInstanceOf[List[List[Any]]].map(beginning++_)
)

// Exiting paste mode, now interpreting.

myRDD2: org.apache.spark.rdd.RDD[Seq[Any]] = MapPartitionsRDD[10] at flatMap at <console>:29

scala> myRDD2.foreachprintln
List(29430500, 1104296400000, 1938, F, 11, 2131, 123291654450, 1440129600000, 100121, 0, 1440734400000)
List(29430500, 1104296400000, 1938, F, 11, 2131, 234564535, 2345129600000, 345121, 1, 14567734400000)

【讨论】:

感谢 @evan058.. 当我添加 val myRDD2 = myRDD.flatMap(row =&gt; val (beginning, end) = (row.dropRight(1), row.last) end.asInstanceOf[List[List[Any]]].map(beginning++_)) 时,它说 无法解析符号 dropRight、last 和 ++ 你的RDD是否在org.apache.spark.rdd.RDD[Seq[Any]]类型的开头?这就是您的示例数据集似乎表明的内容 我明白了,它是类型RDD[Row]。如果你先做一个RDD1.map(_.toSeq),那么我的回答会起作用 嗨@evan058 同样的错误.. 无法解析符号(_.toSeq) 如果RDD1RDD[Row] 类型,那么它应该可以工作,因为Row has the toSeq method。让我知道RDD1 的类型是什么(也许将其添加到原始问题中)。【参考方案2】:

用途:

rdd.flatMap(row => row.getSeq[String](6).map(_.split(","))

【讨论】:

不起作用 val mapRdd1= Rdd1.map(Row =&gt; Row.get(6)) val flatmapRdd1= mapRdd1.flatMap(row =&gt; row.getSeq[String](6).map(_.split(",")) 类型不匹配,预期:(Any) => TraversableOnce[NotInferedU],实际:(Any) =>任意

以上是关于使用 Scala 在 Spark appln 中 FlatMap 一个可变列表的主要内容,如果未能解决你的问题,请参考以下文章

java.lang.NoSuchMethodError: Scala.Predef$.refArrayOps 在 Spark 作业中使用 Scala

使用 Spark 和 Scala 进行字数统计

如何在 Windows 中使用 Scala 将 Cassandra 与 Spark 连接起来

如何使用 Scala 在 Spark 中进行滑动窗口排名?

如何在窗口 scala/spark 中使用 partitionBy 函数

在 Spark 上使用 Scala 在 Dataframe 中拆分字符串