在 python Apache Beam 中打开一个 gzip 文件
Posted
技术标签:
【中文标题】在 python Apache Beam 中打开一个 gzip 文件【英文标题】:Opening a gzip file in python Apache Beam 【发布时间】:2016-12-29 09:05:40 【问题描述】:目前是否可以使用 Apache Beam 在 python 中读取 gzip 文件? 我的管道正在使用这行代码从 gcs 中提取 gzip 文件:
beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP'))
但我收到此错误:
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte
我们注意到在 python 束源代码中,压缩文件似乎在写入接收器时被处理。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445
更详细的追溯:
Traceback (most recent call last):
File "beam-playground.py", line 11, in <module>
p.run()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run
return self.runner.run(self)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run
super(DirectPipelineRunner, self).run(pipeline)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run
pipeline.visit(RunVisitor(self))
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit
self._root_transform().visit(visitor, self, visited)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit
part.visit(visitor, pipeline, visited)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit
visitor.visit_transform(self)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform
self.runner.run_transform(transform_node)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform
return m(transform_node)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 99, in func_wrapper
func(self, pvalue, *args, **kwargs)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 258, in run_Read
read_values(reader)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 245, in read_values
read_result = [GlobalWindows.windowed_value(e) for e in reader]
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/fileio.py", line 807, in __iter__
yield self.source.coder.decode(line)
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/coders.py", line 187, in decode
return value.decode('utf-8')
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte
【问题讨论】:
【参考方案1】:更新:Python SDK 中的TextIO
现在支持从压缩文件中读取。
今天 Python SDK 中的TextIO
实际上并不支持从压缩文件中读取。
【讨论】:
感谢您的快速回复!我将浏览这篇文章。我将整个回溯添加到问题中。 数据以 UTF-8 压缩。我通过将原始文本从字节解码为 unicode 来验证它。没有错误。如果我错了,请纠正我,但是查看梁源代码,似乎 TextFileSource 甚至不处理压缩文件。 我刚刚深入了解了 - 我对支持深度的看法是错误的。该类接受compression_type 参数以准备实际支持。 感谢您的验证。您能否粗略估计一下何时实施此功能? 继 Apache Beam(孵化)之后,JIRA 发布了跟踪支持,以添加用于读取文本文件的新源,并在此基础上添加对读取压缩文件的支持。 * issues.apache.org/jira/browse/BEAM-553 * issues.apache.org/jira/browse/BEAM-577 我们希望在几个月内实现这些。【参考方案2】:我遇到了类似的问题。我有一个自定义二进制源,我想从中解析和抓取数据。问题是 file.io API 基于 CSV 或 ARVO,无论我尝试什么,如果不尝试在换行符处拆分它们,它都不会给我这些行。可以想象,二进制文件不能很好地处理这个问题。
起初我尝试了一个自定义源,最终实现了 3 个类,并且复制了核心 Dataflow/Beam 代码。最后,我编写了这个 WONDERFUL 一点点猴子补丁来完成我需要做的事情(这里是深层源代码测试)。
import apache_beam as beam
from apache_beam.io.fileio import coders
def _TextFileReader__iter(self):
# The full data file is had here and can be read like normal
# You can even limit the character bit here. (I did 9 to grab the file format)
data = self._file.read()
# Now you can either yield the whole file as a single data entry
# and run a ParDo to split it, or you can iterate in here and
# yield each row. I chose the latter, but I'm showing an example
# of the former.
yield data
# This monkeypatch good!
beam.io.fileio.TextFileReader.__iter__ = _TextFileReader__iter
要调用此源并确保它是 BINARY,我执行了以下操作:
pipeline | 'start_3' >> beam.io.Read(
beam.io.TextFileSource( 'gs://MY_BUCKET/sample.bin',
coder=coders.BytesCoder()
)
)
注意到coders.BytesCoders()
了吗?没有它,它试图将字节转换为非二进制的东西,这对我的解析引擎不利。 ;)
花了我一天的时间来解决这个问题。但是,如果您使用此方法,您几乎可以对 Dataflow 中的 file.io 类执行任何操作。 ;)
【讨论】:
【参考方案3】:我遇到了同样的问题。我试图从 GCS 读取二进制 GZ 文件,解压缩它们,然后将它们运送到其他地方进行处理。我分两步解决了。
首先,确保您使用的是正确的 Python 库;我的原始库已过时(我至少使用 v0.4):pip install --upgrade google-cloud-dataflow
。
其次,我构建了我的管道如下:
import apache_beam as beam
from apache_beam import (coders, io, transforms)
raw_logs = (p
| io.Read("ReadLogsFromGCS", beam.io.TextFileSource(
"gs://my-bucket/logs-*.gz",
coder=coders.BytesCoder()))
| transforms.Map(lambda x: x)
| io.Write("WriteToLocalhost", io.textio.WriteToText(
"/tmp/flattened-logs",
file_name_suffix=".json")))
p.run()
运行管道后,您应该有一个名为 /tmp/flattened-logs.json
的文件。
【讨论】:
以上是关于在 python Apache Beam 中打开一个 gzip 文件的主要内容,如果未能解决你的问题,请参考以下文章
在 python Apache Beam 中打开一个 gzip 文件
如何在 python apache Beam 的窗口中订购元素?
使用 Python / Apache Beam 进行 Google Cloud Storage 并发控制?