按组查找金额

Posted

技术标签:

【中文标题】按组查找金额【英文标题】:Find amount by group 【发布时间】:2021-12-27 17:04:19 【问题描述】:

请帮助我理解 PySpark

我有一个数据框,如何通过 PySpark 找到每个参数的总和?

+----+-----+-----+-----+-----+-----+-----+
|o_id|col_1|col_3|col_9|ser_1|ser_2|ser_9|
+----+-----+-----+-----+-----+-----+-----+
|   0|  103|   17|   73|   c1|   c2|   c2|
|   1|  323|  245|   66|   c2|   c1|   c3|
|   2|  112|   96| 1452|   c3|   c2|   c1|
|   3|   46|  746|    4|   c3|   c2|   c3|
|   4|   82|  379|   78|   c1|   c1|   c2|
+----+-----+-----+-----+-----+-----+-----+

如何求所有出现的 c1 等的总和。

如何获得这种类型的数据透视表?

+----+-----+
| ser|  sum|
+----+-----+
|  c1| 2261| 
|  c2| 1333| 
|  c3|  228| 
+----+-----+

如果列更少,那么我会很容易地做.groupby(),但是有很多列(更多),我不明白如何在不使用 python 循环的情况下对所有内容进行分组,这将显着降低代码速度。

df_1 = (df.groupby('ser_1').sum('col_1'))
...
df_x = (df.groupby('ser_x').sum('col_x'))

还有更优雅的解决方案吗?

【问题讨论】:

您能解释一下如何将sum 作为2261 获得c1 吗? ser_1 - 第一天的服务类型 col_1 - 相应的第一天服务的费用,所有服务的费用 ser_1 将等于col_1 [0] + col_1 [4] + col_3 [1] + col_3 [4] + col_9 [2] 而我不想为每对列手动创建一个单独的数据框df_1 = (df.groupby ('ser_1'). sum ('col_1')) ...也许有更好的方法来做到这一点。喜欢df_1 = (df.groupby (col (col ("ser_1) + col (" ser_2 "))). sum ((col (col (" col_1) + col ("col_2")))) 但是栏目很多,我觉得这不是一个合理的解决方案。 【参考方案1】:

可以使用以下策略优化计算。

    创建ser_*col_* 列之间的映射以创建结构数组。 将结构数组分解为行。 按key 分组,并对分解后的行的value 求和。

工作示例

数据设置

from pyspark.sql import functions as F

data = [(0, 103, 17, 73, "c1", "c2", "c2",),
        (1, 323, 245, 66, "c2", "c1", "c3",),
        (2, 112, 96, 1452, "c3", "c2", "c1",),
        (3, 46, 746, 4, "c3", "c2", "c3",),
        (4, 82, 379, 78, "c1", "c1", "c2",), ]

df = spark.createDataFrame(data, ("o_id", "col_1", "col_3", "col_9", "ser_1", "ser_2", "ser_9",))

ser_*col_* 列之间创建映射

这里的映射是硬编码的,因为colser 之间没有基于它们的索引的一对一映射,因为col_3 从问题映射到ser_2。如果存在一对一映射,则可以通过代码生成组合。

ser_arrays = F.array(F.col("ser_1"), F.col("ser_2"), F.col("ser_9")).alias("ser")
col_arrays = F.array(F.col("col_1"), F.col("col_3"), F.col("col_9")).alias("col_val")

df_col_ser_map = df.select(F.arrays_zip(ser_arrays, col_arrays).alias("mapped_col_ser"))

df_col_ser_map.show(truncate=False)

#+---------------------------------+
#|mapped_col_ser                   |
#+---------------------------------+
#|[c1, 103, c2, 17, c2, 73]  |
#|[c2, 323, c1, 245, c3, 66] |
#|[c3, 112, c2, 96, c1, 1452]|
#|[c3, 46, c2, 746, c3, 4]   |
#|[c1, 82, c1, 379, c2, 78]  |
#+---------------------------------+

分解映射列

df_col_ser_exploded = df_col_ser_map.select(F.explode("mapped_col_ser").alias("exploded_col_ser"))

df_col_ser_exploded.show()

#+----------------+
# |exploded_col_ser|
# +----------------+
# |       c1, 103|
# |        c2, 17|
# |        c2, 73|
# |       c2, 323|
# |       c1, 245|
# |        c3, 66|
# |       c3, 112|
# |        c2, 96|
# |      c1, 1452|
# |        c3, 46|
# |       c2, 746|
# |         c3, 4|
# |        c1, 82|
# |       c1, 379|
# |        c2, 78|
# +----------------+

按键和聚合值分组

df_col_ser_exploded.groupBy((df_col_ser_exploded["exploded_col_ser"]["ser"]).alias("ser")) \
    .agg(F.sum(df_col_ser_exploded["exploded_col_ser"]["col_val"]).alias("sum")).show()


# +---+----+
# |ser| sum|
# +---+----+
# | c1|2261|
# | c3| 228|
# | c2|1333|
# +---+----+

【讨论】:

谢谢!很好的解决方案!但我无法摆脱手动输入所有列?他们有很多。 ser_arrays = F.array (F.col ("ser_1"), F.col ("ser_2"), F.col ("ser_9"))。 alias ("ser") 这意味着我需要先手动输入整个范围...( @Dima 来自您的问题,col_3 映射到ser_2,这是正确的吗?如果ser_2映射到col_2则可以生成数组。 是的,没错。我的意思是如果我需要映射超过 100 列,是否全部手动完成?当我尝试通过 python 中的循环执行此操作时,火花会生成大量作业。 如果sercol 之间的映射不能从循环中导出,则必须对映射进行硬编码。但是,使用我发布的解决方案不会创建很多工作,因为聚合是在转换之后计算的。

以上是关于按组查找金额的主要内容,如果未能解决你的问题,请参考以下文章

按组和列之间查找最小值

如何在mysql中按组查找累积值?

在具有重复行的 SQL Server 表中按组查找行号

按组查找第二大元素

按组查找最近的日期和输出值

数据重复时如何按组查找最小日期