Apache Beam:ReadFromText 与 ReadAllFromText
Posted
技术标签:
【中文标题】Apache Beam:ReadFromText 与 ReadAllFromText【英文标题】:Apache Beam: ReadFromText versus ReadAllFromText 【发布时间】:2019-01-06 21:59:14 【问题描述】:我正在运行一个 Apache Beam 管道,从 Google Cloud Storage 读取文本文件,对这些文件执行一些解析并将解析后的数据写入 Bigquery。
为了简短起见,这里忽略了解析和 google_cloud_options,我的代码如下:(apache-beam 2.5.0 带有 GCP 附加组件和 Dataflow 作为运行器)
p = Pipeline(options=options)
lines = p | 'read from file' >>
beam.io.ReadFromText('some_gcs_bucket_path*') | \
'parse xml to dict' >> beam.ParDo(
beam.io.WriteToBigQuery(
'my_table',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
p.run()
这运行良好并成功地将相关数据附加到我的 Bigquery 表中以获取少量输入文件。但是,当我将输入文件的数量增加到 +- 800k 时,出现错误:
“BoundedSource.split() 操作返回的BoundedSource 对象的总大小大于允许的限制。”
我找到了Troubleshooting apache beam pipeline import errors [BoundedSource objects is larger than the allowable limit],它建议使用 ReadAllFromText 而不是 ReadFromText。 但是,当我换出时,出现以下错误:
Traceback (most recent call last):
File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 240, in <module>
xmltobigquery.run_dataflow()
File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 220, in run_dataflow
'parse xml to dict' >> beam.ParDo(XmlToDictFn(), job_spec=self.job_spec) | \
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 831, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 488, in __ror__
result = p.apply(self, pvalueish, label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
return self.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/textio.py", line 470, in expand
return pvalue | 'ReadAllFiles' >> self._read_all_files
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
return self.pipeline.apply(ptransform, self)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
label or transform.label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
return self.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 416, in expand
| 'ReadRange' >> ParDo(_ReadRange(self._source_from_file)))
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
return self.pipeline.apply(ptransform, self)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
label or transform.label)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
return self.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 568, in expand
| 'RemoveRandomKeys' >> Map(lambda t: t[1]))
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
return self.pipeline.apply(ptransform, self)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
pvalueish_result = self.runner.apply(transform, pvalueish)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
return m(transform, input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
return transform.expand(input)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 494, in expand
windowing_saved = pcoll.windowing
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
self.producer.inputs)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
return inputs[0].windowing
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
self.producer.inputs)
File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
return inputs[0].windowing
AttributeError: 'PBegin' object has no attribute 'windowing'.
有什么建议吗?
【问题讨论】:
请同时分享出现此错误的代码的最终版本。这样我们就有了实际的上下文。 另外我发现this 回答提到了同样的错误并且添加PColl
修复了它。
原来使用“ReadAllFromText”还应该添加一个“Create”方法(不像“ReadFromText”示例那样自动包含)。这解决了我的问题,谢谢。
@richardt-benade-rezco,酷!随意分享你的代码到底是什么样子的。
@Richardt Benade REZCO 您能否将解决方案和工作代码发布为社区利益?
【参考方案1】:
我遇到了同样的问题。正如 Richardt 提到的,beam.Create
必须被显式调用。另一个挑战是如何将此模式与模板参数一起使用,因为beam.Create
仅支持in the documentation 所述的内存数据。
Google Cloud 支持在这种情况下帮助了我,我想与您分享解决方案。诀窍是使用虚拟字符串创建管道,然后使用映射 lambda 在运行时读取输入:
class AggregateOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
help='Path of the files to read from')
parser.add_value_provider_argument(
'--output',
help='Output files to write results to')
def run():
logging.info('Starting main function')
pipeline_options = PipelineOptions()
pipeline = beam.Pipeline(options=pipeline_options)
options = pipeline_options.view_as(AggregateOptions)
steps = (
pipeline
| 'Create' >> beam.Create(['Start']) # workaround to kickstart the pipeline
| 'Read Input Parameter' >> beam.Map(lambda x: options.input.get()) # get the real input param
| 'Read Data' >> beam.io.ReadAllFromText()
| # ... other steps
希望这个答案有帮助。
【讨论】:
你在哪里指定输入/输出文件?运行管道脚本时作为解析器参数? @Luiscri 出于测试目的,您可以在本地运行代码时添加参数。但是,为了在 GCloud DataFlow 中运行,通过创建元数据文件并运行所描述的 python 命令来生成custom template。将带有后缀_metadata
的元数据文件上传到保存模板的同一 Cloud Storage 位置。那么模板就是selectable in Dataflow。以上是关于Apache Beam:ReadFromText 与 ReadAllFromText的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnecti
在 mac zsh 终端上安装 apache-beam[gcp] 时出错 - “zsh: no match found: apache-beam[gcp]”