如何在不中断流的情况下安全地从asyncio读取ReaderStream

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在不中断流的情况下安全地从asyncio读取ReaderStream相关的知识,希望对你有一定的参考价值。

我正在尝试构建一个man-in-middle代理服务器,将客户端请求中继到我的应用程序中预定义的各种代理服务。因此,为了做到这一点,我需要区分客户端的服务请求。 只要我没有取消注释# data = await client_reader.read(2048)我可以解析标题,下面的代码就可以工作。即如果我执行该代码, 假如我使用未注释的代码行执行此操作:

r = requests.get('http://api.ipify.org/', headers = {'Proxy-Type':'custom'}, proxies={'http':'http://127.0.0.1:9911'}) 

我会从408 Request Time-out的ipify获得一个r.content

async def proxy_data(reader, writer, connection_string):

  try:
    while True:
      data = await reader.read(2048)
      if not data:
        break
      writer.write(data)
      await writer.drain()
  except Exception as e:
    raise
  finally:
    writer.close()

async def accept_client(client_reader, client_writer):

  try:
    # Get proxy service - [Proxy-Type] from header via client_reader
    # Set remote_address and remote_port based on it

    # data = await client_reader.read(2048)

    (remote_reader, remote_writer) = await asyncio.wait_for(
    asyncio.open_connection(host = remote_address, port = remote_port),
      timeout = 30)
  except asyncio.TimeoutError:
    client_writer.close()
  except Exception as e:
    client_writer.close()
  else:
    # Pipe the streams
    asyncio.ensure_future(proxy_data(client_reader, remote_writer))
    asyncio.ensure_future(proxy_data(remote_reader, client_writer))

def main():

  def handle_client(client_reader, client_writer):
    asyncio.ensure_future(
      accept_client(
        client_reader = client_reader,
        client_writer = client_writer
      )
    )


  loop = asyncio.get_event_loop()
  try:
    server = loop.run_until_complete(
      asyncio.start_server(
        handle_client, host = '127.0.0.1', port = 9911))
  except Exception as e:
    logger.error('Bind error: {}'.format(e))
    sys.exit(1)

  for s in server.sockets:
    logger.debug('Proxy broker listening on {}'.format(s.getsockname()))

  try:
    loop.run_forever()
  except KeyboardInterrupt:
    pass
if __name__ == '__main__':
  main()

任何人都可以指出这里的问题或如何有条件地打开连接?

答案

通过重新输入client_reader(ReaderStream)修复了问题

感谢Vincent的评论,指出请求篡改情况并将请求重建回其原生形式

async def accept_client(client_reader, client_writer):

  try:
    # Get proxy service - [Proxy-Type] from header via client_reader
    # Set remote_address and remote_port based on it

    data = await client_reader.read(2048)

    # -------- Edited --------

    # perform operations based on data and obtain remote_address, remote_port

    (remote_reader, remote_writer) = await asyncio.wait_for(
    asyncio.open_connection(host = remote_address, port = remote_port),
      timeout = 30)
  except asyncio.TimeoutError:
    client_writer.close()
  except Exception as e:
    client_writer.close()
  else:

    # Write the data to remote
    remote_writer.write(data)
    await remote_writer.drain()

    # Pipe the streams
    asyncio.ensure_future(proxy_data(client_reader, remote_writer))
    asyncio.ensure_future(proxy_data(remote_reader, client_writer))

以上是关于如何在不中断流的情况下安全地从asyncio读取ReaderStream的主要内容,如果未能解决你的问题,请参考以下文章

如何在 BizTalk 管道组件中有效地修改流中的文本?

Clojure 懒惰地从文件中读取随机行

在这种情况下如何正确使用 asyncio?

如何在不阻塞的情况下发出请求(使用 asyncio)?

设备通过国标GB28181接入EasyCVR平台,出现断流情况该如何解决?

如何安全地从多个线程并行访问和写入复杂容器?