PySpark 一次性对分区执行第一个和最后一个函数

Posted

技术标签:

【中文标题】PySpark 一次性对分区执行第一个和最后一个函数【英文标题】:PySpark first and last function over a partition in one go 【发布时间】:2020-02-10 13:41:33 【问题描述】:

我有这样的 pyspark 代码,

spark_df = spark_df.orderBy('id', 'a1', 'c1')
out_df = spark_df.groupBy('id', 'a1', 'a2').agg(
    F.first('c1').alias('c1'),
    F.last('c2').alias('c2'),
    F.first('c3').alias('c3'))

我需要保持数据按照 id、a1 和 c1 的顺序排列。然后在键 id、a1 和 c1 上定义的组上选择如上所示的列。

由于第一个和最后一个不确定性,我将代码更改为这个看起来很丑的代码,它可以工作,但我不确定它是否有效。

w_first = Window.partitionBy('id', 'a1', 'a2').orderBy('c1')
w_last = Window.partitionBy('id', 'a1', 'a2').orderBy(F.desc('c1'))

out_first = spark_df.withColumn('Rank_First', F.rank().over(w_first)).filter(F.col('Rank_First') == 1).drop(
    'Rank_First')
out_last = spark_df.withColumn('Rank_Last', F.rank().over(w_last)).filter(F.col('Rank_First') == 1).drop(
    'Rank_Last')

out_first = out_first.withColumnRenamed('c1', 'First_c1') \
    .withColumnRenamed('c2', 'First_c2') \
    .withColumnRenamed('c3', 'First_c3')

out_last = out_last.withColumnRenamed('c1', 'Last_c1') \
    .withColumnRenamed('c2', 'Last_c2') \
    .withColumnRenamed('c3', 'Last_c3')

out_df = out_first.join(out_last, ['id', 'a1', 'a2']) \
    .select('id', 'a1', 'a2', F.col('First_c1').alias('c1'),
            F.col('Last_c2').alias('c2'),
            F.col('First_c3').alias('c3'))

我正在尝试一种更好、更有效的替代方案。当数据量很大时,我会遇到性能瓶颈。

有没有更好的替代方法,可以一次性在按特定顺序排序的窗口上执行第一个和最后一个。

【问题讨论】:

【参考方案1】:

orderBy 与Window 一起使用时,您需要将帧边界指定为ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING,否则last 函数将仅获得UNBOUNDED PRECEDINGCURRENT ROW 之间的最后一个值(指定order by 时的默认帧边界) .

试试这个:

w = Window.partitionBy('id', 'a1', 'a2').orderBy('c1') \
          .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df = df.withColumn("First_c1", first("c1").over(w)) \
      .withColumn("First_c3", first("c3").over(w)) \
      .withColumn("Last_c2", last("c2").over(w))

df.groupby("id", "a1", "a2")\
  .agg(first("First_c1").alias("c1"),
       first("Last_c2").alias("c2"),
       first("First_c3").alias("c3")
  ).show()

【讨论】:

当分区的数据太大时,我仍然会得到不同的结果。有什么解决办法吗?

以上是关于PySpark 一次性对分区执行第一个和最后一个函数的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 中 JDBC 上的自定义分区

使用 pyspark 对 parquet 文件进行分区和重新分区

如何在pyspark中使用第一个和最后一个函数?

手动选择镶木地板分区与在 pyspark 中过滤它们

一次从多个分区中选择

Pyspark - 从数据框中删除重复项,保持最后一次出现