带有窗口函数的 PySpark 数据偏度

Posted

技术标签:

【中文标题】带有窗口函数的 PySpark 数据偏度【英文标题】:PySpark data skewness with Window Functions 【发布时间】:2020-11-23 11:10:54 【问题描述】:

我有一个巨大的 PySpark 数据框,我正在对我的密钥定义的分区执行一系列窗口函数。

密钥的问题是,我的分区因此而倾斜,并导致事件时间线看起来像这样,

我知道在进行联接时可以使用加盐技术来解决此问题。但是当我使用窗口函数时,我该如何解决这个问题呢?

我在窗口函数中使用了滞后、超前等函数。我不能用加盐键来完成这个过程,因为我会得到错误的结果。

在这种情况下如何解决偏斜?

我正在寻找一种动态的方式来重新分区我的数据框而不会出现偏斜。

根据@jxc 的回答更新

我尝试创建一个示例 df 并尝试在其上运行代码,

df = pd.DataFrame()
df['id'] = np.random.randint(1, 1000, size=150000)
df['id'] = df['id'].map(lambda x: 100 if x % 2 == 0 else x)
df['timestamp'] = pd.date_range(start=pd.Timestamp('2020-01-01'), periods=len(df), freq='60s')
sdf = sc.createDataFrame(df)
sdf = sdf.withColumn("amt", F.rand()*100)
w = Window.partitionBy("id").orderBy("timestamp")

sdf = sdf.withColumn("new_col", F.lag("amt").over(w) + F.lead("amt").over(w))
x = sdf.toPandas()

这给了我一个这样的事件时间表,

我尝试了@jxc 答案中的代码,

sdf = sc.createDataFrame(df)
sdf = sdf.withColumn("amt", F.rand()*100)

N = 24*3600*365*2
sdf_1 = sdf.withColumn('pid', F.ceil(F.unix_timestamp('timestamp')/N))

w1 = Window.partitionBy('id', 'pid').orderBy('timestamp')
w2 = Window.partitionBy('id', 'pid')

sdf_2 = sdf_1.select(
    '*',
    F.count('*').over(w2).alias('cnt'),
    F.row_number().over(w1).alias('rn'),
    (F.lag('amt',1).over(w1) + F.lead('amt',1).over(w1)).alias('new_val')
)

sdf_3 = sdf_2.filter('rn in (1, 2, cnt-1, cnt)') \
    .withColumn('new_val', F.lag('amt',1).over(w) + F.lead('amt',1).over(w)) \
    .filter('rn in (1,cnt)')
    
df_new = sdf_2.filter('rn not in (1,cnt)').union(sdf_3)

x = df_new.toPandas()

我结束了一个额外的阶段,事件时间线看起来更加倾斜,

新代码也增加了运行时间

【问题讨论】:

如果预先知道倾斜键,您可以在其上拆分数据框,分别计算倾斜键的窗口函数,然后union 结果。 恐怕,Skewness 是事先不知道的。它可以不时改变。例如,对于特定的数据集,它们的键不会带来任何偏斜。但是对于其他一些数据(相同的列但更多的行);它们的键可能会导致偏斜 只要知道哪些键容易出现偏差,仍然可以独立拆分数据帧和进程集,不是吗? 我知道哪些键会带来偏斜,但不知道对于给定的数据集是否会存在。我同意,我仍然可以拆分和合并数据。但我不知道如何以这样一种方式进行拆分,即分区不会在拆分的数据帧中倾斜。 你是对的,现在我也认为它可能对窗口滞后/超前功能无效,因为它们仍然需要单分区交换。对不起... 【参考方案1】:

要处理大分区,您可以尝试根据 orderBy 列(很可能是可以转换为数字的数字列或日期/时间戳列)拆分它,以便所有新的子分区保持正确的行顺序。使用新的分区器处理行并使用laglead 函数进行计算,只需要对子分区之间边界周围的行进行后处理。 (下面也讨论了如何在task-2中合并小分区)

使用您的示例 sdf 并假设我们有以下 WinSpec 和一个简单的聚合函数:

w = Window.partitionBy('id').orderBy('timestamp')
df.withColumn('new_amt', F.lag('amt',1).over(w) + F.lead('amt',1).over(w))

Task-1:拆分大分区:

