在满足条件之前返回行值的排序行的火花聚合

Posted

技术标签:

【中文标题】在满足条件之前返回行值的排序行的火花聚合【英文标题】:spark aggregation with sorted rows that returns a row's value before a condition is met 【发布时间】:2020-06-24 18:08:12 【问题描述】:

我有一些数据(发票数据)。假设 id ~ dateid 是我的排序依据:

fid, id, due, overdue
  0,  1,   5,       0
  0,  3,   5,       5
  0, 13,   5,      10
  0, 14,   5,       0

  1,  5,   5,       0
  1, 26,   5,       5
  1, 27,   5,       10
  1, 38,   5,       0
    删除任意日期 ID id = 20 下的所有行 group_by fid 并在组内按id 排序 (主要)聚合一个新列overdue_id,即overdue 具有非零值的组中第一行之前的行的id (次要)为每个 fid 填充一行,即使所有行都被 #0 过滤掉了

所以输出将是(给定默认值 null)

fid, overdue_id
  0,          1
  1,       null

因为对于fid = 0,非零overdue 的第一个id 是id = 3,我想输出id-date 之前的行的id,即id = 1

我有group_by('fid').withColumn('overdue_id', ...),想使用aggminwhen 等函数,但之后我不确定,因为我对文档很陌生。

【问题讨论】:

【参考方案1】:

您可以使用以下步骤来解决:

import pyspark.sql.functions as F
from pyspark.sql import *

#added  fid=2 for overdue = 0 condition 
fid = [0,1,2]*4
fid.sort()

dateId = [1,3,13,14,5,26,27,28]
dateId.extend(range(90,95))
due = [5]*12
overdue = [0,5,10,0]*2
overdue.extend([0,0,0,0])

data = zip(fid, dateId, due, overdue)

df = spark.createDataFrame(data, schema =["fid", "dateId", "due", "overdue"])

win = Window.partitionBy(df['fid']).orderBy(df['dateId'])

res = df\
.filter(F.col("dateId")!= 20)\
.withColumn("lag_id", F.lag(F.col("dateId"), 1).over(win))\
.withColumn("overdue_id", F.when(F.col("overdue")!=0, F.col("lag_id")).otherwise(None))\
.groupBy("fid")\
.agg(F.min("overdue_id").alias("min_overdue_id"))

>>> res.show()
+---+--------------+
|fid|min_overdue_id|
+---+--------------+
|  0|             1|
|  1|             5|
|  2|          null|
+---+--------------+

【讨论】:

【参考方案2】:

您需要使用滞后和窗口功能。在我们开始之前,为什么您的示例输出显示 fid 1 为空。第一个非零值是 id 26,所以之前的 id 是 5。所以不应该是 5?除非你需要别的东西,你可以试试这个。

tst=sqlContext.createDataFrame([(0, 1,5,0),(0,20,5,0),(0,30,5,5),(0,13,5,10),(0,14,5,0),(1,5,5,0),(1,26,5,5),(1,27,5,10),(1,38,5,0)],schema=["fid","id","due","overdue"])
# To filter data
tst_f = tst.where('id!=20')
# Define window function
w=Window.partitionBy('fid').orderBy('id')
tst_lag = tst_f.withColumn('overdue_id',F.lag('id').over(w))
# Remove rows with 0 overdue
tst_od = tst_lag.where('overdue!=0')
# Find the row before first non zero overdue
tst_res = tst_od.groupby('fid').agg(F.first('overdue_id').alias('overdue_id'))
tst_res.show()
+---+----------+
|fid|overdue_id|
+---+----------+
|  0|         1|
|  1|         5|
+---+----------+

如果您对使用第一个功能感到厌倦,或者只是为了避免出现重影问题,您可以尝试以下性能昂贵的选项

# Create a copy to avoid ambiguous join and select the minimum from non zero overdue rows
tst_min= tst_od.withColumn("dummy",F.lit('dummy')).groupby('fid').agg(F.min('id').alias('id_min'))
# Join this with the dataframe to get results
tst_join = tst_od.join(tst_min,on=tst_od.id==tst_min.id_min,how='right')
tst_join.show()
+---+---+---+-------+----------+---+------+
|fid| id|due|overdue|overdue_id|fid|id_min|
+---+---+---+-------+----------+---+------+
|  1| 26|  5|      5|         5|  1|    26|
|  0| 13|  5|     10|         1|  0|    13|
+---+---+---+-------+----------+---+------+
# This way you can see all the information

您可以使用 filter() 或 where() 方法从该数据帧中过滤相关信息

【讨论】:

以上是关于在满足条件之前返回行值的排序行的火花聚合的主要内容,如果未能解决你的问题,请参考以下文章

如果满足任何(不是全部)条件,如何执行火花连接

满足条件时更改表格中整行的颜色?

8.聚合函数

R语言data.table导入数据实战: .N函数和.I函数使用.N函数返回行数使用.I函数返回满足特定条件行的索引

excel中if函数若满足条件返回“是”,不满足条件无返回值怎么写

MongoDB 轻松搞定统计 —— 聚合函数使用