从 Celery 任务向 Channels 发送消息

Posted

技术标签:

【中文标题】从 Celery 任务向 Channels 发送消息【英文标题】:Sending message from Celery task to Channels 【发布时间】:2019-08-15 19:24:32 【问题描述】:

Django 2.1.1, Django 频道 2.1.3, 芹菜 4.2.1

我在 Celery 中设置了一个任务,在任务结束时,我需要向客户端发送一条 websocket 消息。但是,永远不会发送 websocket 消息。没有抛出任何错误,只是根本不发送。

我已经设置了一个使用 Redis 作为后端的通道层。从普通的 Django 视图中执行此操作可以正常工作。但是当在 Celery 任务中运行时,它会将消息发送到 Channels,我可以看到 Channels 确实运行了下面我的 consumer.py 代码中显示的代码,但客户端从未收到 websocket 消息。

tasks.py

def import_job(self):
    # (do task calculations, store in data dict)
    message = 'type': 'send_my_data',
               'data': json.dumps(thecalcs) 
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)('core-data', message)

consumers.py

class AsyncDataConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.channel_group_name = 'core-data'

        # Join the group
        await self.channel_layer.group_add(
            self.channel_group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        # Leave the group
        await self.channel_layer.group_discard(
            self.channel_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data=None, bytes_data=None):
        pass

    # Receive message from the group
    async def send_my_data(self, event):
        text = event['data']
        # Send message to WebSocket
        await self.send(text_data=text)

settings.py

CHANNEL_LAYERS = 
    'default': 
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': 
            "hosts": [('127.0.0.1', 6379)],
        ,
    ,

由于没有异常/错误,我完全不知道这个过程的哪一部分失败了。

    Celery 触发任务?是的 任务运行并向通道层发送消息?是的 消费者收到来自群组的消息并执行send()?是的 客户端收到 websocket 消息?没有

这是 Channels 和 Redis 之间的问题吗?是Channels和客户端的问题吗?

【问题讨论】:

不太确定,但我的猜测是,由于持有 websocket 连接的进程与您运行任务的进程(芹菜工作者)不同,因此它没有发送消息的连接跨度> 没关系,我读过一些从 celery 任务发送频道消息的案例,所以我错了 请出示完整的consumers.py 或完整的消费者类。你有没有在ws_connect 中添加频道到群组?如果不通过任务,您是否能够发送和接收消息? self.group_name 是什么?您的频道是否已添加到此群组? @spiritsree 我已经用完整的consumers.py 文件更新了这个。是的,当从常规 Django 代码执行时,组发送工作正常。只有在 Celery 任务中它才不会发送 websocket 消息。几个月来我一直在使用这个消费者和 websocket 设置,没有任何问题。只有当我尝试在 Celery 任务中使用它时,我才发现问题。 没关系。你检查过芹菜日志吗?有什么错误吗?你是如何实现并发的? eventletgeventprefork? 【参考方案1】:

事实证明,Celery 在执行任务期间在我的代码中吞下了一个异常。我需要实现更彻底的日志记录来捕获这些异常。

【讨论】:

嗨,查尔斯,我遇到了和你一样的问题。我可以在 Redis Cli 内部看到一条消息正在从 Celery 后台任务的通道层某处发送,但我的消费者没有处理该事件并将其通过套接字传递给客户端。事实上,有时套接字会在消息到达通道层时自行断开连接。你最后是怎么解决这个问题的?干杯马可 @MarcoPolo 嗨,我遇到了和你一样的问题。你是如何解决这个错误的? 嗨 gii96,对不起,我从来没有设法解决这个问题。使用带有通道的 Celery 变得非常复杂。我最终放弃了它。 嗨 gii96,对不起,我从来没有设法解决这个问题。使用带有通道的 Celery 变得非常复杂。我最终放弃了它。

以上是关于从 Celery 任务向 Channels 发送消息的主要内容,如果未能解决你的问题,请参考以下文章

Django Channels 从 Celery 任务发送组消息。 Asyncio 事件循环在所有异步任务完成之前停止

不同服务器上的 Django 和 celery,一旦任务完成,celery 能够向 django 发送回调

Django Channels 2.0 channel_layers 不通信

理论python使用celery异步处理请求

RuntimeError:永远不要在任务 Celery 中调用 result.get()

不同 docker 容器之间的 Django 通道