如何从 Python 循环创建 PySpark DataFrame

Posted

技术标签:

【中文标题】如何从 Python 循环创建 PySpark DataFrame【英文标题】:How to create a PySpark DataFrame from a Python loop 【发布时间】:2018-10-11 18:54:57 【问题描述】:

我正在循环访问多个运行良好的网络服务

customers= json.loads(GetCustomers())

for o in customers["result"]:
  if o["customerId"] is not None:
    custRoles = GetCustomersRoles(o["customerId"])
    custRolesObj = json.loads(custRoles)

    if custRolesObj["result"] is not None:
      for l in custRolesObj["result"]:
        print str(l["custId"]) + ", " + str(o["salesAmount"])

这行得通,我的 print 输出也是正确的。但是,现在我需要从中创建一个 DataFrame。我读到,我们不能“创建一个包含两列的 DataFrame 并在循环时逐行添加”。

但是我该如何解决呢?

更新

我希望这是创建列表的正确方法?

customers= json.loads(GetCustomers())
result = []

for o in customers["result"]:
  if o["customerId"] is not None:
    custRoles = GetCustomersRoles(o["customerId"])
    custRolesObj = json.loads(custRoles)

    if custRolesObj["result"] is not None:
      for l in custRolesObj["result"]:
          result.append(make_opportunity(str(l["customerId"]), str(o["salesAmount"])))

如果正确,如何从中创建 Dataframe?

【问题讨论】:

将结果存储在元组(或列表)列表中,然后在最后创建 spark DataFrame。您可以在循环中添加一行,但效率极低 正如@pault 所说,我绝对不会将行添加(或附加)到 for 循环内的数据帧中。这将是非常低效的。组装数据后,在循环之外一次创建数据帧的性能要高得多。请注意,您应该在 OP 中包含数据样本。 @pault:你能给我一个两列场景的样本吗? df = spark.createDataFrame([('a', 1), ('b', 2), ('c', 3)], ["letter", "number"])。也看看this post。 这就是我现在所知道的并且知道如何在理论上实施。但是实际上如何通过一个小代码sn-p来做到这一点。这将是答案和我正在寻找的。​​span> 【参考方案1】:

我使用以下代码解决了我的问题

customers= json.loads(GetCustomers())
result = []

for o in customers["result"]:
  if o["customerId"] is not None:
    custRoles = GetCustomersRoles(o["customerId"])
    custRolesObj = json.loads(custRoles)

    if custRolesObj["result"] is not None:
      for l in custRolesObj["result"]:
          result.append([str(l["customerId"]), str(o["salesAmount"])])

from pyspark.sql import *

df = spark.createDataFrame(result,['customerId', 'salesAmount'])

【讨论】:

以上是关于如何从 Python 循环创建 PySpark DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

pyspark - 使用最大值为一列创建一个从 0 到该值的行值循环,并为其重复其他列值

pyspark:从现有列创建 MapType 列

如何在不使用for循环的情况下从pyspark中的列表创建数据框?

如何在 PySpark 中广播 RDD?

如何使用 pyspark 在数据块中循环数据框列

ipython怎么安装pyspark