如何在 spark 数据框中创建唯一的自动生成的 Id 列

Posted

技术标签:

【中文标题】如何在 spark 数据框中创建唯一的自动生成的 Id 列【英文标题】:How to create an unique autogenerated Id column in a spark dataframe 【发布时间】:2019-03-25 15:32:25 【问题描述】:

我有一个数据框,我必须在其中一列中生成一个唯一 ID。此 id 必须使用偏移量生成。 因为,我需要用自动生成的 id 持久化这个数据框,现在如果新数据进入自动生成的 id 不应该与现有的冲突。 我检查了单调递增函数,但它不接受任何 offset 。 这是我尝试过的:

df=df.coalesce(1);
df = df.withColumn(inputCol,functions.monotonically_increasing_id());

但是有没有办法让 monotonically_increasing_id() 从起始偏移量开始?

【问题讨论】:

为什么需要coalesce(1)?这会将所有数据放在一个分区中,这可能会导致 OOM 错误并阻止 spark 并行化任何内容。 与scala相关answer 【参考方案1】:

或者,如果您不想将程序限制在一个只有df.coalesce(1) 的分区中,您可以使用以 index = 0 开头的zipWithIndex 作为下一个:

lines = [["a1", "a2", "a3"],
            ["b1", "b2", "b3"],
            ["c1", "c2", "c3"]]

    cols = ["c1", "c2", "c3"]

    df = spark.createDataFrame(lines, cols)

    start_indx = 10
    df = df.rdd.zipWithIndex() \
           .map(lambda (r, indx): (indx + start_indx, r[0], r[1], r[2])) \
           .toDF(["id", "c1", "c2", "c3"])

    df.show(10, False)

在这种情况下,我设置了start_index = 10。这将是输出:

+---+---+---+---+
|id |c1 |c2 |c3 |
+---+---+---+---+
|10 |a1 |a2 |a3 |
|11 |b1 |b2 |b3 |
|12 |c1 |c2 |c3 |
+---+---+---+---+

【讨论】:

【参考方案2】:

您可以简单地添加它以提供 id 的最小值。请注意,不保证值将从最小值开始

.withColumn("id", monotonically_increasing_id + 123)

说明:+ 的列 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L642 重载了运算符

【讨论】:

如果我将数据帧合并到 1 个分区,它会保证它从最小值开始并遵循一个序列吗? @AyanBiswas In Java .withColumn("id", functions.monotonically_increasing_id().plus(123)) 。至少对于当前的实现,如果只有一个分区,它似乎从 min 开始。如果是这种情况,如果你想自己控制行为,你也可以使用df.mapPartitions(_.zipWithIndex)...(对不起scala)之类的东西 谢谢,我可以确保它在单个分区上运行,所以它也应该与 monotonically_increasing_id 一起使用,对吧? 是的,至少在目前的实现中是这样。根据spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/…,分区号用于高位,分区内的记录号用于低位。如果只有一个分区,高位将为 0,你得到 0,1,2,...【参考方案3】:

您可以向列添加行号,然后将其添加到最大现有标识列或偏移量。设置后删除 rownumber 属性。

from pyspark.sql import functions as sf
from pyspark.sql.window import Window

# Could also grab the exist max ID value
seed_value = 123

df = df.withColumn("row_number", sf.rowNumber().over(Window.partitionBy(sf.col("natural_key")).orderBy(sf.col("anything"))))

df = df.withColumn("id", sf.col("row_number")+seed_value)

记得删除 row_number 属性。

【讨论】:

你希望分区超过一个常数,而不是自然键!

以上是关于如何在 spark 数据框中创建唯一的自动生成的 Id 列的主要内容,如果未能解决你的问题,请参考以下文章

Spark:如何在每个执行程序中创建本地数据帧

在 spark 数据框中创建 StructType 的空列

如何在 pyspark 中创建数据框的副本?

火花在UDF中创建数据框

如何在 Spark 中创建 UDF 以支持自定义谓词

无法在 spark/pyspark 中创建数组文字