将索引列添加到现有 Spark 的 DataFrame

Posted

技术标签:

【中文标题】将索引列添加到现有 Spark 的 DataFrame【英文标题】:Add index column to existing Spark's DataFrame 【发布时间】:2016-08-10 14:00:09 【问题描述】:

我使用 Java 使用 Spark 1.5 进行操作。我需要将 ID/Index 列附加到现有的 DataFrame 中,例如:

+---------+--------+
|  surname|    name|
+---------+--------+
|    Green|    Jake|
| Anderson|  Thomas|
| Corleone| Michael|
|    Marsh|   Randy|
|  Montana|    Tony|
|    Green|   Julia|
|Brenneman|    Eady|
|   Durden|   Tyler|
| Corleone|    Vito|
|   Madiro|     Mat|
+---------+--------+

我希望每一行都附加索引,范围在介于 1 和表记录数量之间。索引顺序无关紧要,任何行都必须只包含唯一的 ID/索引。它可以通过转换为RDD并附加索引行并转换为具有修改的StructType的DataFrame来完成,但是,如果我理解正确的话,这个操作会消耗大量的资源用于转换等,并且必须有另一种方法。 结果必须是这样的:

+---------+--------+---+
|  surname|    name| id|
+---------+--------+---+
|    Green|    Jake|  3|
| Anderson|  Thomas|  5|
| Corleone| Michael|  2|
|    Marsh|   Randy| 10|
|  Montana|    Tony|  7|
|    Green|   Julia|  1|
|Brenneman|    Eady|  2|
|   Durden|   Tyler|  9|
| Corleone|    Vito|  4|
|   Madiro|     Mat|  6|
+---------+--------+---+

谢谢。

【问题讨论】:

Primary keys with Apache Spark的可能重复 他提出的第一个解决方案(如果我正确理解 Scala 语法)是转换为 RDD 等。其次 - 我不能在 Java 中调用这个函数,它会生成不是来自所需范围的唯一值,所以唯一可能的解决方案是使用哈希函数,但它有不可接受的缺点。 实际上我的意思是,鉴于您的要求,没有比 rdd -> zipWithIndex 更好的解决方案了。也不包括 Python sn-ps 每一段代码都应该是 Java 兼容的。 【参考方案1】:

我知道这个问题可能是很久以前的问题了,但你可以这样做:

from pyspark.sql.window import Window  
w = Window.orderBy("myColumn") 
withIndexDF = originalDF.withColumn("index", row_number().over(w))
myColumn:数据框中的任何特定列。 originalDF:没有索引列的原始DataFrame。

【讨论】:

在使用没有分区子句的窗口时,将警告所有数据落入单个分区,可能会导致性能大幅下降。【参考方案2】:

在 spark 数据框中最简洁的方法:

.withColumn("idx",monotonically_increasing_id())

完整文档:https://docs.databricks.com/spark/latest/sparkr/functions/withColumn.html

【讨论】:

来自问题:> 我希望每一行都附加索引,范围在 1 和表记录数量之间。来自monotonically_increasing_id()的代码: > 生成的ID保证单调递增且唯一,但不连续。【参考方案3】:

伙计们,这是一个很好的方法:

DataFrame-ified zipWithIndex

从 RDD 模拟 ZipWithIndex 方法......第一个建议表现更好,但到目前为止与纯 Dataframes 解决方案没什么大不了(在我的场景中超过 100M 行表)。

【讨论】:

【参考方案4】:

在 Scala 中,首先我们需要创建一个索引数组:

val indx_arr=(1 to your_df.count.toInt).toArray

indx_arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

现在,我们想将此列附加到我们的数据框中。 首先,我们打开 Dataframe 并将其作为数组获取,然后使用 indx_arr 对其进行压缩,然后将新创建的数组转换回 RDD。最后一步是将其作为 Dataframe 获取:

final_df = sc.parallelize((your_df.collect.map(
    x=>(x(0),x(1))) zip indx_arr).map(
    x=>(x._1._1.toString,x._1._2.toString,x._2))).toDF("surname","name","id")

这也是将任何类型的数组附加到我们的 Spark Dataframe 的简单直接的方法。

【讨论】:

【参考方案5】:

您可以使用 withColumn 函数。用法应该类似于 Val myDF = existingDF.withColumn("index",express(random(1,existingDF.count())

【讨论】:

什么是express Expr 表达表达式

以上是关于将索引列添加到现有 Spark 的 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark SQL 中向现有 Dataframe 添加新列

使用其他现有列 Spark/Scala 添加新列

使用 Spark Scala 使用现有列添加新列

如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]

Spark scala将数据框列复制到新数据框

Spark 根据现有列的映射值创建新列