Spark上的Scala [自加入后过滤掉重复的行]

Posted

技术标签:

【中文标题】Spark上的Scala [自加入后过滤掉重复的行]【英文标题】:Scala on Spark [filtering out duplicate rows after Self Join] 【发布时间】:2019-01-25 10:47:56 【问题描述】:

我尝试过使用 python 过滤掉数据

|name_x | age_x | salary_x | name_y | age_y | salary_y | age_diff
| James | 23    | 200000   | Jack   | 24    | 210040   |  1
| Jack  | 24    | 210040   | James  | 23    | 200000   |  1
| Irene | 25    | 200012   | John   | 25    | 210000   |  0
| Johny | 26    | 21090    | Elon   | 29    | 210012   |  3
| Josh  | 24    | 21090    | David  | 23    | 213012   |  1
| John  | 25    | 210000   | Irene  | 25    | 200012   |  0

row1row2 也是重复的 row3row6 重复为 name_x == name_y, age_x == age_y, salary_x == salary_y 并且不考虑 age_diff 这是 输出。 我需要将它们过滤掉,[重复行之一]。

需要最终输出。:如下过滤掉重复项

|name_x | age_x | salary_x | name_y | age_y | salary_y | age_diff
| James | 23    | 200000   | Jack   | 24    | 210040   |  1
| Irene | 25    | 200012   | John   | 25    | 210000   |  0
| Johny | 26    | 21090    | Elon   | 29    | 210012   |  3
| Josh  | 24    | 21090    | David  | 23    | 213012   |  1

在python上实现如下,返回重复项的索引,也太慢了。

def duplicate_index(df):
    length = len(df.columns) - 1 # -1 for removing the time difference
    length = length//2
    nrows = df.shape[0]
    duplicate_index = [] 
    for row in range(nrows-1):
        count  = 0
        for frow in range(row+1,nrows):
            if (list(df.iloc[row][:length]) == list(df.iloc[frow][length:-1])):
                if (list(df.iloc[row][length:-1]) == list(df.iloc[frow][:length])):
                    duplicate_index.append(frow)
                    #print(row, frow)
                    count = count + 1
            if count == 1:
                break
    return duplicate_index
del_index = duplicate_index(df)
final_df  = df.drop(index = del_index)

但是现在我必须使用 spark 在 Scala 上执行这些操作,是否有任何更快的方法来接近这些过滤器,或者类似于 python 中的 shift。或 window 在 Scala 上

【问题讨论】:

你能展示原始数据框是如何制作的吗? @Vamsi Prabhala val df = sparkSession.read.format("org.apache.spark.csv").option("header", true).option("inferSchema",true).csv (路径+"data.csv") 问题说你使用了自连接..你能证明这个说法吗? @Vamsi Prabhala val df_Id = dff.join(dff, Seq("ID"), "inner") @Vamsi Prabhala plus 仍然需要与其他两个相同 df 的键进行自我连接。所以创建了三个数据框,它们改变了列名,合并它们。这就是数据框的创建方式。 【参考方案1】:

您可以向仅保留两行之一的联接添加附加条件,例如 name_x

示例数据框:

  val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
    Seq(
      Row(1, "James",  1, 10),
      Row(1, "Jack",   2, 20),
      Row(2, "Tom",    3, 30),
      Row(2, "Eva",    4, 40)
    )
  )

  val schema: StructType = new StructType()
    .add(StructField("id",      IntegerType,  false))
    .add(StructField("name",    StringType,  false))
    .add(StructField("age",     IntegerType, false))
    .add(StructField("salary",  IntegerType, false))

  val df0: DataFrame = spark.createDataFrame(rowsRdd, schema)

  df0.sort("id").show()

这给出了:

+---+-----+---+------+
| id| name|age|salary|
+---+-----+---+------+
|  1|James|  1|    10|
|  1| Jack|  2|    20|
|  2|  Eva|  4|    40|
|  2|  Tom|  3|    30|
+---+-----+---+------+

重命名数据框的列:

