如何在 Pyspark 中应用 groupby 和 transpose?
Posted
技术标签:
【中文标题】如何在 Pyspark 中应用 groupby 和 transpose?【英文标题】:How to apply groupby and transpose in Pyspark? 【发布时间】:2019-10-22 04:22:23 【问题描述】:我有一个如下所示的数据框
df = pd.DataFrame(
'subject_id':[1,1,1,1,2,2,2,2,3,3,4,4,4,4,4],
'readings' : ['READ_1','READ_2','READ_1','READ_3','READ_1','READ_5','READ_6','READ_8','READ_10','READ_12','READ_11','READ_14','READ_09','READ_08','READ_07'],
'val' :[5,6,7,11,5,7,16,12,13,56,32,13,45,43,46],
)
我上面的输入数据框是这样的
虽然下面的代码在 Python pandas 中运行良好(感谢 Jezrael),但当我将其应用于真实数据(超过 4M 条记录)时,它会运行很长时间。所以我试图使用 pyspark
。请注意,我已经尝试过Dask
,modin
,pandarallel
,它们相当于 pandas 进行大规模处理,但也没有帮助。以下代码的作用是it generates the summary statistics for each subject for each reading
。你可以看看下面的预期输出来了解一下
df_op = (df.groupby(['subject_id','readings'])['val']
.describe()
.unstack()
.swaplevel(0,1,axis=1)
.reindex(df['readings'].unique(), axis=1, level=0))
df_op.columns = df_op.columns.map('_'.join)
df_op = df_op.reset_index()
你能帮我在pyspark中实现上述操作吗?当我尝试以下时,它抛出了一个错误
df.groupby(['subject_id','readings'])['val']
例如 - subject_id = 1 有 4 个读数,但有 3 个唯一读数。因此,对于 subject_id = 1,我们得到 3 * 8 = 24 列。为什么是 8?因为它是MIN,MAX,COUNT,Std,MEAN,25%percentile,50th percentile,75th percentile
。希望这会有所帮助
当我在 pyspark 中开始使用它时,它返回以下错误
TypeError: 'GroupedData' 对象不可下标
我希望我的输出如下所示
【问题讨论】:
您到底想做什么?该代码不是一个很好的参考,因为它不起作用 @pissall - 不,它是一个工作代码。你能再试一次吗?我想做的是得到summary statistics for each subject for each reading
@pissall - 更新了帖子中subject_id = 1
的示例。如果您还有任何疑问,请告诉我
您想要什么所有的汇总统计信息?您必须使用 groupby 编码聚合
我会为你解决的
【参考方案1】:
您需要先进行分组并获取每个读数的统计信息,然后进行调整以获得预期结果
import pyspark.sql.functions as F
agg_df = df.groupby("subject_id", "readings").agg(F.mean(F.col("val")), F.min(F.col("val")), F.max(F.col("val")),
F.count(F.col("val")),
F.expr('percentile_approx(val, 0.25)').alias("quantile_25"),
F.expr('percentile_approx(val, 0.75)').alias("quantile_75"))
这将为您提供以下输出:
+----------+--------+--------+--------+--------+----------+-----------+-----------+
|subject_id|readings|avg(val)|min(val)|max(val)|count(val)|quantile_25|quantile_75|
+----------+--------+--------+--------+--------+----------+-----------+-----------+
| 2| READ_1| 5.0| 5| 5| 1| 5| 5|
| 2| READ_5| 7.0| 7| 7| 1| 7| 7|
| 2| READ_8| 12.0| 12| 12| 1| 12| 12|
| 4| READ_08| 43.0| 43| 43| 1| 43| 43|
| 1| READ_2| 6.0| 6| 6| 1| 6| 6|
| 1| READ_1| 6.0| 5| 7| 2| 5| 7|
| 2| READ_6| 16.0| 16| 16| 1| 16| 16|
| 1| READ_3| 11.0| 11| 11| 1| 11| 11|
| 4| READ_11| 32.0| 32| 32| 1| 32| 32|
| 3| READ_10| 13.0| 13| 13| 1| 13| 13|
| 3| READ_12| 56.0| 56| 56| 1| 56| 56|
| 4| READ_14| 13.0| 13| 13| 1| 13| 13|
| 4| READ_07| 46.0| 46| 46| 1| 46| 46|
| 4| READ_09| 45.0| 45| 45| 1| 45| 45|
+----------+--------+--------+--------+--------+----------+-----------+-----------+
使用 groupby subject_id
如果你旋转readings
,你会得到预期的输出:
agg_df2 = df.groupby("subject_id").pivot("readings").agg(F.mean(F.col("val")), F.min(F.col("val")), F.max(F.col("val")),
F.count(F.col("val")),
F.expr('percentile_approx(val, 0.25)').alias("quantile_25"),
F.expr('percentile_approx(val, 0.75)').alias("quantile_75"))
for i in agg_df2.columns:
agg_df2 = agg_df2.withColumnRenamed(i, i.replace("(val)", ""))
agg_df2.show()

