每个任务都无法在 Google Cloud Tasks 上执行

Posted

技术标签:

【中文标题】每个任务都无法在 Google Cloud Tasks 上执行【英文标题】:Every task failing to execute on Google Cloud Tasks 【发布时间】:2020-06-21 10:38:49 【问题描述】:

我需要在 Django 应用程序中运行一些异步任务,我开始研究 Google Cloud Tasks。我想我已经遵循了所有的指示 - 以及我能想到的所有可能的变化,但到目前为止没有成功。

问题是所有创建的任务都进入队列,但无法执行。控制台和日志仅报告 http 代码 301(永久重定向)。为简单起见,我将相同的代码部署到 App Engine(标准)的两个服务中,并将任务请求仅路由到其中一个。

看起来代码本身运行良好。当我转到“https://[proj].appspot.com/api/v1/tasks”时,例程执行得很好,并且根据 DevTools/Network 没有重定向。当 Cloud Tasks 尝试调用“/api/v1/tasks”时,每次都会失败。

如果有人可以查看下面的代码并指出可能导致此失败的原因,我将不胜感激。

谢谢。

#--------------------------------
# [proj]/.../urls.py
#--------------------------------
from [proj].api import tasks

urlpatterns += [
    # tasks api
    path('api/v1/tasks', tasks, name='tasks'),
]
#--------------------------------
#   [proj]/api.py:
#--------------------------------
from django.views.decorators.csrf import csrf_exempt

@csrf_exempt
def tasks(request):
    print('Start api')
    payload = request.body.decode("utf-8")
    print (payload)
    print('End api')
    return HttpResponse('OK')

#--------------------------------
# [proj]/views/manut.py
#--------------------------------
from django.views.generic import View
from django.shortcuts import redirect
from [proj].tasks import TasksCreate

class ManutView(View):
    template_name = '[proj]/manut.html'

    def post(self, request, *args, **kwargs):
        relative_url = '/api/v1/tasks'
        testa_task = TasksCreate()
        resp = testa_task.send_task(
            url=relative_url,
            schedule_time=5,
            payload='task_type': 1, 'id': 21
        )
        print(resp)
        return redirect(request.META['HTTP_REFERER'])

#--------------------------------
# [proj]/tasks/tasks.py:
#--------------------------------
from django.conf import settings
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
from typing import Dict, Optional, Union
import json
import time

class TasksCreate:

    def send_task(self,
        url: str,
        payload: Optional[Union[str, Dict]] = None,
        schedule_time: Optional[int] = None,    # in seconds
        name: Optional[str] = None,
        ) -> None:

        client = tasks_v2.CloudTasksClient()
        parent = client.queue_path(
            settings.GCP_PROJECT,
            settings.GCP_LOCATION,
            settings.GCP_QUEUE,
        )

        # App Engine task:
        task = 
            'app_engine_http_request':   # Specify the type of request.
                'http_method': 'POST',
                'relative_uri': url,
                'app_engine_routing': 'service': 'tasks'
            
        

        if name:
            task['name'] = name
        if isinstance(payload, dict):
            payload = json.dumps(payload)
        if payload is not None:
            converted_payload = payload.encode()
            # task['http_request']['body'] = converted_payload
            task['app_engine_http_request']['body'] = converted_payload
        if schedule_time is not None:
            now = time.time() + schedule_time
            seconds = int(now)
            nanos = int((now - seconds) * 10 ** 9)
            # Create Timestamp protobuf.
            timestamp = timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos)
            # Add the timestamp to the tasks.
            task['schedule_time'] = timestamp

        resp = client.create_task(parent, task)

        return resp

# --------------------------------
# [proj]/dispatch.yaml:
# --------------------------------
dispatch:
  - url: "*/api/v1/tasks"
    service: tasks

  - url: "*/api/v1/tasks/"
    service: tasks

  - url: "*appspot.com/*"
    service: default

#--------------------------------
# [proj]/app.yaml & tasks.yaml:
#--------------------------------
runtime: python37

instance_class: F1

automatic_scaling:
  max_instances: 2

service: default

#handlers:
#- url: .*
#  secure: always
#  redirect_http_response_code: 301
#  script: auto

entrypoint: gunicorn -b :$PORT --chdir src server.wsgi

env_variables:
...

更新:

这里是执行的日志:


 insertId: "1lfs38fa9"  
 jsonPayload: 
  @type: "type.googleapis.com/google.cloud.tasks.logging.v1.TaskActivityLog"   
  attemptResponseLog: 
   attemptDuration: "0.008005s"    
   dispatchCount: "5"    
   maxAttempts: 0    
   responseCount: "5"    
   retryTime: "2020-03-09T21:50:33.557783Z"    
   scheduleTime: "2020-03-09T21:50:23.548409Z"    
   status: "UNAVAILABLE"    
   targetAddress: "POST /api/v1/tasks"    
   targetType: "APP_ENGINE_HTTP"    
  
  task: "projects/[proj]/locations/us-central1/queues/tectaq/tasks/09687434589619534431"   
 
 logName: "projects/[proj]/logs/cloudtasks.googleapis.com%2Ftask_operations_log"  
 receiveTimestamp: "2020-03-09T21:50:24.375681687Z"  
 resource: 
  labels: 
   project_id: "[proj]"    
   queue_id: "tectaq"    
   target_type: "APP_ENGINE_HTTP"    
  
  type: "cloud_tasks_queue"   
 
 severity: "ERROR"  
 timestamp: "2020-03-09T21:50:23.557842532Z"  

【问题讨论】:

