在 PySpark 中反转 Group By
Posted
技术标签:
【中文标题】在 PySpark 中反转 Group By【英文标题】:Reversing Group By in PySpark 【发布时间】:2020-09-21 17:19:44 【问题描述】:我不确定问题本身的正确性。我为 SQL 找到的解决方案不适用于 Hive SQL 或禁止递归。 因此,我想在 Pyspark 中解决这个问题,并且需要一个解决方案,或者至少是想法,如何解决这个问题。
我有一个如下所示的原始表格:
+--------+----------+
|customer|nr_tickets|
+--------+----------+
| A| 3|
| B| 1|
| C| 2|
+--------+----------+
这就是我想要的表格:
+--------+
|customer|
+--------+
| A|
| A|
| A|
| B|
| C|
| C|
+--------+
你有什么建议吗?
非常感谢您!
【问题讨论】:
【参考方案1】:对于 Spark2.4+
,请使用 array_repeat
和 explode
。
from pyspark.sql import functions as F
df.selectExpr("""explode(array_repeat(customer,cast(nr_tickets as int))) as customer""").show()
#+--------+
#|customer|
#+--------+
#| A|
#| A|
#| A|
#| B|
#| C|
#| C|
#+--------+
【讨论】:
您好,感谢您的解决方案。我已经尝试过了,但是它说“array_repeat”未知:未定义的函数:“array_repeat”。该函数既不是注册的临时函数,也不是在数据库“默认”中注册的永久函数。不幸的是,检查我的 Sparkversion 也不起作用。但它应该在 2 以上。 你的 spark 版本小于 2.4。这种方法对你不起作用 愚蠢的问题,但我如何找出我的火花版本? 'sc.version' 不起作用【参考方案2】:您可以通过遍历行(组)来创建一个新的数据框。
使用range(int(a["nr_tickets"]))
为该客户重复nr_tickets
次customer
(Row(customer=a["customer"])
) 的行列表
df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]
您可以将它们存储并附加到一个列表中,然后用它创建一个数据框。
df= spark.createDataFrame(df_list)
总体而言,
from pyspark.sql import Row
df_list = []
for a in df.select(["customer","nr_tickets"]).collect():
df_list = df_list + [Row(customer=a["customer"]) for T in range(int(a["nr_tickets"]))]
df= spark.createDataFrame(df_list)
df.show()
你也可以用列表理解来做到这一点
from pyspark.sql import Row
from functools import reduce #python 3
df_list = [
[Row(customer=a["customer"])]*int(a["nr_tickets"])
for a in df.select(["customer","nr_tickets"]).collect()
]
df= spark.createDataFrame(reduce(lambda x,y: x+y,df_list))
df.show()
生产
+--------+
|customer|
+--------+
| A|
| A|
| A|
| B|
| C|
| C|
+--------+
【讨论】:
您好,感谢您的帮助。您的代码的“整体”版本生成的列表确实是正确的。但是转换为 df 不知何故对我不起作用,并在我 df.show(): 时抛出此错误 Py4JJavaError:调用 o153.showString 时出错。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 6.0 中的任务 0 失败 4 次,最近一次失败:阶段 6.0 中丢失任务 0.3(TID 12,sdeb-hdpdn-q3014a.sys.schwarz,执行程序2):org.apache.spark.api.python.PythonException:回溯(最近一次调用最后):文件“/hadoop/disk10/hadoop/yarn/local/usercache/bnem2103/appcache/application_1598371445148_44504/container_e247_1598371445148_44504_01_000003/pyspark.zip/ pyspark/worker.py",第 125 行,在 main ("%d.%d" % sys.version_info[:2], version)) 异常:worker 中的 Python 3.6 版本与驱动程序 3.8 中的版本不同,PySpark 无法使用不同的次要版本运行。请检查环境变量 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON 是否设置正确。 我在使用“列表理解”版本时遇到相同/类似的错误。 你在哪里运行 spark?【参考方案3】:同时我自己也找到了解决办法:
for i in range(1, max_nr_of_tickets):
table = table.filter(F.col('nr_tickets') >= 1).union(test)
table = table.withColumn('nr_tickets', F.col('nr_tickets') - 1)
解释:DF的“table”和“test”开头是一样的。 所以“max_nr_of_tickets”只是最高的“nr_tickets”。有用。 我只是在为最大数字的格式苦苦挣扎:
max_nr_of_tickets = df.select(F.max('nr_tickets')).collect()
我不能在 for 循环的范围内使用结果,因为它是一个列表。所以我手动输入最高的数字。 有什么想法可以让 max_nr_of_tickets 转换为正确的格式,以便循环范围接受它?
谢谢
【讨论】:
以上是关于在 PySpark 中反转 Group By的主要内容,如果未能解决你的问题,请参考以下文章
使用数据框在pyspark中获取列post group by