倾斜的窗口函数和 Hive 源分区?

Posted

技术标签:

【中文标题】倾斜的窗口函数和 Hive 源分区?【英文标题】:Skewed Window Function & Hive Source Partitions? 【发布时间】:2019-06-24 17:04:32 【问题描述】:

我通过 Spark 读取的数据是高度倾斜的 Hive 表,具有以下统计信息。

(MIN、25TH、MEDIAN、75TH、MAX)通过 Spark UI:

1506.0 B / 0 232.4 KB / 27288 247.3 KB / 29025 371.0 KB / 42669 269.0 MB / 27197137

我相信当我执行一些Window FuncsPivots 时,它会在工作的下游引起问题。

我尝试探索这个参数来限制分区大小,但是没有任何改变,并且分区在读取时仍然倾斜。

spark.conf.set("spark.sql.files.maxPartitionBytes")

另外,当我使用 Hive 表作为源缓存这个 DF 时,它需要几分钟,甚至在 Spark UI 中导致一些 GC,这很可能也是因为偏斜。

spark.sql.files.maxPartitionBytes 是否适用于 Hive 表或仅适用于文件?

处理这种倾斜的 Hive 源的最佳做法是什么?

写到 parquet 或 Salting 的舞台屏障之类的东西是否适合这个问题?

我想避免读取.repartition(),因为它会在已经数据过山车的作业中添加另一层。

谢谢

================================================ ===

经过进一步研究,Window Function 似乎也导致数据倾斜,这就是Spark Job 挂起的地方。

我正在通过双重Window Function 执行一些time series 填充(向前然后向后填充以估算所有null 传感器读数)并尝试按照本文尝试salt 方法来均匀分布.. . 但是下面的代码会产生所有null 值,所以salt 方法不起作用。

不知道为什么我在Window 之后得到skews,因为我分区的每个度量项在通过.groupBy() 检查后具有大致相同数量的记录......因此为什么需要salt

+--------------------+-------+
|          measure   |  count|
+--------------------+-------+
|    v1              |5030265|
|      v2            |5009780|
|     v3             |5030526|
| v4                 |5030504|
...

盐帖 => https://medium.com/appsflyer/salting-your-spark-to-scale-e6f1c87dd18

nSaltBins = 300 # based off number of "measure" values
df_fill = df_fill.withColumn("salt", (F.rand() * nSaltBins).cast("int"))

# FILLS [FORWARD + BACKWARD]
window = Window.partitionBy('measure')\
               .orderBy('measure', 'date')\
               .rowsBetween(Window.unboundedPreceding, 0)

# FORWARD FILLING IMPUTER
ffill_imputer = F.last(df_fill['new_value'], ignorenulls=True)\
.over(window)
fill_measure_DF = df_fill.withColumn('value_impute_temp', ffill_imputer)\
.drop("value", "new_value")

window = Window.partitionBy('measure')\
               .orderBy('measure', 'date')\
               .rowsBetween(0,Window.unboundedFollowing)

# BACKWARD FILLING IMPUTER
bfill_imputer = F.first(df_fill['value_impute_temp'], ignorenulls=True)\
.over(window)
df_fill = df_fill.withColumn('value_impute_final', bfill_imputer)\
.drop("value_impute_temp")

【问题讨论】:

我相信当我执行一些 Window Funcs 和 Pivots 时,它会导致下游作业出现问题 - 这不太可能。具有较高概率的窗口函数和枢轴是导致性能问题的那些,因为两者都非常昂贵。但是,如果不了解更多有关表属性和执行计划的信息,则无法确定。 可能...但问题出在最终的write 上,该rdd 捆绑了skewed input, window, and pivot transformations 的特定阶段,其中所有数据都在一个分区中,其余分区中是空的,所以工作就挂了。 我也怀疑所有人都理解障碍。但关键是你的散文太少了。 @thePurplePython。你能分享一些代码n表结构吗? @vikrantrana 谢谢你,但我使用 spark 作为处理引擎而不是配置单元......将尝试分享一些东西......倾斜的源是一个问题,但根本原因似乎是带有倾斜的窗口函数分区......这里也处理时间序列数据......我正在尝试salt方法 【参考方案1】:

Hive based solution :

您可以使用配置单元配置启用倾斜连接优化。适用的设置有:

set hive.optimize.skewjoin=true;
set hive.skewjoin.key=500000;
set hive.skewjoin.mapjoin.map.tasks=10000;
set hive.skewjoin.mapjoin.min.split=33554432;

请参阅 databricks 提示:

skew hints 在这种情况下可能会起作用

【讨论】:

倾斜提示是专有的 Databricks 扩展,不是吗?如果我没记错的话,Spark 会忽略 Hive 倾斜选项。 是的,此功能特定于 databricks,尚未针对开源 spark 发布 如果你没事请关注accept answer as owner【参考方案2】:

在单个分区大到无法放入单个执行程序的内存的情况下,加盐可能会有所帮助。即使所有密钥也平均分布(如您的情况),这也可能发生。

您必须在您用于创建窗口的 partitionBy 子句中包含 salt 列。

window = Window.partitionBy('measure', 'salt')\
               .orderBy('measure', 'date')\
               .rowsBetween(Window.unboundedPreceding, 0)

您必须再次创建另一个窗口,该窗口将对中间结果进行操作

window1 = Window.partitionBy('measure')\
                   .orderBy('measure', 'date')\
                   .rowsBetween(Window.unboundedPreceding, 0)

【讨论】:

以上是关于倾斜的窗口函数和 Hive 源分区?的主要内容,如果未能解决你的问题,请参考以下文章

hive关于窗口函数的使用

Hive中的窗口函数

hive函数之~窗口函数与分析函数

Hive 窗口与分析型函数

hive窗口函数极速入门及在拉链表上的运用案例

Hive碎碎念(2):分析函数和窗口函数