PySpark HDFS 数据流读/写

Posted

技术标签:

【中文标题】PySpark HDFS 数据流读/写【英文标题】:PySpark HDFS data streams reading/writing 【发布时间】:2019-11-15 16:34:28 【问题描述】:

我有一个包含多个文件的 HDFS 目录,我想合并为一个。我不想使用 Spark DF 来执行此操作,而是使用数据流进行 HDFS 交互。到目前为止,这是我的代码:

...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

out_stream = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet'))  # FSDataOutputStream

for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):

    buffer = bytes(256)
    in_stream = hdfs.open(f.getPath())  # FSDataInputStream

    bytesRead = in_stream.read(buffer)
    while (bytesRead > 0):
        out_stream.writeBytes(bytesRead)
        out_stream.flush()
    in_stream.close()

out_stream.close()

此代码的第一个问题是,我不确定如何通过缓冲区从输入流中读取数据。第一个问题是,输出文件是在 HDFS 中创建的,但没有写入任何内容(即使我向其写入固定值)。

【问题讨论】:

【参考方案1】:

经过一番调查,我找到了解决问题的方法。解决方案包括,通过 spark 上下文创建一些 JVM 对象并将它们用于缓冲 i/o 操作:

...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())

raw_out = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet'))  # FSDataOutputStream
out_stream = sc._jvm.java.io.BufferedOutputStream(raw_out)

for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):

    raw_in = hdfs.open(f.getPath())  # FSDataInputStream
    in_stream = sc._jvm.java.io.BufferedInputStream(raw_in)

    while in_stream.available() > 0:
        out_stream.write(in_stream.read()) 
        out_stream.flush()
    in_stream.close()
out_stream.close()

【讨论】:

以上是关于PySpark HDFS 数据流读/写的主要内容,如果未能解决你的问题,请参考以下文章

初学HDFS的读过程和写过程的分析

HDFS解析 | HDFS短路读详解

HDFS短路读详解

如何创建 Pyspark 应用程序

HDFS 读流程和写流程

Java往hbase写数据