如何使用 PySpark 的 Window 函数来模拟指数衰减?

Posted

技术标签:

【中文标题】如何使用 PySpark 的 Window 函数来模拟指数衰减?【英文标题】:How can I use PySpark's Window function to model exponential decay? 【发布时间】:2020-05-27 00:55:22 【问题描述】:

我正在尝试应用 PySpark Window 函数来执行“指数衰减”。公式是

todays_score = yesterdays_score * (weight) + todays_raw_score

例如,假设我们有一个按天排序的数据框,并且每天的得分为 1:

+---+----+---------+
|day|user|raw_score|
+---+----+---------+
|  0|   a|        1|
|  1|   a|        1|
|  2|   a|        1|
|  3|   a|        1|
+---+----+---------+

如果我要计算 todays_score,应该是这样的:

+---+----+---------+------------+
|day|user|raw_score|todays_score| # Here's the math:
+---+----+---------+------------+
|  0|   a|        1|         1.0| (0 * .90) + 1
|  1|   a|        1|         1.9| (1.0 * .90) + 1
|  2|   a|        1|        2.71| (1.9 * .90) + 1
|  3|   a|        1|       3.439| (2.71 * .90) + 1
+---+----+---------+------------+

我尝试过使用窗口函数;但是根据我所见,他们只能使用原始数据框中的“静态值”,而不是我们刚刚计算的值。我什至尝试创建一个“虚拟列”来启动该过程;但是这也不起作用。

我尝试的代码:

df = sqlContext.createDataFrame([
                                 (0, 'a', 1),
                                 (1, 'a', 1),
                                 (2, 'a', 1),
                                 (3, 'a', 1)],
    ['day', 'user', 'raw_score']
)
df.show()

# Create a "dummy column" (weighted score) so we can use it.
df2 = df.select('*', col('raw_score').alias('todays_score'))
df2.show()

w = Window.partitionBy('user') 

df2.withColumn('todays_score', 
              F.lag(F.col('todays_score'), count=1, default=0).over(w.orderBy('day'))* 0.9 + F.col('raw_score')) \
  .show()

这个(不需要的)输出是:

+---+----+---------+------------+
|day|user|raw_score|todays_score|
+---+----+---------+------------+
|  0|   a|        1|         1.0|
|  1|   a|        1|         1.9|
|  2|   a|        1|         1.9|
|  3|   a|        1|         1.9|
+---+----+---------+------------+

它只取前一个值 * (.90),而不是刚刚计算的值。

如何访问刚刚由窗口函数计算的值?

【问题讨论】:

你应该使用 pandas 分组地图 udaf。计算中的 +1 是否取自 raw_score 列?还是 +1 只是一个静态值,你的 spark 版本是什么? @murtihash - 我想提两件重要的事情:(1)性能对我来说是个大问题;我将与成千上万的用户打交道,数百天,数百个分数......所以我有点犹豫是否使用 udaf。如果我错了,请纠正我,但它比原生 Spark SQL 函数慢,不是吗? (2) 是的,+1 取自原始分数列。我还有另一个计算分数的步骤。对于实际值,这些值每天都会有所不同,并且使用起来不太好。 所以是的,你是对的,它会比 spark 内置函数慢,但它会比普通 udf 快得多,因为它是一个矢量化的 udf,可以在数据组上执行(groupby用户。地图)。我看到的唯一其他选择是按收集列表分组,使用高阶函数来获得分数,然后分解列表。除此之外,我认为任何其他 spark 函数都不能完成对每一行都是动态的任务。你也可以告诉你的火花版本,因为pandas udaf是2.3+,高阶函数是2.4+ @murtihash - 我在下面看到了你的答案,它远远超出了头脑。我想知道你是否可以用熊猫分组地图 udaf 解释如何做到这一点。 *此外,根据我的研究,pandas grouped-map UDF 不适用于有界窗口。我的代码也一直失败。 你说的“它过头了”是什么意思,是不是因为 groupby 和爆炸太慢了?还是你不明白逻辑?我也用 pandas 分组地图 udaf 更新了解决方案。请投票/接受答案以关闭线程,干杯 【参考方案1】:

对于Spark2.4+,可以使用高阶函数transformaggregatefilterarrays_zip 像这样。 它适用于 raw_score 的任何组合,并且比 pandas_udaf 更快。(假设数据已按每个用户按天排序,如示例所示)

df.show() #sample dataframe
#+---+----+---------+
#|day|user|raw_score|
#+---+----+---------+
#|  0|   a|        1|
#|  1|   a|        1|
#|  2|   a|        1|
#|  3|   a|        1|
#+---+----+---------+


from pyspark.sql import functions as F

df\
  .groupBy("user").agg(F.collect_list("raw_score").alias("raw_score"),F.collect_list("day").alias("day"))\
   .withColumn("raw_score1", F.expr("""transform(raw_score,(x,i)-> struct(x as raw,i as index))"""))\
   .withColumn("todays_score", F.expr("""transform(raw_score1, x-> aggregate(filter(raw_score1,z-> z.index<=x.index)\
                                             ,cast(0 as double),(acc,y)->(acc*0.9)+y.raw))"""))\
   .withColumn("zip", F.explode(F.arrays_zip("day","raw_score","todays_score")))\
   .select("user", "zip.*")\
   .show(truncate=False)


#+----+---+---------+------------+
#|user|day|raw_score|todays_score|
#+----+---+---------+------------+
#|a   |0  |1        |1.0         |
#|a   |1  |1        |1.9         |
#|a   |2  |1        |2.71        |
#|a   |3  |1        |3.439       |
#+----+---+---------+------------+

UPDATE:

假设数据如示例所示按天排序,您可以像这样使用Pandas Grouped Map UDAF

import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType


@pandas_udf(df.withColumn("raw_score", F.lit(1.2456)).schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df):
     for i in range(1,len(df)):
          df.loc[i,'raw_score']=(df.loc[i-1,'raw_score'] * 0.9)+1   

     return df
df\
  .groupby("user").apply(grouped_map).show()

#+---+----+---------+
#|day|user|raw_score|
#+---+----+---------+
#|  0|   a|      1.0|
#|  1|   a|      1.9|
#|  2|   a|     2.71|
#|  3|   a|    3.439|
#+---+----+---------+

【讨论】:

以上是关于如何使用 PySpark 的 Window 函数来模拟指数衰减?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark基础sql获取user最近3次使用的item

使用 Window() 计算 PySpark 中数组的滚动总和?

如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数

对两列应用 Window.partitionBy 以在 pyspark 中获取 n-core 数据集

toPandas() 会随着 pyspark 数据框变小而加快速度吗?

pyspark 使用过滤器应用 DataFrame 窗口函数