与 celery 正在进行的任务交互
Posted
技术标签:
【中文标题】与 celery 正在进行的任务交互【英文标题】:Interact with celery ongoing task 【发布时间】:2015-05-27 12:09:38 【问题描述】:我们有一个基于rabbitMQ
和Celery
的分布式架构。
我们可以毫无问题地并行启动多个任务。扩展性好。
现在我们需要远程控制任务:暂停、恢复、取消。 我们找到的唯一解决方案是在 Celery 任务中对另一个任务进行 RPC 调用,该任务在 DB 请求后回复命令。 Celery 任务和 RPC 任务不在同一台机器上,只有 RPC 任务可以访问数据库。
您对如何改进它并轻松与正在进行的任务进行沟通有什么建议吗? 谢谢
编辑:
事实上,我们想做如下图所示的事情。进行Blue
配置或Orange
配置很容易,但我们不知道如何同时进行。
工人正在订阅一个共同的Jobs queue
,每个工人都有自己的Admin queue
在交易所声明。
编辑:
如果 Celery
无法做到这一点,我愿意接受 python-rq
等其他框架的解决方案。
【问题讨论】:
这个问题很老了,是的,但在今天同样重要。我在这里打开了一个相关但更一般的通信问题:***.com/questions/59796397/… 但我想知道你是否曾经在这方面用 celery 或 python-rq 取得进展? @BerndWechner 迟到了,但我想我可能有一个实用的解决方案 - 发布了一个答案。 【参考方案1】:看起来像Control Bus pattern
。
为了更好的可扩展性和减少 RPC 调用,我建议反转逻辑。当状态发生变化时,PAUSE, RESUME, CANCEL
命令通过控制总线推送到 Celery 任务。 Celery 应用程序会将 Celery 应用程序的当前状态存储在存储中(可能在内存中,在文件系统中......)。如果即使在应用程序停止/启动后也必须保持任务状态,这将涉及更多工作以保持两个应用程序同步(例如启动时同步)。
【讨论】:
谢谢,但是有没有办法用celery实现管理通道? 出色但无用的模糊回复,抱歉。我会重复 Julio 的问题:具体来说,如何通过控制总线推送到 Celery 任务?好主意。但一个具体的例子就是挽救生命。【参考方案2】:我想展示一种通过工作流模式实现可暂停(和可恢复)正在进行 celery 任务的通用方法。注意:原始答案写为here。由于这篇文章非常相关,因此在此处重新编写。
概念
使用celery workflows - 您可以将整个操作设计为分为chain
的任务。它不一定必须是纯粹的链,但它应该遵循一个任务在另一个任务(或任务group
)完成后完成的一般概念。
一旦您有了这样的工作流程,您就可以最终定义点以在整个工作流程中暂停。在每个这些点,您可以检查前端用户是否请求操作暂停并相应地继续。概念是这样的:-
一个复杂且耗时的操作 O 被拆分为 5 个 celery 任务——T1、T2、T3、T4 和 T5——每个任务(第一个除外)都取决于前一个任务的返回值。
假设我们定义了在每个任务之后暂停的点,所以工作流看起来像-
T1 执行 T1 完成,检查用户是否请求暂停 如果用户没有请求暂停 - 继续 如果用户请求暂停,序列化剩余的工作流链并将其存储在某个地方以便以后继续... 等等。由于每个任务之后都有一个暂停点,因此在每个任务之后都会执行该检查(当然最后一个除外)。
但这只是理论,我很难在网上任何地方找到它的实现,所以这就是我想出的-
实施
from typing import Any, Optional
from celery import shared_task
from celery.canvas import Signature, chain, signature
@shared_task(bind=True)
def pause_or_continue(
self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
):
# Task to use for deciding whether to pause the operation chain
if signature(clause)(retval):
# Pause requested, call given callback with retval and remaining chain
# chain should be reversed as the order of execution follows from end to start
signature(callback)(retval, self.request.chain[::-1])
self.request.chain = None
else:
# Continue to the next task in chain
return retval
def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
'''
Make a operation workflow chain pause-able/resume-able by inserting
the pause_or_continue task for every nth task in given chain
ch: chain
The workflow chain
clause: Signature
Signature of a task that takes one argument - return value of
last executed task in workflow (if any - othewise `None` is passsed)
- and returns a boolean, indicating whether or not the operation should continue
Should return True if operation should continue normally, or be paused
callback: Signature
Signature of a task that takes 2 arguments - return value of
last executed task in workflow (if any - othewise `None` is passsed) and
remaining chain of the operation workflow as a json dict object
No return value is expected
This task will be called when `clause` returns `True` (i.e task is pausing)
The return value and the remaining chain can be handled accordingly by this task
nth: Int
Check `clause` after every nth task in the chain
Default value is 1, i.e check `clause` after every task
Hence, by default, user given `clause` is called and checked
after every task
NOTE: The passed in chain is mutated in place
Returns the mutated chain
'''
newch = []
for n, sig in enumerate(ch.tasks):
if n != 0 and n % nth == nth - 1:
newch.append(pause_or_continue.s(clause=clause, callback=callback))
newch.append(sig)
ch.tasks = tuple(newch)
return ch
解释 - pause_or_continue
这里pause_or_continue
是前面提到的暂停点。这是一项将以特定时间间隔调用的任务(时间间隔为任务间隔,而不是时间间隔)。这个任务然后调用一个用户提供的函数(实际上是一个任务) - clause
- 来检查这个任务是否应该继续。
如果clause
函数(实际上是一个任务)返回True
,用户提供的callback
函数被调用,最新的返回值(如果有的话-None
否则)被传递给这个回调,以及作为剩余的任务链。 callback
做它需要做的事情,pause_or_continue
将 self.request.chain
设置为 None
,这告诉 celery “任务链现在是空的 - 一切都完成了”。
如果clause
函数(实际上是一个任务)返回False
,则返回上一个任务的返回值(如果有的话 - None
否则)返回给下一个要接收的任务 - 并且链继续.因此工作流程继续。
为什么是clause
和callback
任务签名而不是常规函数?
clause
和callback
都被直接调用 - 没有delay
或apply_async
。它在当前进程中,在当前上下文中执行。所以它的行为与普通函数完全一样,那为什么要使用signatures
?
答案是序列化。您不能方便地将常规函数对象传递给 celery 任务。但是您可以传递任务签名。这正是我在这里所做的。 clause
和 callback
都应该是 celery 任务的常规 signature
对象。
什么是self.request.chain
?
self.request.chain
存储 dicts 列表(将 jsons 表示为 celery 任务序列化程序,默认情况下为 json) - 每个都代表一个任务签名。此列表中的每个任务都以相反的顺序执行。这就是为什么,在传递给用户提供的callback
函数(实际上是一个任务)之前,列表是颠倒的——用户可能希望任务的顺序是从左到右。
快速说明:与本次讨论无关,但如果您使用来自apply_async
的link
参数来构造链而不是chain
原语本身。 self.request.callback
是要修改的属性(即设置为None
以删除回调和停止链)而不是self.request.chain
解释 - tappable
tappable
只是一个基本函数,它接受一个链(为简洁起见,这是此处介绍的唯一工作流原语)并在每个 nth
任务之后插入 pause_or_continue
。您可以将它们插入您真正想要的任何位置,由您在操作中定义暂停点。这只是一个例子!
对于每个chain
对象,任务的实际签名(按顺序,从左到右)存储在.tasks
属性中。这是一个任务签名的元组。所以我们所要做的就是获取这个元组,转换成一个列表,插入暂停点并转换回一个元组以分配给链。然后返回修改后的链对象。
clause
和 callback
也附加到 pause_or_continue
签名上。普通的芹菜。
这涵盖了主要概念,但为了展示使用此模式的真实项目(以及展示暂停任务的恢复部分),这里有一个所有必要资源的小演示
用法
此示例使用假设具有数据库的基本 Web 服务器的概念。每当启动操作(即工作流链)时,它就会分配一个 id 并存储到数据库中。该表的架构看起来像-
-- Create operations table
-- Keeps track of operations and the users that started them
CREATE TABLE operations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
requester_id INTEGER NOT NULL,
completion TEXT NOT NULL,
workflow_store TEXT,
result TEXT,
FOREIGN KEY (requester_id) REFERENCES user (id)
);
现在唯一需要知道的字段是completion
。它只是存储操作的状态-
IN PROGRESS
当用户请求暂停时,路由控制器(即视图)将其修改为REQUESTING PAUSE
当操作实际暂停并调用callback
(来自tappable
,内部pause_or_continue
)时,callback
应将其修改为PAUSED
任务完成后修改为COMPLETED
clause
的一个例子
@celery.task()
def should_pause(_, operation_id: int):
# This is the `clause` to be used for `tappable`
# i.e it lets celery know whether to pause or continue
db = get_db()
# Check the database to see if user has requested pause on the operation
operation = db.execute(
"SELECT * FROM operations WHERE id = ?", (operation_id,)
).fetchone()
return operation["completion"] == "REQUESTING PAUSE"
这是在暂停点调用的任务,以确定是否暂停。这是一个带有 2 个参数的函数……嗯。第一个是强制性的,tappable
要求 clause
有一个(并且恰好是一个)参数 - 因此它可以将前一个任务的返回值传递给它(即使该返回值是 @ 987654385@)。在这个例子中,不需要使用返回值——所以我们可以忽略它。
第二个参数是操作id。看,clause
所做的一切 - 是检查数据库中的操作(工作流)条目并查看它是否具有状态 REQUESTING PAUSE
。为此,它需要知道操作 ID。但是clause
应该是一个只有一个参数的任务,什么给出?
好吧,好东西签名可以是部分的。首次启动任务并创建 tappable
链时。操作 id 是已知的,因此我们可以使用should_pause.s(operation_id)
来获取带有 one 参数的任务的签名,即前一个任务的返回值。这符合clause
!
callback
的一个例子
import os
import json
from typing import Any, List
@celery.task()
def save_state(retval: Any, chains: dict, operation_id: int):
# This is the `callback` to be used for `tappable`
# i.e this is called when an operation is pausing
db = get_db()
# Prepare directories to store the workflow
operation_dir = os.path.join(app.config["OPERATIONS"], f"operation_id")
workflow_file = os.path.join(operation_dir, "workflow.json")
if not os.path.isdir(operation_dir):
os.makedirs(operation_dir, exist_ok=True)
# Store the remaining workflow chain, serialized into json
with open(workflow_file, "w") as f:
json.dump(chains, f)
# Store the result from the last task and the workflow json path
db.execute(
"""
UPDATE operations
SET completion = ?,
workflow_store = ?,
result = ?
WHERE id = ?
""",
("PAUSED", workflow_file, f"retval", operation_id),
)
db.commit()
这是在任务暂停时要调用的任务。请记住,这应该采用最后执行的任务的返回值和剩余的签名列表(按顺序,从左到右)。还有一个额外的参数 - operation_id
- 再次。对此的解释与clause
的解释相同。
此函数将剩余的链存储在一个 json 文件中(因为它是一个字典列表)。请记住,您可以使用不同的序列化程序 - 我使用的是 json,因为它是 celery 使用的默认任务序列化程序。
存储剩余链后,将completion
状态更新为PAUSED
,并将json文件的路径记录到db中。
现在,让我们看看这些在行动-
启动工作流的示例
def start_operation(user_id, *operation_args, **operation_kwargs):
db = get_db()
operation_id: int = db.execute(
"INSERT INTO operations (requester_id, completion) VALUES (?, ?)",
(user_id, "IN PROGRESS"),
).lastrowid
# Convert a regular workflow chain to a tappable one
tappable_workflow = tappable(
(T1.s() | T2.s() | T3.s() | T4.s() | T5.s(operation_id)),
should_pause.s(operation_id),
save_state.s(operation_id),
)
# Start the chain (i.e send task to celery to run asynchronously)
tappable_workflow(*operation_args, **operation_kwargs)
db.commit()
return operation_id
接受用户 ID 并启动操作工作流的函数。这或多或少是一个围绕视图/路由控制器建模的不切实际的虚拟函数。但我认为它可以通过总体思路。
假设T[1-4]
是操作的所有单元任务,每一个都以前一个任务的返回作为参数。只是普通芹菜链的一个例子,你可以随意使用你的链子。
T5
是将最终结果(T4
的结果)保存到数据库的任务。因此,除了来自T4
的返回值,它还需要operation_id
。哪个被传递到签名中。
暂停工作流的示例
def pause(operation_id):
db = get_db()
operation = db.execute(
"SELECT * FROM operations WHERE id = ?", (operation_id,)
).fetchone()
if operation and operation["completion"] == "IN PROGRESS":
# Pause only if the operation is in progress
db.execute(
"""
UPDATE operations
SET completion = ?
WHERE id = ?
""",
("REQUESTING PAUSE", operation_id),
)
db.commit()
return 'success'
return 'invalid id'
这采用了前面提到的修改数据库条目以将completion
更改为REQUESTING PAUSE
的概念。一旦提交,下次pause_or_continue
调用should_pause
时,它会知道用户已请求暂停操作,并会相应地这样做。
恢复工作流程的示例
def resume(operation_id):
db = get_db()
operation = db.execute(
"SELECT * FROM operations WHERE id = ?", (operation_id,)
).fetchone()
if operation and operation["completion"] == "PAUSED":
# Resume only if the operation is paused
with open(operation["workflow_store"]) as f:
# Load the remaining workflow from the json
workflow_json = json.load(f)
# Load the chain from the json (i.e deserialize)
workflow_chain = chain(signature(x) for x in serialized_ch)
# Start the chain and feed in the last executed task result
workflow_chain(operation["result"])
db.execute(
"""
UPDATE operations
SET completion = ?
WHERE id = ?
""",
("IN PROGRESS", operation_id),
)
db.commit()
return 'success'
return 'invalid id'
回想一下,当操作暂停时 - 剩余的工作流存储在 json 中。由于我们目前将工作流限制为 chain
对象。我们知道这个 json 是一个签名列表,应该变成一个chain
。因此,我们相应地对其进行反序列化并将其发送给 celery worker。
请注意,这个剩余的工作流仍然具有原来的 pause_or_continue
任务 - 所以这个工作流本身再次可以暂停/恢复。当它暂停时,workflow.json
将被更新。
【讨论】:
以上是关于与 celery 正在进行的任务交互的主要内容,如果未能解决你的问题,请参考以下文章
如何设置 Celery 在运行任务之前调用自定义初始化函数?