python 将websockets与龙卷风结合起来,龙卷风用于控制
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 将websockets与龙卷风结合起来,龙卷风用于控制相关的知识,希望对你有一定的参考价值。
#!/usr/bin/env python3
import asyncio
import collections
import time
import json
import pendulum
import redis
import tornado.web
import websockets
class Handler(tornado.web.RequestHandler):
websockets = collections.defaultdict(list)
redis_cli = redis.StrictRedis()
def set_default_headers(self):
self.set_header("Content-Type", "text/plain; charset=UTF-8")
def write_json(self, obj, default=str):
self.set_header("Content-Type", "application/json; charset=UTF-8")
self.write(json.dumps(obj, default=default, ensure_ascii=False,
separators=(",", ": "), indent=4))
@classmethod
async def run_ws(cls, uri):
async with websockets.connect(uri) as ws:
print(uri, ws, pendulum.now())
cls.websockets[uri].append(ws)
try:
async for o in ws:
cls.redis_cli.publish(uri, o)
finally:
cls.websockets[uri].remove(ws)
print(uri, "closed", pendulum.now())
class Main(Handler):
def get(self):
self.write_json(self.websockets, default=str)
class Control(Handler):
def get(self, uri):
self.write_json(self.websockets[uri], default=str)
def post(self, uri):
asyncio.get_event_loop().create_task(self.run_ws(uri))
async def put(self, uri):
msg = self.get_argument("msg")
idx = int(self.get_argument("idx", 0))
if self.get_argument("binary", False):
msg = msg.encode()
await self.websockets[uri][idx].send(msg)
async def delete(self, uri):
idx = int(self.get_argument("idx", 0))
await self.websockets[uri][idx].close()
app = tornado.web.Application([
(r"/", Main),
(r"/(.+)", Control),
])
def listen(addr, port):
from tornado.log import enable_pretty_logging
enable_pretty_logging()
app.listen(port, addr, xheaders=True)
if __name__ == '__main__':
#import uvloop
#uvloop.install()
from tornado.options import define, parse_command_line, options
define("port", default=1111)
define("addr", default="")
parse_command_line()
listen(options.addr, options.port)
asyncio.get_event_loop().run_forever()
以上是关于python 将websockets与龙卷风结合起来,龙卷风用于控制的主要内容,如果未能解决你的问题,请参考以下文章
Tornado WebSocket 与 Django ORM 与共享会话
如何从本地机器公开龙卷风 websocket
python的Websocket库
龙卷风 websocket 服务器 - 连接队列
如何响应龙卷风websocket中的服务器ping
HTTP 服务器外 Python3 的 Websocket 服务器