使用 pyspark 在循环中附加 Spark DataFrames 的有效方法

Posted

技术标签:

【中文标题】使用 pyspark 在循环中附加 Spark DataFrames 的有效方法【英文标题】:Efficient way of appending Spark DataFrames in a loop using pyspark 【发布时间】:2020-03-27 10:51:29 【问题描述】:

我有'|'分隔巨大的文本文件,我想合并所有文本文件并创建一个巨大的火花数据框,稍后将用于 ETL 过程,使用 pyspark。

低效的方式

1) 创建一个空的 spark 数据框,df

2) 在一个循环中,读取文本文件作为火花数据帧 df1 并将其附加到空火花数据帧 df

df = spark.createDataFrame([],schema)
for x in os.listdir(textfiles_dir):
    filepath = '/'.format(textfiles_dir,x)
    df1 = spark.read.format("csv") \
                    .option("header", "true") \
                    .option("delimiter", "|") \
                    .option("inferSchema","true") \
                    .load(filepath)
    df = df.union(df1)

这不是一种有效的火花方式。

谁能提出一种有效的方法来做到这一点? 如果用示例代码解释那就太好了。

谢谢 :)

【问题讨论】:

一次加载路径列表就可以了 【参考方案1】:

filepath = 多个文件所在目录的文件路径

dataframe = spark.read.format("csv").option("header", "true").option("delimiter", "|").load(filepath )

【讨论】:

【参考方案2】:
    df1 = spark.read。 ... .load("pathFolder/") - 读取文件夹中的所有文件 df1 保存为表 db 或文件

【讨论】:

【参考方案3】:

正如其他人所指出的,您需要将整个文本文件目录作为数据框读取,而不是迭代地读取每个单独的目录:

df = spark.read.format("csv") \
                    .option("header", "true") \
                    .option("delimiter", "|") \
                    .option("inferSchema","true") \
                    .load(textfiles_dir)

如果你真的想走联合路线,我建议使用 SparkContext (http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=union#pyspark.SparkContext.union) 中的 union 函数,而不是 DataFrame 中的联合函数:

dfs = []
for x in os.listdir(textfiles_dir):
   filepath = '/'.format(textfiles_dir,x)
   df1 = spark.read.format("csv") \
                .option("header", "true") \
                .option("delimiter", "|") \
                .option("inferSchema","true") \
                .load(filepath)
   dfs.append(df1)
df = spark.sparkContext.union(dfs)

【讨论】:

以上是关于使用 pyspark 在循环中附加 Spark DataFrames 的有效方法的主要内容,如果未能解决你的问题,请参考以下文章

如何在 for 循环中附加 pyspark 数据帧?

pyspark 在 for 循环下的每个进程之后附加非常大的多个数据帧(例如:在每日 ETL 之后附加)

Pyspark 追加执行程序环境变量

如何使用 Spark/PySpark 删除雪花目标表

在 for 循环中使用 udf 在 Pyspark 中创建多个列

PySpark:将字典数据附加到 PySpark DataFrame