如何在 Google App Engine 上为 BigQuery 设置截止日期

Posted

技术标签:

【中文标题】如何在 Google App Engine 上为 BigQuery 设置截止日期【英文标题】:How to set deadline for BigQuery on Google App Engine 【发布时间】:2014-07-16 19:14:33 【问题描述】:

我有一个调用 BigQuery 获取数据的 Google App Engine 程序。

查询通常需要 3 - 4.5 秒,还可以,但有时需要超过 5 秒并抛出此错误:

DeadlineExceededError:API 调用 urlfetch.Fetch() 响应时间过长并被取消。

这个article 显示了截止日期和不同类型的截止日期错误。

有没有办法将 BigQuery 作业的截止日期设置为 5 秒以上?在 BigQuery API 文档中找不到它。

【问题讨论】:

【参考方案1】:

BigQuery 查询速度很快,但通常比默认的 App Engine urlfetch 超时时间更长。 BigQuery API 是异步的,因此您需要将步骤分解为每个短于 5 秒的 API 调用。

对于这种情况,我会使用App Engine Task Queue:

    调用 BigQuery API 以插入您的作业。这会返回一个 JobID。

    在 App Engine 任务队列中放置一个任务,以检查该 ID 处 BigQuery 查询作业的状态。

    如果 BigQuery 作业状态不是“DONE”,请将新任务放入队列以再次检查。

    如果状态为“DONE”,则使用 urlfetch 进行调用以检索结果。

【讨论】:

完美。谢谢迈克尔!【参考方案2】:

请注意,我会接受 Michael 的建议,因为这是最可靠的。我只是想指出,您可以将 urlfetch 超时时间增加到 60 秒,这对于大多数查询来说应该足够了。

How to set timeout for urlfetch in Google App Engine?

【讨论】:

谢谢乔丹。我要和 Michaels 一起去,但很高兴知道我也可以这样做,作为开发时的快速解决方法。干杯!【参考方案3】:

我无法将 urlfetch.set_default_fetch_deadline() 方法应用于 Big Query API,但在授权大查询会话时能够增加超时,如下所示:

from apiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials

credentials = ServiceAccountCredentials.from_json_keyfile_dict(credentials_dict, scopes)

# Create an authorized session and set the url fetch timeout.
http_auth = credentials.authorize(Http(timeout=60))

# Build the service.
service =  build(service_name, version, http=http_auth)

# Make the query
request = service.jobs().query(body=query_body).execute()

或者使用jobs().insert的异步方法

query_response = service.jobs().insert(body=query_body).execute()

big_query_job_id = query_response['jobReference']['jobId']

# poll the job.get endpoint until the job is complete 
while True:

    job_status_response = service.jobs()\
        .get(jobId=big_query_job_id).execute()

    if job_status_response['status']['state'] == done:
        break

    time.sleep(1)   

results_respone = service.jobs()\
    .getQueryResults(**query_params)\
    .execute()

我们最终采用了类似于 Michael 上面建议的方法,但是即使在使用异步调用时,getQueryResults 方法(使用小的 maxResults 参数分页)在 url 获取时超时,引发错误张贴在问题中。

因此,为了增加 Big Query / App Engine 中 URL Fetch 的超时时间,请在授权会话时相应地设置超时时间。

【讨论】:

【参考方案4】:

要在 AppEngine 中发出 HTTP 请求,您可以使用 urlliburllib2httpliburlfetch。但是,无论您选择什么库,AppEngine 都会使用App Engine's URL Fetch service 执行 HTTP 请求。

googleapiclientuseshttplib2。看起来httplib2.Http 将超时传递给 urlfetch。由于它的默认值为 None,因此无论您使用 urlfetch.set_default_fetch_deadline 设置什么,urlfetch 都会将该请求的截止日期设置为 5 秒。

httplib2 uses socket 库用于 HTTP 请求。

要设置超时,您可以执行以下操作:

import socket
socket.setdefaulttimeout(30)

你应该也可以这样做,但我还没有测试过:

http = httplib2.Http(timeout=30)

如果您没有现有代码来为请求计时,您可以像这样包装您的查询:

import time
start_query = time.time()

<your query code>

end_query = time.time()
print(end_query - start_query)

【讨论】:

【参考方案5】:

这是在 AppEngine for Go 中解决 bigquery 超时问题的一种方法。只需将查询上的TimeoutMs 设置为远低于 5000。bigquery 查询的默认超时时间为 10000 毫秒,这超过了 AppEngine 中传出请求的默认 5 秒截止时间。

问题是必须在初始请求中设置超时:bigquery.service.Jobs.Query(…) 和用于轮询查询结果的后续 b.service.Jobs.GetQueryResults(…)

例子:

query := &gbigquery.QueryRequest
    DefaultDataset: &gbigquery.DatasetReference
        DatasetId: "mydatasetid",
        ProjectId: "myprojectid",
    ,
    Kind:       "json",
    Query:      "<insert query here>",
    TimeoutMs:  3000, // <- important!


queryResponse := bigquery.service.Jobs.Query("myprojectid", query).Do()

// determine if queryResponse is a completed job and if not start to poll

queryResponseResults := bigquery.service.Jobs.
        GetQueryResults("myprojectid", res.JobRef.JobId).
        TimeoutMs(DefaultTimeoutMS) // <- important!

// determine if queryResponseResults is a completed job and if not continue to poll

这样做的好处是您可以保持整个请求的默认请求截止日期(正常请求为 60 秒,任务和 cronjobs 为 10 分钟),同时避免将传出请求的截止日期设置为任意较大的值。

【讨论】:

以上是关于如何在 Google App Engine 上为 BigQuery 设置截止日期的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Google App Engine(灵活环境)上为应用创建开发服务器?

在 Google App Engine 上为用 Python 编写的 Qualtrics 创建 Web 服务

我如何在 google app 引擎上为 html5 创建 websocket

如何在 Google Cloud App Engine 上使用 PubSub 创建订阅者,该订阅者通过 Publisher 从 Google Cloud App Engine Flex 收听消息?

如何在 Google App Engine app.yaml 中处理尾部斜线

如何在 Google Cloud Functions 和 Google App Engine 之间进行选择?