Django Channels 2.0 channel_layers 不通信
Posted
技术标签:
【中文标题】Django Channels 2.0 channel_layers 不通信【英文标题】:Django Channels 2.0 channel_layers not communicating 【发布时间】:2018-02-02 20:16:46 【问题描述】:所以我一直在迁移使用 Django Channels 1.x -> 2.x+ 的服务器
最初的设计会使用getAFTreeTask.delay(message.reply_channel.name)
向 celery 发送一个任务,并且通过访问channel_name
它可以很可能异步回复
from celery import task
from channels import Channel
@task
def getAFTreeTask(channel_name):
tree = Request().cache_af_tree()
Channel(channel_name).send(
"text": json.dumps(
"channel": "AF_INIT",
"payload": tree
)
)
现在,出于各种原因,我已将服务器迁移到 Channels 2.x+。根据文档
class Consumer(JsonWebsocketConsumer):
def connect(self):
print("Client Connected: ", self.channel_name)
self.accept()
def receive_json(self, content, **kwargs):
print(content)
parse_request(self.channel_name, content)
def disconnect(self, content):
print(content)
def chat_message(self, event):
print("Entered reply channel")
print(event)
如果我使用正确的channel_name,这样的消费者应该通过通道层接收请求,现在如果响应可以访问self.send_json()
或self.send()
,则消费者可以正确地作为发送-接收websocket通用消费者,所以我假设我的所有设置都是正确的,我的问题是当我尝试使用通道层发送一些东西时,像这样(根据https://channels.readthedocs.io/en/latest/topics/channel_layers.html#single-channels)
from channels.layers import get_channel_layer
from asgiref.sync import AsyncToSync
def parse_request(channel_name, content):
print("parsed ", channel_name, content)
channel_layer = get_channel_layer()
AsyncToSync(channel_layer.send)(channel_name,
"type": "chat.message",
"text": "Hello there!",
)
我明白了
编辑(完整堆栈跟踪):
2018-02-02 18:28:35,984 ERROR Exception inside application: There is no current event loop in thread 'Thread-3'.
File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/consumer.py", line 51, in __call__
await await_many_dispatch([receive, self.channel_receive], self.dispatch)
File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/utils.py", line 48, in await_many_dispatch
await dispatch(result)
File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 81, in inner
return await async_func(*args, **kwargs)
File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 65, in __call__
return await asyncio.wait_for(future, timeout=None)
File "/usr/lib/python3.5/asyncio/tasks.py", line 373, in wait_for
return (yield from fut)
File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/concurrent/futures/thread.py", line 55, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 74, in thread_handler
raise e
File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 72, in thread_handler
self.func(*args, **kwargs)
File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/consumer.py", line 93, in dispatch
handler(message)
File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/generic/websocket.py", line 40, in websocket_receive
self.receive(text_data=message["text"])
File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/generic/websocket.py", line 104, in receive
self.receive_json(self.decode_json(text_data), **kwargs)
File "./MYAPP/API/consumers.py", line 13, in receive_json
parse_api_request(self.channel_name, content)
File "./MYAPP/API/api_request.py", line 16, in parse_api_request
AsyncToSync(channel_layer.send)(channel_name,
File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 17, in __init__
self.main_event_loop = asyncio.get_event_loop()
File "/usr/lib/python3.5/asyncio/events.py", line 632, in get_event_loop
return get_event_loop_policy().get_event_loop()
File "/usr/lib/python3.5/asyncio/events.py", line 578, in get_event_loop
% threading.current_thread().name)
There is no current event loop in thread 'Thread-3'.
如果我不使用 AsyncToSync
我会得到(根据文档我不应该这样做,只是为了检查)
2018-02-02 18:34:27,965 WARNING ./MYAPP/API/api_request.py:18: builtins.RuntimeWarning: coroutine 'RedisChannelLayer.send' was never awaited
我不明白,因为我完全按照指南进行操作,我还尝试从 celery 任务(一个单独的线程)回复,但没有收到相同的错误,但没有任何反应,celery 日志只是说任务完成,但我没有得到回复。
另外,尝试直接通过发送响应
AsyncToSync(channel_layer.send)(channel_name,
"type": "websocket.send",
"text": "Hello there!",
)
从线程内部和线程外得到相同的非结果......
有没有人能够通过Consumers
对象之外的channel_layers 进行发送。
仅供参考我的 settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'channels',
'myapp',
]
ASGI_APPLICATION = "myapp.routing.application"
CHANNEL_LAYERS =
"default":
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG":
"hosts": [("localhost", 6379)],
,
,
【问题讨论】:
您能否发布完整的回溯? @jpic 我添加了完整的堆栈跟踪 【参考方案1】:在来自 Andres Godwin 的 reply 之后:
我发现这是asgiref
所以我的工作实现,任何有兴趣的人:
consumers.py
from channels.consumer import AsyncConsumer
class My_Consumer(AsyncConsumer):
async def websocket_connect(self, event):
print("Connected")
print(event)
print(self.channel_name)
await self.send(
"type": "websocket.accept",
)
async def websocket_receive(self, event):
print("Received")
print(event)
parse_api_request(self.channel_name, json.loads(event['text']))
async def celery_message(self, event):
print("Service Received")
print(event)
await self.send(
"type": "websocket.send",
"text": event["text"],
)
task.py
from channels.layers import get_channel_layer
from asgiref.sync import AsyncToSync
def async_send(channel_name, text):
channel_layer = get_channel_layer()
AsyncToSync(channel_layer.send)(
channel_name,
"type": "celery.message",
"text": json.dumps(text)
)
def getAFTree(channel_name, message):
getAFTreeTask.delay(channel_name, message)
@task
def getAFTreeTask(channel_name, message):
tree = Request().cache_af_tree()
async_send(channel_name,
"channel": "AF_INIT",
"payload": tree
)
【讨论】:
以上是关于Django Channels 2.0 channel_layers 不通信的主要内容,如果未能解决你的问题,请参考以下文章
列出连接到 django-channels 组的用户(频道 1.x)