在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?

Posted

技术标签:

【中文标题】在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?【英文标题】:in pyspark, is it possible to do 2 aggregations using 1 groupBy?在 pyspark 中,是否可以使用 1 个 groupBy 进行 2 个聚合? 【发布时间】:2019-06-21 23:06:23 【问题描述】:

我想知道使用 pyspark 是否允许以下​​操作: 假设以下df:

|model  |  year  | price   |    mileage |
+++++++++++++++++++++++++++++++++++++++++
|Galaxy | 2017   | 27841   |17529       |
|Galaxy | 2017   | 29395   |11892       |
|Novato | 2018   | 35644   |22876       |
|Novato | 2018   |  8765   |54817       |


df.groupBy('model', 'year')\
  .agg('price':'sum')\
  .agg('mileage':sum')\
  .withColumnRenamed('sum(price)', 'total_prices')\
  .withColumnRenamed('sum(mileage)', 'total_miles')

希望有结果

|model  |  year  | price   |    mileage | total_prices| total_miles|
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|Galaxy | 2017   | 27841   |17529       |    57236    |     29421  |
|Galaxy | 2017   | 29395   |11892       |    57236    |     29421  |
|Novato | 2018   | 35644   |22876       |    44409    |     77693  |
|Novato | 2018   |  8765   |54817       |    44409    |     77693  |
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

【问题讨论】:

也检查一下:***.com/questions/34409875/… 【参考方案1】:

您实际上并不是在寻找 groupby,而是在寻找 window 函数或 join,因为您想使用聚合值扩展行。

窗口:

from pyspark.sql import functions as F
from pyspark.sql import Window

df = spark.createDataFrame(
    [('Galaxy', 2017, 27841, 17529),
     ('Galaxy', 2017, 29395, 11892),
     ('Novato', 2018, 35644, 22876),
     ('Novato', 2018, 8765,  54817)],
    ['model','year','price','mileage']
)

w = Window.partitionBy('model', 'year')

df = df.withColumn('total_prices', F.sum('price').over(w))
df = df.withColumn('total_miles', F.sum('mileage').over(w))
df.show()

加入:

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [('Galaxy', 2017, 27841, 17529),
     ('Galaxy', 2017, 29395, 11892),
     ('Novato', 2018, 35644, 22876),
     ('Novato', 2018, 8765,  54817)],
    ['model','year','price','mileage']
)

df = df.join(df.groupby('model', 'year').agg(F.sum('price').alias('total_price'), F.sum('mileage').alias('total_miles')), ['model', 'year'])
df.show()

输出:

+------+----+-----+-------+------------+-----------+ 
| model|year|price|mileage|total_prices|total_miles| 
+------+----+-----+-------+------------+-----------+ 
|Galaxy|2017|27841|  17529|       57236|      29421| 
|Galaxy|2017|29395|  11892|       57236|      29421| 
|Novato|2018|35644|  22876|       44409|      77693| 
|Novato|2018| 8765|  54817|       44409|      77693| 
+------+----+-----+-------+------------+-----------+

【讨论】:

感谢 cronoik,Window 方法非常完美!【参考方案2】:

使用 pandas udf,您可以获得任意数量的聚合

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType,StructType,StructField,StringType
import pandas as pd

agg_schema = StructType(
    [StructField("model", StringType(), True),
     StructField("year", IntegerType(), True),
     StructField("price", IntegerType(), True),
     StructField("mileage", IntegerType(), True),
     StructField("total_prices", IntegerType(), True),
     StructField("total_miles", IntegerType(), True)
     ]
)

@F.pandas_udf(agg_schema, F.PandasUDFType.GROUPED_MAP)
def agg(pdf):
    total_prices = pdf['price'].sum()
    total_miles = pdf['mileage'].sum()
    pdf['total_prices'] = total_prices
    pdf['total_miles'] = total_miles
    return pdf

df = spark.createDataFrame(
    [('Galaxy', 2017, 27841, 17529),
     ('Galaxy', 2017, 29395, 11892),
     ('Novato', 2018, 35644, 22876),
     ('Novato', 2018, 8765,  54817)],
    ['model','year','price','mileage']
)
df.groupBy('model','year').apply(agg).show()

导致

+------+----+-----+-------+------------+-----------+
| model|year|price|mileage|total_prices|total_miles|
+------+----+-----+-------+------------+-----------+
|Galaxy|2017|27841|  17529|       57236|      29421|
|Galaxy|2017|29395|  11892|       57236|      29421|
|Novato|2018|35644|  22876|       44409|      77693|
|Novato|2018| 8765|  54817|       44409|      77693|
+------+----+-----+-------+------------+-----------+

【讨论】:

以上是关于在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?的主要内容,如果未能解决你的问题,请参考以下文章

在 Hadoop 中运行 pyspark 时不是文件异常

为啥 pySpark 无法仅运行 udf 函数?

pyspark 是不是支持窗口函数(例如 first、last、lag、lead)?

如何在pyspark中使用等效的熊猫轴来删除列而不是行?

Pyspark - 将数据帧写入 2 个不同的 csv 文件

Pyspark:检查数组类型列是不是包含列表中的值[重复]