如何在 Celery-Django 应用程序中监视来自工作人员的事件?

Posted

技术标签:

【中文标题】如何在 Celery-Django 应用程序中监视来自工作人员的事件?【英文标题】:How to monitor events from workers in a Celery-Django application? 【发布时间】:2013-02-16 01:15:17 【问题描述】:

根据有关real-time monitoring of celery workers 的 celery 教程,还可以通过编程方式捕获工人产生的事件并采取相应的行动。

我的问题是如何将监视器作为 this 示例中的监视器集成到 Celery-Django 应用程序中?

编辑: 教程中的代码示例如下:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        task_id = event['uuid']

        print('TASK FAILED: %s[%s] %s' % (
            event['name'], task_id, state[task_id].info(), ))
    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers=
                'task-failed': announce_failed_tasks,
                'worker-heartbeat': announce_dead_workers,
        )
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    celery = Celery(broker='amqp://guest@localhost//')
    my_monitor(celery)

所以我想捕获工作人员发送的 task_failed 事件,并像教程所示那样获取它的 task_id,从为我的应用程序配置的结果后端获取此任务的结果并进一步处理它。我的问题是如何获取应用程序对我来说并不明显,因为在 django-celery 项目中,Celery 库的实例化对我来说并不透明。

我也愿意接受任何其他关于如何在工作人员完成执行任务时处理结果的想法。

【问题讨论】:

我认为您必须更具体一点,需要捕获哪些事件?有示例代码吗? 【参考方案1】:

好的,我找到了一种方法,虽然我不确定这是否是解决方案,但它对我有用。监视器功能基本上直接连接到代理并监听不同类型的事件。我的代码如下所示:

from celery.events import EventReceiver
from kombu import Connection as BrokerConnection

def my_monitor:
    connection = BrokerConnection('amqp://guest:guest@localhost:5672//')

    def on_event(event):
        print "EVENT HAPPENED: ", event

    def on_task_failed(event):
        exception = event['exception']
        print "TASK FAILED!", event, " EXCEPTION: ", exception

    while True:
        try:
            with connection as conn:
                recv = EventReceiver(conn,
                                 handlers='task-failed' : on_task_failed,
                                           'task-succeeded' : on_event,
                                           'task-sent' : on_event,
                                           'task-received' : on_event,
                                           'task-revoked' : on_event,
                                           'task-started' : on_event,
                                           # OR: '*' : on_event
                                           )
            recv.capture(limit=None, timeout=None)
    except (KeyboardInterrupt, SystemExit):
        print "EXCEPTION KEYBOARD INTERRUPT"
        sys.exit()

这就是全部。我在与普通应用程序不同的进程中运行它,这意味着我创建了我的 celery 应用程序的子进程,它只运行这个函数。 高温

【讨论】:

嗨,谢谢,你的问题基本上就是我现在想要做的。你在哪里把这段代码放在你的 Django 项目中?您能解释一下创建 celery 应用程序的子进程吗?目前我的芹菜应用程序在myproj/myproj/celery.py中配置(根据docs.celeryproject.org/en/latest/django/…) 嗨!我已经很长时间没有在这方面工作了,所以在最近的 n 个版本中,Celery 本身的情况可能已经发生了变化。基本上我正在启动一个 Python 守护进程,例如: daemon_process = Process(target=results_processing.my_monitor) daemon_process.daemon = True daemon_process.start() 在应用程序启动时调用的模块之一中 我使用 Django 并通过它启动了这个监视器。在 Django proj/proj/celery.py 文件中开始监视,只需在定义 celery 应用程序后通过 my_monitor(app) 即可。现在在 Django 1.9 中导致 AppRegistryNotReady exc(我认为现在不允许在应用程序的 __init__.py 中导入模型 --- 我应该注意我的监视器依赖于某些模型)。我最终在 django 应用程序的 AppConfig.ready() 方法中启动了监视器,我的监视器依赖于它的模型(这确保了应用程序已完成注册)。 HTH【参考方案2】:

提防几个陷阱

    您需要在 celery 配置中将 CELERY_SEND_EVENTS 标志设置为 true。 您还可以在工作线程的新线程中设置事件监视器。

这是我的实现:

class MonitorThread(object):
    def __init__(self, celery_app, interval=1):
        self.celery_app = celery_app
        self.interval = interval

        self.state = self.celery_app.events.State()

        self.thread = threading.Thread(target=self.run, args=())
        self.thread.daemon = True
        self.thread.start()

    def catchall(self, event):
        if event['type'] != 'worker-heartbeat':
            self.state.event(event)

        # logic here

    def run(self):
        while True:
            try:
                with self.celery_app.connection() as connection:
                    recv = self.celery_app.events.Receiver(connection, handlers=
                        '*': self.catchall
                    )
                    recv.capture(limit=None, timeout=None, wakeup=True)

            except (KeyboardInterrupt, SystemExit):
                raise

            except Exception:
                # unable to capture
                pass

            time.sleep(self.interval)

if __name__ == '__main__':
    app = get_celery_app() # returns app
    MonitorThread(app)
    app.start()

【讨论】:

以上是关于如何在 Celery-Django 应用程序中监视来自工作人员的事件?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 C++ 中监视目录中的文件?

如何监视 WPF 中的所有窗口,在所有窗口中订阅事件或者附加 UI

如何检查 Celery 中的任务状态?

如何使用 TCP 在 c# 控制台应用程序中监视 syslog 消息

如何监视自己计算机上运行的网络数据

如何在WinForm应用程序中持续监视用户输入