如何从 python 复制 pyspark / hadoop 中的文件

Posted

技术标签:

【中文标题】如何从 python 复制 pyspark / hadoop 中的文件【英文标题】:How to copy a file in pyspark / hadoop from python 【发布时间】:2019-02-05 02:49:33 【问题描述】:

我正在使用 pyspark 将数据框保存为 parquet 文件或 csv 文件:

def write_df_as_parquet_file(df, path, mode="overwrite"):
    df = df.repartition(1)  # join partitions to produce 1 parquet file
    dfw = df.write.format("parquet").mode(mode)
    dfw.save(path)

def write_df_as_csv_file(df, path, mode="overwrite", header=True):
    df = df.repartition(1)  # join partitions to produce 1 csv file
    header = "true" if header else "false"
    dfw = df.write.format("csv").option("header", header).mode(mode)
    dfw.save(path)

但这会将 parquet/csv 文件保存在一个名为 path 的文件夹中,并以这种方式保存一些我们不需要的其他文件:

图片:https://ibb.co/9c1D8RL

基本上,我想创建一些函数,使用上述方法将文件保存到一个位置,然后将 CSV 或 PARQUET 文件移动到一个新位置。喜欢:

def write_df_as_parquet_file(df, path, mode="overwrite"):
    # save df in one file inside tmp_folder
    df = df.repartition(1)  # join partitions to produce 1 parquet file
    dfw = df.write.format("parquet").mode(mode)
    tmp_folder = path + "TEMP"
    dfw.save(tmp_folder)

    # move parquet file from tmp_folder to path
    copy_file(tmp_folder + "*.parquet", path)
    remove_folder(tmp_folder)

我该怎么做?如何实现copy_fileremove_folder?我在 scala 中看到了一些使用 Hadoop api 的解决方案,但我无法在 python 中完成这项工作。我想我需要使用sparkContext,但是我还在学习Hadoop,还没有找到方法。

【问题讨论】:

【参考方案1】:

您可以使用 Python 的 HDFS 库之一连接到您的 HDFS 实例,然后执行所需的任何操作。

来自 hdfs3 文档(https://hdfs3.readthedocs.io/en/latest/quickstart.html):

from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host=<host>, port=<port>)
hdfs.mv(tmp_folder + "*.parquet", path)

将上面的内容包装在一个函数中,你就可以开始了。

注意:我刚刚以 hdfs3 为例。您也可以使用 hdfsCLI。

【讨论】:

我已经找到了类似的东西,但我不知道在哪里可以找到主机和端口。我正在使用一个已经为我设置环境的数据块实例,所以我不知道它是如何配置的。也许我应该就此提出一个单独的问题。

以上是关于如何从 python 复制 pyspark / hadoop 中的文件的主要内容,如果未能解决你的问题,请参考以下文章

从 C++ 到 Python,如何写这个? [复制]

Pyspark:从 Python 到 Pyspark 实现 lambda 函数和 udf

如何在pyspark中压缩两列? [复制]

如何使用 pyspark 从 python 列表中选择随机文本值?

在 Python/PySpark 中 Spark 复制数据框列的最佳实践?

如何从 Pyspark / Python 数据集中先前计算的列中获取值