使用 Python3 在 Pyramid 中使用 Websocket

Posted

技术标签:

【中文标题】使用 Python3 在 Pyramid 中使用 Websocket【英文标题】:Using Websocket in Pyramid using Python3 【发布时间】:2013-05-18 15:55:28 【问题描述】:

有没有办法使用 Python 3 在 Pyramid 中使用 Websockets。 当服务器上发生数据更改时,我想将其用于实时更新表。

我已经考虑过使用长轮询,但我认为这不是最好的方法。

有什么想法或想法吗?

【问题讨论】:

不确定 Python 3 中的 websocket(gevent-socketio 依赖于 gevent,我不确定 Python 3 是否支持)。但是您是否考虑过服务器发送事件?示例:github.com/antoineleclair/zmq-sse-chat/blob/master/sse/views.py 【参考方案1】:

https://github.com/housleyjk/aiopyramid 为我工作。请参阅 websocket 的文档http://aiopyramid.readthedocs.io/features.html#websockets

UPD:

具有金字塔环境的 WebSocket 服务器。

import aiohttp
import asyncio
from aiohttp import web
from webtest import TestApp
from pyramid.config import Configurator
from pyramid.response import Response

async def websocket_handler(request):

    ws = web.WebSocketResponse()
    await ws.prepare(request)

    while not ws.closed:
        msg = await ws.receive()

        if msg.tp == aiohttp.MsgType.text:
            if msg.data == 'close':
                await ws.close()
            else:
                hello = TestApp(request.app.pyramid).get('/')
                ws.send_str(hello.text)
        elif msg.tp == aiohttp.MsgType.close:
            print('websocket connection closed')
        elif msg.tp == aiohttp.MsgType.error:
            print('ws connection closed with exception %s' %
                  ws.exception())
        else:
            ws.send_str('Hi')

    return ws


def hello(request):
    return Response('Hello world!')

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/name', websocket_handler)
    config = Configurator()
    config.add_route('hello_world', '/')
    config.add_view(hello, route_name='hello_world')
    app.pyramid = config.make_wsgi_app()

    srv = await loop.create_server(app.make_handler(),
                                   '127.0.0.1', 8080)
    print("Server started at http://127.0.0.1:8080")
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

WebSocket 客户端:

import asyncio
import aiohttp

session = aiohttp.ClientSession()


async def client():
    ws = await session.ws_connect('http://0.0.0.0:8080/foo')

    while True:
        ws.send_str('Hi')
        await asyncio.sleep(2)
        msg = await ws.receive()

        if msg.tp == aiohttp.MsgType.text:
            print('MSG: ', msg)
            if msg.data == 'close':
                await ws.close()
                break
            else:
                ws.send_str(msg.data + '/client')
        elif msg.tp == aiohttp.MsgType.closed:
            break
        elif msg.tp == aiohttp.MsgType.error:
            break

loop = asyncio.get_event_loop()
loop.run_until_complete(client())
loop.close()

【讨论】:

你现在使用 aiopyramid 吗?稳定吗? @Infernion 工作正常,但是在一个新项目中,我使用 aiohttp 进行 websocket 你是否只使用裸 aiohttp 而没有金字塔?

以上是关于使用 Python3 在 Pyramid 中使用 Websocket的主要内容,如果未能解决你的问题,请参考以下文章

使用 React 和 Pyramid(微服务架构)进行身份验证,如何存储“会话数据”?

使用 Pyramid 压缩所有 HTTP 流量

使用 Pyramid 模拟渲染以响应

偶尔禁用 Pyramid 中间件

在 Pyramid 中存储和验证用于登录的加密密码

Pyramid AuthTktAuthenticationPolicy 回调从未调用过