Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

Posted

技术标签:

【中文标题】Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行【英文标题】:Spark Scala Delete rows in one RDD based on columns of another RDD 【发布时间】:2017-10-19 21:00:33 【问题描述】:

我对 scala 和 spark 很陌生,不知道如何开始。

我有一个如下所示的 RDD:

1,2,3,11
2,1,4,12
1,4,5,13
3,5,6,12

另一个看起来像这样:

2,1
1,2

我想过滤第一个 RDD,以便删除与第二个 RDD 的前两列匹配的所有行。输出应如下所示:

 1,4,5,13
 3,5,6,12

【问题讨论】:

【参考方案1】:
// input rdds
val rdd1 = spark.sparkContext.makeRDD(Seq((1,2,3,11), (2,1,3,12), (1,4,5,13), (3,5,6,12)))
val rdd2 = spark.sparkContext.makeRDD(Seq((1,2), (2,1)))

// manipulate the 2 rdds as a key, val pair
// the key of the first rdd is a tuple pair of first two fields, the val contains all the fields
// the key of the second rdd is a tuple of first two fields, the val is just null
// then we could perform joins on their key
val rdd1_key = rdd1.map(record => ((record._1, record._2), record))
val rdd2_key = rdd2.map(record => (record, null))

// 1. perform left outer join, the record become (key, (val1, val2))
// 2. filter, keep those records which do not have a join
// if there is no join, val2 will be None, otherwise val2 will be null, which is the value we hardcoded from previous step
// 3. get val1 
rdd1_key.leftOuterJoin(rdd2_key)
  .filter(record => record._2._2 == None)
  .map(record => record._2._1)
  .collect().foreach(println(_))

// result
(1,4,5,13)
(3,5,6,12)

谢谢

【讨论】:

【参考方案2】:

我个人更喜欢dataframe/dataset 方式,因为它们是rdd 的优化形式,并且具有更多内置功能,并且类似于传统数据库。

下面是dataframe方式:

第一步是将rdds 都转换为dataframes

import sqlContext.implicits._
val df1 = rdd1.toDF("col1", "col2", "col3", "col4")
val df2 = rdd2.toDF("col1", "col2")

第二步是在dataframe2 中添加一个新的column 用于过滤条件检查

import org.apache.spark.sql.functions._
val tempdf2 = df2.withColumn("check", lit("check"))

最后一步是join 两个dataframesfilterdrop 不必要的rowscolumns

val finalDF = df1.join(tempdf2, Seq("col1", "col2"), "left")
                          .filter($"check".isNull)
                          .drop($"check")

你应该有最终的dataframe

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|3   |5   |6   |12  |
|1   |4   |5   |13  |
+----+----+----+----+

现在您可以使用finalDF.rdd 转换为rdd,也可以使用dataframe 本身继续进行进一步处理。

希望回答对你有帮助

【讨论】:

以上是关于Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行的主要内容,如果未能解决你的问题,请参考以下文章

Spark Scala:如何转换 DF 中的列

spark:根据另一个 rdd 的序列加入 rdd

scala spark 机器学习初探

Spark / Scala - RDD填充最后一个非空值

Spark RDD API(scala)

从 Scala 上的 Spark RDD 对象构建 RDD LabeledPoint