如何使用 scala 根据 spark 中的条件获取 row_number()

Posted

技术标签:

【中文标题】如何使用 scala 根据 spark 中的条件获取 row_number()【英文标题】:How to take row_number() based on a condition in spark with scala 【发布时间】:2020-12-15 14:51:44 【问题描述】:

我有以下数据框 -

+----+-----+---+
| val|count| id|
+----+-----+---+
|   a|   10| m1|
|   b|   20| m1|
|null|   30| m1|
|   b|   30| m2|
|   c|   40| m2|
|null|   50| m2|
+----+-----+---+

创建者-

 val df1=Seq(
 ("a","10","m1"),
 ("b","20","m1"),
 (null,"30","m1"),
 ("b","30","m2"),
 ("c","40","m2"),
 (null,"50","m2")
 )toDF("val","count","id")

我正在尝试在 row_number() 和窗口函数的帮助下进行排名,如下所示。

df1.withColumn("rannk_num", row_number() over Window.partitionBy("id").orderBy("count")).show
+----+-----+---+---------+
| val|count| id|rannk_num|
+----+-----+---+---------+
|   a|   10| m1|        1|
|   b|   20| m1|        2|
|null|   30| m1|        3|
|   b|   30| m2|        1|
|   c|   40| m2|        2|
|null|   50| m2|        3|
+----+-----+---+---------+

但我必须过滤那些列 - val 值为空的记录。

预期输出--

+----+-----+---+---------+
| val|count| id|rannk_num|
+----+-----+---+---------+
|   a|   10| m1|        1|
|   b|   20| m1|        2|
|null|   30| m1|     NULL|
|   b|   30| m2|        1|
|   c|   40| m2|        2|
|null|   50| m2|     NULL|
+----+-----+---+---------+

想知道这是否可以通过最小的更改来实现。 val 和 count 列也可以有 'n' 个值。

【问题讨论】:

【参考方案1】:

用 null val 过滤这些行,为它们分配一个 null 行号,然后联合回原始数据帧。

val df1=Seq(
 ("a","10","m1"),
 ("b","20","m1"),
 (null,"30","m1"),
 ("b","30","m2"),
 ("c","40","m2"),
 (null,"50","m2")
 ).toDF("val","count","id")

df1.filter("val is not null").withColumn(
    "rannk_num", row_number() over Window.partitionBy("id").orderBy("count")
).union(
    df1.filter("val is null").withColumn("rannk_num", lit(null))
).show
+----+-----+---+---------+
| val|count| id|rannk_num|
+----+-----+---+---------+
|   a|   10| m1|        1|
|   b|   20| m1|        2|
|   b|   30| m2|        1|
|   c|   40| m2|        2|
|null|   30| m1|     null|
|null|   50| m2|     null|
+----+-----+---+---------+

【讨论】:

在创建 row_number() 本身时是否在数据帧级别有任何默认过滤机制 不执行过滤,因为 row_number 应该为每一行分配一个行号。

以上是关于如何使用 scala 根据 spark 中的条件获取 row_number()的主要内容,如果未能解决你的问题,请参考以下文章

如何在 if-else 条件下的列中使用 Spark 值 - Scala

如何使用 spark(scala)读取和写入(更新)同一个文件

如何根据 Spark Scala 中其他数据帧中的多列匹配过滤数据帧

如何使用 Scala 从 Spark 中的列表或数组创建行

使用 Scala 在以 Spark 中的列值为条件的广播 Map 上执行查找

scala - Spark:如何在 groupedData 中获取带有条件的结果集