将索引列添加到现有 Spark 的 DataFrame
Posted
技术标签:
【中文标题】将索引列添加到现有 Spark 的 DataFrame【英文标题】:Add index column to existing Spark's DataFrame 【发布时间】:2016-08-10 14:00:09 【问题描述】:我使用 Java 使用 Spark 1.5 进行操作。我需要将 ID/Index 列附加到现有的 DataFrame 中,例如:
+---------+--------+
| surname| name|
+---------+--------+
| Green| Jake|
| Anderson| Thomas|
| Corleone| Michael|
| Marsh| Randy|
| Montana| Tony|
| Green| Julia|
|Brenneman| Eady|
| Durden| Tyler|
| Corleone| Vito|
| Madiro| Mat|
+---------+--------+
我希望每一行都附加索引,范围在介于 1 和表记录数量之间。索引顺序无关紧要,任何行都必须只包含唯一的 ID/索引。它可以通过转换为RDD并附加索引行并转换为具有修改的StructType的DataFrame来完成,但是,如果我理解正确的话,这个操作会消耗大量的资源用于转换等,并且必须有另一种方法。 结果必须是这样的:
+---------+--------+---+
| surname| name| id|
+---------+--------+---+
| Green| Jake| 3|
| Anderson| Thomas| 5|
| Corleone| Michael| 2|
| Marsh| Randy| 10|
| Montana| Tony| 7|
| Green| Julia| 1|
|Brenneman| Eady| 2|
| Durden| Tyler| 9|
| Corleone| Vito| 4|
| Madiro| Mat| 6|
+---------+--------+---+
谢谢。
【问题讨论】:
Primary keys with Apache Spark的可能重复 他提出的第一个解决方案(如果我正确理解 Scala 语法)是转换为 RDD 等。其次 - 我不能在 Java 中调用这个函数,它会生成不是来自所需范围的唯一值,所以唯一可能的解决方案是使用哈希函数,但它有不可接受的缺点。 实际上我的意思是,鉴于您的要求,没有比 rdd -> zipWithIndex 更好的解决方案了。也不包括 Python sn-ps 每一段代码都应该是 Java 兼容的。 【参考方案1】:我知道这个问题可能是很久以前的问题了,但你可以这样做:
from pyspark.sql.window import Window
w = Window.orderBy("myColumn")
withIndexDF = originalDF.withColumn("index", row_number().over(w))
myColumn:数据框中的任何特定列。
originalDF:没有索引列的原始DataFrame。
【讨论】:
在使用没有分区子句的窗口时,将警告所有数据落入单个分区,可能会导致性能大幅下降。【参考方案2】:在 spark 数据框中最简洁的方法:
.withColumn("idx",monotonically_increasing_id())
完整文档:https://docs.databricks.com/spark/latest/sparkr/functions/withColumn.html
【讨论】:
来自问题:> 我希望每一行都附加索引,范围在 1 和表记录数量之间。来自monotonically_increasing_id()
的代码: > 生成的ID保证单调递增且唯一,但不连续。【参考方案3】:
伙计们,这是一个很好的方法:
DataFrame-ified zipWithIndex
从 RDD 模拟 ZipWithIndex 方法......第一个建议表现更好,但到目前为止与纯 Dataframes 解决方案没什么大不了(在我的场景中超过 100M 行表)。
【讨论】:
【参考方案4】:在 Scala 中,首先我们需要创建一个索引数组:
val indx_arr=(1 to your_df.count.toInt).toArray
indx_arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
现在,我们想将此列附加到我们的数据框中。
首先,我们打开 Dataframe 并将其作为数组获取,然后使用 indx_arr
对其进行压缩,然后将新创建的数组转换回 RDD。最后一步是将其作为 Dataframe 获取:
final_df = sc.parallelize((your_df.collect.map(
x=>(x(0),x(1))) zip indx_arr).map(
x=>(x._1._1.toString,x._1._2.toString,x._2))).toDF("surname","name","id")
这也是将任何类型的数组附加到我们的 Spark Dataframe 的简单直接的方法。
【讨论】:
【参考方案5】:您可以使用 withColumn 函数。用法应该类似于 Val myDF = existingDF.withColumn("index",express(random(1,existingDF.count())
【讨论】:
什么是express
?
Expr 表达表达式以上是关于将索引列添加到现有 Spark 的 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spark SQL 中向现有 Dataframe 添加新列