使用 Python 处理 Apache Beam 管道中的异常

Posted

技术标签:

【中文标题】使用 Python 处理 Apache Beam 管道中的异常【英文标题】:Exception Handling in Apache Beam pipelines using Python 【发布时间】:2019-06-22 21:04:57 【问题描述】:

我正在使用 Python(在 GCP Dataflow 上)中的 Apache Beam 做一个简单的管道,以从 PubSub 读取并在 Big Query 上写入,但无法处理管道上的异常以创建替代流。

关于一个简单的 WriteToBigQuery 示例:

output = json_output | 'Write to BigQuery' >> beam.io.WriteToBigQuery('some-project:dataset.table_name')

我试图将它放在try/except 代码中,但它不起作用,因为当它失败时,似乎在我的 python 执行之外的 Java 层上抛出了异常:

INFO:root:2019-01-29T15:49:46.516Z: JOB_MESSAGE_ERROR: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
    request.instruction_id)
...
...
...
    self.signature.finish_bundle_method.method_value())
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1368, in finish_bundle
    self._flush_batch()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1380, in _flush_batch
    self.table_id, errors))
RuntimeError: Could not successfully insert rows to BigQuery table [<myproject:datasetname.tablename>]. Errors: [<InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: u''
 location: u''
 message: u'Missing required field: object.teste.'
 reason: u'invalid'>]
 index: 0>] [while running 'generatedPtransform-63']

        java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:276)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:84)
        org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:119)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -87: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 135, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 170, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 221, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 237, in process_bundle
    bundle_processor.process_bundle(instruction_id)
...
...
...
    self.signature.finish_bundle_method.method_value())
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1368, in finish_bundle
    self._flush_batch()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1380, in _flush_batch
    self.table_id, errors))

即使试图处理这个:

RuntimeError: Could not successfully insert rows to BigQuery table [<myproject:datasetname.tablename>]. Errors: [<InsertErrorsValueListEntry
 errors: [<ErrorProto
 debugInfo: u''
 location: u''
 message: u'Missing required field: object.teste.'
 reason: u'invalid'>]
 index: 0>] [while running 'generatedPtransform-63']

使用:

try:
 ...
except RuntimeException as e:
 ...

或者使用通用Exception 不起作用。

我可以找到很多使用 Java 在 Apache Beam 中处理错误的示例,但在 python 处理错误中没有一个示例。

有谁知道如何获得这个?

【问题讨论】:

【参考方案1】:

我只能在DoFn 级别捕获异常,所以是这样的:

class MyPipelineStep(beam.DoFn):

    def process(self, element, *args, **kwargs):
        try:
            # do stuff...
            yield pvalue.TaggedOutput('main_output', output_element)
        except Exception as e:
            yield pvalue.TaggedOutput('exception', str(e))

但是 WriteToBigQueryPTransform 包装了 DoFn BigQueryWriteFn

所以你可能需要做这样的事情

class MyBigQueryWriteFn(BigQueryWriteFn):

    def process(self, *args, **kwargs):
        try:
            return super(BigQueryWriteFn, self).process(*args, **kwargs)
        except Exception as e:
            # Do something here

class MyWriteToBigQuery(WriteToBigQuery):
    # Copy the source code of `WriteToBigQuery` here, 
    # but replace `BigQueryWriteFn` with `MyBigQueryWriteFn`

https://beam.apache.org/releases/pydoc/2.9.0/_modules/apache_beam/io/gcp/bigquery.html#WriteToBigQuery

【讨论】:

【参考方案2】:

您也可以使用generator flavor 的FlatMap

这类似于另一个答案,因为您可以使用 DoFn 代替其他东西,例如CombineFn 在出现异常或其他类型的失败前提条件时不产生任何输出。

def sum_values(values: List[int]) -> Generator[int, None, None]:
    if not values or len(values) < 10:
        logging.error(f'received invalid inputs: ...')
        return
    yield sum(values)


# Now instead of use |CombinePerKey|
(inputs
  | 'WithKey' >> beam.Map(lambda x: (x.key, x)) \
  | 'GroupByKey' >> beam.GroupByKey() \
  | 'Values' >> beam.Values() \
  | 'MaybeSum' >> beam.FlatMap(sum_values))

【讨论】:

以上是关于使用 Python 处理 Apache Beam 管道中的异常的主要内容,如果未能解决你的问题,请参考以下文章

数据流管道上的 Apache Beam StatusRuntimeException

防止 Apache Beam / Dataflow 流 (python) 管道中的融合以消除管道瓶颈

Apache Beam 数据流 BigQuery

使用Apache-beam在Python中删除字典中的第一项[重复]

Python 上的 Apache Beam 将 beam.Map 调用相乘

Apache Beam 处理文件