在 PySpark 中反转 Group By

Posted

技术标签:

【中文标题】在 PySpark 中反转 Group By【英文标题】:Reversing Group By in PySpark 【发布时间】:2020-09-21 17:19:44 【问题描述】:

我不确定问题本身的正确性。我为 SQL 找到的解决方案不适用于 Hive SQL 或禁止递归。 因此,我想在 Pyspark 中解决这个问题,并且需要一个解决方案,或者至少是想法,如何解决这个问题。

我有一个如下所示的原始表格:

+--------+----------+
|customer|nr_tickets|
+--------+----------+
|       A|         3|
|       B|         1|
|       C|         2|
+--------+----------+

这就是我想要的表格:

+--------+
|customer|
+--------+
|       A|
|       A|
|       A|
|       B|
|       C|
|       C|
+--------+

你有什么建议吗?

非常感谢您!

【问题讨论】:

【参考方案1】:

对于 Spark2.4+,请使用 array_repeatexplode

from pyspark.sql import functions as F

df.selectExpr("""explode(array_repeat(customer,cast(nr_tickets as int))) as customer""").show()

#+--------+
#|customer|
#+--------+
#|       A|
#|       A|
#|       A|
#|       B|
#|       C|
#|       C|
#+--------+

【讨论】:

您好,感谢您的解决方案。我已经尝试过了,但是它说“array_repeat”未知:未定义的函数:“array_repeat”。该函数既不是注册的临时函数,也不是在数据库“默认”中注册的永久函数。不幸的是,检查我的 Sparkversion 也不起作用。但它应该在 2 以上。 你的 spark 版本小于 2.4。这种方法对你不起作用 愚蠢的问题,但我如何找出我的火花版本? 'sc.version' 不起作用【参考方案2】:

您可以通过遍历行(组)来创建一个新的数据框。

使用range(int(a["nr_tickets"])) 为该客户重复nr_ticketscustomer (Row(customer=a["customer"])) 的行列表

df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]

您可以将它们存储并附加到一个列表中,然后用它创建一个数据框。

 df= spark.createDataFrame(df_list)

总体而言,

from pyspark.sql import Row

df_list = []
for a in df.select(["customer","nr_tickets"]).collect():
  df_list = df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]
df= spark.createDataFrame(df_list)
df.show()

你也可以用列表理解来做到这一点

from pyspark.sql import Row
from functools import reduce #python 3

df_list = [
[Row(customer=a["customer"])]*int(a["nr_tickets"]) 
for a in df.select(["customer","nr_tickets"]).collect() 
 ]

df= spark.createDataFrame(reduce(lambda x,y: x+y,df_list))
df.show()

生产

+--------+
|customer|
+--------+
|       A|
|       A|
|       A|
|       B|
|       C|
|       C|
+--------+

【讨论】:

您好,感谢您的帮助。您的代码的“整体”版本生成的列表确实是正确的。但是转换为 df 不知何故对我不起作用,并在我 df.show(): 时抛出此错误 Py4JJavaError:调用 o153.showString 时出错。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 6.0 中的任务 0 失败 4 次,最近一次失败:阶段 6.0 中丢失任务 0.3(TID 12,sdeb-hdpdn-q3014a.sys.schwarz,执行程序2):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后):文件“/hadoop/disk10/hadoop/yarn/local/usercache/bnem2103/appcache/application_1598371445148_44504/container_e247_1598371445148_44504_01_000003/pyspark.zip/ pyspark/worker.py",第 125 行,在 main ("%d.%d" % sys.version_info[:2], version)) 异常:worker 中的 Python 3.6 版本与驱动程序 3.8 中的版本不同,PySpark 无法使用不同的次要版本运行。请检查环境变量 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON 是否设置正确。 我在使用“列表理解”版本时遇到相同/类似的错误。 你在哪里运行 spark?【参考方案3】:

同时我自己也找到了解决办法:

for i in range(1, max_nr_of_tickets):
    table = table.filter(F.col('nr_tickets') >= 1).union(test)
    table = table.withColumn('nr_tickets', F.col('nr_tickets') - 1)

解释:DF的“table”和“test”开头是一样的。 所以“max_nr_of_tickets”只是最高的“nr_tickets”。有用。 我只是在为最大数字的格式苦苦挣扎:

max_nr_of_tickets = df.select(F.max('nr_tickets')).collect()

我不能在 for 循环的范围内使用结果,因为它是一个列表。所以我手动输入最高的数字。 有什么想法可以让 max_nr_of_tickets 转换为正确的格式,以便循环范围接受它?

谢谢

【讨论】:

以上是关于在 PySpark 中反转 Group By的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark数据框中根据group by连接行值

在jupyter中访问数据框元素pyspark

使用数据框在pyspark中获取列post group by

为啥 pyspark sql 不能正确计算 group by 子句?

分组并爆炸pyspark数组类型列

如何在 pyspark 数据帧上应用 group by 并对结果对象进行转换