从谷歌云功能排队大量任务

Posted

技术标签:

【中文标题】从谷歌云功能排队大量任务【英文标题】:Queue large number of tasks from Google cloud function 【发布时间】:2021-02-05 18:00:31 【问题描述】:

我正在尝试使用云功能通过每天调用一次外部 API 来更新数据。

到目前为止,我有:

云计划设置为调用函数 1

函数 1 - 遍历项目并为每个项目创建一个任务

任务 - 使用函数 1 提供的数据调用函数 2

函数 2 - 调用外部 API 来获取数据并更新我们的数据库

问题是每天有大约 2k 项要​​更新,并且云功能在它可以执行此操作之前会超时,因此我将它们放在队列中。但是,即使将项目放入队列中,云功能也需要很长时间,因此在将它们全部添加之前会超时。

有没有一种简单的方法可以一次将多个任务批量添加到队列中?

如果做不到这一点,是否有更好的解决方案?

全部用python编写

函数 1 的代码:

def refresh(request):
    for i in items:
        # Create a client.
        client = tasks_v2.CloudTasksClient()

        # TODO(developer): Uncomment these lines and replace with your values.
        project = 'my-project'
        queue = 'refresh-queue'
        location = 'europe-west2'
        name = i['name'].replace(' ','')
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name=name"

        # Construct the fully qualified queue name.
        parent = client.queue_path(project, location, queue)

        # Construct the request body.
        task = 
            "http_request":   # Specify the type of request.
                "http_method": tasks_v2.HttpMethod.GET,
                "url": url,  # The full url path that the task will be sent to.
            
        
        

        # Use the client to build and send the task.
        response = client.create_task(request="parent": parent, "task": task)

【问题讨论】:

我了解到您已经提高了 Cloud Functions 的超时时间,这需要超过 9 分钟,您是否尝试过为 Cloud Functions 使用 higher tier?我的意思是使用一个 512Mb 的函数,它使用 800 MHz 的 CPU 或 1024 Mb,也许这有助于处理时间。 @Chris32 我认为问题不在于等待外部 API 的处理时间 【参考方案1】:

回答您的问题“有没有一种简单的方法可以一次将多个任务批量添加到队列中?”根据公众documentation 最好的方法是实现双注入模式。

为此,您将有一个新队列,您将在其中添加一个包含原始队列的多个任务的单个任务,然后在此队列的接收端,您将有一个服务来获取此数据任务并在第二个队列中为每个条目创建一个任务。

此外,我建议您将500/50/5 模式用于冷队列。这将有助于任务队列和 Cloud Function 服务以安全比率上升。

【讨论】:

【参考方案2】:

Chris32 的回答是正确的,但我在您的代码 sn-p 中注意到的一件事是您应该在 for 循环之外创建客户端。

def refresh(request):
    # Create a client.
    client = tasks_v2.CloudTasksClient()

    # TODO(developer): Uncomment these lines and replace with your values.
    project = 'my-project'
    queue = 'refresh-queue'
    location = 'europe-west2'
    for i in items:
        name = i['name'].replace(' ','')
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?name=name"

        # Construct the fully qualified queue name.
        parent = client.queue_path(project, location, queue)

        # Construct the request body.
        task = 
            "http_request":   # Specify the type of request.
                "http_method": tasks_v2.HttpMethod.GET,
                "url": url,  # The full url path that the task will be sent to.
            
        
        

        # Use the client to build and send the task.
        response = client.create_task(request="parent": parent, "task": task)

在应用引擎中,我会在文件级别在def refresh 之外执行client = tasks_v2.CloudTasksClient(),但我不知道这对云功能是否重要。

第二件事,

修改“功能 2”以采用多个“名称”,而不仅仅是一个。然后在“功能 1”中,您可以一次向“功能 2”发送 10 个名称

BATCH_SIZE = 10  # send 10 names to Function 2

def refresh(request):
    # Create a client.
    client = tasks_v2.CloudTasksClient()
    # ...
    for i in range(0, len(items), BATCH_SIZE)]:
        items_batch = items[i:i + BATCH_SIZE]
        names = ','.join([i['name'].replace(' ','') for i in items_batch])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint?names=names"

        # Construct the fully qualified queue name.
        # ...

如果这 2 个快速修复不这样做,那么您将不得不将“功能 1”拆分为“功能 1A”和“功能 1B”

功能 1A:

BATCH_SIZE = 100  # send 100 names to Function 1B

def refresh(request):
    client = tasks_v2.CloudTasksClient()
    for i in range(0, len(items), BATCH_SIZE)]:
        items_batch = items[i:i + BATCH_SIZE]
        names = ','.join([i['name'].replace(' ','') for i in items_batch])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-1b?names=names"

        # send the task.
        response = client.create_task(request=
            "parent": client.queue_path('my-project', 'europe-west2', 'refresh-queue'), 
            "task": 
                "http_request": "http_method": tasks_v2.HttpMethod.GET, "url": url
        )

功能 1B:

BATCH_SIZE = 10  # send 10 names to Function 2

def refresh(request):
    # set `names` equal to the query param `names`
    client = tasks_v2.CloudTasksClient()
    for i in range(0, len(names), BATCH_SIZE)]:
        names_batch = ','.join(names[i:i + BATCH_SIZE])
        url = f"https://europe-west2-my-project.cloudfunctions.net/endpoint-for-function-2?names=names_batch"

        # send the task.
        response = client.create_task(request=
            "parent": client.queue_path('my-project', 'europe-west2', 'refresh-queue'), 
            "task": 
                "http_request": "http_method": tasks_v2.HttpMethod.GET, "url": url
        )

【讨论】:

以上是关于从谷歌云功能排队大量任务的主要内容,如果未能解决你的问题,请参考以下文章

从谷歌云上传多个 csv 到 bigquery

从谷歌云数据存储迁移到谷歌云 sql

使用 python 将历史数据从谷歌云存储移动到日期分区的 bigquery 表

从谷歌云存储桶中读取文件

从谷歌云存储获取状态 503

从谷歌云中的虚拟机锁定