如何使用 Python / Pyspark 在 Databricks 笔记本中合并数据帧

Posted

技术标签:

【中文标题】如何使用 Python / Pyspark 在 Databricks 笔记本中合并数据帧【英文标题】:How to merge dataframes in Databricks notebook using Python / Pyspark 【发布时间】:2021-02-10 16:32:53 【问题描述】:

我正在使用 Databricks 笔记本来提取 gz 压缩的 csv 文件并加载到数据框对象中。我在下面的第 2 部分遇到问题。

第 1 部分:将压缩文件加载到数据框中运行良好...

    %python
    df1 = spark.read.option("header",True).option("delimiter", "|").csv("dbfs:/model/.../file_1.csv.gz")
    df2 = spark.read.option("header",True).option("delimiter", "|").csv("dbfs:/model/.../file_2.csv.gz")
    

第 2 部分:尝试合并数据框...

    %python
    import pandas as pd
    df =pd.concat([df1, df2], ignore_index=True)
    df.show(truncate=False)
    

...返回以下错误:

TypeError: 无法连接类型为 '' 的对象;只有 Series 和 DataFrame obj 是有效的

对于尝试修改我如何合并数据框有什么建议吗?我最多可以合并 20 个文件,其中所有列都相同。

【问题讨论】:

现在通过将对象转换为 Pandas 数据帧重新尝试: df1 = df1.select("*").toPandas().... 希望这可行,但对象很大,集群很小'什。我希望很快待定。 concat 在 pandas 中受支持,但在 spark 中不支持,您应该改为探索 df1.union(df2)。 pandas 和 pyspark 不一样,.toPandas() 可能有效,但效率不高/当您的 df 大小变大时可能不适合内存 对于多个 dfs 尝试减少 from pyspark.sql import DataFrame 然后 reduce(DataFrame.unionAll, [df1,df2,df3]) 相关spark unionAll multiple dataframes 【参考方案1】:

如果对象很大,我认为最好的方法不是从pyspark 转换为pandas,而是在pyspark 中执行与concat 等效的操作。

请注意 unionAll() 自 Spark “2.0.0” 版本以来已弃用,并替换为 union() https://sparkbyexamples.com/pyspark/pyspark-union-and-unionall/

我相信你可以做到:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2]
merged = reduce(DataFrame.union, dfs)

当然要查看它:

merged.show(truncate=False) # or display(merged)

【讨论】:

你说得对,转换成熊猫惨遭失败。上面的解决方案很好用,谢谢。

以上是关于如何使用 Python / Pyspark 在 Databricks 笔记本中合并数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?

Apache Spark:如何在Python 3中使用pyspark

在 python 或 Pyspark 数据框中使用特殊字符重命名列

python pyspark入门篇

python pyspark入门篇

如何将 json 转换为 pyspark 数据帧(更快的实现)[重复]