芹菜作为网络发布/订阅事件
Posted
技术标签:
【中文标题】芹菜作为网络发布/订阅事件【英文标题】:Celery as networked pub/sub events 【发布时间】:2015-09-12 20:07:37 【问题描述】:我想建立一个网络发布/订阅事件系统,但还需要能够异步运行任务。我曾尝试让 celery 来完成繁重的工作,但我觉得我正在尝试填充一大堆东西以使其正常工作。
我有两台机器(输入和输出),它们都可以访问 RabbitMQ。我想让一个主程序启动一个等待输入的循环(网络摄像头检测到移动)。我已经设置了 input_machine 启动 main.py ,它启动了一个 celery 任务,该任务由 input_machine 上的工作人员监控,该任务被分配到“输入”队列。这个任务只是运行一个 while True 循环,直到检测到一些输入,然后它调用另一个名为('project.entered_room' 什么都不做)芹菜任务到“输出”队列。
同时在 output_machine 上,我有一个 celery 实例正在监视“输出”队列,其中包含一个名为 ('project.entered_room' 响应有人进入房间的任务)。
因此,当在 input_machine 上检测到输入时,任务会在输出机器上运行。我可以让它工作,但遇到很多导入问题和其他令人头疼的问题。有没有更简单的方法来实现这一点?我做错了?我是否使用了错误的工具?
我研究了许多不同的框架,包括电路和扭曲。 Twisted 非常复杂,我觉得我会用手提钻敲钉子。
【问题讨论】:
【参考方案1】:我建议跳过 Celery 并直接使用 Redis 及其 pub/sub 功能。例如,您可以通过运行 Docker image 来启动 Redis。然后在您的输入机器上,当检测到某些东西时,您将消息发布到通道。在您的输出机器上,您订阅该频道并对事件采取行动。
例如,您的输入机器可以使用如下内容:
import redis
def publish(message):
r = redis.Redis(host="redis")
r.publish("test-channel", message)
然后在输出端:
import time
import redis
def main():
r = redis.Redis(host="redis", decode_responses=True)
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe("test-channel")
while True:
message = p.get_message()
if message:
print(message.get("data", ""))
# Do more things...
time.sleep(0.001)
通过这种方式,您可以在输入和输出机器之间发送纯文本或 JSON 数据。
在此处查找示例实现:https://github.com/moritz-biersack/simple-async-pub-sub
【讨论】:
既然RabbitMQ可以做pubsub(OP用的是RabbitMQ),那么Redis相对RabbitMQ有什么优势呢?【参考方案2】:Celery 只是一个任务管理器。
RabbitMQ 是您的消息代理。 我会在你的两台机器之间实现一个 RabbitMQ 通道,并使用发布/订阅来管理你的输入。
也许这个link可以帮助你
【讨论】:
以上是关于芹菜作为网络发布/订阅事件的主要内容,如果未能解决你的问题,请参考以下文章
EventBus手写实现事件通信框架 ( 订阅类-订阅方法缓存集合 | 事件类型-订阅者集合 | 订阅对象-事件类型集合 )