删除或加速 PySpark 中的显式 for 循环
Posted
技术标签:
【中文标题】删除或加速 PySpark 中的显式 for 循环【英文标题】:remove, or speed-up an explicit for loop in PySpark 【发布时间】:2018-03-16 13:16:15 【问题描述】:正如您在阅读问题后会理解的那样,我是 Spark 的新手。我正在尝试使用每个会话的操作列表创建一个新的 DataFrame,以最终调用 PySparks FP-Growth function
为了澄清我想要什么,我有:
+-----------+---------+
|sessionid |event_col|
+-----------+---------+
|0 | 1|
|1 | 2|
|1 | 3|
|2 | 1|
|0 | 3|
+-----------+---------+
并且想要:
+-----------+---------+
|sessionid | events|
+-----------+---------+
| 0| [1, 3]|
| 1| [2, 3]|
| 2| [1]|
+-----------+---------+
我在纯 Python 中使用 Pandas DataFrame 对此进行了原型设计,以获取事件列表。
sessions = []
for sess in df.sessionid.unique():
session = []
for action in df[df.sessionid == sess]["event_col"]:
session.append(action)
sessions.append(session)
我已经在 PySpark (2.0) 中重写了它,但实际上重新创建了 for 循环:
def sessionsbuilder(df):
df = df.select(['sessionid', 'event_col'])
sessions = []
for sess in df.select('sessionid').distinct().collect():
session = []
for action in df.where(df.sessionid == sess[0][0]).select('event_col').collect():
session.append(action)
sessions.append(session)
return sessions
正如预期的那样,这非常慢(超过 2 小时,而普通 python 和 pandas 需要 11 秒)。我检查了this post 关于 Spark 中嵌套 for 循环的信息。由于 for 循环中的列表启动,我很难为我的目的创建这种 lambdas。可能有一种方法可以在没有显式 for 循环的情况下创建这样的 DataFrame,或者可以更有效地执行此操作(可能是 udf),因为我没有以这种方式利用 Spark 的强大功能。
【问题讨论】:
如果您能解释一下您要做什么,将会很有帮助。我相信您正在尝试将所有items
组合为每个唯一的id
。这是正确的吗?同样在代码 sn-p: sess[0][0]
中我相信是不正确的。我将其更改为:sess[0]
以使其正常工作。
我认为“尝试使用每个会话的操作列表创建一个新的 DataFrame”已经足够解释了。 sess[0][0]
是从Row对象中获取sessionid的字符串(第一个切片给sessionid = id
,第二个切片给id
)
您创建的数据框中没有 sessionid 和 event_col。它是怎么编译的?
createDataFrame
来自 Spark 中 FP-Growth 类的文档,它是为了展示我希望数据最终如何。我编辑了数据框的名称以使其不会混淆
您可以针对您当时所做的更改编辑问题吗?
【参考方案1】:
如果您的数据框看起来像
+---------+---------+
|sessionid|event_col|
+---------+---------+
|0 |1 |
|1 |2 |
|1 |3 |
|2 |1 |
|0 |3 |
+---------+---------+
然后,Lokesh 在上面的评论中所说的 groupBy 和聚合应该足以获得输出为
from pyspark.sql import functions as F
df.groupBy("sessionid").agg(F.collect_list(F.col("event_col")).alias("events")).show(truncate=False)
你应该得到想要的输出
+---------+------+
|sessionid|events|
+---------+------+
|0 |[1, 3]|
|1 |[2, 3]|
|2 |[1] |
+---------+------+
希望回答对你有帮助
【讨论】:
很高兴听到@Chielio以上是关于删除或加速 PySpark 中的显式 for 循环的主要内容,如果未能解决你的问题,请参考以下文章