对 Spark 数据框中的行进行洗牌

Posted

技术标签:

【中文标题】对 Spark 数据框中的行进行洗牌【英文标题】:Shuffling the rows in a Spark data frame 【发布时间】:2019-06-29 02:28:34 【问题描述】:

我正在尝试在数据框中创建一个新列,该列只是现有列的改组版本。我可以使用How to shuffle the rows in a Spark dataframe? 中描述的方法对数据框中的行进行随机排序,但是当我尝试将列的洗牌版本添加到数据框中时,它似乎没有执行洗牌。

import pyspark
import pyspark.sql.functions as F

spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.range(5).toDF("x")
df.show()
#> +---+
#> |  x|
#> +---+
#> |  0|
#> |  1|
#> |  2|
#> |  3|
#> |  4|
#> +---+

# the rows appear to be shuffled
ordered_df = df.orderBy(F.rand())
ordered_df.show()
#> +---+
#> |  x|
#> +---+
#> |  0|
#> |  2|
#> |  3|
#> |  4|
#> |  1|
#> +---+

# ...but when i try to add this column to the df, they are no longer shuffled
df.withColumn('y', ordered_df.x).show()
#> +---+---+
#> |  x|  y|
#> +---+---+
#> |  0|  0|
#> |  1|  1|
#> |  2|  2|
#> |  3|  3|
#> |  4|  4|
#> +---+---+

由reprexpy package于 2019-06-28 创建

几点说明:

我想找到一种解决方案,将数据保留在 Spark 中。例如,我不想使用需要将数据移出 JVM 的用户定义函数。 PySpark: Randomize rows in dataframe 中的解决方案对我不起作用(见下文)。

df = spark.sparkContext.parallelize(range(5)).map(lambda x: (x, )).toDF(["x"])

df.withColumn('y', df.orderBy(F.rand()).x).show()
#> +---+---+
#> |  x|  y|
#> +---+---+
#> |  0|  0|
#> |  1|  1|
#> |  2|  2|
#> |  3|  3|
#> |  4|  4|
#> +---+---+
我必须对多列中的行进行洗牌,并且每一列都必须独立于其他列进行洗牌。因此,我不希望在https://***.com/a/45889539 中使用zipWithIndex() 解决方案,因为该解决方案需要我对数据运行许多连接(我假设这会很耗时)。

【问题讨论】:

看执行计划:df.withColumn('y', ordered_df.x).explain()。 Spark 是惰性的,所以ordered_df 不会被缓存或保存在任何地方。当您调用withColumn 时,它会再次计算。独立地洗牌并不是 spark 设计的好东西。每一行都作为一个原子单元处理 - 这就是允许 spark 跨执行器并行化行操作的原因。我认为没有任何方法可以避免加入。 为什么当我调用df.withColumn('y', ordered_df.x).show() 时必须重新运行排序步骤 (df.orderBy(F.rand())) 会导致根本没有完成重新排序(或者至少看起来是这样)?换句话说,我不明白为什么 Spark 是懒惰的事实会影响这个案例。 例如,如果你我添加了对ordered_df.cache()的调用,我仍然会遇到我描述的问题。 我觉得你对惰性执行有一些误解。一个例子是多次运行ordered_df.show() - 你会看到每次都得到不同的结果。 Spark 不会“保存”任何值 - 只有有关如何创建这些值的说明。 我知道 Spark 在调用某个操作之前实际上不会执行此工作 - 我的观点是,不应该因为我没有得到预期的结果而责怪延迟执行。例如,为什么df.withColumn('y', df.orderBy(F.rand()).x).show() 不将列 (y) 显示为重新排序?调用了一个动作 (show()),所以 y 应该是随机排序的,但不是。 【参考方案1】:

您可以使用窗口函数为每一行分配一个随机索引来完成此操作,在单独的 DF 中再次执行此操作,然后加入索引:

>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> df = spark.range(5).toDF("x")
>>> left = df.withColumn("rnd", F.row_number().over(Window.orderBy(F.rand())))
>>> right = df.withColumnRenamed("x", "y").withColumn("rnd", F.row_number().over(Window.orderBy(F.rand()))) 
>>> dff = left.join(right, left.rnd == right.rnd).drop("rnd")
>>> dff.show()
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+---+                                                                       
|  x|  y|
+---+---+
|  3|  3|
|  2|  0|
|  0|  2|
|  1|  1|
|  4|  4|
+---+---+

正如警告所暗示的,这在实践中可能不是一个好主意。

【讨论】:

以上是关于对 Spark 数据框中的行进行洗牌的主要内容,如果未能解决你的问题,请参考以下文章

在不计算的情况下获取 Spark 数据框中的行数

Spark基于其他数据框中的列对数据框中的列进行重复数据删除

在 spark 中比较数据框中的行,以根据行的比较为列分配值

R - 如何使用 sparklyr 复制火花数据框中的行

Spark - 如何使用列对数据框中的字符串进行切片[重复]

在 Spark 数据框中聚合时访问窗口外的行