val df1 = df0.columns.foldLeft(df0)((acc, x) => acc.withColumnRenamed(x, x+"_x"))
val df2 = df0.columns.foldLeft(df0)((acc, x) => acc.withColumnRenamed(x, x+"_y"))

然后用三个条件进行连接:

val df3 = df1.join(df2,
    col("id_x") === col("id_y") and
    col("name_x") =!= col("name_y") and
    col("name_x") < col("name_y"),
    "inner")
df3.show()

返回

+----+------+-----+--------+----+------+-----+--------+                         
|id_x|name_x|age_x|salary_x|id_y|name_y|age_y|salary_y|
+----+------+-----+--------+----+------+-----+--------+
|   1|  Jack|    2|      20|   1| James|    1|      10|
|   2|   Eva|    4|      40|   2|   Tom|    3|      30|
+----+------+-----+--------+----+------+-----+--------+

根据您在数据中定义重复的方式,区分两个重复的条件会有所不同。

【讨论】:

感谢您的回答。 [这是必需的输出] 但是假设名称相同,或者如果我们有一些相同的属性,它就不起作用。例如:这里用“James”替换“Jack”,我们只得到一列。但它应该返回两列,只删除attribute_x和attribute_y中完全相同的行。 如果名字不是唯一的,我认为你最好在一开始就为每个人提供一个唯一的 ID 并使用它而不是名字。如果可用,唯一 ID 的最佳来源通常是数据源中的主键。【参考方案2】:

我认为 astro_asz 的答案是更简洁的方法,但为了完整起见,这里是使用窗口的方法:

编辑:我更改了数据集,所以两个人有相同的名字,并根据每行的内容添加了一个唯一的 ID

val people = Seq(
  ("1", "James", 23, 200000),
  ("1", "James", 24, 210040),  // two people with same name
  ("2", "Irene", 25, 200012),
  ("2", "John",  25, 210000),
  ("3", "Johny", 26,  21090),
  ("3", "Elon",  29, 200000),
  ("4", "Josh",  24, 200000),
  ("4", "David", 23, 200000))

val columns = Seq("ID", "name", "age", "salary")
val df = people.toDF(columns:_*)

// In general you want to use the primary key from the underlying data store
// as your unique keys.  If for some weird reason the primary key is not 
// available or does not exist, you can try to create your own.  This
// is fraught with danger.  If you are willing to make the (dangerous)
// assumption a unique row is enough to uniquely identify the entity in
// that row, you can use a md5 hash of the contents of the row to create
// your id
val withKey = df.withColumn("key", md5(concat(columns.map(c => col(c)):_*)))

val x = withKey.toDF(withKey.columns.map(c => if (c == "ID") c else "x_" + c):_*)
val y = withKey.toDF(withKey.columns.map(c => if (c == "ID") c else "y_" + c):_*)

val partition = Window.partitionBy("ID").orderBy("x_key")
val df2 = x.join(y, Seq("ID"))
  .where('x_key =!= 'y_key)
  .withColumn("rank", rank over partition)
  .where('rank === 1)
  .drop("rank", "x_key", "y_key")

df2.show
/*-+------+-----+--------+------+-----+--------+                         
|ID|x_name|x_age|x_salary|y_name|y_age|y_salary|
+--+------+-----+--------+------+-----+--------+
| 3|  Elon|   29|  200000| Johny|   26|   21090|
| 1| James|   24|  210040| James|   23|  200000|
| 4| David|   23|  200000|  Josh|   24|  200000|
| 2| Irene|   25|  200012|  John|   25|  210000|
+--+------+-----+--------+------+-----+-------*/

【讨论】:

以上是关于Spark上的Scala [自加入后过滤掉重复的行]的主要内容,如果未能解决你的问题,请参考以下文章

scala 根据具有相同值的 2 列过滤掉连接 df 中的行 - 最佳方式

Spark Window Functions:过滤掉开始和结束日期在另一行开始和结束日期范围内的行

过滤包含Scala Spark数据帧中数组的列中的数组长度[重复]

Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

如何使用过滤器从scala中的数据框中获取包含空值的行集

火花可重复/确定性结果