尝试以下方法:

    选择一个N来拆分timestamp并设置一个额外的partitionBy列pid(使用ceilint、@987654333 @等):

    # N to cover 35-days' intervals
    N = 24*3600*35
    df1 = sdf.withColumn('pid', F.ceil(F.unix_timestamp('timestamp')/N))
    

    pid 添加到 partitionBy(参见 w1),然后在 w1 上调用 row_number()lag()lead()。还可以查找每个新分区中的行数 (cnt),以帮助识别分区的结尾 (rn == cnt)。生成的 new_val 将适用于大多数行,每个分区边界上的行除外。

    w1 = Window.partitionBy('id', 'pid').orderBy('timestamp')
    w2 = Window.partitionBy('id', 'pid')
    
    df2 = df1.select(
        '*',
        F.count('*').over(w2).alias('cnt'),
        F.row_number().over(w1).alias('rn'),
        (F.lag('amt',1).over(w1) + F.lead('amt',1).over(w1)).alias('new_amt')
    )
    

    下面是一个示例df2,显示了边界行。

    处理边界:选择边界上的行rn in (1, cnt)加上那些在计算rn in (2, cnt-1)中使用的值,对w进行同样的new_val计算strong> 并仅保存边界行的结果。

    df3 = df2.filter('rn in (1, 2, cnt-1, cnt)') \
        .withColumn('new_amt', F.lag('amt',1).over(w) + F.lead('amt',1).over(w)) \
        .filter('rn in (1,cnt)')
    

    下面显示了从上面 df2

    得到的 df3

    df3 合并回 df2 以更新边界行rn in (1,cnt)

    df_new = df2.filter('rn not in (1,cnt)').union(df3)
    

    下面的屏幕截图显示了边界行周围的最终 df_new

    # drop columns which are used to implement logic only
    df_new = df_new.drop('cnt', 'rn')
    

一些注意事项:

    定义了以下3个WindowSpec:

    w = Window.partitionBy('id').orderBy('timestamp')          <-- fix boundary rows
    w1 = Window.partitionBy('id', 'pid').orderBy('timestamp')  <-- calculate internal rows
    w2 = Window.partitionBy('id', 'pid')                       <-- find #rows in a partition
    

    注意:严格来说,我们最好使用以下w 来修复边界行,以避免在边界周围绑定timestamp 出现问题。

    w = Window.partitionBy('id').orderBy('pid', 'rn')          <-- fix boundary rows
    

    如果您知道哪些分区是倾斜的,只需划分它们并跳过其他分区。如果分布稀疏,现有方法可能会将一个小分区分成 2 个甚至更多

    df1 = df.withColumn('pid', F.when(F.col('id').isin('a','b'), F.ceil(F.unix_timestamp('timestamp')/N)).otherwise(1))
    

    如果对于每个分区,您可以检索count(行数)和min_ts=min(时间戳),然后为pid(低于M 是阈值行数)尝试更动态的方法分裂):

    F.expr(f"IF(count>M, ceil((unix_timestamp(timestamp)-unix_timestamp(min_ts))/N), 1)")
    

    注意:对于分区内的偏斜,将需要更复杂的函数来生成pid

    如果仅使用lag(1) 函数,则只需对左边界进行后处理,按rn in (1, cnt) 过滤并仅更新rn == 1

    df3 = df1.filter('rn in (1, cnt)') \
        .withColumn('new_amt', F.lag('amt',1).over(w)) \
        .filter('rn = 1')
    

    当我们只需要修复右边界并更新rn == cnt时,类似于lead函数

    如果只使用lag(2),则使用df3过滤和更新更多行:

    df3 = df1.filter('rn in (1, 2, cnt-1, cnt)') \
        .withColumn('new_amt', F.lag('amt',2).over(w)) \
        .filter('rn in (1,2)')
    

    您可以将相同的方法扩展到 laglead 具有不同偏移量的混合情况。

Task-2:合并小分区:

根据一个分区count的记录数,我们可以设置一个阈值M,这样如果count&gt;M,则id持有自己的分区,否则我们合并分区所以总记录的#of 小于M(下面的方法有一个边缘情况2*M-2)。

M = 20000

# create pandas df with columns `id`, `count` and `f`, sort rows so that rows with count>=M are located on top
d2 = pd.DataFrame([ e.asDict() for e in sdf.groupby('id').count().collect() ]) \
    .assign(f=lambda x: x['count'].lt(M)) \
    .sort_values('f')    

# add pid column to merge smaller partitions but the total row-count in partition should be less than or around M 
# potentially there could be at most `2*M-2` records for the same pid, to make sure strictly count<M, use a for-loop to iterate d1 and set pid:
d2['pid'] = (d2.mask(d2['count'].gt(M),M)['count'].shift(fill_value=0).cumsum()/M).astype(int)

