如何使用 PySpark 的 Window 函数来模拟指数衰减?
Posted
技术标签:
【中文标题】如何使用 PySpark 的 Window 函数来模拟指数衰减?【英文标题】:How can I use PySpark's Window function to model exponential decay? 【发布时间】:2020-05-27 00:55:22 【问题描述】:我正在尝试应用 PySpark Window 函数来执行“指数衰减”。公式是
todays_score = yesterdays_score * (weight) + todays_raw_score
例如,假设我们有一个按天排序的数据框,并且每天的得分为 1:
+---+----+---------+
|day|user|raw_score|
+---+----+---------+
| 0| a| 1|
| 1| a| 1|
| 2| a| 1|
| 3| a| 1|
+---+----+---------+
如果我要计算 todays_score,应该是这样的:
+---+----+---------+------------+
|day|user|raw_score|todays_score| # Here's the math:
+---+----+---------+------------+
| 0| a| 1| 1.0| (0 * .90) + 1
| 1| a| 1| 1.9| (1.0 * .90) + 1
| 2| a| 1| 2.71| (1.9 * .90) + 1
| 3| a| 1| 3.439| (2.71 * .90) + 1
+---+----+---------+------------+
我尝试过使用窗口函数;但是根据我所见,他们只能使用原始数据框中的“静态值”,而不是我们刚刚计算的值。我什至尝试创建一个“虚拟列”来启动该过程;但是这也不起作用。
我尝试的代码:
df = sqlContext.createDataFrame([
(0, 'a', 1),
(1, 'a', 1),
(2, 'a', 1),
(3, 'a', 1)],
['day', 'user', 'raw_score']
)
df.show()
# Create a "dummy column" (weighted score) so we can use it.
df2 = df.select('*', col('raw_score').alias('todays_score'))
df2.show()
w = Window.partitionBy('user')
df2.withColumn('todays_score',
F.lag(F.col('todays_score'), count=1, default=0).over(w.orderBy('day'))* 0.9 + F.col('raw_score')) \
.show()
这个(不需要的)输出是:
+---+----+---------+------------+
|day|user|raw_score|todays_score|
+---+----+---------+------------+
| 0| a| 1| 1.0|
| 1| a| 1| 1.9|
| 2| a| 1| 1.9|
| 3| a| 1| 1.9|
+---+----+---------+------------+
它只取前一个值 * (.90),而不是刚刚计算的值。
如何访问刚刚由窗口函数计算的值?
【问题讨论】:
你应该使用 pandas 分组地图 udaf。计算中的 +1 是否取自 raw_score 列?还是 +1 只是一个静态值,你的 spark 版本是什么? @murtihash - 我想提两件重要的事情:(1)性能对我来说是个大问题;我将与成千上万的用户打交道,数百天,数百个分数......所以我有点犹豫是否使用 udaf。如果我错了,请纠正我,但它比原生 Spark SQL 函数慢,不是吗? (2) 是的,+1 取自原始分数列。我还有另一个计算分数的步骤。对于实际值,这些值每天都会有所不同,并且使用起来不太好。 所以是的,你是对的,它会比 spark 内置函数慢,但它会比普通 udf 快得多,因为它是一个矢量化的 udf,可以在数据组上执行(groupby用户。地图)。我看到的唯一其他选择是按收集列表分组,使用高阶函数来获得分数,然后分解列表。除此之外,我认为任何其他 spark 函数都不能完成对每一行都是动态的任务。你也可以告诉你的火花版本,因为pandas udaf是2.3+,高阶函数是2.4+ @murtihash - 我在下面看到了你的答案,它远远超出了头脑。我想知道你是否可以用熊猫分组地图 udaf 解释如何做到这一点。 *此外,根据我的研究,pandas grouped-map UDF 不适用于有界窗口。我的代码也一直失败。 你说的“它过头了”是什么意思,是不是因为 groupby 和爆炸太慢了?还是你不明白逻辑?我也用 pandas 分组地图 udaf 更新了解决方案。请投票/接受答案以关闭线程,干杯 【参考方案1】:对于Spark2.4+
,可以使用高阶函数transform
、aggregate
、filter
和 arrays_zip
像这样。 它适用于 raw_score 的任何组合,并且比 pandas_udaf 更快。(假设数据已按每个用户按天排序,如示例所示)
df.show() #sample dataframe
#+---+----+---------+
#|day|user|raw_score|
#+---+----+---------+
#| 0| a| 1|
#| 1| a| 1|
#| 2| a| 1|
#| 3| a| 1|
#+---+----+---------+
from pyspark.sql import functions as F
df\
.groupBy("user").agg(F.collect_list("raw_score").alias("raw_score"),F.collect_list("day").alias("day"))\
.withColumn("raw_score1", F.expr("""transform(raw_score,(x,i)-> struct(x as raw,i as index))"""))\
.withColumn("todays_score", F.expr("""transform(raw_score1, x-> aggregate(filter(raw_score1,z-> z.index<=x.index)\
,cast(0 as double),(acc,y)->(acc*0.9)+y.raw))"""))\
.withColumn("zip", F.explode(F.arrays_zip("day","raw_score","todays_score")))\
.select("user", "zip.*")\
.show(truncate=False)
#+----+---+---------+------------+
#|user|day|raw_score|todays_score|
#+----+---+---------+------------+
#|a |0 |1 |1.0 |
#|a |1 |1 |1.9 |
#|a |2 |1 |2.71 |
#|a |3 |1 |3.439 |
#+----+---+---------+------------+
UPDATE:
假设数据如示例所示按天排序,您可以像这样使用Pandas Grouped Map UDAF
:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(df.withColumn("raw_score", F.lit(1.2456)).schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df):
for i in range(1,len(df)):
df.loc[i,'raw_score']=(df.loc[i-1,'raw_score'] * 0.9)+1
return df
df\
.groupby("user").apply(grouped_map).show()
#+---+----+---------+
#|day|user|raw_score|
#+---+----+---------+
#| 0| a| 1.0|
#| 1| a| 1.9|
#| 2| a| 2.71|
#| 3| a| 3.439|
#+---+----+---------+
【讨论】:
以上是关于如何使用 PySpark 的 Window 函数来模拟指数衰减?的主要内容,如果未能解决你的问题,请参考以下文章
使用 Window() 计算 PySpark 中数组的滚动总和?
如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数
对两列应用 Window.partitionBy 以在 pyspark 中获取 n-core 数据集