PySpark HDFS 数据流读/写
Posted
技术标签:
【中文标题】PySpark HDFS 数据流读/写【英文标题】:PySpark HDFS data streams reading/writing 【发布时间】:2019-11-15 16:34:28 【问题描述】:我有一个包含多个文件的 HDFS 目录,我想合并为一个。我不想使用 Spark DF 来执行此操作,而是使用数据流进行 HDFS 交互。到目前为止,这是我的代码:
...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
out_stream = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet')) # FSDataOutputStream
for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):
buffer = bytes(256)
in_stream = hdfs.open(f.getPath()) # FSDataInputStream
bytesRead = in_stream.read(buffer)
while (bytesRead > 0):
out_stream.writeBytes(bytesRead)
out_stream.flush()
in_stream.close()
out_stream.close()
此代码的第一个问题是,我不确定如何通过缓冲区从输入流中读取数据。第一个问题是,输出文件是在 HDFS 中创建的,但没有写入任何内容(即使我向其写入固定值)。
【问题讨论】:
【参考方案1】:经过一番调查,我找到了解决问题的方法。解决方案包括,通过 spark 上下文创建一些 JVM 对象并将它们用于缓冲 i/o 操作:
...
sc = SparkContext()
hadoop = sc._jvm.org.apache.hadoop
hdfs = hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
raw_out = hdfs.create(hadoop.fs.Path('/tmp/combinedFile.parquet')) # FSDataOutputStream
out_stream = sc._jvm.java.io.BufferedOutputStream(raw_out)
for f in hdfs.listStatus(hadoop.fs.Path('/tmp/dirWithSeparatedFiles/')):
raw_in = hdfs.open(f.getPath()) # FSDataInputStream
in_stream = sc._jvm.java.io.BufferedInputStream(raw_in)
while in_stream.available() > 0:
out_stream.write(in_stream.read())
out_stream.flush()
in_stream.close()
out_stream.close()
【讨论】:
以上是关于PySpark HDFS 数据流读/写的主要内容,如果未能解决你的问题,请参考以下文章