在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?
Posted
技术标签:
【中文标题】在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?【英文标题】:in pyspark, is it possible to do 2 aggregations using 1 groupBy?在 pyspark 中,是否可以使用 1 个 groupBy 进行 2 个聚合? 【发布时间】:2019-06-21 23:06:23 【问题描述】:我想知道使用 pyspark 是否允许以下操作: 假设以下df:
|model | year | price | mileage |
+++++++++++++++++++++++++++++++++++++++++
|Galaxy | 2017 | 27841 |17529 |
|Galaxy | 2017 | 29395 |11892 |
|Novato | 2018 | 35644 |22876 |
|Novato | 2018 | 8765 |54817 |
df.groupBy('model', 'year')\
.agg('price':'sum')\
.agg('mileage':sum')\
.withColumnRenamed('sum(price)', 'total_prices')\
.withColumnRenamed('sum(mileage)', 'total_miles')
希望有结果
|model | year | price | mileage | total_prices| total_miles|
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
|Galaxy | 2017 | 27841 |17529 | 57236 | 29421 |
|Galaxy | 2017 | 29395 |11892 | 57236 | 29421 |
|Novato | 2018 | 35644 |22876 | 44409 | 77693 |
|Novato | 2018 | 8765 |54817 | 44409 | 77693 |
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
【问题讨论】:
也检查一下:***.com/questions/34409875/… 【参考方案1】:您实际上并不是在寻找 groupby,而是在寻找 window 函数或 join,因为您想使用聚合值扩展行。
窗口:
from pyspark.sql import functions as F
from pyspark.sql import Window
df = spark.createDataFrame(
[('Galaxy', 2017, 27841, 17529),
('Galaxy', 2017, 29395, 11892),
('Novato', 2018, 35644, 22876),
('Novato', 2018, 8765, 54817)],
['model','year','price','mileage']
)
w = Window.partitionBy('model', 'year')
df = df.withColumn('total_prices', F.sum('price').over(w))
df = df.withColumn('total_miles', F.sum('mileage').over(w))
df.show()
加入:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('Galaxy', 2017, 27841, 17529),
('Galaxy', 2017, 29395, 11892),
('Novato', 2018, 35644, 22876),
('Novato', 2018, 8765, 54817)],
['model','year','price','mileage']
)
df = df.join(df.groupby('model', 'year').agg(F.sum('price').alias('total_price'), F.sum('mileage').alias('total_miles')), ['model', 'year'])
df.show()
输出:
+------+----+-----+-------+------------+-----------+
| model|year|price|mileage|total_prices|total_miles|
+------+----+-----+-------+------------+-----------+
|Galaxy|2017|27841| 17529| 57236| 29421|
|Galaxy|2017|29395| 11892| 57236| 29421|
|Novato|2018|35644| 22876| 44409| 77693|
|Novato|2018| 8765| 54817| 44409| 77693|
+------+----+-----+-------+------------+-----------+
【讨论】:
感谢 cronoik,Window 方法非常完美!【参考方案2】:使用 pandas udf,您可以获得任意数量的聚合
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType,StructType,StructField,StringType
import pandas as pd
agg_schema = StructType(
[StructField("model", StringType(), True),
StructField("year", IntegerType(), True),
StructField("price", IntegerType(), True),
StructField("mileage", IntegerType(), True),
StructField("total_prices", IntegerType(), True),
StructField("total_miles", IntegerType(), True)
]
)
@F.pandas_udf(agg_schema, F.PandasUDFType.GROUPED_MAP)
def agg(pdf):
total_prices = pdf['price'].sum()
total_miles = pdf['mileage'].sum()
pdf['total_prices'] = total_prices
pdf['total_miles'] = total_miles
return pdf
df = spark.createDataFrame(
[('Galaxy', 2017, 27841, 17529),
('Galaxy', 2017, 29395, 11892),
('Novato', 2018, 35644, 22876),
('Novato', 2018, 8765, 54817)],
['model','year','price','mileage']
)
df.groupBy('model','year').apply(agg).show()
导致
+------+----+-----+-------+------------+-----------+
| model|year|price|mileage|total_prices|total_miles|
+------+----+-----+-------+------------+-----------+
|Galaxy|2017|27841| 17529| 57236| 29421|
|Galaxy|2017|29395| 11892| 57236| 29421|
|Novato|2018|35644| 22876| 44409| 77693|
|Novato|2018| 8765| 54817| 44409| 77693|
+------+----+-----+-------+------------+-----------+
【讨论】:
以上是关于在 pyspark 中,是不是可以使用 1 个 groupBy 进行 2 个聚合?的主要内容,如果未能解决你的问题,请参考以下文章
pyspark 是不是支持窗口函数(例如 first、last、lag、lead)?