Cloud Tasks 条件执行

Posted

技术标签:

【中文标题】Cloud Tasks 条件执行【英文标题】:Cloud Tasks Conditional Execution 【发布时间】:2020-07-28 02:53:29 【问题描述】:

我正在使用 Cloud Tasks。只有在任务 A 和任务 B 成功完成后,我才需要触发任务 C 的执行。所以我需要某种方式来阅读/被通知触发的任务状态。但是我在 GCP 的文档中看不到这样做的方法。如果有帮助的话,使用 Node.js SDK 创建任务和 Cloud Functions 作为任务处理程序。

编辑:

根据要求,这里有更多关于我们正在做的事情的信息:

任务 1 到 10 分别发出 HTTP 请求、获取数据、根据这些数据更新 Firestore 中的各个集合。这 10 个任务可以并行运行,并且没有特定的顺序,因为它们彼此之间没有任何依赖关系。所有这些任务实际上都是在 GCF 内部实现的。

Task 11实际上依赖于Tasks 1-10更新的Firestore集合数据,所以只有Tasks 1-10成功完成后才能运行。

我们确实会发出一个 RunID 作为通用标识符来对所有任务 (1 - 11) 的特定运行进行分组。

【问题讨论】:

如果您添加更多关于任务的作用以及它们如何相互依赖的上下文,我们将能够更好地为您指出。 问题已编辑为您提供更多信息 【参考方案1】:

Cloud Task 只触发任务,只能定义时间条件。您必须在任务 C 运行时手动编写检查代码。

这里是一个流程示例:

任务 A 正在运行,最后,写入 firestore 的任务已完成 任务 B 正在运行,最后,写入 firestore 的任务已完成 任务 C 启动并检查 A 和 B 是否在 firestore 中完成。 如果不是,任务退出错误 是,继续这个过程

您必须自定义 C 任务队列,以便在出错时重试任务。

另一个昂贵的解决方案是使用 Cloud Composer 来处理此工作流

目前没有其他关于工作流管理的解决方案。

【讨论】:

如果使用这种方法,您可以从 Cloud Firestore onWrite 侦听器触发任务 C,该侦听器侦听任务 A 和 B 写入的位置。 “检查”功能将为每对运行两次(第一次仅在 A 或 B 完成时运行,第二次在两者中的最后一个完成时运行 - 触发任务 C)。如果任务 A 和 B 具有“作业 ID”的概念,您可以写入同一文档 /async/job10 并使用 onUpdate 侦听器,该侦听器仅在文档被较长运行的任务更改时才会触发。 是的,这是一个不错的选择。这一切都取决于您的上下文、要同步的任务数量和并行任务的数量。但是,是的,好主意!【参考方案2】:

Cloud Tasks 不是您想在这种情况下使用的工具。查看Cloud Composer,它内置于 Apache Airflow for GCP 之上。

编辑:您可以创建一个 GCF 来处理这些请求的状态

import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

################ TASK A
taskA_list = [
    "https://via.placeholder.com/400",
    "https://via.placeholder.com/410",
    "https://via.placeholder.com/420",
    "https://via.placeholder.com/430",
    "https://via.placeholder.com/440",
    "https://via.placeholder.com/450",
    "https://via.placeholder.com/460",
    "https://via.placeholder.com/470",
    "https://via.placeholder.com/480",
    "https://via.placeholder.com/490",
]

def call2TaskA(url):
    html = requests.get(url, stream=True)
    return (url,html.status_code)


processes = []
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
    for url in taskA_list:
        processes.append(executor.submit(call2TaskA, url))

isOkayToDoTaskB = True
for taskA in as_completed(processes):
    result = taskA.result()
    if result[1] != 200: # your validation on taskA
        isOkayToDoTaskB = False
    results.append(result)

if not isOkayToDoTaskB:
    raise ValueError('Problems: '.format(results))

################ TASK B
def doTaskB():
    pass

doTaskB()

【讨论】:

Cloud Composer 在这方面有点过头了

以上是关于Cloud Tasks 条件执行的主要内容,如果未能解决你的问题,请参考以下文章

ansible-playbook之条件判断

Cloud Tasks + Cloud Functions - 重复执行

每个任务都无法在 Google Cloud Tasks 上执行

Ansible — 编程 — 条件与循环

Ansible 日常使用技巧 (下)

ansible