在 Apache Spark 中拆分 DataFrame

Posted

技术标签:

【中文标题】在 Apache Spark 中拆分 DataFrame【英文标题】:Splitting DataFrames in Apache Spark 【发布时间】:2016-08-15 20:16:51 【问题描述】:

将 Apache Spark 2.0 与 pyspark 结合使用,我有一个包含 1000 行数据的 DataFrame,并希望将该 DataFrame 拆分/切片为 2 个单独的 DataFrame;

第一个 DataFrame 应包含前 750 行 第二个 DataFrame 应该包含剩余的 250 行

注意:随机种子是不够的,因为我打算多次重复这种拆分方法,并希望控制第一个和第二个 DataFrame 使用哪些数据。

我发现 take(n) 方法对于生成第一个结果很有用。 但我似乎找不到正确的方式(或任何方式)来获取第二个 DataFrame。

任何正确方向的指针将不胜感激。

提前致谢。

更新:我现在设法通过再次排序和应用 take(n) 找到了解决方案。不过,这仍然是一个次优的解决方案:

# First DataFrame, simply take the first 750 rows
part1 = spark.createDataFrame(df.take(750))
# Second DataFrame, sort by key descending, then take 250 rows
part2 = spark.createDataFrame(df.rdd.sortByKey(False).toDF().take(250))
# Then reverse the order again, to maintain the original order
part2 = part2.rdd.sortByKey(True).toDF()
# Then rename the columns as they have been reset to "_1" and "_2" by the sorting process
part2 = part2.withColumnRenamed("_1", "label").withColumnRenamed("_2", "features")

【问题讨论】:

【参考方案1】:

您对使用 take 提出质疑是正确的,因为它将数据绘制到驱动程序,然后 createDataFrame 将其重新分配到集群中。这是低效的,如果您的驱动程序没有足够的内存来存储数据,则可能会失败。

这是一个在其上创建行索引列和切片的解决方案:

from pyspark.sql.functions import monotonicallyIncreasingId

idxDf = df.withColumn("idx", monotonicallyIncreasingId())
part1 = idxDf.filter('idx < 750')
part2 = idxDf.filter('idx >= 750')

【讨论】:

以上是关于在 Apache Spark 中拆分 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

火花火车测试拆分

spark之RDD详解----五大特性

将字符串拆分附加到 Pandas DataFrame [关闭]

Spark Window Functions 需要 HiveContext?

在 Spark 上使用 Scala 在 Dataframe 中拆分字符串

使用 spark python 拆分数据帧