如何在pyspark的循环中合并数据帧

Posted

技术标签:

【中文标题】如何在pyspark的循环中合并数据帧【英文标题】:how to merge dataframes in a loop in pyspark 【发布时间】:2020-11-23 18:42:16 【问题描述】:

我的 pyspark 代码遇到问题。

我必须遍历一个时间范围并计算给定时间范围内每个月的某些指标,并将结果写入 S3。

示例代码:

for date in daterange(startdate, enddate):
   df = spark.read.parquet(PATH + "/" + date)
   df_final = applylogic(df)

问题是,我无法一一写入数据帧,因为每次都应该覆盖 S3 路径。所以我需要一种方法将循环中的数据帧组合成一个数据帧并将其写入 S3。

请帮助我为 S3 编写相同的逻辑

【问题讨论】:

【参考方案1】:

您可以在 for 循环中对 df_finals 执行 union

from functools import reduce

df_list = []
for date in daterange(startdate, enddate):
    df = spark.read.parquet(PATH + "/" + date)
    df_final = applylogic(df)
    df_list.append(df_final)
output_df = reduce(lambda x, y: x.union(y), df_list)

【讨论】:

谢谢你,mck,看起来它正在工作......我可以知道作为列表的 df_list 是如何在这里转换为数据框的吗? @AswinKs df_list 中的数据框列表在最后一行合并在一起。最后一行相当于df_list[0].union(df_list[1]).union(df_list[2])... @mick,感谢您的回复。我现在面临另一个问题。在执行这段代码时,由于延迟评估代码并且仅在减少操作期间执行操作,因此出现内存不足错误。我试图通过收集数据框来稍微更改代码: df_list.append(df_final.collect()) ;但是当我这样做时,reduce 操作失败并出现错误 - 属性错误:列表对象没有属性联合

以上是关于如何在pyspark的循环中合并数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何在 for 循环中附加 pyspark 数据帧?

Scala:如何在循环中合并数据帧

pyspark - 将两个数据帧与目标中的额外列合并

使用 pyspark 在循环中附加 Spark DataFrames 的有效方法

如何避免pyspark中加入操作中的过度洗牌?

如何使用 for 循环组合多个数据帧?