Apache Beam Python gscio 上传方法已实现@retry.no_retries 导致数据丢失?

Posted

技术标签:

【中文标题】Apache Beam Python gscio 上传方法已实现@retry.no_retries 导致数据丢失?【英文标题】:Apache Beam Python gscio upload method has @retry.no_retries implemented causes data loss? 【发布时间】:2021-08-30 12:11:14 【问题描述】:

我有一个在 Dataflow 中运行的 Python Apache Beam 流式传输管道。它从 PubSub 读取并写入 GCS。有时我会收到类似 “插入文件时 _start_upload 中的错误 ...” 之类的错误,它来自:

File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/<bucke-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<id>>: response: <'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id>', 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'>, content <> [while running 'Write to GCS/ParDo(_WriteShardedRecordsFn)-ptransform-50705'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631

根本问题似乎是在方法_start_uploadhere中没有应用重试逻辑,当出现HttpError(本例中为503)时,没有处理:

# TODO(silviuc): Refactor so that retry logic can be applied.
 # There is retry logic in the underlying transfer library but we should make
 # it more explicit so we can control the retry parameters.
 @retry.no_retries # Using no_retries marks this as an integration point.
 def _start_upload(self):

从我的角度来看,每当抛出这些错误时,就会由于服务器问题而导致上传失败,甚至没有重试,因此数据丢失了吗?我错过了什么或者这就是正在发生的事情吗?如果是这样的话,我觉得奇怪的是以前没有人发现过这个。

失败的代码是使用 io.fileio 中的 WriteToFiles。看起来像这样(destination_partitioning_naming 是自定义方法):

from apache_beam.io.fileio import WriteToFiles

   ...
                | "Write to GCS" >> WriteToFiles(
                    path=output_path,
                    shards=1,
                    max_writers_per_bundle=0,
                    destination=lambda record: record['topic_kafka'],
                    sink=JsonSink(),
                    file_naming=destination_partitioning_naming(extension="json", topics=topics)
                )
            )

我已经提出this issue in JIRA as well

这是完整的堆栈跟踪:

  2021-06-10 16:58:55.104 CEST
Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker.invoke_process_per_window File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in process writer.close() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 220, in close self._uploader.finish() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 651, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert return self._RunMethod( File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self.ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/<bucket-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<uploadid>>: response: <'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id>, 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'>, content <> During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 999, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/init.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.8/site-packages/apache_beam/io/fileio.py", line 620, in process writer.close() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/filesystemio.py", line 220, in close self._uploader.finish() File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 676, in finish raise self._upload_thread.last_error # pylint: disable=raising-bad-type File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/gcsio.py", line 651, in _start_upload self._client.objects.Insert(self._insert_request, upload=self._upload) File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1154, in Insert return self._RunMethod( File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod return self.ProcessHttpResponse(method_config, http_response, request) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse self._ProcessHttpResponse(method_config, http_response, request)) File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse raise exceptions.HttpError.FromResponse( RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/<bucke-name>/o?alt=json&name=tmp%2F.tempaf83360e-673f-4f9a-b15a-5be45081c335%2F3919075269125806430_9535790f-f57d-430f-9631-f121966e5ca4&uploadType=resumable&upload_id=<id>>: response: <'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': '<id>', 'content-length': '0', 'date': 'Thu, 10 Jun 2021 14:58:51 GMT', 'server': 'UploadServer', 'status': '503'>, content <> [while running 'Write to GCS/ParDo(_WriteShardedRecordsFn)-ptransform-50705'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service.cc:631

【问题讨论】:

【参考方案1】:

在流式传输管道中,Dataflow 重试遇到错误indefinitely 的工作项。

代码本身不需要有重试逻辑。

【讨论】:

以上是关于Apache Beam Python gscio 上传方法已实现@retry.no_retries 导致数据丢失?的主要内容,如果未能解决你的问题,请参考以下文章

使用Apache-beam在Python中删除字典中的第一项[重复]

在 python Apache Beam 中打开一个 gzip 文件

无法使用 Apache Beam(Python SDK)读取 Pub/Sub 消息

如何从 PCollection Apache Beam Python 创建 N 个元素组

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

使用 Python 处理 Apache Beam 管道中的异常