高速公路 Python RPC 与 Pytest
Posted
技术标签:
【中文标题】高速公路 Python RPC 与 Pytest【英文标题】:Autobahn Python RPC with Pytest 【发布时间】:2017-05-10 20:13:55 【问题描述】:所以在应用程序中我必须测试 RPC 调用(有一个使用 Autobahn Python 制作的网络服务器)。我想对这些进行集成测试。所以我正在使用Pytest。我这样做的方式是这样的: 1)我有一个这样的测试类:
@pytest.mark.incremental
class TestCreateUser:
@pytest.fixture(scope="function", params="Json Test data")
def create_user_data(self, request):
return request.param
def test_valid_user_creation(self, create_user_data):
ws_realm = config["websocket"]["realm"]
runner = ApplicationRunner(
url="%s://%s:%s/ws" % (
config["websocket"]["protocol"], config["websocket"]["host"], config["websocket"]["port"]),
realm=ws_realm,
extra="method": u"appname.create_user", "email": create_user_data["username"], create_user_data["password"]
)
runner.run(MyComponent)
然后我有一个名为“MyComponent”的组件:
def __init__(self, config=None):
super().__init__(config)
@asyncio.coroutine
def onJoin(self, details):
try:
result = yield from self.call(self.config.extra["method"], self.config.extra["email"], self.config.extra["password"])
except Exception as e:
print("Error: ".format(e))
else:
print(result)
self.leave()
def onDisconnect(self):
asyncio.get_event_loop().stop()
所以我有一些问题,首先我无法在测试类中获得 RPC 调用的结果,其次当我多次运行“test_valid_user_creation”(使用夹具系统)时,我遇到了这样的错误:
./../.pyenv/versions/novalibrary/lib/python3.4/site-packages/autobahn/asyncio/wamp.py:167: in run
(transport, protocol) = loop.run_until_complete(coro)
../../.pyenv/versions/3.4.3/lib/python3.4/asyncio/base_events.py:293: in run_until_complete
self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=True debug=False>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
../../.pyenv/versions/3.4.3/lib/python3.4/asyncio/base_events.py:265: RuntimeError
所以我的问题是我如何检索 RPC 调用的值,以及当使用夹具多次调用测试时,我如何无法获得“事件循环已关闭”。
谢谢。
【问题讨论】:
您是否检查过您的def onDisconnect(self):
在每次运行时都被调用了?您必须关闭loop
并等到终止,然后再开始新的。关于您的result
,将其设为member
或class MyComponent
或复制到global var
。
onDisconnect() 每次都会被调用;但我如何等待关闭循环。因为当我调用 asyncio.get_event_loop().close() 时,我遇到了异常。当我调用 asyncio.get_event_loop().stop() 然后调用 asyncio.get_event_loop().is_running() 它返回 true。
我的错,我错了。我的意思是 loop.stop()
然后 wait 然后 loop.close()
。阅读有关 wait 的答案:***.com/a/43810272/7414759
【参考方案1】:
所以基本上,为了返回值,我创建了另一个模块“shared_result”,并在其中放置了一个全局变量,以将组件中的值返回到测试类中。至于我得到的“事件循环已关闭”异常,我在每次调用跑步者之前都会这样做:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
这是我找到解决方案的地方: https://github.com/aio-libs/aiohttp/issues/441
如果有人可以对此发表评论并解释为什么需要这样做并给我一个更好的解决方案,那就太好了。
【讨论】:
以上是关于高速公路 Python RPC 与 Pytest的主要内容,如果未能解决你的问题,请参考以下文章