线程只能在 Django Channels 中启动一次
Posted
技术标签:
【中文标题】线程只能在 Django Channels 中启动一次【英文标题】:Threads can only be started once in Django Channels 【发布时间】:2020-04-04 16:32:22 【问题描述】:我创建了一个简单的 Django Channels 消费者,它应该连接到外部源、检索数据并将其发送到客户端。所以,用户打开页面 > 消费者连接到外部服务并获取数据 > 数据被发送到 websocket。
这是我的代码:
import json
from channels.generic.websocket import WebsocketConsumer, AsyncConsumer, AsyncJsonWebsocketConsumer
from binance.client import Client
import json
from binance.websockets import BinanceSocketManager
import time
import asyncio
client = Client('', '')
trades = client.get_recent_trades(symbol='BNBBTC')
bm = BinanceSocketManager(client)
class EchoConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
await self.send_json('test')
bm.start_trade_socket('BNBBTC', self.process_message)
bm.start()
def process_message(self, message):
JSON1 = json.dumps(message)
JSON2 = json.loads(JSON1)
#define variables
Rate = JSON2['p']
Quantity = JSON2['q']
Symbol = JSON2['s']
Order = JSON2['m']
asyncio.create_task(self.send_json(Rate))
print(Rate)
当我打开一页时,此代码有效;但是,如果我尝试使用新帐户打开新窗口,则会引发以下错误:
File "C:\Users\User\Desktop\Heroku\github\master\myapp\consumers.py", line 54, in connect
bm.start()
File "C:\Users\User\lib\threading.py", line 843, in start
raise RuntimeError("threads can only be started once")
threads can only be started once
我是 Channels 新手,所以这是一个菜鸟问题,但我该如何解决这个问题?我想做的是:用户打开页面并获取数据,另一个用户打开页面并获取数据;有没有办法做到这一点?还是我只是误解了 Django Channels 和 websockets 的工作原理?
【问题讨论】:
【参考方案1】:你真的需要辅助线程吗?
class EchoConsumer(AsyncJsonWebsocketConsumer):
symbol = ''
async def connect(self):
self.symbol = 'BNBBTC'
# or, more probably, retrieve the value for "symbol" from query_string
# so the client can specify which symbol he's interested into:
# socket = new WebSocket("ws://.../?symbol=BNBBTC");
await self.accept()
def process_message(self, message):
# PSEUDO-CODE BELOW !
if self.symbol == message['symbol']:
await self.send(
'type': 'websocket.send',
'text': json.dumps(message),
)
为了获得额外的灵活性,您也可以接受来自客户端的所有符号列表,而不是:
//html
socket = new WebSocket("ws://.../?symbols=XXX,YYY,ZZZ");
那么(在消费者中):
class EchoConsumer(AsyncJsonWebsocketConsumer):
symbols = []
async def connect(self):
# here we need to parse "?symbols=XXX,YYY,ZZZ" ...
# the code below has been stolen from another project of mine and should be suitably adapted
params = urllib.parse.parse_qs(self.scope.get('query_string', b'').decode('utf-8'))
try:
self.symbols = json.loads(params.get('symbols', ['[]'])[0])
except:
self.symbols = []
def process_message(self, message):
if message['symbol'] in self.symbols:
...
【讨论】:
评论不用于扩展讨论;这个对话是moved to chat。 @Mario Orlandi 我可能解决了这个问题!这是我所做的:我在我的 Django Channels 应用程序上创建了两个消费者,一个用于在打开页面时订阅用户,因此如果用户打开市场“BTCUSD”,则用户订阅了“BTCUSD”组。另一个消费者只从 Django 收集器接收数据,对于它从 Django 收集器接收到的每一笔交易,它都会读取交易的名称并将其广播到正确的组! 不错。但是,我不明白需要第二个消费者。Django 收集器可能会自己直接广播到正确的组 @MarioOrlandi 需要第二个消费者是因为数据收集器托管在另一台服务器上,因此它无法将数据直接发送到我的 Django 频道组,因为 Django 托管在不同的服务器。我打算将数据收集器移至 Django,以便我可以将其用作管理命令,在这种情况下,我会按照你说的做!【参考方案2】:我不是 Django 开发人员,但如果我理解正确,函数 connect
被多次调用 - 并且 bm.start
引用最有可能在 bm.start_trade_socket
中创建的同一线程(或连接中的其他地方)。总之,当调用bm.start
时,会启动一个线程,当再次执行时,您会收到该错误。
【讨论】:
是的,正是你所说的;不幸的是,我对异步代码和 websockets 的了解还很少,所以我认为消费者会将数据发送给每个用户【参考方案3】:这里 start() 启动线程的活动。
每个线程对象最多应该调用一次。您已将 BinanceSocketManager 的全局对象设为“bm”。
如果在同一个线程对象上多次调用,它总是会引发 RuntimeError。
请参考下面提到的代码,它可能会帮助你
from channels.generic.websocket import WebsocketConsumer, AsyncConsumer, AsyncJsonWebsocketConsumer
from binance.client import Client
import json
from binance.websockets import BinanceSocketManager
import time
import asyncio
class EchoConsumer(AsyncJsonWebsocketConsumer):
client = Client('', '')
trades = client.get_recent_trades(symbol='BNBBTC')
bm = BinanceSocketManager(client)
async def connect(self):
await self.accept()
await self.send_json('test')
self.bm.start_trade_socket('BNBBTC', self.process_message)
self.bm.start()
def process_message(self, message):
JSON1 = json.dumps(message)
JSON2 = json.loads(JSON1)
#define variables
Rate = JSON2['p']
Quantity = JSON2['q']
Symbol = JSON2['s']
Order = JSON2['m']
asyncio.create_task(self.send_json(Rate))
print(Rate)
【讨论】:
您好,谢谢您的回答!我试过这个,但我得到了同样的错误!以上是关于线程只能在 Django Channels 中启动一次的主要内容,如果未能解决你的问题,请参考以下文章
Ubuntu + Django(DRF) + channels(websocket)+NGINX + uwsgi 环境部署
无法使用 django-channels 连接到 websocket,docker 上的 nginx 作为服务
在真实服务器中部署 django-channels 的最佳方式
Django Channels 是不是使用 ws:// 协议前缀在 Django 视图或 Channels 应用程序之间进行路由?
Django 通道 websocket 连接和断开连接(Nginx + Daphne + Django + Channels)