Django Channels 从 Celery 任务发送组消息。 Asyncio 事件循环在所有异步任务完成之前停止

Posted

技术标签:

【中文标题】Django Channels 从 Celery 任务发送组消息。 Asyncio 事件循环在所有异步任务完成之前停止【英文标题】:Django Channels send group message from Celery task. Asyncio event loop stopping before all async tasks finished 【发布时间】:2020-02-01 14:15:23 【问题描述】:

我目前遇到一个特别棘手的问题,我会尽力解释它。

我有一个 Django 项目,它的主要目的是快速执行数据库中的排队任务。我使用 Celery 和 Celerybeat 通过 Django 通道来实现这一点,以实时更新我的​​模板和响应。

Celery worker 是一个 gevent 工作池,具有相当数量的线程。

我的任务(简化版):

@shared_task
def exec_task(action_id):
  # execute the action
  action = Action.objects.get(pk=action_id)
  response = post_request(action)

  # update action status
  if response.status_code == 200:
    action.status = 'completed'

  else:
    action.status = 'failed'

  # save the action to the DB
  action.save()

  channel_layer = get_channel_layer()
  status_data = 'id': action.id, 'status': action.status
  status_data = json.dumps(status_data)
  try:
    async_to_sync(channel_layer.group_send)('channel_group', 'type': 'propergate_status', 'data': status_data)
  except:
    event_loop = asyncio.get_running_loop()
    future = asyncio.run_coroutine_threadsafe(channel_layer.group_send('channel_group', 'type': 'propergate_status', 'data': status_data), event_loop)
    result = future.result()

我的错误:

[2019-10-03 18:47:59,990: WARNING/MainProcess] 动作排队:25

[2019-10-03 18:48:02,206:警告/MainProcess] c:\users\jack\documents\github\mcr-admin\venv\lib\site-packages\gevent_socket3.py:123: RuntimeWarning:从未等待协程“AsyncToSync.main_wrap” self._read_event = io_class(fileno, 1)

RuntimeWarning: E​​nable tracemalloc 获取对象分配回溯

[2019-10-03 18:48:02,212:警告/MainProcess] c:\users\jack\documents\github\mcr-admin\venv\lib\site-packages\gevent_socket3.py:123: RuntimeWarning:协程“BaseEventLoop.shutdown_asyncgens”从来没有 等待 self._read_event = io_class(fileno, 1) RuntimeWarning:

最初在我将操作保存到我刚刚调用的数据库之后:

async_to_sync(channel_layer.group_send)('channel_group', 'type': 'propergate_status', 'data': status_data)

但我一直收到运行时错误,因为如果已经有一个 asyncio 事件循环已经在运行,您就不能使用 async_to_sync,as shown here at line 61。所以我有多个 gevent 线程试图 async_to_sync 非常靠近,不断地在链接中抛出错误。

这让我找到了this wonderful answer 和当前版本的 exec_task,它在向 Django Channels 组发送消息时的成功率为 98%,但我真的需要它是 100%。

这里的问题是,在我添加的协程有机会完成之前,偶尔会停止 asyncio 事件循环,并且我一直在调整我的代码,使用 asyncio 和事件循环 api,但我要么破坏我的代码,要么得到更差的结果。我感觉这可能与 Asgiref async_to_sync 函数提前关闭循环有关,但这很复杂,我几天前才开始使用 python async。

欢迎任何反馈、cmets、提示或修复!

干杯。

【问题讨论】:

您应该使用except RunetimeError:,这样您就不会意外忽略可能遇到的其他错误。 感谢我在except RuntimeError: 中添加,遗憾的是日志没有变化。 我不认为它会解决你的问题,但它至少可以让你免于未来的调试噩梦。 【参考方案1】:

最后我无法解决问题并选择使用 Channels AsyncHttpConsumer 发送群组消息的替代解决方案。它不是最佳的,但它可以工作并将工作流保留在 Channels 库中。

消费者:

class celeryMessageConsumer(AsyncHttpConsumer):

