使用 Python 处理 Apache Beam 管道中的异常
Posted
技术标签:
【中文标题】使用 Python 处理 Apache Beam 管道中的异常【英文标题】:Exception Handling in Apache Beam pipelines using Python 【发布时间】:2019-06-22 21:04:57 【问题描述】:我正在使用 Python(在 GCP Dataflow 上)中的 Apache Beam 做一个简单的管道,以从 PubSub 读取并在 Big Query 上写入,但无法处理管道上的异常以创建替代流。
关于一个简单的 WriteToBigQuery 示例:
output = json_output | 'Write to BigQuery' >> beam.io.WriteToBigQuery('some-project:dataset.table_name')
我试图将它放在try/except
代码中,但它不起作用,因为当它失败时,似乎在我的 python 执行之外的 Java 层上抛出了异常:
INFO:root:2019-01-29T15:49:46.516Z: JOB_MESSAGE_ERROR: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
request.instruction_id)
...
...
...
self.signature.finish_bundle_method.method_value())
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1368, in finish_bundle
self._flush_batch()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1380, in _flush_batch
self.table_id, errors))
RuntimeError: Could not successfully insert rows to BigQuery table [<myproject:datasetname.tablename>]. Errors: [<InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: u''
location: u''
message: u'Missing required field: object.teste.'
reason: u'invalid'>]
index: 0>] [while running 'generatedPtransform-63']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:276)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:84)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:119)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 237, in process_bundle
bundle_processor.process_bundle(instruction_id)
...
...
...
self.signature.finish_bundle_method.method_value())
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1368, in finish_bundle
self._flush_batch()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1380, in _flush_batch
self.table_id, errors))
即使试图处理这个:
RuntimeError: Could not successfully insert rows to BigQuery table [<myproject:datasetname.tablename>]. Errors: [<InsertErrorsValueListEntry
errors: [<ErrorProto
debugInfo: u''
location: u''
message: u'Missing required field: object.teste.'
reason: u'invalid'>]
index: 0>] [while running 'generatedPtransform-63']
使用:
try:
...
except RuntimeException as e:
...
或者使用通用Exception
不起作用。
我可以找到很多使用 Java 在 Apache Beam 中处理错误的示例,但在 python 处理错误中没有一个示例。
有谁知道如何获得这个?
【问题讨论】:
【参考方案1】:我只能在DoFn
级别捕获异常,所以是这样的:
class MyPipelineStep(beam.DoFn):
def process(self, element, *args, **kwargs):
try:
# do stuff...
yield pvalue.TaggedOutput('main_output', output_element)
except Exception as e:
yield pvalue.TaggedOutput('exception', str(e))
但是 WriteToBigQuery
是 PTransform
包装了 DoFn
BigQueryWriteFn
所以你可能需要做这样的事情
class MyBigQueryWriteFn(BigQueryWriteFn):
def process(self, *args, **kwargs):
try:
return super(BigQueryWriteFn, self).process(*args, **kwargs)
except Exception as e:
# Do something here
class MyWriteToBigQuery(WriteToBigQuery):
# Copy the source code of `WriteToBigQuery` here,
# but replace `BigQueryWriteFn` with `MyBigQueryWriteFn`
https://beam.apache.org/releases/pydoc/2.9.0/_modules/apache_beam/io/gcp/bigquery.html#WriteToBigQuery
【讨论】:
【参考方案2】:您也可以使用generator flavor 的FlatMap
:
这类似于另一个答案,因为您可以使用 DoFn
代替其他东西,例如CombineFn
在出现异常或其他类型的失败前提条件时不产生任何输出。
def sum_values(values: List[int]) -> Generator[int, None, None]:
if not values or len(values) < 10:
logging.error(f'received invalid inputs: ...')
return
yield sum(values)
# Now instead of use |CombinePerKey|
(inputs
| 'WithKey' >> beam.Map(lambda x: (x.key, x)) \
| 'GroupByKey' >> beam.GroupByKey() \
| 'Values' >> beam.Values() \
| 'MaybeSum' >> beam.FlatMap(sum_values))
【讨论】:
以上是关于使用 Python 处理 Apache Beam 管道中的异常的主要内容,如果未能解决你的问题,请参考以下文章
数据流管道上的 Apache Beam StatusRuntimeException
防止 Apache Beam / Dataflow 流 (python) 管道中的融合以消除管道瓶颈
使用Apache-beam在Python中删除字典中的第一项[重复]