在 Dataflow 流式传输管道上捕获 BigQuery HttpBadRequestError

Posted

技术标签:

【中文标题】在 Dataflow 流式传输管道上捕获 BigQuery HttpBadRequestError【英文标题】:Catch BigQuery HttpBadRequestError on Dataflow streaming pipeline 【发布时间】:2021-02-23 10:25:05 【问题描述】:

最近,由于超出请求大小,我的 Dataflow 流式传输作业从 BigQuery API 抛出 HttpBadRequestError。

    Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
    return self._flush_all_batches()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
    *[
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
    self._flush_batch(destination)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
    passed, errors = self.bigquery_wrapper.insert_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
    result, errors = self._insert_all_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
    response = self.client.tabledata.InsertAll(request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
    return self._RunMethod(
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
    raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'>, content <
  "error": 
    "code": 400,
    "message": "Request payload size exceeds the limit: 10485760 bytes.",
    "errors": [
      
        "message": "Request payload size exceeds the limit: 10485760 bytes.",
        "domain": "global",
        "reason": "badRequest"
      
    ],
    "status": "INVALID_ARGUMENT"
  

>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 315, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 990, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 730, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 732, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 733, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/common.py", line 1267, in apache_beam.runners.common.DoFnRunner.finish
  File "apache_beam/runners/common.py", line 1248, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
    return self._flush_all_batches()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
    *[
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
    self._flush_batch(destination)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
    passed, errors = self.bigquery_wrapper.insert_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
    result, errors = self._insert_all_rows(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
    response = self.client.tabledata.InsertAll(request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
    return self._RunMethod(
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
    raise exceptions.HttpError.FromResponse(
RuntimeError: apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'>, content <
  "error": 
    "code": 400,
    "message": "Request payload size exceeds the limit: 10485760 bytes.",
    "errors": [
      
        "message": "Request payload size exceeds the limit: 10485760 bytes.",
        "domain": "global",
        "reason": "badRequest"
      
    ],
    "status": "INVALID_ARGUMENT"
  

> [while running 'WriteBqTables/WriteBQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-25875']

passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service.cc:631

我想使用this dead lettering pattern 来缓解此类错误,以防它再次发生。

发生 HttpBadRequestError 时,BQ 死字模式是否也有效?还是仅在由于架构不匹配而导致插入行失败时才有效?我为 python 使用 Apache Beam SDK 2.27.0 版

提前致谢

20201-02-24 更新:我添加了更多堆栈跟踪 sn-ps 以显示错误发生的时间

【问题讨论】:

你能分享更多的堆栈跟踪来演示这个错误发生在哪里吗? @KennKnowles 我已经更新了错误的堆栈跟踪,如果它可以帮助你更多 谢谢!这证实了这一点。我会用一些细节更新我的答案。 【参考方案1】:

是的,该模式将起作用。一般情况下,它会捕获任何可以捕获的故障(有时故障非常严重,以至于处理完全停止)。

在您的具体情况下,堆栈跟踪包括 this region of BigQueryIO,您可以看到失败的行输出到死信 PCollection just below, here。

【讨论】:

谢谢!我会把它放在我的管道上。只是为了确认,我应该将重试策略设置为 RETRY_ALWAYS 以捕获此错误,对吗? 我不建议这样做。如果您希望将记录输出到死信,那么您希望重试最终放弃。否则您的管道可能会挂起。

以上是关于在 Dataflow 流式传输管道上捕获 BigQuery HttpBadRequestError的主要内容,如果未能解决你的问题,请参考以下文章

来自 Dataflow 的 BigQuery 流式插入 - 没有结果

直接流式传输到 BigQuery 与通过 Google Pub/Sub + Dataflow 流式传输的优缺点

GCP Dataflow + Apache Beam - 缓存问题

Dataflow 大型侧输入中的 Apache Beam

捕获通过管道传输到批处理文件的真正 STDIN

BigQuery 中的数据流/Beam 流式插入导致 SSL 错误