对 Spark 数据框中的行进行洗牌
Posted
技术标签:
【中文标题】对 Spark 数据框中的行进行洗牌【英文标题】:Shuffling the rows in a Spark data frame 【发布时间】:2019-06-29 02:28:34 【问题描述】:我正在尝试在数据框中创建一个新列,该列只是现有列的改组版本。我可以使用How to shuffle the rows in a Spark dataframe? 中描述的方法对数据框中的行进行随机排序,但是当我尝试将列的洗牌版本添加到数据框中时,它似乎没有执行洗牌。
import pyspark
import pyspark.sql.functions as F
spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = spark.range(5).toDF("x")
df.show()
#> +---+
#> | x|
#> +---+
#> | 0|
#> | 1|
#> | 2|
#> | 3|
#> | 4|
#> +---+
# the rows appear to be shuffled
ordered_df = df.orderBy(F.rand())
ordered_df.show()
#> +---+
#> | x|
#> +---+
#> | 0|
#> | 2|
#> | 3|
#> | 4|
#> | 1|
#> +---+
# ...but when i try to add this column to the df, they are no longer shuffled
df.withColumn('y', ordered_df.x).show()
#> +---+---+
#> | x| y|
#> +---+---+
#> | 0| 0|
#> | 1| 1|
#> | 2| 2|
#> | 3| 3|
#> | 4| 4|
#> +---+---+
由reprexpy package于 2019-06-28 创建
几点说明:
我想找到一种解决方案,将数据保留在 Spark 中。例如,我不想使用需要将数据移出 JVM 的用户定义函数。 PySpark: Randomize rows in dataframe 中的解决方案对我不起作用(见下文)。df = spark.sparkContext.parallelize(range(5)).map(lambda x: (x, )).toDF(["x"])
df.withColumn('y', df.orderBy(F.rand()).x).show()
#> +---+---+
#> | x| y|
#> +---+---+
#> | 0| 0|
#> | 1| 1|
#> | 2| 2|
#> | 3| 3|
#> | 4| 4|
#> +---+---+
我必须对多列中的行进行洗牌,并且每一列都必须独立于其他列进行洗牌。因此,我不希望在https://***.com/a/45889539 中使用zipWithIndex()
解决方案,因为该解决方案需要我对数据运行许多连接(我假设这会很耗时)。
【问题讨论】:
看执行计划:df.withColumn('y', ordered_df.x).explain()
。 Spark 是惰性的,所以ordered_df
不会被缓存或保存在任何地方。当您调用withColumn
时,它会再次计算。独立地洗牌并不是 spark 设计的好东西。每一行都作为一个原子单元处理 - 这就是允许 spark 跨执行器并行化行操作的原因。我认为没有任何方法可以避免加入。
为什么当我调用df.withColumn('y', ordered_df.x).show()
时必须重新运行排序步骤 (df.orderBy(F.rand())
) 会导致根本没有完成重新排序(或者至少看起来是这样)?换句话说,我不明白为什么 Spark 是懒惰的事实会影响这个案例。
例如,如果你我添加了对ordered_df.cache()
的调用,我仍然会遇到我描述的问题。
我觉得你对惰性执行有一些误解。一个例子是多次运行ordered_df.show()
- 你会看到每次都得到不同的结果。 Spark 不会“保存”任何值 - 只有有关如何创建这些值的说明。
我知道 Spark 在调用某个操作之前实际上不会执行此工作 - 我的观点是,不应该因为我没有得到预期的结果而责怪延迟执行。例如,为什么df.withColumn('y', df.orderBy(F.rand()).x).show()
不将列 (y
) 显示为重新排序?调用了一个动作 (show()
),所以 y
应该是随机排序的,但不是。
【参考方案1】:
您可以使用窗口函数为每一行分配一个随机索引来完成此操作,在单独的 DF 中再次执行此操作,然后加入索引:
>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> df = spark.range(5).toDF("x")
>>> left = df.withColumn("rnd", F.row_number().over(Window.orderBy(F.rand())))
>>> right = df.withColumnRenamed("x", "y").withColumn("rnd", F.row_number().over(Window.orderBy(F.rand())))
>>> dff = left.join(right, left.rnd == right.rnd).drop("rnd")
>>> dff.show()
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+---+
| x| y|
+---+---+
| 3| 3|
| 2| 0|
| 0| 2|
| 1| 1|
| 4| 4|
+---+---+
正如警告所暗示的,这在实践中可能不是一个好主意。
【讨论】:
以上是关于对 Spark 数据框中的行进行洗牌的主要内容,如果未能解决你的问题,请参考以下文章
Spark基于其他数据框中的列对数据框中的列进行重复数据删除
在 spark 中比较数据框中的行,以根据行的比较为列分配值