Pyspark/Hive 中带条件的加权运行总计
Posted
技术标签:
【中文标题】Pyspark/Hive 中带条件的加权运行总计【英文标题】:Weighted running total with conditional in Pyspark/Hive 【发布时间】:2021-03-21 16:18:41 【问题描述】:我有产品、品牌、百分比和价格列。我想计算与当前行具有不同品牌的行以及与当前行具有相同品牌的行的百分比列的总和。我想按价格衡量它们。如果当前行上方的产品价格高于当前行,我想通过将其乘以 0.8 来降低权重。如何在 PySpark 中或使用 spark.sql 中执行此操作?不使用乘以权重的答案是here。
df = pd.DataFrame('a': ['a1','a2','a3','a4','a5','a6'],
'brand':['b1','b2','b1', 'b3', 'b2','b1'],
'pct': [40, 30, 10, 8,7,5],
'price':[0.6, 1, 0.5, 0.8, 1, 0.5])
df = spark.createDataFrame(df)
我在寻找什么
product brand pct pct_same_brand pct_different_brand
a1 b1 40 null null
a2 b2 30 null 40
a3 b1 10 32 30
a4 b3 8 null 80
a5 b2 7 24 58
a6 b1 5 40 45
更新: 我添加了以下数据点以帮助澄清问题。可以看出,一行可以在一行中乘以 0.8,在另一行中乘以 1.0。
product brand pct price pct_same_brand pct_different_brand
a1 b1 30 0.6 null null
a2 b2 20 1.3 null 30
a3 b1 10 0.5 30*0.8 20
a4 b3 8 0.8 null 60
a5 b2 7 0.5 20*0.8 48
a6 b1 6 0.8 30*1 + 10*1 35
a7 b2 5 1.5 20*1 + 7*1 54
Update2:在我上面提供的数据中,每行的权重是相同的数字(0.8 或 1),但也可以是 1 和 0.8(某些行为 0.8,而 1对于其他行)
以下数据框中的示例,例如,最后一行的乘数应该是 a6 的 0.8 和品牌 b1 的其余部分的 1.0。 :
df = pd.DataFrame('a': ['a1','a2','a3','a4','a5','a6', 'a7', 'a8', 'a9', 'a10'],
'brand':['b1','b2','b1', 'b3', 'b2','b1','b2', 'b1', 'b1', 'b1'],
'pct': [30, 20, 10, 8, 7,6,5,4,3,2],
'price':[0.6, 1.3, 0.5, 0.8, 0.5, 0.8, 1.5, 0.5, 0.65, 0.7]
)
df = spark.createDataFrame(df)
【问题讨论】:
【参考方案1】:可以添加weight
列方便计算:
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'weight',
F.when(
F.col('price') <= F.lag('price').over(
Window.partitionBy('brand')
.orderBy(F.desc('pct'))
),
0.8
).otherwise(1.0)
).withColumn(
'pct_same_brand',
F.col('weight')*F.sum('pct').over(
Window.partitionBy('brand')
.orderBy(F.desc('pct'))
.rowsBetween(Window.unboundedPreceding, -1)
)
).withColumn(
'pct_different_brand',
F.sum('pct').over(
Window.orderBy(F.desc('pct'))
.rowsBetween(Window.unboundedPreceding, -1)
) - F.coalesce(F.col('pct_same_brand'), F.lit(0)) / F.col('weight')
)
df2.show()
+---+-----+---+-----+------+--------------+-------------------+
| a|brand|pct|price|weight|pct_same_brand|pct_different_brand|
+---+-----+---+-----+------+--------------+-------------------+
| a1| b1| 40| 0.6| 1.0| null| null|
| a2| b2| 30| 1.0| 1.0| null| 40.0|
| a3| b1| 10| 0.5| 0.8| 32.0| 30.0|
| a4| b3| 8| 0.8| 1.0| null| 80.0|
| a5| b2| 7| 1.0| 0.8| 24.0| 58.0|
| a6| b1| 5| 0.5| 0.8| 40.0| 45.0|
+---+-----+---+-----+------+--------------+-------------------+
已编辑问题的输出:
+---+-----+---+-----+------+--------------+-------------------+
| a|brand|pct|price|weight|pct_same_brand|pct_different_brand|
+---+-----+---+-----+------+--------------+-------------------+
| a1| b1| 30| 0.6| 1.0| null| null|
| a2| b2| 20| 1.3| 1.0| null| 30.0|
| a3| b1| 10| 0.5| 0.8| 24.0| 20.0|
| a4| b3| 8| 0.8| 1.0| null| 60.0|
| a5| b2| 7| 0.5| 0.8| 16.0| 48.0|
| a6| b1| 6| 0.8| 1.0| 40.0| 35.0|
| a7| b2| 5| 1.5| 1.0| 27.0| 54.0|
+---+-----+---+-----+------+--------------+-------------------+
【讨论】:
谢谢。但是一个价格可能低于其中一些价格,同时又高于其他价格。假设最后一行的价格是 0.7,那么第 3 行的第 1 行的权重为 0.8,第 6 行的权重为 1。 “第 3 行的第 1 行的权重为 0.8”是什么意思?你的意思是第 1 行或第 3 行的 0.8? 你能解释一下加权规则吗?一个完整的例子会很有帮助,如果你可以编辑你的问题来添加它 我添加了更多数据点,如您所见,同一行可以在一行中乘以 0.8,在另一行中乘以 1。 @FissehaBehane 我在已编辑问题的数据框中检查了我的代码输出,它与您的预期输出一致...该代码适合您吗?【参考方案2】:如果有人有类似的问题,这对我有用。 基本上,我使用了数据框的外连接并分配了权重。最后,我使用了窗口函数。
df_copy = df.withColumnRenamed('a', 'asin')\
.withColumnRenamed('brand', 'brandd')\
.withColumnRenamed('pct', 'pct2')\
.withColumnRenamed('price', 'price2')
df2 = df.join(df_copy, on = [df.brand == df_copy.brandd], how = 'outer').orderBy('brand')
df3 = df2.filter(~((df2.a == df2.asin) & (df2.brand == df2.brandd))
& (df2.pct <= df2.pct2))
df3 = df3.withColumn('weight', F.when(df3.price2 > df3.price, 0.8).otherwise(1))
df4 = df3.groupBy(['a', 'brand', 'pct', 'price']).agg(F.sum(df3.pct2*df3.weight)
.alias('same_brand_pct'))
df5 = df.join(df4, on = ['a', 'brand', 'pct', 'price'], how = 'left')
df6 = df5.withColumn(
'pct_same_brand_unscaled',
F.sum('pct').over(
Window.partitionBy('brand')
.orderBy(F.desc('pct'))
.rowsBetween(Window.unboundedPreceding, -1)
)
).withColumn(
'pct_different_brand',
F.sum('pct').over(
Window.orderBy(F.desc('pct'))
.rowsBetween(Window.unboundedPreceding, -1)
) - F.coalesce(F.col('pct_same_brand_unscaled'), F.lit(0))
).drop('pct_same_brand_unscaled')
给予:
+---+-----+---+-----+--------------+-------------------+
| a|brand|pct|price|same_brand_pct|pct_different_brand|
+---+-----+---+-----+--------------+-------------------+
| a1| b1| 30| 0.6| null| null|
| a2| b2| 20| 1.3| null| 30|
| a3| b1| 10| 0.5| 24.0| 20|
| a4| b3| 8| 0.8| null| 60|
| a5| b2| 7| 0.5| 16.0| 48|
| a6| b1| 6| 0.8| 40.0| 35|
| a7| b2| 5| 1.5| 27.0| 54|
| a8| b1| 4| 0.5| 38.8| 40|
| a9| b1| 3| 0.65| 48.8| 40|
|a10| b1| 2| 0.7| 51.8| 40|```
【讨论】:
以上是关于Pyspark/Hive 中带条件的加权运行总计的主要内容,如果未能解决你的问题,请参考以下文章
满足其他列中的条件后,如何在 SQL BigQuery 中重置运行总计?