在满足条件之前返回行值的排序行的火花聚合
Posted
技术标签:
【中文标题】在满足条件之前返回行值的排序行的火花聚合【英文标题】:spark aggregation with sorted rows that returns a row's value before a condition is met 【发布时间】:2020-06-24 18:08:12 【问题描述】:我有一些数据(发票数据)。假设 id ~ date
和 id
是我的排序依据:
fid, id, due, overdue
0, 1, 5, 0
0, 3, 5, 5
0, 13, 5, 10
0, 14, 5, 0
1, 5, 5, 0
1, 26, 5, 5
1, 27, 5, 10
1, 38, 5, 0
-
删除任意日期 ID
id = 20
下的所有行
group_by fid
并在组内按id
排序
(主要)聚合一个新列overdue_id
,即overdue
具有非零值的组中第一行之前的行的id
(次要)为每个 fid
填充一行,即使所有行都被 #0 过滤掉了
所以输出将是(给定默认值 null)
fid, overdue_id
0, 1
1, null
因为对于fid = 0
,非零overdue
的第一个id 是id = 3
,我想输出id-date
之前的行的id,即id = 1
。
我有group_by('fid').withColumn('overdue_id', ...)
,想使用agg
、min
、when
等函数,但之后我不确定,因为我对文档很陌生。
【问题讨论】:
【参考方案1】:您可以使用以下步骤来解决:
import pyspark.sql.functions as F
from pyspark.sql import *
#added fid=2 for overdue = 0 condition
fid = [0,1,2]*4
fid.sort()
dateId = [1,3,13,14,5,26,27,28]
dateId.extend(range(90,95))
due = [5]*12
overdue = [0,5,10,0]*2
overdue.extend([0,0,0,0])
data = zip(fid, dateId, due, overdue)
df = spark.createDataFrame(data, schema =["fid", "dateId", "due", "overdue"])
win = Window.partitionBy(df['fid']).orderBy(df['dateId'])
res = df\
.filter(F.col("dateId")!= 20)\
.withColumn("lag_id", F.lag(F.col("dateId"), 1).over(win))\
.withColumn("overdue_id", F.when(F.col("overdue")!=0, F.col("lag_id")).otherwise(None))\
.groupBy("fid")\
.agg(F.min("overdue_id").alias("min_overdue_id"))
>>> res.show()
+---+--------------+
|fid|min_overdue_id|
+---+--------------+
| 0| 1|
| 1| 5|
| 2| null|
+---+--------------+
【讨论】:
【参考方案2】:您需要使用滞后和窗口功能。在我们开始之前,为什么您的示例输出显示 fid 1 为空。第一个非零值是 id 26,所以之前的 id 是 5。所以不应该是 5?除非你需要别的东西,你可以试试这个。
tst=sqlContext.createDataFrame([(0, 1,5,0),(0,20,5,0),(0,30,5,5),(0,13,5,10),(0,14,5,0),(1,5,5,0),(1,26,5,5),(1,27,5,10),(1,38,5,0)],schema=["fid","id","due","overdue"])
# To filter data
tst_f = tst.where('id!=20')
# Define window function
w=Window.partitionBy('fid').orderBy('id')
tst_lag = tst_f.withColumn('overdue_id',F.lag('id').over(w))
# Remove rows with 0 overdue
tst_od = tst_lag.where('overdue!=0')
# Find the row before first non zero overdue
tst_res = tst_od.groupby('fid').agg(F.first('overdue_id').alias('overdue_id'))
tst_res.show()
+---+----------+
|fid|overdue_id|
+---+----------+
| 0| 1|
| 1| 5|
+---+----------+
如果您对使用第一个功能感到厌倦,或者只是为了避免出现重影问题,您可以尝试以下性能昂贵的选项
# Create a copy to avoid ambiguous join and select the minimum from non zero overdue rows
tst_min= tst_od.withColumn("dummy",F.lit('dummy')).groupby('fid').agg(F.min('id').alias('id_min'))
# Join this with the dataframe to get results
tst_join = tst_od.join(tst_min,on=tst_od.id==tst_min.id_min,how='right')
tst_join.show()
+---+---+---+-------+----------+---+------+
|fid| id|due|overdue|overdue_id|fid|id_min|
+---+---+---+-------+----------+---+------+
| 1| 26| 5| 5| 5| 1| 26|
| 0| 13| 5| 10| 1| 0| 13|
+---+---+---+-------+----------+---+------+
# This way you can see all the information
您可以使用 filter() 或 where() 方法从该数据帧中过滤相关信息
【讨论】:
以上是关于在满足条件之前返回行值的排序行的火花聚合的主要内容,如果未能解决你的问题,请参考以下文章
R语言data.table导入数据实战: .N函数和.I函数使用.N函数返回行数使用.I函数返回满足特定条件行的索引