Apache Spark - 根据列值添加增量 ID

Posted

技术标签:

【中文标题】Apache Spark - 根据列值添加增量 ID【英文标题】:Apache Spark - Adding a incremental Id in based on a column value 【发布时间】:2020-05-21 14:56:54 【问题描述】:

我想根据列值创建一个增量 ID。

例如,如果我有下表

-----------------------
| id |   value    |
-----------------------
| 3  |    a       |
| 2  |    a       | 
| 1  |    b       |
| 4  |    b       |
| 5  |    c       |
-----------------------

我想创建一个带有随机或增量标识符的新列,该标识符对于列值是唯一的,如下所示:

-----------------------------------------------
| id |   value    |    new_id    |
-----------------------------------------------
| 3  |    a       |     1        | 
| 2  |    a       |     1        |
| 1  |    b       |     2        |
| 4  |    b       |     2        |
| 5  |    c       |     3        |
-----------------------------------------------

除了使用 distinct 并稍后加入之外,还有其他选择吗?

谢谢!

【问题讨论】:

【参考方案1】:

您可以在 Window 中使用 dense_rank() 并按值排序,但是这会将所有数据移动到单个分区,因此对于大型 DataSet 的性能会很差。

val window =   Window.orderBy($"value")

df.withColumn("new_id", dense_rank.over(window))

编辑 - 似乎使用虚拟分区可确保将数据打乱到 spark.sql.shuffle.partitions 分区中

val window =   Window.partitionBy(lit(0)).orderBy($"value")

【讨论】:

【参考方案2】:

一种直接的方法是获取列值的哈希值。这应该是一个无冲突的散列,并且会阻止对整个数据集的扫描。

使用斯卡拉,

val sparkSession = ???
import sparkSession.implicits._
import org.apache.spark.sql.functions._

val df = ???
val dfModified = df.withColumn("new_id", hash(col("value")).cast("string"))

注意,如果你有一组固定的已知值,那么你应该事先创建一个value -> id 映射并使用广播连接或UDF 来放置新的ID。如果你不使用 scala,方法是一样的,你只需要使用不同的散列技术。

希望这会有所帮助,干杯。

【讨论】:

这符合要求,但 UDF 很慢,如果可能最好避免使用。 不,它们不是,它们基本上是map 操作。 ***.com/questions/43411234/… 这篇文章只是提到了一个众所周知的事实,即 UDF 大多无法优化。在这种情况下没关系,因为没有复杂的转换,所以有一个string => map 转换。尽管如此,我还是更新了使用内置 hash 函数的答案,该函数再次使用相同的实现。

以上是关于Apache Spark - 根据列值添加增量 ID的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Apache-Spark 2.x 中使用 java 进行增量序列

如何在 Apache Spark 中进行增量 MapReduce

根据最大 Spark Scala 替换列值

在脚本mysql中重置自动增量列值

根据列值有效地从宽 Spark Dataframe 中删除列

如果列值取决于文件路径,有没有办法在一次读取多个文件时将文字作为列添加到 spark 数据框中?