特定于队列的芹菜事件

Posted

技术标签:

【中文标题】特定于队列的芹菜事件【英文标题】:Celery events specific to a queue 【发布时间】:2014-11-06 13:50:42 【问题描述】:

我有两个 Django 项目,每个项目都有一个 Celery 应用程序:

- fooproj.celery_app
- barproj.celery_app

每个应用都运行自己的 Celery worker:

celery worker -A fooproj.celery_app -l info -E -Q foo_queue
celery worker -A barproj.celery_app -l info -E -Q bar_queue

以下是我配置 Celery 应用程序的方式:

import os
from celery import Celery
from django.conf import settings


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings.local')


app = Celery('celery_app', broker=settings.BROKER_URL)
app.conf.update(
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
    CELERY_SEND_EVENTS=True,
    CELERY_DEFAULT_QUEUE=settings.CELERY_DEFAULT_QUEUE,
    CELERY_DEFAULT_EXCHANGE=settings.CELERY_DEFAULT_EXCHANGE,
    CELERY_DEFAULT_ROUTING_KEY=settings.CELERY_DEFAULT_ROUTING_KEY,
    CELERY_DEFAULT_EXCHANGE_TYPE='direct',
    CELERY_ROUTES = ('proj.celeryrouters.MainRouter', ),
    CELERY_IMPORTS=(
        'apps.qux.tasks',
        'apps.lorem.tasks',
        'apps.ipsum.tasks',
        'apps.sit.tasks'
    ),
)

我的路由器类:

from django.conf import settings


class MainRouter(object):
    """
    Routes Celery tasks to a proper exchange and queue
    """
    def route_for_task(self, task, args=None, kwargs=None):
        return 
            'exchange': settings.CELERY_DEFAULT_EXCHANGE,
            'exchange_type': 'direct',
            'queue': settings.CELERY_DEFAULT_QUEUE,
            'routing_key': settings.CELERY_DEFAULT_ROUTING_KEY,
        

fooproj 有设置:

BROKER_URL = redis://localhost:6379/0
CELERY_DEFAULT_EXCHANGE = 'foo_exchange'
CELERY_DEFAULT_QUEUE = 'foo_queue'
CELERY_DEFAULT_ROUTING_KEY = 'foo_routing_key'

barproj 有设置:

BROKER_URL = redis://localhost:6379/1
CELERY_DEFAULT_EXCHANGE = 'foo_exchange'
CELERY_DEFAULT_QUEUE = 'foo_queue'
CELERY_DEFAULT_ROUTING_KEY = 'foo_routing_key'

如您所见,两个项目都使用自己的 Redis 数据库作为代理,使用自己的 mysql 数据库作为结果后端,使用自己的交换、队列和路由键。

我正在尝试运行两个 Celery 事件进程,每个应用程序一个:

celery events -A fooproj.celery_app -l info -c djcelery.snapshot.Camera
celery events -A barproj.celery_app -l info -c djcelery.snapshot.Camera

问题是,两个 celery 事件进程都在从我所有的 Celery 工人那里接任务!所以在 fooproj 数据库中,我可以看到来自 barproj 数据库的任务结果。

知道如何解决这个问题吗?

【问题讨论】:

或许对你有帮助***.com/questions/5460276/… 【参考方案1】:

来自http://celery.readthedocs.org/en/latest/getting-started/brokers/redis.html

监控事件(由花和其他工具使用)是全局的,并且 不受虚拟主机设置的影响。

这是由 Redis 的限制引起的。 Redis PUB/SUB 通道是全局的,不受数据库编号的影响。

这似乎是 Redis 的警告之一 :(

【讨论】:

以上是关于特定于队列的芹菜事件的主要内容,如果未能解决你的问题,请参考以下文章

丢弃 child 会触发 parent 的“drop”事件侦听器。我想用特定于子的“drop”覆盖该事件侦听器

是否有可用的事件(在 xamarin 或特定于平台的)可用于收听用户设备时间的任何变化?

php TEC档案页面(不是单个事件):添加特定于Google日历的第二个导出/订阅按钮 - https://cl.ly/2j2d2d1B3s1E

php TEC档案页面(不是单个事件):添加特定于Google日历的第二个导出/订阅按钮 - https://cl.ly/2j2d2d1B3s1E

grails中特定于环境的web.xml?

Flask入门之触发器,事件,数据迁移