结合 2 个基于 asyncio 的代码段
Posted
技术标签:
【中文标题】结合 2 个基于 asyncio 的代码段【英文标题】:Combining 2 asyncio based code segments 【发布时间】:2018-04-11 21:55:40 【问题描述】:我正在使用 Autobahn asyncio 系统(用于讨论 Websocket WAMP 协议),它工作正常,我可以处理传入的 RPC 调用和 pubsub。 我的问题是,一旦 RPC 调用通过 Autobahn 部分进入,我现在必须连接 TCP 套接字并通过这些套接字发送信息。
高速公路部分可以这样正常工作:
from autobahn.asyncio.component import Component, run
from asyncio import sleep
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
@comp.on_join
async def joined(session, details):
print("Connected to websocket")
def on_message(msg):
msg = json.loads(msg)
print(msg)
def some_rpc(with_data):
print("Doing something with the data")
return json.dumps('status': 'OK')
try:
session.subscribe(on_message, u'some_pubsub_topic')
session.register(some_rpc, u'some_rpc_call')
print("RPC and Pubsub initialized")
except Exception as e:
print("could not subscribe to topic: 0".format(e))
if __name__ == "__main__":
run([comp])
但是现在我需要能够连接到多个常规 TCP 套接字:
class SocketClient(asyncio.Protocol):
def __init__(self, loop):
self.data = b''
self.loop = loop
def connection_made(self, transport):
self.transport = transport
print('connected')
def data_received(self, data):
print('Data received: !r'.format(data.decode()))
def send(self, data):
self.transport.write(data)
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
loop = asyncio.get_event_loop()
c=loop.create_connection(lambda: SocketClient(loop),
'192.168.0.219', 6773)
loop.run_until_complete(c)
loop.run_forever()
loop.close()
问题是,当我将两者结合并执行此操作时:
def some_rpc(with_data):
c.send('test')
return json.dumps('status': 'OK')
它对我吠叫并告诉我:
停止迭代
在处理上述异常的过程中,又发生了一个异常:
Traceback(最近一次调用最后一次):文件 “/usr/lib/python3.5/site-packages/autobahn/wamp/websocket.py”,行 95、在onMessage中 self._session.onMessage(msg) 文件“/usr/lib/python3.5/site-packages/autobahn/wamp/protocol.py”,行 894,在 onMessage 中 on_reply = txaio.as_future(endpoint.fn, *invoke_args, **invoke_kwargs) 文件“/usr/lib/python3.5/site-packages/txaio/aio.py”,第 400 行,在 as_future 返回 create_future_error(create_failure()) 文件“/usr/lib/python3.5/site-packages/txaio/aio.py”,第 393 行,在 create_future_error 拒绝(f,错误)文件“/usr/lib/python3.5/site-packages/txaio/aio.py”,第 462 行,在拒绝中 future.set_exception(error.value) 文件“/usr/lib64/python3.5/asyncio/futures.py”,第 365 行,在 set_exception raise TypeError("StopIteration 与生成器交互不好" TypeError: StopIteration 与生成器交互不好,不能 成长为未来
有人知道如何从 RPC 调用函数中调用发送函数吗?
【问题讨论】:
【参考方案1】:在这段代码中:
c=loop.create_connection(lambda: SocketClient(loop),
'192.168.0.219', 6773)
# [...]
def some_rpc(with_data):
c.send('test')
return json.dumps('status': 'OK')
create_connection
is a coroutine function,所以c
包含一个协程对象。这样的对象确实有一个send
方法,但它与通过网络发送东西完全无关。调用create_connection
后,您可能希望通过以下方式获取生成的传输:
transport, ignore = loop.run_until_complete(c)
然后使用transport.write()
,而不是c.send()
。
【讨论】:
以上是关于结合 2 个基于 asyncio 的代码段的主要内容,如果未能解决你的问题,请参考以下文章