当内核过多或多于一台机器时,Dataflow 无法获取对 BigQuery 表的引用

Posted

技术标签:

【中文标题】当内核过多或多于一台机器时,Dataflow 无法获取对 BigQuery 表的引用【英文标题】:Dataflow unable to GET a reference to BigQuery tables when too many cores or more than one machine 【发布时间】:2018-11-25 15:15:24 【问题描述】:

我的流式 Dataflow 管道应该从 Pub/Sub 读取分析命中并将其写入 BigQuery。如果我使用太多机器,或者它们太大,那么在获取对表的引用时会引发速率限制错误,更准确地说是在执行 _get_or_create_table 时。

达到的速率限制似乎是these 之一:每个用户每秒 100 个 API 请求,每个用户 300 个并发 API 请求。

它并没有阻塞管道(行在某个时间点之后被写入),但我感觉它阻塞了一些线程并阻止我充分利用并行化。从一台有 4 个 CPU 的机器切换到每台 8 个 CPU 的 5 台机器并没有改善延迟(实际上它变得更糟)。

如何避免这个错误,让大量机器写入 BQ?

这是来自 Dataflow 监控界面的日志。当我启动管道时,它会定期出现:

...
File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 1087, in get_or_create_table
    found_table = self._get_table(project_id, dataset_id, table_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 184, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/bigquery.py", line 925, in _get_table
    response = self.client.tables.Get(request)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 611, in Get
    config, request, global_params=global_params)
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 722, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 728, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 599, in __ProcessHttpResponse
    http_response, method_config=method_config, request=request)

HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables/<table_id>?alt=json>: response: <'status': '403', 'content-length': '577', 'x-xss-protection': '1; mode=block', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'expires': 'Sun, 25 Nov 2018 14:36:24 GMT', 'vary': 'Origin, X-Origin', 'server': 'GSE', '-content-encoding': 'gzip', 'cache-control': 'private, max-age=0', 'date': 'Sun, 25 Nov 2018 14:36:24 GMT', 'x-frame-options': 'SAMEORIGIN', 'content-type': 'application/json; charset=UTF-8'>, content <
 "error": 
  "errors": [
   
    "domain": "global",
    "reason": "rateLimitExceeded",
    "message": "Exceeded rate limits: Your user_method exceeded quota for api requests per user per method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
    "locationType": "other",
    "location": "helix_api.method_request"
   
  ],
  "code": 403,
  "message": "Exceeded rate limits: Your user_method exceeded quota for api requests per user per method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors"

这是管道的代码。我剪掉了其中的几乎所有内容,看看是否还会发生这种情况:

p = beam.Pipeline(options=options)

msgs = p | 'Read' >> beam.io.gcp.pubsub.ReadFromPubSub(
    topic='projects/project/topics/topic'.format(
        project=args.project, topic=args.hits_topic),
    id_label='hit_id',
    timestamp_attribute='time')

lines = msgs | beam.Map(lambda x: 'content': x)

(lines
    | 'WriteToBQ' >> beam.io.gcp.bigquery.WriteToBigQuery(args.table,
                                                          dataset=args.dataset,
                                                          project=args.project))

【问题讨论】:

你能分享你的数据流代码吗? 好的,刚刚添加了这个 【参考方案1】:

尝试升级到最新的 apache_beam 库(截至写作时为 2.12.0)。 https://github.com/apache/beam/commit/932e802279a2daa0ff7797a8fc81e952a4e4f252 引入了表缓存,否则会触发您在此库的旧版本中可能遇到的速率限制。

【讨论】:

以上是关于当内核过多或多于一台机器时,Dataflow 无法获取对 BigQuery 表的引用的主要内容,如果未能解决你的问题,请参考以下文章

当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub

Django Unittest无法使用TypeError运行:isinstance()arg 2必须是一台机器上的类型或元组,而不是其他机器上的类型或元组

Cloud Dataflow 到 BigQuery - 来源过多

由于权限不足,无法获取区域 us-central-c 中机器类型 n1-standard-2 的机器类型信息 - Google Cloud Dataflow

当传递给 main 的参数过多/过少时抛出异常

无法发布到另一台机器上的服务结构本地集群