如何使用 asyncio add_reader 从套接字读取

Posted

技术标签:

【中文标题】如何使用 asyncio add_reader 从套接字读取【英文标题】:How to read from socket using asyncio add_reader 【发布时间】:2019-08-20 11:25:23 【问题描述】:

我有这个代码:

import sys
import socket
import asyncio

async def main(dest_addr, max_hops=30, timeout=0.5):
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue()

    port = 33434

    rx = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
    rx.settimeout(timeout)
    rx.bind(("", port))

    def reader():
        try:
            _, addr = rx.recvfrom(512)
            addr = addr[0]
        except socket.timeout:
            addr = None
        queue.put_nowait(addr)

    loop.add_reader(rx, reader)

    tx = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

    for ttl in range(1, max_hops + 1):
        tx.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
        tx.sendto(b"", (dest_addr, port))
        addr = await queue.get()
        print(ttl, addr)

    loop.remove_reader(rx)


if __name__ == "__main__":
    dest_name = sys.argv[1]
    dest_addr = socket.gethostbyname(dest_name)
    print(f"traceroute to dest_name (dest_addr)")
    asyncio.get_event_loop().run_until_complete(main(dest_addr))

我基本上是在尝试使用 asyncio 实现 traceroute。

我正在监视套接字文件描述符的读取可用性,并在使用socket.sendto 方法后从设备接收数据时调用reader,并等待队列被填满,然后再进行下一步。

但是,我的程序在第一次迭代后立即挂起,第二次 addr = await queue.get()

似乎reader 回调只被调用了一次,因此队列没有被填满,这很奇怪,因为我在rx 套接字上有 0.5 秒的超时。

【问题讨论】:

经过测试并获得了至少 7 个带有地址的跃点,在第一次迭代后无法重现您的 挂起 @RomanPerekhrest 你用的是哪个地址? (不知道为什么这很重要),我跑了:sudo python3.6 traceroute.py www.google.com 它确实挂了 @RomanPerekhrest 这怎么可能?无论发生什么,都应该触发套接字超时,因此队列应该在最多 0.5 秒后被填充 我跑sudo python3 ~/projects/drafts/networking/traceroute_emulator.py i.ua @Newlol,当你有一个可行的答案时,你可以回答你自己的问题,而不是在问题中提供解决方案。 【参考方案1】:

回答我自己的问题:

我认为发生的情况是,设备(例如我的前端路由器)没有响应任何内容,因此当文件描述符准备好读取时我永远不会收到通知,因此不会调用回调。

解决方法是将queue.get() 包裹在asyncio.wait_for 中,并设置一个超时时间,这样它就不会永远挂起:

for ttl in range(1, max_hops + 1):
    tx.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl)
    tx.sendto(b"", (dest_addr, port))
    try:
        addr = await asyncio.wait_for(queue.get(), timeout)
    except asyncio.TimeoutError:
        addr = "timeout"
    print(ttl, addr)

【讨论】:

在 Ubuntu 18.04 上为我工作,但我必须以 root 身份运行脚本以防止 File "traceroute.py", line 11, in main rx = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP) File "/usr/lib/python3.6/socket.py", line 144, in __init__ _socket.socket.__init__(self, family, type, proto, fileno) PermissionError: [Errno 1] Operation not permitted @V-R 是,因为创建 ICMP 数据包需要原始套接字,这需要 root 权限。

以上是关于如何使用 asyncio add_reader 从套接字读取的主要内容,如果未能解决你的问题,请参考以下文章

如何在asyncio python中使用子进程模块限制并发进程数

使用 asyncio 运行协程后如何继续原始事件循环?

python asyncio,如何从另一个线程创建和取消任务

从 concurrent.futures 到 asyncio

Flask 作者写万字长文谈 asyncio(上)

使用 asyncio ProactorEventLoop 时如何分配线程池