如何从 python 复制 pyspark / hadoop 中的文件
Posted
技术标签:
【中文标题】如何从 python 复制 pyspark / hadoop 中的文件【英文标题】:How to copy a file in pyspark / hadoop from python 【发布时间】:2019-02-05 02:49:33 【问题描述】:我正在使用 pyspark 将数据框保存为 parquet 文件或 csv 文件:
def write_df_as_parquet_file(df, path, mode="overwrite"):
df = df.repartition(1) # join partitions to produce 1 parquet file
dfw = df.write.format("parquet").mode(mode)
dfw.save(path)
def write_df_as_csv_file(df, path, mode="overwrite", header=True):
df = df.repartition(1) # join partitions to produce 1 csv file
header = "true" if header else "false"
dfw = df.write.format("csv").option("header", header).mode(mode)
dfw.save(path)
但这会将 parquet/csv 文件保存在一个名为 path
的文件夹中,并以这种方式保存一些我们不需要的其他文件:
图片:https://ibb.co/9c1D8RL
基本上,我想创建一些函数,使用上述方法将文件保存到一个位置,然后将 CSV 或 PARQUET 文件移动到一个新位置。喜欢:
def write_df_as_parquet_file(df, path, mode="overwrite"):
# save df in one file inside tmp_folder
df = df.repartition(1) # join partitions to produce 1 parquet file
dfw = df.write.format("parquet").mode(mode)
tmp_folder = path + "TEMP"
dfw.save(tmp_folder)
# move parquet file from tmp_folder to path
copy_file(tmp_folder + "*.parquet", path)
remove_folder(tmp_folder)
我该怎么做?如何实现copy_file
或remove_folder
?我在 scala 中看到了一些使用 Hadoop api 的解决方案,但我无法在 python 中完成这项工作。我想我需要使用sparkContext,但是我还在学习Hadoop,还没有找到方法。
【问题讨论】:
【参考方案1】:您可以使用 Python 的 HDFS 库之一连接到您的 HDFS 实例,然后执行所需的任何操作。
来自 hdfs3 文档(https://hdfs3.readthedocs.io/en/latest/quickstart.html):
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host=<host>, port=<port>)
hdfs.mv(tmp_folder + "*.parquet", path)
将上面的内容包装在一个函数中,你就可以开始了。
注意:我刚刚以 hdfs3 为例。您也可以使用 hdfsCLI。
【讨论】:
我已经找到了类似的东西,但我不知道在哪里可以找到主机和端口。我正在使用一个已经为我设置环境的数据块实例,所以我不知道它是如何配置的。也许我应该就此提出一个单独的问题。以上是关于如何从 python 复制 pyspark / hadoop 中的文件的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:从 Python 到 Pyspark 实现 lambda 函数和 udf
如何使用 pyspark 从 python 列表中选择随机文本值?