避免 Spark 窗口函数中单一分区模式对性能的影响

Posted

技术标签:

【中文标题】避免 Spark 窗口函数中单一分区模式对性能的影响【英文标题】:Avoid performance impact of a single partition mode in Spark window functions 【发布时间】:2016-12-24 13:00:16 【问题描述】:

我的问题是由计算 spark 数据帧中连续行之间差异的用例触发的。

例如,我有:

>>> df.show()
+-----+----------+
|index|      col1|
+-----+----------+
|  0.0|0.58734024|
|  1.0|0.67304325|
|  2.0|0.85154736|
|  3.0| 0.5449719|
+-----+----------+

如果我选择使用“窗口”函数来计算这些,那么我可以这样做:

>>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc())
>>> import pyspark.sql.functions as f
>>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show()
+-----+----------+-----------+
|index|      col1| diffs_col1|
+-----+----------+-----------+
|  0.0|0.58734024|0.085703015|
|  1.0|0.67304325| 0.17850411|
|  2.0|0.85154736|-0.30657548|
|  3.0| 0.5449719|       null|
+-----+----------+-----------+

问题:我明确地将数据帧划分为单个分区。这对性能有什么影响,如果有,为什么会这样,我该如何避免?因为当我不指定分区时,会收到以下警告:

16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

【问题讨论】:

【参考方案1】:

在实践中,性能影响几乎与完全省略 partitionBy 子句相同。所有记录将被打乱到单个分区,在本地排序并按顺序逐一迭代。

区别仅在于总共创建的分区数量。让我们通过一个使用具有 10 个分区和 1000 条记录的简单数据集的示例来说明这一点:

df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42))

如果你定义了没有按子句划分的框架

w_unpart = Window.orderBy(f.col("index").asc())

并与lag一起使用

df_lag_unpart = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
)

总共只有一个分区:

df_lag_unpart.rdd.glom().map(len).collect()
[1000]

与带有虚拟索引的帧定义相比(与您的代码相比简化了一点:

w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc())

将使用等于spark.sql.shuffle.partitions的分区数:

spark.conf.set("spark.sql.shuffle.partitions", 11)

df_lag_part = df.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1")
)

df_lag_part.rdd.glom().count()
11

只有一个非空分区:

df_lag_part.rdd.glom().filter(lambda x: x).count()
1

不幸的是,在 PySpark 中没有可用于解决此问题的通用解决方案。这只是结合分布式处理模型的一种内在机制。

由于index 列是连续的,您可以生成每个块的记录数固定的人工分区键:

rec_per_block  = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions"))

df_with_block = df.withColumn(
    "block", (f.col("index") / rec_per_block).cast("int")
)

并用它来定义框架规范:

w_with_block = Window.partitionBy("block").orderBy("index")

df_lag_with_block = df_with_block.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1")
)

这将使用预期的分区数:

df_lag_with_block.rdd.glom().count()
11

数据分布大致均匀(我们无法避免哈希冲突):

df_lag_with_block.rdd.glom().map(len).collect()
[0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270]

但在块边界上有许多间隙:

df_lag_with_block.where(f.col("diffs_col1").isNull()).count()
12

因为边界很容易计算:

from itertools import chain

boundary_idxs = sorted(chain.from_iterable(
    # Here we depend on sequential identifiers
    # This could be generalized to any monotonically increasing
    # id by taking min and max per block
    (idx - 1, idx) for idx in 
    df_lag_with_block.groupBy("block").min("index")
        .drop("block").rdd.flatMap(lambda x: x)
        .collect()))[2:]  # The first boundary doesn't carry useful inf.

你总是可以选择:

missing = df_with_block.where(f.col("index").isin(boundary_idxs))

并分别填写:

# We use window without partitions here. Since number of records
# will be small this won't be a performance issue
# but will generate "Moving all data to a single partition" warning
missing_with_lag = missing.withColumn(
    "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1")
).select("index", f.col("diffs_col1").alias("diffs_fill"))

join:

combined = (df_lag_with_block
    .join(missing_with_lag, ["index"], "leftouter")
    .withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill")))

得到想要的结果:

mismatched = combined.join(df_lag_unpart, ["index"], "outer").where(
    combined["diffs_col1"] != df_lag_unpart["diffs_col1"]
)
assert mismatched.count() == 0

【讨论】:

你能解释一下 f.lit(0) 的用法吗? 和推论:为什么使用 partitionBy(f.lit(0)) 只会导致一个非空分区 (df_lag_part) 而使用 partitionBy (block) 会导致 8 个非空分区(df_lag_with_block) partitionBy 用于在应用函数之前对数据进行洗牌。由于lit(0) 是常量,所有记录都必须移动到同一个分区。

以上是关于避免 Spark 窗口函数中单一分区模式对性能的影响的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming Dataframe 执行,有状态,分区本地 groupBy,避免洗牌

spark shuffle:分区原理及相关的疑问

重新分区 pyspark 数据帧失败以及如何避免初始分区大小

Spark窗口分区功能需要永远完成

没有 orderBy 的 Spark 窗口函数

Spark 重分区函数:coalesce和repartition区别与实现,可以优化Spark程序性能