芹菜作为网络发布/订阅事件

Posted

技术标签:

【中文标题】芹菜作为网络发布/订阅事件【英文标题】:Celery as networked pub/sub events 【发布时间】:2015-09-12 20:07:37 【问题描述】:

我想建立一个网络发布/订阅事件系统,但还需要能够异步运行任务。我曾尝试让 celery 来完成繁重的工作,但我觉得我正在尝试填充一大堆东西以使其正常工作。

我有两台机器(输入和输出),它们都可以访问 RabbitMQ。我想让一个主程序启动一个等待输入的循环(网络摄像头检测到移动)。我已经设置了 input_machine 启动 main.py ,它启动了一个 celery 任务,该任务由 input_machine 上的工作人员监控,该任务被分配到“输入”队列。这个任务只是运行一个 while True 循环,直到检测到一些输入,然后它调用另一个名为('project.entered_room' 什么都不做)芹菜任务到“输出”队列。

同时在 output_machine 上,我有一个 celery 实例正在监视“输出”队列,其中包含一个名为 ('project.entered_room' 响应有人进入房间的任务)。

因此,当在 input_machine 上检测到输入时,任务会在输出机器上运行。我可以让它工作,但遇到很多导入问题和其他令人头疼的问题。有没有更简单的方法来实现这一点?我做错了?我是否使用了错误的工具?

我研究了许多不同的框架,包括电路和扭曲。 Twisted 非常复杂,我觉得我会用手提钻敲钉子。

【问题讨论】:

【参考方案1】:

我建议跳过 Celery 并直接使用 Redis 及其 pub/sub 功能。例如,您可以通过运行 Docker image 来启动 Redis。然后在您的输入机器上,当检测到某些东西时,您将消息发布到通道。在您的输出机器上,您订阅该频道并对事件采取行动。

例如,您的输入机器可以使用如下内容:

import redis

def publish(message):
    r = redis.Redis(host="redis")
    r.publish("test-channel", message)

然后在输出端:

import time
import redis

def main():
    r = redis.Redis(host="redis", decode_responses=True)
    p = r.pubsub(ignore_subscribe_messages=True)
    p.subscribe("test-channel")

    while True:
        message = p.get_message()
        if message:
            print(message.get("data", ""))
            # Do more things...
        time.sleep(0.001)

通过这种方式,您可以在输入和输出机器之间发送纯文本或 JSON 数据。

在此处查找示例实现:https://github.com/moritz-biersack/simple-async-pub-sub

【讨论】:

既然RabbitMQ可以做pubsub(OP用的是RabbitMQ),那么Redis相对RabbitMQ有什么优势呢?【参考方案2】:

Celery 只是一个任务管理器。

RabbitMQ 是您的消息代理。 我会在你的两台机器之间实现一个 RabbitMQ 通道,并使用发布/订阅来管理你的输入。

也许这个link可以帮助你

【讨论】:

以上是关于芹菜作为网络发布/订阅事件的主要内容,如果未能解决你的问题,请参考以下文章

EventBus手写实现事件通信框架 ( 订阅类-订阅方法缓存集合 | 事件类型-订阅者集合 | 订阅对象-事件类型集合 )

Terraform 模块 azure 事件订阅可选字段

publish/subscribe

js 事件发布订阅销毁

EventBus事件通信框架 ( 发送事件 | 根据事件类型获取订阅者 | 调用订阅方法 )

事件订阅者是不是按订阅顺序调用?