pyspark 在 for 循环下的每个进程之后附加非常大的多个数据帧(例如:在每日 ETL 之后附加)

Posted

技术标签:

【中文标题】pyspark 在 for 循环下的每个进程之后附加非常大的多个数据帧(例如:在每日 ETL 之后附加)【英文标题】:pyspark append very large multiple dataframes after each process under for loop (eg: append after daily ETL) 【发布时间】:2018-07-13 11:58:53 【问题描述】:

我必须每天进行 ETL,然后将其添加到单个数据帧中。 例如:每天 ETL 之后的输出如下..

df1: 
    id category quantity date
    1   abc       100    01-07-18
    2   deg       175    01-07-18
    .....
df2: 
    id category quantity date
    1   abc       50     02-07-18
    2   deg       300    02-07-18
    3   zzz       250    02-07-18
    .....
df3: 
    id category quantity date
    1   abc       500    03-07-18
    .....
df4: 
    id category quantity date
    5   jjj       200    04-07-18
    7   ddd       100    04-07-18
    .....

对于每一天的 ETL,需要创建一个数据框,如 df1、df2、df3、...,并且在每天 ETL 之后,该数据框应附加更早的 ETL 日期。

预期的最终输出:

After day 2 output should be:
 finaldf: 
        id category quantity date
        1   abc       100    01-07-18
        2   deg       175    01-07-18
        1   abc       50     02-07-18
        2   deg       300    02-07-18
        3   zzz       250    02-07-18
        .....


After day 4 output should be:
     finaldf: 
            id category quantity date
            1   abc       100    01-07-18
            2   deg       175    01-07-18
            1   abc       50     02-07-18
            2   deg       300    02-07-18
            3   zzz       250    02-07-18
            1   abc       500    03-07-18
            5   jjj       200    04-07-18
            7   ddd       100    04-07-18
            .....

我已经使用Pandas using append function 完成了这项工作,但由于数据量非常大,我收到了 MemoryError。

【问题讨论】:

您在使用spark 数据帧吗? 是的,火花@pault 【参考方案1】:

PySpark 的答案

把所有的DataFrames放到一个列表中

df_list = [df1, df2, df3, df4]
finaldf = reduce(lambda x, y: x.union(y), df_list)

finaldf 将包含所有数据。

【讨论】:

以上是关于pyspark 在 for 循环下的每个进程之后附加非常大的多个数据帧(例如:在每日 ETL 之后附加)的主要内容,如果未能解决你的问题,请参考以下文章

如何在 for 循环中附加 pyspark 数据帧?

重用pyspark缓存并在for循环中不持久

删除或加速 PySpark 中的显式 for 循环

如何使用 PySpark 并行化我的文件处理程序

我们可以在 pyspark 的 ParamGridBuilder 中使用 for 循环吗?

在 pyspark 中同时而不是按顺序运行 for 循环