在 Apache Spark 中拆分 DataFrame
Posted
技术标签:
【中文标题】在 Apache Spark 中拆分 DataFrame【英文标题】:Splitting DataFrames in Apache Spark 【发布时间】:2016-08-15 20:16:51 【问题描述】:将 Apache Spark 2.0 与 pyspark 结合使用,我有一个包含 1000 行数据的 DataFrame,并希望将该 DataFrame 拆分/切片为 2 个单独的 DataFrame;
第一个 DataFrame 应包含前 750 行 第二个 DataFrame 应该包含剩余的 250 行注意:随机种子是不够的,因为我打算多次重复这种拆分方法,并希望控制第一个和第二个 DataFrame 使用哪些数据。
我发现 take(n) 方法对于生成第一个结果很有用。 但我似乎找不到正确的方式(或任何方式)来获取第二个 DataFrame。
任何正确方向的指针将不胜感激。
提前致谢。
更新:我现在设法通过再次排序和应用 take(n) 找到了解决方案。不过,这仍然是一个次优的解决方案:
# First DataFrame, simply take the first 750 rows
part1 = spark.createDataFrame(df.take(750))
# Second DataFrame, sort by key descending, then take 250 rows
part2 = spark.createDataFrame(df.rdd.sortByKey(False).toDF().take(250))
# Then reverse the order again, to maintain the original order
part2 = part2.rdd.sortByKey(True).toDF()
# Then rename the columns as they have been reset to "_1" and "_2" by the sorting process
part2 = part2.withColumnRenamed("_1", "label").withColumnRenamed("_2", "features")
【问题讨论】:
【参考方案1】:您对使用 take 提出质疑是正确的,因为它将数据绘制到驱动程序,然后 createDataFrame 将其重新分配到集群中。这是低效的,如果您的驱动程序没有足够的内存来存储数据,则可能会失败。
这是一个在其上创建行索引列和切片的解决方案:
from pyspark.sql.functions import monotonicallyIncreasingId
idxDf = df.withColumn("idx", monotonicallyIncreasingId())
part1 = idxDf.filter('idx < 750')
part2 = idxDf.filter('idx >= 750')
【讨论】:
以上是关于在 Apache Spark 中拆分 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
将字符串拆分附加到 Pandas DataFrame [关闭]
Spark Window Functions 需要 HiveContext?