Apache Spark - 根据列值添加增量 ID
Posted
技术标签:
【中文标题】Apache Spark - 根据列值添加增量 ID【英文标题】:Apache Spark - Adding a incremental Id in based on a column value 【发布时间】:2020-05-21 14:56:54 【问题描述】:我想根据列值创建一个增量 ID。
例如,如果我有下表
-----------------------
| id | value |
-----------------------
| 3 | a |
| 2 | a |
| 1 | b |
| 4 | b |
| 5 | c |
-----------------------
我想创建一个带有随机或增量标识符的新列,该标识符对于列值是唯一的,如下所示:
-----------------------------------------------
| id | value | new_id |
-----------------------------------------------
| 3 | a | 1 |
| 2 | a | 1 |
| 1 | b | 2 |
| 4 | b | 2 |
| 5 | c | 3 |
-----------------------------------------------
除了使用 distinct 并稍后加入之外,还有其他选择吗?
谢谢!
【问题讨论】:
【参考方案1】:您可以在 Window 中使用 dense_rank() 并按值排序,但是这会将所有数据移动到单个分区,因此对于大型 DataSet 的性能会很差。
val window = Window.orderBy($"value")
df.withColumn("new_id", dense_rank.over(window))
编辑 - 似乎使用虚拟分区可确保将数据打乱到 spark.sql.shuffle.partitions 分区中
val window = Window.partitionBy(lit(0)).orderBy($"value")
【讨论】:
【参考方案2】:一种直接的方法是获取列值的哈希值。这应该是一个无冲突的散列,并且会阻止对整个数据集的扫描。
使用斯卡拉,
val sparkSession = ???
import sparkSession.implicits._
import org.apache.spark.sql.functions._
val df = ???
val dfModified = df.withColumn("new_id", hash(col("value")).cast("string"))
注意,如果你有一组固定的已知值,那么你应该事先创建一个value -> id
映射并使用广播连接或UDF 来放置新的ID。如果你不使用 scala,方法是一样的,你只需要使用不同的散列技术。
希望这会有所帮助,干杯。
【讨论】:
这符合要求,但 UDF 很慢,如果可能最好避免使用。 不,它们不是,它们基本上是map
操作。
***.com/questions/43411234/…
这篇文章只是提到了一个众所周知的事实,即 UDF 大多无法优化。在这种情况下没关系,因为没有复杂的转换,所以有一个string => map
转换。尽管如此,我还是更新了使用内置 hash
函数的答案,该函数再次使用相同的实现。以上是关于Apache Spark - 根据列值添加增量 ID的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Apache-Spark 2.x 中使用 java 进行增量序列
如何在 Apache Spark 中进行增量 MapReduce