async def handle(self, body):
    # send response
    await self.send_response(200, b"Recieved Loud and Clear", headers=[
        (b"Content-Type", b"text/plain"),
    ])
    # formating url encoded string into json
    body_data = urllib.parse.unquote_plus(body.decode("utf-8"))
    body_data = json.loads(body_data)
    id = body_data['data']['id']

    await self.channel_layer.group_send(
        f"group_id",
        
            'type': 'propergate.data',
            'data': body_data['data']
        
    )

路由:

application = ProtocolTypeRouter(
    'websocket': AuthMiddlewareStack(
        URLRouter(
            myApp.routing.websocket_urlpatterns
        )
    ),
    'http': URLRouter([
        path("celeryToTemplate/", consumers.celeryMessageConsumer),
        re_path('genericMyAppPath/.*', AsgiHandler),
    ]),
)

Http 请求:

data = json.dumps('id': id, 'status': status)
response = internal_post_request('http://genericAddress/celeryToTemplate/', data)
if response.status_code == 200:
    # phew
    pass
else:
    # whoops
    pass

请求:

def internal_post_request(request_url, payload):
    headers=
        'Content-Type': 'application/json'
    
    response = requests.post(request_url, data=payload, headers=headers)
    return response

【讨论】:

【参考方案2】:

您好,我目前遇到了您的确切问题,即能够从已完成的 celery 任务向客户端发送消息至关重要。

我之前可以通过使用信号到模型方法来分组发送,例如:

def SyncLogger(**kwargs):
    """ a syncronous function to instigate the websocket layer
    to send messages to all clients in the project """

instance = kwargs.get('instance')
# print('instance '.format(instance))

args = eval(instance.args)
channel_layer = channels.layers.get_channel_layer()
async_to_sync(channel_layer.group_send)(
    args ['room'],
    
        "type": "chat.message",
        "operation": args['operation'],
        "state": instance.state,
        "task": instance.task
    )

和信号

post_save.connect(SyncLogger, TaskProgress)

更新 只要有event_loop,我就可以发送消息 无论消费者是否异步,这都有效

@shared_task()
def test_message():
   channel_layer = get_channel_layer()

   loop = asyncio.new_event_loop()
   asyncio.set_event_loop(loop)

   loop.run_until_complete(channel_layer.group_send('sync_chat', 
       'type': 'chat.message',
       'operation': 'operation',
       'state': 'state',
       'task': 'task'
   ))

【讨论】:

嗨,最后我无法解决问题并使用了替代解决方案。我使用 Channels AsyncHttpConsumer 发送群组消息。我将发布我的替代解决方案。 可以同时使用同步和异步消费者,只要有一个事件循环并调用它的run_until_complete方法 为更新干杯我很高兴你得到它的工作。可悲的是,我已经尝试过创建/操作 asyncio 循环。我确实尝试了您更新的解决方案,但遇到了预期的问题,主要是我的工作池 Gevent。尽管希望您的解决方案对其他人有所帮助。 抱歉,为了更好地了解您的问题是什么?你能分享错误信息吗?这是我第一次使用 asyncio 我不会重新创建错误,因为这需要时间并且意味着修改我的代码。我正在运行一个具有 10 个绿色线程并发的 Celery gevent 工作者。我有 10 个 greenlets 在一个线程中运行。当我在一个greenlet 中创建一个Asyncio 事件循环时,它是为整个线程创建的,所有10 个greenlet 都可以访问同一个Asyncio 事件循环。通常我有大量的任务计划同时处理。当多个 greenlet 同时调用 AsyncToSync 时,会发生错误here at line 61。

以上是关于Django Channels 从 Celery 任务发送组消息。 Asyncio 事件循环在所有异步任务完成之前停止的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Heroku 上使用 Channels 和 Celery 部署 Django?

通过 celery 向 django-channels 发送消息

不同 docker 容器之间的 Django 通道

Django使用Channels实现WebSocket--下篇

Django - Celery Worker - 频道

Django Channels 2.0 channel_layers 不通信