使用 pyspark 在循环中附加 Spark DataFrames 的有效方法
Posted
技术标签:
【中文标题】使用 pyspark 在循环中附加 Spark DataFrames 的有效方法【英文标题】:Efficient way of appending Spark DataFrames in a loop using pyspark 【发布时间】:2020-03-27 10:51:29 【问题描述】:我有'|'分隔巨大的文本文件,我想合并所有文本文件并创建一个巨大的火花数据框,稍后将用于 ETL 过程,使用 pyspark。
低效的方式
1) 创建一个空的 spark 数据框,df
2) 在一个循环中,读取文本文件作为火花数据帧 df1 并将其附加到空火花数据帧 df
df = spark.createDataFrame([],schema)
for x in os.listdir(textfiles_dir):
filepath = '/'.format(textfiles_dir,x)
df1 = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema","true") \
.load(filepath)
df = df.union(df1)
这不是一种有效的火花方式。
谁能提出一种有效的方法来做到这一点? 如果用示例代码解释那就太好了。
谢谢 :)
【问题讨论】:
一次加载路径列表就可以了 【参考方案1】:filepath = 多个文件所在目录的文件路径
dataframe = spark.read.format("csv").option("header", "true").option("delimiter", "|").load(filepath )
【讨论】:
【参考方案2】:-
df1 = spark.read。 ... .load("pathFolder/") - 读取文件夹中的所有文件
df1 保存为表 db 或文件
【讨论】:
【参考方案3】:正如其他人所指出的,您需要将整个文本文件目录作为数据框读取,而不是迭代地读取每个单独的目录:
df = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema","true") \
.load(textfiles_dir)
如果你真的想走联合路线,我建议使用 SparkContext (http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=union#pyspark.SparkContext.union) 中的 union
函数,而不是 DataFrame 中的联合函数:
dfs = []
for x in os.listdir(textfiles_dir):
filepath = '/'.format(textfiles_dir,x)
df1 = spark.read.format("csv") \
.option("header", "true") \
.option("delimiter", "|") \
.option("inferSchema","true") \
.load(filepath)
dfs.append(df1)
df = spark.sparkContext.union(dfs)
【讨论】:
以上是关于使用 pyspark 在循环中附加 Spark DataFrames 的有效方法的主要内容,如果未能解决你的问题,请参考以下文章
pyspark 在 for 循环下的每个进程之后附加非常大的多个数据帧(例如:在每日 ETL 之后附加)