如何在 Spark 数据框中添加具有序列值的列?

Posted

技术标签:

【中文标题】如何在 Spark 数据框中添加具有序列值的列?【英文标题】:How to add column with sequence value in Spark dataframe? 【发布时间】:2018-08-15 06:07:03 【问题描述】:

如何在 PySpark 数据框中添加具有特定数字的序列值的列?

当前数据集:

Col1    Col2    Flag
Val1    Val2    F
Val3    Val4    T

但我希望数据集是这样的:

Col1    Col2    Flag    New_Col
Val1    Val2    F       11F
Val3    Val4    T       12T

我在 Python 中使用以下代码。

from pyspark.sql import functions as F
from pyspark.sql import types as T

seq = 10

def fn_increment_id(flag):
    global seq
    seq += 1
    return str(seq) + flag

if __name__ == "__main__":
    df = spark.loadFromMapRDB("path/to/table")
    my_udf = F.UserDefinedFunction(fn_increment_id, T.StringType())
    df = df.withColumn("New_Col", my_udf("Flag"))
    print(df.show(10))

但是,我最终得到了结果:

Received Dataset:

Col1    Col2    Flag    New_Col
Val1    Val2    F       11F
Val3    Val4    T       11T

因此,它对所有行都增加了一次。如何增加每一行? 提前致谢。

【问题讨论】:

你有一个列来排序数据框吗? @Shaido,不,我没有。事实上,不需要按 DF 的顺序订购。 所以哪一行得到哪个序列值都没有关系?只要它们不同就可以吗? @Shaido,是的,确切地说,哪一行得到哪个序列值并不重要......值应该不同。另外,如果行是按顺序排序的,请告诉我是否有任何解决方案(尽管在当前项目/场景中不需要这样做)。 【参考方案1】:

可以使用Window 添加具有顺序值的列。只要数据框不太大就可以了,对于较大的数据框,您应该考虑在窗口上使用partitionBy,但值不会是连续的。

以下代码为每一行创建序列号,将其加 10,然后将值与Flag 列连接以创建一个新列。这里的行按Col1 排序,但可以使用任何列。

from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, concat

w = Window().orderBy("Col1")
df = df.withColumn("New_Col", concat(row_number().over(w) + 10, col(Flag)))

【讨论】:

以上是关于如何在 Spark 数据框中添加具有序列值的列?的主要内容,如果未能解决你的问题,请参考以下文章

如何将具有值的列添加到 Spark Java 中的新数据集?

如何在 Spark/Scala 中查找具有许多空值的列

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

Spark 仅获取具有一个或多个空值的列

使用pyspark,spark + databricks时如何将完全不相关的列添加到数据框中

如何在 Spark 中对包含日期和时间值的列进行排序?