删除或加速 PySpark 中的显式 for 循环

Posted

技术标签:

【中文标题】删除或加速 PySpark 中的显式 for 循环【英文标题】:remove, or speed-up an explicit for loop in PySpark 【发布时间】:2018-03-16 13:16:15 【问题描述】:

正如您在阅读问题后会理解的那样,我是 Spark 的新手。我正在尝试使用每个会话的操作列表创建一个新的 DataFrame,以最终调用 PySparks FP-Growth function

为了澄清我想要什么,我有:

+-----------+---------+
|sessionid  |event_col|
+-----------+---------+
|0          |        1|
|1          |        2|
|1          |        3|
|2          |        1|
|0          |        3|
+-----------+---------+

并且想要:

+-----------+---------+
|sessionid  |   events|
+-----------+---------+
|          0|   [1, 3]|
|          1|   [2, 3]|
|          2|      [1]|
+-----------+---------+

我在纯 Python 中使用 Pandas DataFrame 对此进行了原型设计,以获取事件列表。

sessions = []
for sess in df.sessionid.unique():
    session = []
    for action in df[df.sessionid == sess]["event_col"]:
        session.append(action)
    sessions.append(session)

我已经在 PySpark (2.0) 中重写了它,但实际上重新创建了 for 循环:

def sessionsbuilder(df):
  df = df.select(['sessionid', 'event_col'])
  sessions = []
  for sess in df.select('sessionid').distinct().collect():
      session = []
      for action in df.where(df.sessionid == sess[0][0]).select('event_col').collect():
          session.append(action)
      sessions.append(session)
  return sessions

正如预期的那样,这非常慢(超过 2 小时,而普通 python 和 pandas 需要 11 秒)。我检查了this post 关于 Spark 中嵌套 for 循环的信息。由于 for 循环中的列表启动,我很难为我的目的创建这种 lambdas。可能有一种方法可以在没有显式 for 循环的情况下创建这样的 DataFrame,或者可以更有效地执行此操作(可能是 udf),因为我没有以这种方式利用 Spark 的强大功能。

【问题讨论】:

如果您能解释一下您要做什么,将会很有帮助。我相信您正在尝试将所有items 组合为每个唯一的id。这是正确的吗?同样在代码 sn-p: sess[0][0] 中我相信是不正确的。我将其更改为:sess[0] 以使其正常工作。 我认为“尝试使用每个会话的操作列表创建一个新的 DataFrame”已经足够解释了。 sess[0][0]是从Row对象中获取sessionid的字符串(第一个切片给sessionid = id,第二个切片给id 您创建的数据框中没有 sessionid 和 event_col。它是怎么编译的? createDataFrame 来自 Spark 中 FP-Growth 类的文档,它是为了展示我希望数据最终如何。我编辑了数据框的名称以使其不会混淆 您可以针对您当时所做的更改编辑问题吗? 【参考方案1】:

如果您的数据框看起来像

+---------+---------+
|sessionid|event_col|
+---------+---------+
|0        |1        |
|1        |2        |
|1        |3        |
|2        |1        |
|0        |3        |
+---------+---------+

然后,Lokesh 在上面的评论中所说的 groupBy 和聚合应该足以获得输出为

from pyspark.sql import functions as F
df.groupBy("sessionid").agg(F.collect_list(F.col("event_col")).alias("events")).show(truncate=False)

你应该得到想要的输出

+---------+------+
|sessionid|events|
+---------+------+
|0        |[1, 3]|
|1        |[2, 3]|
|2        |[1]   |
+---------+------+

希望回答对你有帮助

【讨论】:

很高兴听到@Chielio

以上是关于删除或加速 PySpark 中的显式 for 循环的主要内容,如果未能解决你的问题,请参考以下文章

如何避免包含`a(i)= b(i,c(i))`的显式循环?

智能转换与 KOTLIN 中的显式转换有何不同

需要专业建议,关于非角度应用的量角器中的显式等待

"for" 循环只添加最后的 ggplot 层

运算符重载中的显式构造?

Scala 中的显式类型转换