如何从 Python 循环创建 PySpark DataFrame
Posted
技术标签:
【中文标题】如何从 Python 循环创建 PySpark DataFrame【英文标题】:How to create a PySpark DataFrame from a Python loop 【发布时间】:2018-10-11 18:54:57 【问题描述】:我正在循环访问多个运行良好的网络服务
customers= json.loads(GetCustomers())
for o in customers["result"]:
if o["customerId"] is not None:
custRoles = GetCustomersRoles(o["customerId"])
custRolesObj = json.loads(custRoles)
if custRolesObj["result"] is not None:
for l in custRolesObj["result"]:
print str(l["custId"]) + ", " + str(o["salesAmount"])
这行得通,我的 print 输出也是正确的。但是,现在我需要从中创建一个 DataFrame。我读到,我们不能“创建一个包含两列的 DataFrame 并在循环时逐行添加”。
但是我该如何解决呢?
更新
我希望这是创建列表的正确方法?
customers= json.loads(GetCustomers())
result = []
for o in customers["result"]:
if o["customerId"] is not None:
custRoles = GetCustomersRoles(o["customerId"])
custRolesObj = json.loads(custRoles)
if custRolesObj["result"] is not None:
for l in custRolesObj["result"]:
result.append(make_opportunity(str(l["customerId"]), str(o["salesAmount"])))
如果正确,如何从中创建 Dataframe?
【问题讨论】:
将结果存储在元组(或列表)列表中,然后在最后创建 spark DataFrame。您可以在循环中添加一行,但效率极低 正如@pault 所说,我绝对不会将行添加(或附加)到 for 循环内的数据帧中。这将是非常低效的。组装数据后,在循环之外一次创建数据帧的性能要高得多。请注意,您应该在 OP 中包含数据样本。 @pault:你能给我一个两列场景的样本吗?df = spark.createDataFrame([('a', 1), ('b', 2), ('c', 3)], ["letter", "number"])
。也看看this post。
这就是我现在所知道的并且知道如何在理论上实施。但是实际上如何通过一个小代码sn-p来做到这一点。这将是答案和我正在寻找的。span>
【参考方案1】:
我使用以下代码解决了我的问题
customers= json.loads(GetCustomers())
result = []
for o in customers["result"]:
if o["customerId"] is not None:
custRoles = GetCustomersRoles(o["customerId"])
custRolesObj = json.loads(custRoles)
if custRolesObj["result"] is not None:
for l in custRolesObj["result"]:
result.append([str(l["customerId"]), str(o["salesAmount"])])
from pyspark.sql import *
df = spark.createDataFrame(result,['customerId', 'salesAmount'])
【讨论】:
以上是关于如何从 Python 循环创建 PySpark DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
pyspark - 使用最大值为一列创建一个从 0 到该值的行值循环,并为其重复其他列值