Pyspark/Hive 中带条件的加权运行总计

Posted

技术标签:

【中文标题】Pyspark/Hive 中带条件的加权运行总计【英文标题】:Weighted running total with conditional in Pyspark/Hive 【发布时间】:2021-03-21 16:18:41 【问题描述】:

我有产品、品牌、百分比和价格列。我想计算与当前行具有不同品牌的行以及与当前行具有相同品牌的行的百分比列的总和。我想按价格衡量它们。如果当前行上方的产品价格高于当前行,我想通过将其乘以 0.8 来降低权重。如何在 PySpark 中或使用 spark.sql 中执行此操作?不使用乘以权重的答案是here。

df = pd.DataFrame('a': ['a1','a2','a3','a4','a5','a6'],
          'brand':['b1','b2','b1', 'b3', 'b2','b1'],
      'pct': [40, 30, 10, 8,7,5], 
       'price':[0.6, 1, 0.5, 0.8, 1, 0.5])
df = spark.createDataFrame(df)

我在寻找什么

product  brand  pct  pct_same_brand  pct_different_brand
a1       b1     40     null            null
a2       b2     30     null            40
a3       b1     10     32              30
a4       b3     8      null            80
a5       b2     7      24              58
a6       b1     5      40              45  

更新: 我添加了以下数据点以帮助澄清问题。可以看出,一行可以在一行中乘以 0.8,在另一行中乘以 1.0。

  product   brand   pct  price  pct_same_brand  pct_different_brand
  a1        b1      30   0.6    null                 null
  a2        b2      20   1.3    null                 30
  a3        b1      10   0.5    30*0.8               20
  a4        b3      8    0.8    null                 60
  a5        b2      7    0.5    20*0.8               48
  a6        b1      6    0.8    30*1 + 10*1          35
  a7        b2      5    1.5    20*1 + 7*1           54

Update2:在我上面提供的数据中,每行的权重是相同的数字(0.8 或 1),但也可以是 1 和 0.8(某些行为 0.8,而 1对于其他行)

以下数据框中的示例,例如,最后一行的乘数应该是 a6 的 0.8 和品牌 b1 的其余部分的 1.0。 :

  df = pd.DataFrame('a': ['a1','a2','a3','a4','a5','a6', 'a7', 'a8', 'a9', 'a10'],
          'brand':['b1','b2','b1', 'b3', 'b2','b1','b2', 'b1', 'b1', 'b1'],
          'pct': [30, 20, 10, 8, 7,6,5,4,3,2],
          'price':[0.6, 1.3, 0.5, 0.8, 0.5, 0.8, 1.5, 0.5, 0.65, 0.7]
         
         )
df = spark.createDataFrame(df)

【问题讨论】:

【参考方案1】:

可以添加weight列方便计算:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'weight', 
    F.when(
        F.col('price') <= F.lag('price').over(
            Window.partitionBy('brand')
                  .orderBy(F.desc('pct'))
        ), 
        0.8
    ).otherwise(1.0)
).withColumn(
    'pct_same_brand', 
    F.col('weight')*F.sum('pct').over(
        Window.partitionBy('brand')
              .orderBy(F.desc('pct'))
              .rowsBetween(Window.unboundedPreceding, -1)
    )
).withColumn(
    'pct_different_brand', 
    F.sum('pct').over(
        Window.orderBy(F.desc('pct'))
              .rowsBetween(Window.unboundedPreceding, -1)
    ) - F.coalesce(F.col('pct_same_brand'), F.lit(0)) / F.col('weight')
)

df2.show()
+---+-----+---+-----+------+--------------+-------------------+
|  a|brand|pct|price|weight|pct_same_brand|pct_different_brand|
+---+-----+---+-----+------+--------------+-------------------+
| a1|   b1| 40|  0.6|   1.0|          null|               null|
| a2|   b2| 30|  1.0|   1.0|          null|               40.0|
| a3|   b1| 10|  0.5|   0.8|          32.0|               30.0|
| a4|   b3|  8|  0.8|   1.0|          null|               80.0|
| a5|   b2|  7|  1.0|   0.8|          24.0|               58.0|
| a6|   b1|  5|  0.5|   0.8|          40.0|               45.0|
+---+-----+---+-----+------+--------------+-------------------+

已编辑问题的输出:

