线程只能在 Django Channels 中启动一次

Posted

技术标签:

【中文标题】线程只能在 Django Channels 中启动一次【英文标题】:Threads can only be started once in Django Channels 【发布时间】:2020-04-04 16:32:22 【问题描述】:

我创建了一个简单的 Django Channels 消费者,它应该连接到外部源、检索数据并将其发送到客户端。所以,用户打开页面 > 消费者连接到外部服务并获取数据 > 数据被发送到 websocket。

这是我的代码:

import json
from channels.generic.websocket import WebsocketConsumer, AsyncConsumer, AsyncJsonWebsocketConsumer

from binance.client import Client
import json
from binance.websockets import BinanceSocketManager
import time
import asyncio

client = Client('', '')

trades = client.get_recent_trades(symbol='BNBBTC')
bm = BinanceSocketManager(client)
class EchoConsumer(AsyncJsonWebsocketConsumer):


    async def connect(self):
        await self.accept()
        await self.send_json('test')


        bm.start_trade_socket('BNBBTC', self.process_message)
        bm.start()


    def process_message(self, message):
        JSON1 = json.dumps(message)
        JSON2 = json.loads(JSON1)

        #define variables
        Rate = JSON2['p']
        Quantity = JSON2['q']
        Symbol = JSON2['s']
        Order = JSON2['m']

        asyncio.create_task(self.send_json(Rate))
        print(Rate)

当我打开一页时,此代码有效;但是,如果我尝试使用新帐户打开新窗口,则会引发以下错误:

File "C:\Users\User\Desktop\Heroku\github\master\myapp\consumers.py", line 54, in connect
    bm.start()
  File "C:\Users\User\lib\threading.py", line 843, in start
    raise RuntimeError("threads can only be started once")
  threads can only be started once

我是 Channels 新手,所以这是一个菜鸟问题,但我该如何解决这个问题?我想做的是:用户打开页面并获取数据,另一个用户打开页面并获取数据;有没有办法做到这一点?还是我只是误解了 Django Channels 和 websockets 的工作原理?

【问题讨论】:

【参考方案1】:

你真的需要辅助线程吗?

class EchoConsumer(AsyncJsonWebsocketConsumer):

    symbol = ''

    async def connect(self):
        self.symbol = 'BNBBTC'
        # or, more probably, retrieve the value for "symbol" from query_string
        # so the client can specify which symbol he's interested into:
        #    socket = new WebSocket("ws://.../?symbol=BNBBTC");
        await self.accept()

    def process_message(self, message):
        # PSEUDO-CODE BELOW !
        if self.symbol == message['symbol']:
            await self.send(
                'type': 'websocket.send',
                'text': json.dumps(message),
            )

为了获得额外的灵活性,您也可以接受来自客户端的所有符号列表,而不是:

//html
socket = new WebSocket("ws://.../?symbols=XXX,YYY,ZZZ");

那么(在消费者中):

class EchoConsumer(AsyncJsonWebsocketConsumer):

    symbols = []

    async def connect(self):
        # here we need to parse "?symbols=XXX,YYY,ZZZ" ...
        # the code below has been stolen from another project of mine and should be suitably adapted
        params = urllib.parse.parse_qs(self.scope.get('query_string', b'').decode('utf-8'))
        try:
            self.symbols = json.loads(params.get('symbols', ['[]'])[0])
        except:
            self.symbols = []

    def process_message(self, message):
        if message['symbol'] in self.symbols:
            ...

【讨论】:

评论不用于扩展讨论;这个对话是moved to chat。 @Mario Orlandi 我可能解决了这个问题!这是我所做的:我在我的 Django Channels 应用程序上创建了两个消费者,一个用于在打开页面时订阅用户,因此如果用户打开市场“BTCUSD”,则用户订阅了“BTCUSD”组。另一个消费者只从 Django 收集器接收数据,对于它从 Django 收集器接收到的每一笔交易,它都会读取交易的名称并将其广播到正确的组! 不错。但是,我不明白需要第二个消费者。Django 收集器可能会自己直接广播到正确的组 @MarioOrlandi 需要第二个消费者是因为数据收集器托管在另一台服务器上,因此它无法将数据直接发送到我的 Django 频道组,因为 Django 托管在不同的服务器。我打算将数据收集器移至 Django,以便我可以将其用作管理命令,在这种情况下,我会按照你说的做!【参考方案2】:

我不是 Django 开发人员,但如果我理解正确,函数 connect 被多次调用 - 并且 bm.start 引用最有可能在 bm.start_trade_socket 中创建的同一线程(或连接中的其他地方)。总之,当调用bm.start 时,会启动一个线程,当再次执行时,您会收到该错误。

【讨论】:

是的,正是你所说的;不幸的是,我对异步代码和 websockets 的了解还很少,所以我认为消费者会将数据发送给每个用户【参考方案3】:

这里 start() 启动线程的活动。

每个线程对象最多应该调用一次。您已将 BinanceSocketManager 的全局对象设为“bm”。

如果在同一个线程对象上多次调用,它总是会引发 RuntimeError。

请参考下面提到的代码,它可能会帮助你

from channels.generic.websocket import WebsocketConsumer, AsyncConsumer, AsyncJsonWebsocketConsumer

from binance.client import Client
import json
from binance.websockets import BinanceSocketManager
import time
import asyncio


class EchoConsumer(AsyncJsonWebsocketConsumer):
    client = Client('', '')

    trades = client.get_recent_trades(symbol='BNBBTC')
    bm = BinanceSocketManager(client)

    async def connect(self):
        await self.accept()
        await self.send_json('test')


        self.bm.start_trade_socket('BNBBTC', self.process_message)
        self.bm.start()


    def process_message(self, message):
        JSON1 = json.dumps(message)
        JSON2 = json.loads(JSON1)

        #define variables
        Rate = JSON2['p']
        Quantity = JSON2['q']
        Symbol = JSON2['s']
        Order = JSON2['m']

        asyncio.create_task(self.send_json(Rate))
        print(Rate)

【讨论】:

您好,谢谢您的回答!我试过这个,但我得到了同样的错误!

以上是关于线程只能在 Django Channels 中启动一次的主要内容,如果未能解决你的问题,请参考以下文章

Django-Channels:在课堂上锁定关键部分

Ubuntu + Django(DRF) + channels(websocket)+NGINX + uwsgi 环境部署

无法使用 django-channels 连接到 websocket,docker 上的 nginx 作为服务

在真实服务器中部署 django-channels 的最佳方式

Django Channels 是不是使用 ws:// 协议前缀在 Django 视图或 Channels 应用程序之间进行路由?

Django 通道 websocket 连接和断开连接(Nginx + Daphne + Django + Channels)