|subject_id|READ_07_avg(val)|READ_07_min(val)|READ_07_max(val)|READ_07_count(val)|READ_07_quantile_25|READ_07_quantile_75|READ_08_avg(val)|READ_08_min(val)|READ_08_max(val)|READ_08_count(val)|READ_08_quantile_25|READ_08_quantile_75|READ_09_avg(val)|READ_09_min(val)|READ_09_max(val)|READ_09_count(val)|READ_09_quantile_25|READ_09_quantile_75|READ_1_avg(val)|READ_1_min(val)|READ_1_max(val)|READ_1_count(val)|READ_1_quantile_25|READ_1_quantile_75|READ_10_avg(val)|READ_10_min(val)|READ_10_max(val)|READ_10_count(val)|READ_10_quantile_25|READ_10_quantile_75|READ_11_avg(val)|READ_11_min(val)|READ_11_max(val)|READ_11_count(val)|READ_11_quantile_25|READ_11_quantile_75|READ_12_avg(val)|READ_12_min(val)|READ_12_max(val)|READ_12_count(val)|READ_12_quantile_25|READ_12_quantile_75|READ_14_avg(val)|READ_14_min(val)|READ_14_max(val)|READ_14_count(val)|READ_14_quantile_25|READ_14_quantile_75|READ_2_avg(val)|READ_2_min(val)|READ_2_max(val)|READ_2_count(val)|READ_2_quantile_25|READ_2_quantile_75|READ_3_avg(val)|READ_3_min(val)|READ_3_max(val)|READ_3_count(val)|READ_3_quantile_25|READ_3_quantile_75|READ_5_avg(val)|READ_5_min(val)|READ_5_max(val)|READ_5_count(val)|READ_5_quantile_25|READ_5_quantile_75|READ_6_avg(val)|READ_6_min(val)|READ_6_max(val)|READ_6_count(val)|READ_6_quantile_25|READ_6_quantile_75|READ_8_avg(val)|READ_8_min(val)|READ_8_max(val)|READ_8_count(val)|READ_8_quantile_25|READ_8_quantile_75|

| 1| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| 6.0| 5| 7| 2| 5| 7| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| 6.0| 6| 6| 1| 6| 6| 11.0| 11| 11| 1| 11| 11| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|
| 3| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| 13.0| 13| 13| 1| 13| 13| null| null| null| null| null| null| 56.0| 56| 56| 1| 56| 56| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|
| 2| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| 5.0| 5| 5| 1| 5| 5| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| 7.0| 7| 7| 1| 7| 7| 16.0| 16| 16| 1| 16| 16| 12.0| 12| 12| 1| 12| 12|
| 4| 46.0| 46| 46| 1| 46| 46| 43.0| 43| 43| 1| 43| 43| 45.0| 45| 45| 1| 45| 45| null| null| null| null| null| null| null| null| null| null| null| null| 32.0| 32| 32| 1| 32| 32| null| null| null| null| null| null| 13.0| 13| 13| 1| 13| 13| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|

【讨论】:
就用第二步吧兄弟。第一步是演示如何获取统计信息。 哦,是的。刚刚注意到数据框名称 实际上我正在尝试使用agg_df2.show()
查看输出。由于我的真实数据为空值,无法验证show
输出。是否可以像熊猫数据框一样以漂亮的表格形式查看它?我试过这个result_pdf = agg_df2.select("*").toPandas()
,但它运行了很长时间。
最后还是报错了。我是说显示器。可能是由于大数据集?如何以表格形式查看输出,如预期输出部分所示?
是的,toPandas()
在大型数据集上非常昂贵,因为它会将您的所有数据都带到执行程序上。检查执行器内存是否相同。以上是关于如何在 Pyspark 中应用 groupby 和 transpose?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?
如何在 Pyspark 中使用 groupby 和数组元素?
如何在pyspark中运行多个k意味着集群和使用groupBy
如何使用 groupby 和聚合将 pyspark 数据框中的行与多列连接起来