通过 celery 向 django-channels 发送消息

Posted

技术标签:

【中文标题】通过 celery 向 django-channels 发送消息【英文标题】:Sending messages to django-channels via celery 【发布时间】:2017-06-09 05:03:57 【问题描述】:

所以我有一个预定的 celery beat 任务(celery.py):

@app.on_after_configure.connect 
def setup_periodic_tasks(sender,
**kwargs):
    sender.add_periodic_task(10.0, test_event, name='test')

还有任务(events/tasks.py):

@shared_task
def test_event():
    from .models import Event
    Event.objects.create()

当事件被创建时,接收器被触发,它应该向通道组发送消息(events/receivers.py):

@receiver(post_save, sender=Event)
def event_post_add(sender, instance, created, *args, **kwargs):
    if created:
        print("receiver fired")
        Group("test").send(
            "text": json.dumps(
                'type': 'test',
            )
        )

主要问题是接收器在 celery beat 过程中被触发,并且没有通过 django 通道发送任何内容。没有错误消息,什么都没有,只是没有发送。

如何整合这两者,以便能够从 celery 后台进程向通道发送消息?

【问题讨论】:

【参考方案1】:

您好,我不知道您是否找到了解决方案。但是由于我自己被困在这个问题上,所以我尝试了解决方法。 我为需要通过 websocket 发送的消息创建了一个视图,并从 celery beat 向它发出请求 观点:

def send_message(request,uuid,name):
print('lamo')
ty = f"uuid_uuid"
data=
    'message':f'name Driver is now Avaliable',
    'sender':'HQ',
    'id':str(uuid),
    'to':str(uuid),
    'type':'DriverAvailable',

channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
    ty,
    'type':'chat_message',
    'message':data,
    
)

和任务:

def my_task():
list=[]
for d in Driver_api.objects.all():
    if d.available_on !=None:
        if d.available_on <= timezone.now():
            d.available_on = None
            d.save()
        uuid = str(d.user.uuid)
        requests.get(f'DOMAINmessage/sendMessage/uuid/d.name')
    logger.info('Adding 0'.format(d.user.uuid))

return list

对于我解决问题的方法中的任何疏忽或疏忽深表歉意。

【讨论】:

【参考方案2】:

信号在 Django 中实际上并不是异步的。 所以在:

@shared_task
def test_event():
    from .models import Event
    Event.objects.create() # This will fire a signal and the signal will 
                           # still be interpreted by celery

以下链接详细描述了此问题: https://githubmemory.com/repo/CJWorkbench/channels_rabbitmq/issues/37

我已经检查了 redis_channels 重新连接和次优性能的声明(如链接中所述),但我找不到它发生。

这是我正在运行的代码,Celery 正在向 django 频道发送消息。

class DjangoView(APIView):
    def get(request):
        send_message_to_channels.delay()

@shared_task
def send_message_to_channels():
    send_test_message("Hello from celery", 200)
def send_test_message(message: str, code: int):
    channel_layer = get_channel_layer()
    channel_name = "celery-channels-test"
    async_to_sync(channel_layer.group_send)(channel_name, 
        'type': 'consumer.test.message',
        'message': message,
        'code': code
    )

使我的代码工作的先决条件

安装channels_redis(其他软件包只是失败) 在 django settings.py 中定义 channel_layer 指向 redis 数据库

如果这不起作用,我想简单但非常讨厌的方法是向 django 中的视图发出请求,就像在@cosmo_boi 回答中一样

【讨论】:

以上是关于通过 celery 向 django-channels 发送消息的主要内容,如果未能解决你的问题,请参考以下文章

从 Celery 任务向 Channels 发送消息

如何向 Celery (celerybeat) 动态添加/删除周期性任务

如何向 Django Celery Flower Monitoring 添加身份验证和端点?

Celery-Supervisor:如何重新启动主管工作以使新更新的 celery-tasks 工作?

RuntimeError:永远不要在任务 Celery 中调用 result.get()

celery