如何使用 Python / Pyspark 在 Databricks 笔记本中合并数据帧
Posted
技术标签:
【中文标题】如何使用 Python / Pyspark 在 Databricks 笔记本中合并数据帧【英文标题】:How to merge dataframes in Databricks notebook using Python / Pyspark 【发布时间】:2021-02-10 16:32:53 【问题描述】:我正在使用 Databricks 笔记本来提取 gz 压缩的 csv 文件并加载到数据框对象中。我在下面的第 2 部分遇到问题。
第 1 部分:将压缩文件加载到数据框中运行良好...
%python
df1 = spark.read.option("header",True).option("delimiter", "|").csv("dbfs:/model/.../file_1.csv.gz")
df2 = spark.read.option("header",True).option("delimiter", "|").csv("dbfs:/model/.../file_2.csv.gz")
第 2 部分:尝试合并数据框...
%python
import pandas as pd
df =pd.concat([df1, df2], ignore_index=True)
df.show(truncate=False)
...返回以下错误:
TypeError: 无法连接类型为 '
对于尝试修改我如何合并数据框有什么建议吗?我最多可以合并 20 个文件,其中所有列都相同。
【问题讨论】:
现在通过将对象转换为 Pandas 数据帧重新尝试: df1 = df1.select("*").toPandas().... 希望这可行,但对象很大,集群很小'什。我希望很快待定。 concat 在 pandas 中受支持,但在 spark 中不支持,您应该改为探索df1.union(df2)
。 pandas 和 pyspark 不一样,.toPandas()
可能有效,但效率不高/当您的 df 大小变大时可能不适合内存
对于多个 dfs 尝试减少 from pyspark.sql import DataFrame
然后 reduce(DataFrame.unionAll, [df1,df2,df3])
相关spark unionAll multiple dataframes
【参考方案1】:
如果对象很大,我认为最好的方法不是从pyspark
转换为pandas
,而是在pyspark
中执行与concat
等效的操作。
请注意 unionAll()
自 Spark “2.0.0” 版本以来已弃用,并替换为 union()
https://sparkbyexamples.com/pyspark/pyspark-union-and-unionall/
我相信你可以做到:
from functools import reduce
from pyspark.sql import DataFrame
dfs = [df1,df2]
merged = reduce(DataFrame.union, dfs)
当然要查看它:
merged.show(truncate=False) # or display(merged)
【讨论】:
你说得对,转换成熊猫惨遭失败。上面的解决方案很好用,谢谢。以上是关于如何使用 Python / Pyspark 在 Databricks 笔记本中合并数据帧的主要内容,如果未能解决你的问题,请参考以下文章
如何在 pyspark 中使用 df.write.csv 附加到 csv 文件?
Apache Spark:如何在Python 3中使用pyspark