如何在 spark 数据框中创建唯一的自动生成的 Id 列
Posted
技术标签:
【中文标题】如何在 spark 数据框中创建唯一的自动生成的 Id 列【英文标题】:How to create an unique autogenerated Id column in a spark dataframe 【发布时间】:2019-03-25 15:32:25 【问题描述】:我有一个数据框,我必须在其中一列中生成一个唯一 ID。此 id 必须使用偏移量生成。 因为,我需要用自动生成的 id 持久化这个数据框,现在如果新数据进入自动生成的 id 不应该与现有的冲突。 我检查了单调递增函数,但它不接受任何 offset 。 这是我尝试过的:
df=df.coalesce(1);
df = df.withColumn(inputCol,functions.monotonically_increasing_id());
但是有没有办法让 monotonically_increasing_id() 从起始偏移量开始?
【问题讨论】:
为什么需要coalesce(1)
?这会将所有数据放在一个分区中,这可能会导致 OOM 错误并阻止 spark 并行化任何内容。
与scala相关answer
【参考方案1】:
或者,如果您不想将程序限制在一个只有df.coalesce(1)
的分区中,您可以使用以 index = 0 开头的zipWithIndex
作为下一个:
lines = [["a1", "a2", "a3"],
["b1", "b2", "b3"],
["c1", "c2", "c3"]]
cols = ["c1", "c2", "c3"]
df = spark.createDataFrame(lines, cols)
start_indx = 10
df = df.rdd.zipWithIndex() \
.map(lambda (r, indx): (indx + start_indx, r[0], r[1], r[2])) \
.toDF(["id", "c1", "c2", "c3"])
df.show(10, False)
在这种情况下,我设置了start_index = 10
。这将是输出:
+---+---+---+---+
|id |c1 |c2 |c3 |
+---+---+---+---+
|10 |a1 |a2 |a3 |
|11 |b1 |b2 |b3 |
|12 |c1 |c2 |c3 |
+---+---+---+---+
【讨论】:
【参考方案2】:您可以简单地添加它以提供 id 的最小值。请注意,不保证值将从最小值开始
.withColumn("id", monotonically_increasing_id + 123)
说明:+
的列 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L642 重载了运算符
【讨论】:
如果我将数据帧合并到 1 个分区,它会保证它从最小值开始并遵循一个序列吗? @AyanBiswas In Java.withColumn("id", functions.monotonically_increasing_id().plus(123))
。至少对于当前的实现,如果只有一个分区,它似乎从 min 开始。如果是这种情况,如果你想自己控制行为,你也可以使用df.mapPartitions(_.zipWithIndex)...
(对不起scala)之类的东西
谢谢,我可以确保它在单个分区上运行,所以它也应该与 monotonically_increasing_id 一起使用,对吧?
是的,至少在目前的实现中是这样。根据spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/…,分区号用于高位,分区内的记录号用于低位。如果只有一个分区,高位将为 0,你得到 0,1,2,...【参考方案3】:
您可以向列添加行号,然后将其添加到最大现有标识列或偏移量。设置后删除 rownumber 属性。
from pyspark.sql import functions as sf
from pyspark.sql.window import Window
# Could also grab the exist max ID value
seed_value = 123
df = df.withColumn("row_number", sf.rowNumber().over(Window.partitionBy(sf.col("natural_key")).orderBy(sf.col("anything"))))
df = df.withColumn("id", sf.col("row_number")+seed_value)
记得删除 row_number 属性。
【讨论】:
你希望分区超过一个常数,而不是自然键!以上是关于如何在 spark 数据框中创建唯一的自动生成的 Id 列的主要内容,如果未能解决你的问题,请参考以下文章