


【中文标题】从谷歌云功能排队大量任务【英文标题】:Queue large number of tasks from Google cloud function 【发布时间】:2021-02-05 18:00:31 【问题描述】:

我正在尝试使用云功能通过每天调用一次外部 API 来更新数据。


云计划设置为调用函数 1

函数 1 - 遍历项目并为每个项目创建一个任务

任务 - 使用函数 1 提供的数据调用函数 2

函数 2 - 调用外部 API 来获取数据并更新我们的数据库

问题是每天有大约 2k 项要​​更新,并且云功能在它可以执行此操作之前会超时,因此我将它们放在队列中。但是,即使将项目放入队列中,云功能也需要很长时间,因此在将它们全部添加之前会超时。




函数 1 的代码:

def refresh(request):
    for i in items:
        # Create a client.
        client = tasks_v2.CloudTasksClient()

        # TODO(developer): Uncomment these lines and replace with your values.
        project = 'my-project'
        queue = 'refresh-queue'
        location = 'europe-west2'
        name = i['name'].replace(' ','')
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name=name"

        # Construct the fully qualified queue name.
        parent = client.queue_path(project, location, queue)

        # Construct the request body.
        task = 
            "http_request":   # Specify the type of request.
                "http_method": tasks_v2.HttpMethod.GET,
                "url": url,  # The full url path that the task will be sent to.

        # Use the client to build and send the task.
        response = client.create_task(request="parent": parent, "task": task)


我了解到您已经提高了 Cloud Functions 的超时时间,这需要超过 9 分钟,您是否尝试过为 Cloud Functions 使用 higher tier?我的意思是使用一个 512Mb 的函数,它使用 800 MHz 的 CPU 或 1024 Mb,也许这有助于处理时间。 @Chris32 我认为问题不在于等待外部 API 的处理时间 【参考方案1】:

回答您的问题“有没有一种简单的方法可以一次将多个任务批量添加到队列中?”根据公众documentation 最好的方法是实现双注入模式。


此外,我建议您将500/50/5 模式用于冷队列。这将有助于任务队列和 Cloud Function 服务以安全比率上升。



Chris32 的回答是正确的,但我在您的代码 sn-p 中注意到的一件事是您应该在 for 循环之外创建客户端。

def refresh(request):
    # Create a client.
    client = tasks_v2.CloudTasksClient()

    # TODO(developer): Uncomment these lines and replace with your values.
    project = 'my-project'
    queue = 'refresh-queue'
    location = 'europe-west2'
    for i in items:
        name = i['name'].replace(' ','')
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name=name"

        # Construct the fully qualified queue name.
        parent = client.queue_path(project, location, queue)

        # Construct the request body.
        task = 
            "http_request":   # Specify the type of request.
                "http_method": tasks_v2.HttpMethod.GET,
                "url": url,  # The full url path that the task will be sent to.

        # Use the client to build and send the task.
        response = client.create_task(request="parent": parent, "task": task)

在应用引擎中,我会在文件级别在def refresh 之外执行client = tasks_v2.CloudTasksClient(),但我不知道这对云功能是否重要。


修改“功能 2”以采用多个“名称”,而不仅仅是一个。然后在“功能 1”中,您可以一次向“功能 2”发送 10 个名称

BATCH_SIZE = 10  # send 10 names to Function 2

def refresh(request):
    # Create a client.
    client = tasks_v2.CloudTasksClient()
    # ...
    for i in range(0, len(items), BATCH_SIZE)]:
        items_batch = items[i:i + BATCH_SIZE]
        names = ','.join([i['name'].replace(' ','') for i in items_batch])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?names=names"

        # Construct the fully qualified queue name.
        # ...

如果这 2 个快速修复不这样做,那么您将不得不将“功能 1”拆分为“功能 1A”和“功能 1B”

功能 1A:

BATCH_SIZE = 100  # send 100 names to Function 1B

def refresh(request):
    client = tasks_v2.CloudTasksClient()
    for i in range(0, len(items), BATCH_SIZE)]:
        items_batch = items[i:i + BATCH_SIZE]
        names = ','.join([i['name'].replace(' ','') for i in items_batch])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-1b?names=names"

        # send the task.
        response = client.create_task(request=
            "parent": client.queue_path('my-project', 'europe-west2', 'refresh-queue'), 
                "http_request": "http_method": tasks_v2.HttpMethod.GET, "url": url

功能 1B:

BATCH_SIZE = 10  # send 10 names to Function 2

def refresh(request):
    # set `names` equal to the query param `names`
    client = tasks_v2.CloudTasksClient()
    for i in range(0, len(names), BATCH_SIZE)]:
        names_batch = ','.join(names[i:i + BATCH_SIZE])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-2?names=names_batch"

        # send the task.
        response = client.create_task(request=
            "parent": client.queue_path('my-project', 'europe-west2', 'refresh-queue'), 
                "http_request": "http_method": tasks_v2.HttpMethod.GET, "url": url



从谷歌云上传多个 csv 到 bigquery

从谷歌云数据存储迁移到谷歌云 sql

使用 python 将历史数据从谷歌云存储移动到日期分区的 bigquery 表


从谷歌云存储获取状态 503
