如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?

Posted

技术标签:

【中文标题】如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?【英文标题】:How can I aggregate by different levels pivot then inner join in pyspark? 【发布时间】:2021-12-08 09:56:56 【问题描述】:

我是 spark 新手,我有一个包含事务数据的数据框。我想按人员 ID 进行分组,但按不同的属性进行分组,例如店铺类型和教育程度。

%%spark
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf 

df = spark.sql("SELECT * FROM  df  limit 10")

shoptypes = df.select('shoptype').distinct().rdd.map(lambda r: r[0]).collect()

edulevel = df.select('edulevel').distinct().rdd.map(lambda r: r[0]).collect()

pivot_1 = df.groupBy("id").pivot("shoptype", shoptypes).sum("amount")
pivot_1 .show()


pivot_2 = df.groupBy("id").pivot("edulevel", edulevel).count()
pivot_2 .show()

alldfs = pivot_1.join(pivot_2, pivot_2.id == pivot_1.id, how='inner').drop(pivot_2.id)

在我内部加入后,alldfs 为空。当枢轴 1 和枢轴 2 具有相同的 id 时,怎么会这样?

但我觉得奇怪的是,pivot_1 和 pivot_2 没有显示相同的人员 ID - 我认为它们会显示相同的来源,因为它们是从相同的来源创建的?我不知道发生了什么。有人可以帮忙吗?我本质上想在不同的 attrubytes 上进行聚合,然后在个人 ID 上水平连接。所以我最终将行作为我的 id,将列作为枢轴属性。

【问题讨论】:

你对pivot的使用和实现是错误的。请检查sparkbyexamples.com/pyspark/pyspark-pivot-and-unpivot-dataframe 链接以正确使用它。还要在此处提供示例数据和预期输出。 已编辑添加输入。 我认为在 id 1 的输出中,tesco 将有 4000 和 Asos 1500。或者您正在使用 dropDuplicates 随机消除重复的 id? 我没有考虑内容,我真的认为枢轴是不言自明的......:P。删除了输入和示例,因为它可能令人困惑。但是像在 excel/python 中一样旋转表格。 只是为了确定。你原来的选择没有限制吧?如果您的原始选择有限制,那么这就是问题所在,因为限制不是确定性的。 【参考方案1】:

Select 语句中的 Limit 子句不是确定性的。由于 Spark 是惰性求值的,因此 SQL 语句将执行两次,您将在 pivot_1 和 pivot_2 中获得不同的 id。

【讨论】:

感谢您的理解

以上是关于如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 聚合的不同列的不同操作

如何根据 PySpark 中窗口聚合的条件计算不同值?

具有组间聚合结果的 Pyspark 窗口

没有聚合的 Spark 数据帧枢轴

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

Pyspark - 一次聚合数据框的所有列[重复]