结合 2 个基于 asyncio 的代码段

Posted

技术标签:

【中文标题】结合 2 个基于 asyncio 的代码段【英文标题】:Combining 2 asyncio based code segments 【发布时间】:2018-04-11 21:55:40 【问题描述】:

我正在使用 Autobahn asyncio 系统(用于讨论 Websocket WAMP 协议),它工作正常,我可以处理传入的 RPC 调用和 pubsub。 我的问题是,一旦 RPC 调用通过 Autobahn 部分进入,我现在必须连接 TCP 套接字并通过这些套接字发送信息。

高速公路部分可以这样正常工作:

from autobahn.asyncio.component import Component, run
from asyncio import sleep
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

@comp.on_join
async def joined(session, details):
    print("Connected to websocket")

    def on_message(msg):
        msg = json.loads(msg)
        print(msg)

    def some_rpc(with_data):
        print("Doing something with the data")
        return json.dumps('status': 'OK')

    try:
        session.subscribe(on_message, u'some_pubsub_topic')
        session.register(some_rpc, u'some_rpc_call')
        print("RPC and Pubsub initialized")

    except Exception as e:
        print("could not subscribe to topic: 0".format(e))

if __name__ == "__main__":
     run([comp])

但是现在我需要能够连接到多个常规 TCP 套接字:

class SocketClient(asyncio.Protocol):
    def __init__(self, loop):
        self.data = b''
        self.loop = loop

    def connection_made(self, transport):
        self.transport = transport
        print('connected')

    def data_received(self, data):
        print('Data received: !r'.format(data.decode()))

    def send(self, data):
        self.transport.write(data)

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()

c=loop.create_connection(lambda: SocketClient(loop),
                              '192.168.0.219', 6773)
loop.run_until_complete(c)
loop.run_forever()
loop.close()

问题是,当我将两者结合并执行此操作时:

def some_rpc(with_data):
    c.send('test')
    return json.dumps('status': 'OK')

它对我吠叫并告诉我:

停止迭代

在处理上述异常的过程中,又发生了一个异常:

Traceback(最近一次调用最后一次):文件 “/usr/lib/python3.5/site-packages/autobahn/wamp/websocket.py”,行 95、在onMessage中 self._session.onMessage(msg) 文件“/usr/lib/python3.5/site-packages/autobahn/wamp/protocol.py”,行 894,在 onMessage 中 on_reply = txaio.as_future(endpoint.fn, *invoke_args, **invoke_kwargs) 文件“/usr/lib/python3.5/site-packages/txaio/aio.py”,第 400 行,在 as_future 返回 create_future_error(create_failure()) 文件“/usr/lib/python3.5/site-packages/txaio/aio.py”,第 393 行,在 create_future_error 拒绝(f,错误)文件“/usr/lib/python3.5/site-packages/txaio/aio.py”,第 462 行,在拒绝中 future.set_exception(error.value) 文件“/usr/lib64/python3.5/asyncio/futures.py”,第 365 行,在 set_exception raise TypeError("StopIteration 与生成器交互不好" TypeError: StopIteration 与生成器交互不好,不能 成长为未来

有人知道如何从 RPC 调用函数中调用发送函数吗?

【问题讨论】:

【参考方案1】:

在这段代码中:

c=loop.create_connection(lambda: SocketClient(loop),
                              '192.168.0.219', 6773)
# [...]
def some_rpc(with_data):
    c.send('test')
    return json.dumps('status': 'OK')

create_connectionis a coroutine function,所以c包含一个协程对象。这样的对象确实有一个send 方法,但它与通过网络发送东西完全无关。调用create_connection 后,您可能希望通过以下方式获取生成的传输:

transport, ignore = loop.run_until_complete(c)

然后使用transport.write(),而不是c.send()

【讨论】:

以上是关于结合 2 个基于 asyncio 的代码段的主要内容,如果未能解决你的问题,请参考以下文章

基于51单片机的电子秒表

基于51单片机的电子秒表

一段时间后,Discord Client 和 Asyncio 多个任务变得疯狂

前端福利!10个短小却超实用的JavaScript 代码段

asyncio.create_task() 做啥?

使用 asyncio 协程并行运行函数?