我不确定文件views/manut.py 的用途是什么,因为它似乎没有包含在任何地方。云任务队列默认禁用日志记录,因此请尝试为正在调度您的任务的队列启用它,也许它会在那里显示一些信息:gcloud beta tasks queues update <tasks-queue-name> --log-sampling-ratio=1.0 感谢@yedpodtrzitko 提供有关日志记录的提示。我启用了它,但情况并没有好转,因为错误“不可用”对我来说毫无意义。 您能否从您的dispatch.yaml 中删除规则*appspot.com/*,以确保它不会引起任何问题?无论如何,一切都默认路由到default服务,所以这应该是不必要的...... 能加个简化的例子重现吗? 我从 dispatch.yaml 中删除了 *appspot.com/*,但没有改变。我会尝试做一个非常简单的worker,这样问题就可以重现了,谢谢。 【参考方案1】:

最后我可以让 Cloud Tasks 工作,但只能使用 http_request 类型(带有绝对 url)。当任务被定义为 app_engine_http_request(相对 url)时,我无法让它们运行。

我已经尝试过使用 POST 的 http_request 类型,但那是在我免除 api 函数之前检查 csrf 令牌之前,这导致错误 Forbidden (Referer checking failed - no Referer.): /api/v1/tasks,我无法连接到 csrf 遗漏。

如果将来有人偶然发现这个问题,并找到一种方法让 app_engine_http_request 使用 Django 在 Cloud Tasks 上工作,我仍然非常想知道解决方案。

【讨论】:

【参考方案2】:

问题在于 App Engine 任务处理程序不遵循重定向,因此您必须找出请求被重定向的原因并对 App Engine 请求进行异常处理。在我的情况下,我将 http 重定向到 https 并且必须像这样做出异常:(Node Express)

app.use((req, res, next) => 

   const protocol = req.headers['x-forwarded-proto']
   const userAgent = req.headers['user-agent']

   if (userAgent && userAgent.includes('AppEngine-Google')) 
      console.log('USER AGENT IS GAE, SKIPPING REDIRECT TO HTTPS.')
      return next()
    else if (protocol === 'http') 
      res.redirect(301, `https://$req.headers.host$req.url`)
    else 
      next()
   

)

【讨论】:

【参考方案3】:

问题是所有创建的任务都进入队列,但无法执行。控制台和日志仅报告 http 代码 301(永久重定向)。

也许您的任务端点的请求处理程序需要一个斜杠。

尝试改变这个:

class ManutView(View):
    template_name = '[proj]/manut.html'

    def post(self, request, *args, **kwargs):
        relative_url = '/api/v1/tasks'
        ...

到这里:

class ManutView(View):
    template_name = '[proj]/manut.html'

    def post(self, request, *args, **kwargs):
        relative_url = '/api/v1/tasks/'
        ...

也可以尝试自己点击任务 url,看看是否可以从 curl 运行任务

【讨论】:

是的,我什至包含了两个 url 模式(带和不带斜杠),所以没有一个会重定向。 Cloud Tasks 上的错误相同,从 curl 或浏览器调用时都成功。我也尝试过 GET(没有有效负载/正文)而不是 POST,一切都保持不变。【参考方案4】:

如果以后有人偶然发现这个问题,并想办法 为了让 app_engine_http_request 使用 Django 在 Cloud Tasks 上工作,我会 还是很想知道解决办法。

@JCampos 我设法让它在我的 Django 应用程序上运行(我还使用了 DRF,但我认为它不会造成很大的不同)。

from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2
import datetime

class CloudTasksMixin:
@property
def _cloud_task_client(self):
        return tasks_v2.CloudTasksClient()

def send_to_cloud_tasks(self, url, http_method='POST', payload=None,in_seconds=None, name=None):
    """ Send task to be executed """
    parent = self._cloud_task_client.queue_path(settings.TASKS['PROJECT_NAME'], settings.TASKS['QUEUE_REGION'], queue=settings.TASKS['QUEUE_NAME'])

    task = 
        'app_engine_http_request': 
            'http_method': http_method,
            'relative_uri': url
        
    

    ...

然后我使用这样的视图:

class CloudTaskView(views.APIView):
    authentication_classes = []

    def post(self, request, *args, **kwargs):
        # Do your stuff
        return Response()

最后,我在 urls.py(来自 DRF)中使用 csrf_exempt(CloudTaskView.as_view()) 实现了这个 url

起初我遇到 403 错误,但感谢您和您对 csrf_exempt 的评论,它现在可以正常工作了。

【讨论】:

【参考方案5】:

似乎 Cloud Tasks 使用 HTTP url 调用 App Engine(这没关系,因为它们可能在同一个网络中),但如果您使用的是 HTTPs,Django 应该重定向(http -> https)收到的任何请求,包括您的处理程序端点。

要解决这个问题,你应该告诉 Django 不要重定向你的处理程序。 您可以使用settings.SECURE_REDIRECT_EXEMPT。

例如:

SECURE_REDIRECT_EXEMPT = [r"^api/v1/tasks/$"]

【讨论】:

以上是关于每个任务都无法在 Google Cloud Tasks 上执行的主要内容,如果未能解决你的问题,请参考以下文章

无法在 Google Cloud Compute Engine 上使用 GPU

Google Cloud Tasks ImportError:无法导入名称“resource_pb2”

Google Cloud Tasks 无法向 Cloud Run 进行身份验证

Google Cloud Tasks HTTP 触发器 - 如何禁用重试

如何使用 Google Cloud PubSub 和 Run 处理资源密集型长时间运行的任务?

更改Google Cloud SQL中的存储类型