使用 Web 套接字的 Tornado 单元测试 - 堆栈上下文呢?
Posted
技术标签:
【中文标题】使用 Web 套接字的 Tornado 单元测试 - 堆栈上下文呢?【英文标题】:Tornado unit test with web sockets - what about stack context? 【发布时间】:2013-01-08 13:13:26 【问题描述】:我使用 Tornado 服务器已经有一段时间了,我不得不说我喜欢它。 我有一个处理 Web 套接字和 http 请求的龙卷风服务器(在 python3.2 上运行)。 我想做的是用 ws2py(它实现了一个 ws 客户端以与 tornado IOLoop 一起使用)编写一些单元测试(使用 web 套接字)。我看到 tornado 有一个 AsyncTestCase 类,它看起来很有趣,尤其是在与 doc 中所述的 AsyncHTTPClient 一起使用时:
class MyTestCase2(AsyncTestCase):
def test_http_fetch(self):
client = AsyncHTTPClient(self.io_loop)
client.fetch("http://www.tornadoweb.org/", self.stop)
response = self.wait()
# Test contents of response
self.assertIn("FriendFeed", response.body)
我想将 AsyncTestCase 与 Web 套接字一起使用, 客户端没问题,我可以毫无问题地收发消息。
我想我要做的是将self.stop
作为回调传递给客户端,以便通过调用wait()
来检索收到的消息,就像上面的示例一样。但不知何故,这不起作用,这就是我所拥有的:
class SQLRequests(AsyncTestCase):
"""Tests sql requests"""
def test_sql_req_1(self):
"""first test function"""
client = AsyncWSClient(httpBaseUrl, self.io_loop)
client.sendMessage('some_message', self.stop)
response = self.wait()
if response.data:
print('got %s' % str(response.data))
# some test
self.assertTrue(True)
if __name__ == '__main__':
unittest.main()
这是我的网络套接字客户端:
class AsyncWSClient(TornadoWebSocketClient):
"""
Asynchronous web socket client based on ws4py's tornadoclient
Sends a message and calls callback with received message
"""
def __init__(self, url, ioLoop=None, **kwargs):
TornadoWebSocketClient.__init__(self, url, io_loop=ioLoop, **kwargs)
self._callback = None
self._message = None
def opened(self):
"""called when web socket opened"""
self.send(self._message, binary=False)
def received_message(self, message):
"""Received a message"""
self.close()
if self._callback:
self._callback(message)
def sendMessage(self, message, callback):
"""Connects and sends message when connected"""
self._message = message
self._callback = callback
self.connect()
我确实收到了一个ws2py.messaging.TextMessage
对象作为响应,但它的数据字段是None
,尽管客户端已经收到了一些数据。如果我查看 AsyncTestCase,在它调用回调之前,该对象中有一些数据,当它作为 wait() 的返回值传递时,这些数据会以某种方式消失。
我看到龙卷风中有一个神秘的东西叫做 stack_context,这和我的问题有关吗?
【问题讨论】:
【参考方案1】:问题是消息“数据”包含在message
的data
属性中。调用 received_message
后,message.data
将重置为 None (https://github.com/Lawouach/WebSocket-for-Python/blob/master/ws4py/websocket.py#L369)。
所以,只需通过 message.data
而不是完整的 message
调用您的回调。像这样:
def received_message(self, message):
"""Received a message"""
self.close()
if self._callback:
self._callback(message.data)
【讨论】:
它有效,非常感谢!!我不记得我到底在做什么,但它会打印出消息。以上是关于使用 Web 套接字的 Tornado 单元测试 - 堆栈上下文呢?的主要内容,如果未能解决你的问题,请参考以下文章
Tornado websockets:在进程之间共享打开的 web sockets
web 部署专题:压力测试压力测试实例 flask 四种wsgi方式对比(tornado,Gunicorn,Twisted,Gevent)