# add pid to sdf. In case join is too heavy, try using Map
sdf_1 = sdf.join(spark.createDataFrame(d2).alias('d2'), ["id"]) \
    .select(sdf["*"], F.col("d2.pid"))

# check pid: # of records and # of distinct ids
sdf_1.groupby('pid').agg(F.count('*').alias('count'), F.countDistinct('id').alias('cnt_ids')).orderBy('pid').show()
+---+-----+-------+                                                             
|pid|count|cnt_ids|
+---+-----+-------+
|  0|74837|      1|
|  1|20036|    133|
|  2|20052|    134|
|  3|20010|    133|
|  4|15065|    100|
+---+-----+-------+

现在,新的 Window 应该被单独的 pid 分区,并将 id 移动到 orderBy,如下所示:

w3 = Window.partitionBy('pid').orderBy('id','timestamp')

根据上述w3 WinSpec自定义lag/lead函数,然后计算new_val

lag_w3  = lambda col,n=1: F.when(F.lag('id',n).over(w3) == F.col('id'), F.lag(col,n).over(w3))
lead_w3 = lambda col,n=1: F.when(F.lead('id',n).over(w3) == F.col('id'), F.lead(col,n).over(w3))

sdf_new = sdf_1.withColumn('new_val', lag_w3('amt',1) + lead_w3('amt',1))

【讨论】:

感谢您的详细回答。我将通过通读并在我身边进行实验来确保我完全理解它。 我想知道是否可以在任何其他操作上完成类似的操作。例如,max、min、when(sql函数)在windowspec上应用? max, min 不适用于此方法,您可能需要分别对它们进行预处理(拆分分区,计数,然后求和),然后将这些值连接回来。 @SreeramTP,我建议的方法是处理无法加载到任何执行程序内存中的非常大的分区,从而导致 OOM 问题。如果您的数据都很小,除了 3-4 个相对较大的分区。这肯定会矫枉过正。正如我在Notes item-(2)中提到的,我们可以只在那些大分区中添加变量pid,例如,为number_of_rows_in_partition设置一个阈值,并找到min(timestamp),以便子-可以切割分区以减少潜在的小第一个子分区等。 此外,这个答案提供了更多的方法,而不是针对所有潜在情况的解决方案。您可能需要进行一些手动调整以适合您的实际数据。任何自动化设置都会产生一些成本。例如,如果您想根据日期范围和每个分区中的行数等自动查找N。我在考虑更多关于大数据的偏度问题,以避免加入、分组等,但您可能有不同的关注。我会在中午或晚上在我的时间检查这个..【参考方案2】:

要处理这种倾斜的数据,您可以尝试一些方法。

    如果您使用 Databricks 来运行您的作业,并且您知道哪一列会出现倾斜,那么您可以尝试一个名为 skew hint 的选项

    我建议迁移到 Spark 3.0,因为您可以选择使用 Adaptive Query Execution (AQE),它可以处理大多数问题,改善您的工作健康并可能更快地运行它们.

    通常,我建议在任何宽操作之前使您的数据更均匀大小的分区,增加集群大小确实有帮助,但我不确定这是否适合您。

【讨论】:

我没有使用数据块。我正在使用cloudera。我使用的是较旧的 spark 版本,所以 aqe 不适合我。如何使我的数据在大小均匀的分区中?您的意思是根据数据的大小对数据进行重新分区并将其均匀分布在各个分区中吗? 您当前的集群配置和数据负载大小和格式是什么?您可以将数据以 parquet 格式写入中间位置,最好对它们进行重新分区,这样您的部分文件每个大约不到 GB。 集群很大。 128 个执行器,每个执行器 32gb。我正在从已经根据大小分区的配置单元加载数据。我尝试了不同的 spark 并行性,发现 1000 个适用于我的数据和 spark 配置的作品。因此,我在某些列上从 hive、partitionBy 加载数据,并在某些列上找到滞后、领先等。对于某些数据集,我的键没有倾斜,我可以在几秒钟内得到结果。但是当出现倾斜时,我必须等待几分钟。通过结果,我的意思是在使用窗口函数计算所需列之后写回配置单元。

以上是关于带有窗口函数的 PySpark 数据偏度的主要内容,如果未能解决你的问题,请参考以下文章

PySpark - 窗口函数导致新列

如何在 PySpark 中使用窗口函数?

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

pyspark 使用过滤器应用 DataFrame 窗口函数

如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数

使用窗口函数计算 PySpark 中的累积和