如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?
Posted
技术标签:
【中文标题】如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?【英文标题】:How can I aggregate by different levels pivot then inner join in pyspark? 【发布时间】:2021-12-08 09:56:56 【问题描述】:我是 spark 新手,我有一个包含事务数据的数据框。我想按人员 ID 进行分组,但按不同的属性进行分组,例如店铺类型和教育程度。
%%spark
from pyspark.sql.functions import lit
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import functions as sf
df = spark.sql("SELECT * FROM df limit 10")
shoptypes = df.select('shoptype').distinct().rdd.map(lambda r: r[0]).collect()
edulevel = df.select('edulevel').distinct().rdd.map(lambda r: r[0]).collect()
pivot_1 = df.groupBy("id").pivot("shoptype", shoptypes).sum("amount")
pivot_1 .show()
pivot_2 = df.groupBy("id").pivot("edulevel", edulevel).count()
pivot_2 .show()
alldfs = pivot_1.join(pivot_2, pivot_2.id == pivot_1.id, how='inner').drop(pivot_2.id)
在我内部加入后,alldfs 为空。当枢轴 1 和枢轴 2 具有相同的 id 时,怎么会这样?
但我觉得奇怪的是,pivot_1 和 pivot_2 没有显示相同的人员 ID - 我认为它们会显示相同的来源,因为它们是从相同的来源创建的?我不知道发生了什么。有人可以帮忙吗?我本质上想在不同的 attrubytes 上进行聚合,然后在个人 ID 上水平连接。所以我最终将行作为我的 id,将列作为枢轴属性。
【问题讨论】:
你对pivot的使用和实现是错误的。请检查sparkbyexamples.com/pyspark/pyspark-pivot-and-unpivot-dataframe 链接以正确使用它。还要在此处提供示例数据和预期输出。 已编辑添加输入。 我认为在 id 1 的输出中,tesco 将有 4000 和 Asos 1500。或者您正在使用 dropDuplicates 随机消除重复的 id? 我没有考虑内容,我真的认为枢轴是不言自明的......:P。删除了输入和示例,因为它可能令人困惑。但是像在 excel/python 中一样旋转表格。 只是为了确定。你原来的选择没有限制吧?如果您的原始选择有限制,那么这就是问题所在,因为限制不是确定性的。 【参考方案1】:Select 语句中的 Limit 子句不是确定性的。由于 Spark 是惰性求值的,因此 SQL 语句将执行两次,您将在 pivot_1 和 pivot_2 中获得不同的 id。
【讨论】:
感谢您的理解以上是关于如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?的主要内容,如果未能解决你的问题,请参考以下文章