Cloud Tasks 条件执行
Posted
技术标签:
【中文标题】Cloud Tasks 条件执行【英文标题】:Cloud Tasks Conditional Execution 【发布时间】:2020-07-28 02:53:29 【问题描述】:我正在使用 Cloud Tasks。只有在任务 A 和任务 B 成功完成后,我才需要触发任务 C 的执行。所以我需要某种方式来阅读/被通知触发的任务状态。但是我在 GCP 的文档中看不到这样做的方法。如果有帮助的话,使用 Node.js SDK 创建任务和 Cloud Functions 作为任务处理程序。
编辑:
根据要求,这里有更多关于我们正在做的事情的信息:
任务 1 到 10 分别发出 HTTP 请求、获取数据、根据这些数据更新 Firestore 中的各个集合。这 10 个任务可以并行运行,并且没有特定的顺序,因为它们彼此之间没有任何依赖关系。所有这些任务实际上都是在 GCF 内部实现的。
Task 11实际上依赖于Tasks 1-10更新的Firestore集合数据,所以只有Tasks 1-10成功完成后才能运行。
我们确实会发出一个 RunID 作为通用标识符来对所有任务 (1 - 11) 的特定运行进行分组。
【问题讨论】:
如果您添加更多关于任务的作用以及它们如何相互依赖的上下文,我们将能够更好地为您指出。 问题已编辑为您提供更多信息 【参考方案1】:Cloud Task 只触发任务,只能定义时间条件。您必须在任务 C 运行时手动编写检查代码。
这里是一个流程示例:
任务 A 正在运行,最后,写入 firestore 的任务已完成 任务 B 正在运行,最后,写入 firestore 的任务已完成 任务 C 启动并检查 A 和 B 是否在 firestore 中完成。 如果不是,任务退出错误 是,继续这个过程您必须自定义 C 任务队列,以便在出错时重试任务。
另一个昂贵的解决方案是使用 Cloud Composer 来处理此工作流
目前没有其他关于工作流管理的解决方案。
【讨论】:
如果使用这种方法,您可以从 Cloud FirestoreonWrite
侦听器触发任务 C,该侦听器侦听任务 A 和 B 写入的位置。 “检查”功能将为每对运行两次(第一次仅在 A 或 B 完成时运行,第二次在两者中的最后一个完成时运行 - 触发任务 C)。如果任务 A 和 B 具有“作业 ID”的概念,您可以写入同一文档 /async/job10
并使用 onUpdate
侦听器,该侦听器仅在文档被较长运行的任务更改时才会触发。
是的,这是一个不错的选择。这一切都取决于您的上下文、要同步的任务数量和并行任务的数量。但是,是的,好主意!【参考方案2】:
Cloud Tasks 不是您想在这种情况下使用的工具。查看Cloud Composer,它内置于 Apache Airflow for GCP 之上。
编辑:您可以创建一个 GCF 来处理这些请求的状态
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
################ TASK A
taskA_list = [
"https://via.placeholder.com/400",
"https://via.placeholder.com/410",
"https://via.placeholder.com/420",
"https://via.placeholder.com/430",
"https://via.placeholder.com/440",
"https://via.placeholder.com/450",
"https://via.placeholder.com/460",
"https://via.placeholder.com/470",
"https://via.placeholder.com/480",
"https://via.placeholder.com/490",
]
def call2TaskA(url):
html = requests.get(url, stream=True)
return (url,html.status_code)
processes = []
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for url in taskA_list:
processes.append(executor.submit(call2TaskA, url))
isOkayToDoTaskB = True
for taskA in as_completed(processes):
result = taskA.result()
if result[1] != 200: # your validation on taskA
isOkayToDoTaskB = False
results.append(result)
if not isOkayToDoTaskB:
raise ValueError('Problems: '.format(results))
################ TASK B
def doTaskB():
pass
doTaskB()
【讨论】:
Cloud Composer 在这方面有点过头了以上是关于Cloud Tasks 条件执行的主要内容,如果未能解决你的问题,请参考以下文章
Cloud Tasks + Cloud Functions - 重复执行