+---+-----+---+-----+------+--------------+-------------------+
|  a|brand|pct|price|weight|pct_same_brand|pct_different_brand|
+---+-----+---+-----+------+--------------+-------------------+
| a1|   b1| 30|  0.6|   1.0|          null|               null|
| a2|   b2| 20|  1.3|   1.0|          null|               30.0|
| a3|   b1| 10|  0.5|   0.8|          24.0|               20.0|
| a4|   b3|  8|  0.8|   1.0|          null|               60.0|
| a5|   b2|  7|  0.5|   0.8|          16.0|               48.0|
| a6|   b1|  6|  0.8|   1.0|          40.0|               35.0|
| a7|   b2|  5|  1.5|   1.0|          27.0|               54.0|
+---+-----+---+-----+------+--------------+-------------------+

【讨论】:

谢谢。但是一个价格可能低于其中一些价格,同时又高于其他价格。假设最后一行的价格是 0.7,那么第 3 行的第 1 行的权重为 0.8,第 6 行的权重为 1。 “第 3 行的第 1 行的权重为 0.8”是什么意思?你的意思是第 1 行或第 3 行的 0.8? 你能解释一下加权规则吗?一个完整的例子会很有帮助,如果你可以编辑你的问题来添加它 我添加了更多数据点,如您所见,同一行可以在一行中乘以 0.8,在另一行中乘以 1。 @FissehaBehane 我在已编辑问题的数据框中检查了我的代码输出,它与您的预期输出一致...该代码适合您吗?【参考方案2】:

如果有人有类似的问题,这对我有用。 基本上,我使用了数据框的外连接并分配了权重。最后,我使用了窗口函数。

df_copy = df.withColumnRenamed('a', 'asin')\
        .withColumnRenamed('brand', 'brandd')\
         .withColumnRenamed('pct', 'pct2')\
         .withColumnRenamed('price', 'price2')
 df2 = df.join(df_copy, on = [df.brand == df_copy.brandd], how = 'outer').orderBy('brand')
 df3 = df2.filter(~((df2.a == df2.asin) & (df2.brand == df2.brandd))
             & (df2.pct <= df2.pct2))
 df3 = df3.withColumn('weight', F.when(df3.price2 > df3.price, 0.8).otherwise(1))

 df4 = df3.groupBy(['a', 'brand', 'pct', 'price']).agg(F.sum(df3.pct2*df3.weight)
                                        .alias('same_brand_pct'))

 df5 = df.join(df4, on = ['a', 'brand', 'pct', 'price'], how = 'left')
 df6 = df5.withColumn(
   'pct_same_brand_unscaled', 
    F.sum('pct').over(
    Window.partitionBy('brand')
          .orderBy(F.desc('pct'))
          .rowsBetween(Window.unboundedPreceding, -1)
  )
  ).withColumn(
'pct_different_brand', 
F.sum('pct').over(
    Window.orderBy(F.desc('pct'))
          .rowsBetween(Window.unboundedPreceding, -1)
  ) - F.coalesce(F.col('pct_same_brand_unscaled'), F.lit(0))
 ).drop('pct_same_brand_unscaled')

给予:

+---+-----+---+-----+--------------+-------------------+
|  a|brand|pct|price|same_brand_pct|pct_different_brand|
+---+-----+---+-----+--------------+-------------------+
| a1|   b1| 30|  0.6|          null|               null|
| a2|   b2| 20|  1.3|          null|                 30|
| a3|   b1| 10|  0.5|          24.0|                 20|
| a4|   b3|  8|  0.8|          null|                 60|
| a5|   b2|  7|  0.5|          16.0|                 48|
| a6|   b1|  6|  0.8|          40.0|                 35|
| a7|   b2|  5|  1.5|          27.0|                 54|
| a8|   b1|  4|  0.5|          38.8|                 40|
| a9|   b1|  3| 0.65|          48.8|                 40|
|a10|   b1|  2|  0.7|          51.8|                 40|```
  

【讨论】:

以上是关于Pyspark/Hive 中带条件的加权运行总计的主要内容,如果未能解决你的问题,请参考以下文章

使用 BigQuery 中的条件计算运行总计

基于条件的运行总计

满足其他列中的条件后,如何在 SQL BigQuery 中重置运行总计?

数据可视化之DAX篇(二十四)Power BI应用技巧:在总计行实现条件格式

CakePHP 2中带有分页分页类的大小为f数组的问题

PySpark/HIVE:附加到现有表