将大量 Spark 数据帧合并为一个

Posted

技术标签:

【中文标题】将大量 Spark 数据帧合并为一个【英文标题】:Merge large number of spark dataframes into one 【发布时间】:2017-04-07 03:55:28 【问题描述】:

我正在使用满足不同条件的不同查询在 for 循环内查询缓存的配置单元临时表超过 1500 次。我需要在循环内使用 unionAll 将它们全部合并。但是由于 spark 无法跟上 RDD 血统,我得到了 *** 错误。

伪代码:

df=[from a hive table]
tableA=[from a hive table]
tableA.registerTempTable("tableA")
HiveContext.sql('CACHE TABLE tableA')

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    else:
        df1=query something from tableA
        df=df.unionAll(df1)

由于 RDD 沿袭变硬,这会引发 *** 错误。所以我尝试如下检查点:

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df1=query something from tableA
        df=df.unionAll(df1)
    elif ():
        df1=query something from tableA
        df=df.unionAll(df1)
    else:
        df1=query something from tableA
        df=df.unionAll(df1)
    df.rdd.checkpoint
    df = sqlContext.createDataFrame(df.rdd, df.schema)

我遇到了同样的错误。所以我尝试了 SaveAsTable ,因为每个 hql 查询和循环内的 hive io 之间的作业提交滞后,我一直想避免这种情况。但这种方法效果很好。

for i in range(0,2000):
    if (list[0]['column1']=='xyz'):
        df=query something from tableA
        df.write.saveAsTable('output', mode='append')
    elif ():
        df=query something from tableA
        df.write.saveAsTable('output', mode='append') 

我需要帮助来避免将数据框保存到循环内的配置单元中。我想以某种内存和高效的方式合并 dfs。我尝试的其他选项之一是将查询结果直接插入临时表中,但出现错误:无法插入基于 RDD 的表中。

【问题讨论】:

一般来说,这种循环和联合操作总是会导致 Spark 出现问题。您正在运行哪种查询?也许有一种更聪明的方法来重构你的代码,它不需要循环。还有,条件是什么? 条件并不复杂——一些正则表达式匹配和一些直接整数匹配。但问题是我已经使最终用户能够创建这些条件,他们只能编写基于 sql 的条件并将它们导入到 spark 中来处理数据。简单来说,我的应用程序就像一个 sql 工作台一样工作 - 不同之处在于它运行所有查询并将结果存储在一个表中。 【参考方案1】:

也许,结果的临时表会起作用。

df1="query something from tableA".registerTempTable("result")
sqlContext.sql("Insert into result query something from tableA")

【讨论】:

就像我在帖子中提到的那样,它会抛出一个错误:无法插入基于 RDD 的表中。

以上是关于将大量 Spark 数据帧合并为一个的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Python 中合并 Spark SQL 数据帧

具有不匹配模式的 Spark 合并数据帧,无需额外的磁盘 IO

spark:合并两个数据帧,如果两个数据帧中的ID重复,则df1中的行覆盖df2中的行

Pandas:如何将两个不完整的数据帧合并或合并为一个完整的数据帧

如何在没有公共密钥的情况下合并 Apache Spark 中的两个数据帧?

使用熊猫循环合并大量csv文件[重复]