每个任务都无法在 Google Cloud Tasks 上执行
Posted
技术标签:
【中文标题】每个任务都无法在 Google Cloud Tasks 上执行【英文标题】:Every task failing to execute on Google Cloud Tasks 【发布时间】:2020-06-21 10:38:49 【问题描述】:我需要在 Django 应用程序中运行一些异步任务,我开始研究 Google Cloud Tasks。我想我已经遵循了所有的指示 - 以及我能想到的所有可能的变化,但到目前为止没有成功。
问题是所有创建的任务都进入队列,但无法执行。控制台和日志仅报告 http 代码 301(永久重定向)。为简单起见,我将相同的代码部署到 App Engine(标准)的两个服务中,并将任务请求仅路由到其中一个。
看起来代码本身运行良好。当我转到“https://[proj].appspot.com/api/v1/tasks”时,例程执行得很好,并且根据 DevTools/Network 没有重定向。当 Cloud Tasks 尝试调用“/api/v1/tasks”时,每次都会失败。
如果有人可以查看下面的代码并指出可能导致此失败的原因,我将不胜感激。
谢谢。
#--------------------------------
# [proj]/.../urls.py
#--------------------------------
from [proj].api import tasks
urlpatterns += [
# tasks api
path('api/v1/tasks', tasks, name='tasks'),
]
#--------------------------------
# [proj]/api.py:
#--------------------------------
from django.views.decorators.csrf import csrf_exempt
@csrf_exempt
def tasks(request):
print('Start api')
payload = request.body.decode("utf-8")
print (payload)
print('End api')
return HttpResponse('OK')
#--------------------------------
# [proj]/views/manut.py
#--------------------------------
from django.views.generic import View
from django.shortcuts import redirect
from [proj].tasks import TasksCreate
class ManutView(View):
template_name = '[proj]/manut.html'
def post(self, request, *args, **kwargs):
relative_url = '/api/v1/tasks'
testa_task = TasksCreate()
resp = testa_task.send_task(
url=relative_url,
schedule_time=5,
payload='task_type': 1, 'id': 21
)
print(resp)
return redirect(request.META['HTTP_REFERER'])
#--------------------------------
# [proj]/tasks/tasks.py:
#--------------------------------
from django.conf import settings
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
from typing import Dict, Optional, Union
import json
import time
class TasksCreate:
def send_task(self,
url: str,
payload: Optional[Union[str, Dict]] = None,
schedule_time: Optional[int] = None, # in seconds
name: Optional[str] = None,
) -> None:
client = tasks_v2.CloudTasksClient()
parent = client.queue_path(
settings.GCP_PROJECT,
settings.GCP_LOCATION,
settings.GCP_QUEUE,
)
# App Engine task:
task =
'app_engine_http_request': # Specify the type of request.
'http_method': 'POST',
'relative_uri': url,
'app_engine_routing': 'service': 'tasks'
if name:
task['name'] = name
if isinstance(payload, dict):
payload = json.dumps(payload)
if payload is not None:
converted_payload = payload.encode()
# task['http_request']['body'] = converted_payload
task['app_engine_http_request']['body'] = converted_payload
if schedule_time is not None:
now = time.time() + schedule_time
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)
# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
# Add the timestamp to the tasks.
task['schedule_time'] = timestamp
resp = client.create_task(parent, task)
return resp
# --------------------------------
# [proj]/dispatch.yaml:
# --------------------------------
dispatch:
- url: "*/api/v1/tasks"
service: tasks
- url: "*/api/v1/tasks/"
service: tasks
- url: "*appspot.com/*"
service: default
#--------------------------------
# [proj]/app.yaml & tasks.yaml:
#--------------------------------
runtime: python37
instance_class: F1
automatic_scaling:
max_instances: 2
service: default
#handlers:
#- url: .*
# secure: always
# redirect_http_response_code: 301
# script: auto
entrypoint: gunicorn -b :$PORT --chdir src server.wsgi
env_variables:
...
更新:
这里是执行的日志:
insertId: "1lfs38fa9"
jsonPayload:
@type: "type.googleapis.com/google.cloud.tasks.logging.v1.TaskActivityLog"
attemptResponseLog:
attemptDuration: "0.008005s"
dispatchCount: "5"
maxAttempts: 0
responseCount: "5"
retryTime: "2020-03-09T21:50:33.557783Z"
scheduleTime: "2020-03-09T21:50:23.548409Z"
status: "UNAVAILABLE"
targetAddress: "POST /api/v1/tasks"
targetType: "APP_ENGINE_HTTP"
task: "projects/[proj]/locations/us-central1/queues/tectaq/tasks/09687434589619534431"
logName: "projects/[proj]/logs/cloudtasks.googleapis.com%2Ftask_operations_log"
receiveTimestamp: "2020-03-09T21:50:24.375681687Z"
resource:
labels:
project_id: "[proj]"
queue_id: "tectaq"
target_type: "APP_ENGINE_HTTP"
type: "cloud_tasks_queue"
severity: "ERROR"
timestamp: "2020-03-09T21:50:23.557842532Z"
【问题讨论】:
我不确定文件views/manut.py
的用途是什么,因为它似乎没有包含在任何地方。云任务队列默认禁用日志记录,因此请尝试为正在调度您的任务的队列启用它,也许它会在那里显示一些信息:gcloud beta tasks queues update <tasks-queue-name> --log-sampling-ratio=1.0
感谢@yedpodtrzitko 提供有关日志记录的提示。我启用了它,但情况并没有好转,因为错误“不可用”对我来说毫无意义。
您能否从您的dispatch.yaml
中删除规则*appspot.com/*
,以确保它不会引起任何问题?无论如何,一切都默认路由到default
服务,所以这应该是不必要的......
能加个简化的例子重现吗?
我从 dispatch.yaml 中删除了 *appspot.com/*
,但没有改变。我会尝试做一个非常简单的worker,这样问题就可以重现了,谢谢。
【参考方案1】:
最后我可以让 Cloud Tasks 工作,但只能使用 http_request 类型(带有绝对 url)。当任务被定义为 app_engine_http_request(相对 url)时,我无法让它们运行。
我已经尝试过使用 POST 的 http_request 类型,但那是在我免除 api 函数之前检查 csrf 令牌之前,这导致错误 Forbidden (Referer checking failed - no Referer.): /api/v1/tasks
,我无法连接到 csrf 遗漏。
如果将来有人偶然发现这个问题,并找到一种方法让 app_engine_http_request 使用 Django 在 Cloud Tasks 上工作,我仍然非常想知道解决方案。
【讨论】:
【参考方案2】:问题在于 App Engine 任务处理程序不遵循重定向,因此您必须找出请求被重定向的原因并对 App Engine 请求进行异常处理。在我的情况下,我将 http 重定向到 https 并且必须像这样做出异常:(Node Express)
app.use((req, res, next) =>
const protocol = req.headers['x-forwarded-proto']
const userAgent = req.headers['user-agent']
if (userAgent && userAgent.includes('AppEngine-Google'))
console.log('USER AGENT IS GAE, SKIPPING REDIRECT TO HTTPS.')
return next()
else if (protocol === 'http')
res.redirect(301, `https://$req.headers.host$req.url`)
else
next()
)
【讨论】:
【参考方案3】:问题是所有创建的任务都进入队列,但无法执行。控制台和日志仅报告 http 代码 301(永久重定向)。
也许您的任务端点的请求处理程序需要一个斜杠。
尝试改变这个:
class ManutView(View):
template_name = '[proj]/manut.html'
def post(self, request, *args, **kwargs):
relative_url = '/api/v1/tasks'
...
到这里:
class ManutView(View):
template_name = '[proj]/manut.html'
def post(self, request, *args, **kwargs):
relative_url = '/api/v1/tasks/'
...
也可以尝试自己点击任务 url,看看是否可以从 curl
运行任务
【讨论】:
是的,我什至包含了两个 url 模式(带和不带斜杠),所以没有一个会重定向。 Cloud Tasks 上的错误相同,从 curl 或浏览器调用时都成功。我也尝试过 GET(没有有效负载/正文)而不是 POST,一切都保持不变。【参考方案4】:如果以后有人偶然发现这个问题,并想办法 为了让 app_engine_http_request 使用 Django 在 Cloud Tasks 上工作,我会 还是很想知道解决办法。
@JCampos 我设法让它在我的 Django 应用程序上运行(我还使用了 DRF,但我认为它不会造成很大的不同)。
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
import datetime
class CloudTasksMixin:
@property
def _cloud_task_client(self):
return tasks_v2.CloudTasksClient()
def send_to_cloud_tasks(self, url, http_method='POST', payload=None,in_seconds=None, name=None):
""" Send task to be executed """
parent = self._cloud_task_client.queue_path(settings.TASKS['PROJECT_NAME'], settings.TASKS['QUEUE_REGION'], queue=settings.TASKS['QUEUE_NAME'])
task =
'app_engine_http_request':
'http_method': http_method,
'relative_uri': url
...
然后我使用这样的视图:
class CloudTaskView(views.APIView):
authentication_classes = []
def post(self, request, *args, **kwargs):
# Do your stuff
return Response()
最后,我在 urls.py(来自 DRF)中使用 csrf_exempt(CloudTaskView.as_view())
实现了这个 url
起初我遇到 403 错误,但感谢您和您对 csrf_exempt 的评论,它现在可以正常工作了。
【讨论】:
【参考方案5】:似乎 Cloud Tasks 使用 HTTP url 调用 App Engine(这没关系,因为它们可能在同一个网络中),但如果您使用的是 HTTPs,Django 应该重定向(http -> https)收到的任何请求,包括您的处理程序端点。
要解决这个问题,你应该告诉 Django 不要重定向你的处理程序。 您可以使用settings.SECURE_REDIRECT_EXEMPT。
例如:
SECURE_REDIRECT_EXEMPT = [r"^api/v1/tasks/$"]
【讨论】:
以上是关于每个任务都无法在 Google Cloud Tasks 上执行的主要内容,如果未能解决你的问题,请参考以下文章
无法在 Google Cloud Compute Engine 上使用 GPU
Google Cloud Tasks ImportError:无法导入名称“resource_pb2”
Google Cloud Tasks 无法向 Cloud Run 进行身份验证
Google Cloud Tasks HTTP 触发器 - 如何禁用重试