aiohttp 异步请求的任务异常

Posted

技术标签:

【中文标题】aiohttp 异步请求的任务异常【英文标题】:Task exception with aiohttp async request 【发布时间】:2019-09-21 19:20:50 【问题描述】:

我正在尝试使用 asyncio 和 aiohttp 加速对 Web 服务的多个 get 请求。

为此,我在函数内部使用 psycopg2 模块 .fetchmany() 从 postgresql 数据库中获取我的数据,并构造一个包含 100 条记录的字典,以作为字典 url 列表发送到名为 batch() 的异步函数。分批处理。

我在 batch() 函数中面临的问题是,一些请求正在记录下面的消息,尽管脚本继续并且不会失败,但我无法捕获并记录此异常以便以后重新处理它们。

Task exception was never retrieved
future: <Task finished coro=<batch.<locals>.fetch() done, defined at C:/PythonProjects/bindings/batch_fetch.py:34> exception=ClientOSError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)>
Traceback (most recent call last):
  File "C:/PythonProjects/bindings/batch_fetch.py", line 36, in fetch
    async with session.get(url) as resp:
  File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 1005, in __aenter__
    self._resp = await self._coro
  File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 497, in _request
    await resp.start(conn)
  File "C:\Miniconda3\lib\site-packages\aiohttp\client_reqrep.py", line 844, in start
    message, payload = await self._protocol.read()  # type: ignore  # noqa
  File "C:\Miniconda3\lib\site-packages\aiohttp\streams.py", line 588, in read
    await self._waiter
aiohttp.client_exceptions.ClientOSError: [WinError 10054] An existing connection was forcibly closed by the remote host
Task exception was never retrieved
future: <Task finished coro=<batch.<locals>.fetch() done, defined at C:/PythonProjects/bindings/batch_fetch.py:34> exception=ClientConnectorError(10060, "Connect call failed ('xx.xxx.xx.xxx', 80)")>
Traceback (most recent call last):
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 924, in _wrap_create_connection
    await self._loop.create_connection(*args, **kwargs))
  File "C:\Miniconda3\lib\asyncio\base_events.py", line 778, in create_connection
    raise exceptions[0]
  File "C:\Miniconda3\lib\asyncio\base_events.py", line 765, in create_connection
    yield from self.sock_connect(sock, address)
  File "C:\Miniconda3\lib\asyncio\selector_events.py", line 450, in sock_connect
    return (yield from fut)
  File "C:\Miniconda3\lib\asyncio\selector_events.py", line 480, in _sock_connect_cb
    raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 10060] Connect call failed ('xx.xxx.xx.xxx', 80)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:/PythonProjects/bindings/batch_fetch.py", line 36, in fetch
    async with session.get(url) as resp:
  File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 1005, in __aenter__
    self._resp = await self._coro
  File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 476, in _request
    timeout=real_timeout
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 522, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 854, in _create_connection
    req, traces, timeout)
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 992, in _create_direct_connection
    raise last_exc
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 974, in _create_direct_connection
    req=req, client_error=client_error)
  File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 931, in _wrap_create_connection
    raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host cms-uat.cme.in.here.com:80 ssl:None [Connect call failed ('xx.xxx.xx.xxx', 80)]

正如您从我的代码中所描述的那样,我刚刚进入 asyncio 世界,因此非常欢迎有关此场景的完整代码方法的所有建议。

谢谢

完整代码如下。

import psycopg2.extras
import asyncio
import json
from aiohttp import ClientSession
from aiohttp import TCPConnector

base_url = 'http://url-example/'

def query_db():
    urls = []
    # connection to postgres table , fetch data.
    conn = psycopg2.connect("dbname='pac' user='user' host='db'")
    cursor = conn.cursor('psycopg2 request', cursor_factory=psycopg2.extras.NamedTupleCursor)
    sql = "select gid, paid from table"
    cursor.execute(sql)
    while True:
        rec = cursor.fetchmany(100)

        for item in rec:
            record = "gid": item.gid, "url": base_url.format(item.paid)
            urls.append(record.get('url'))
        if not rec:
            break
        # send batch for async batch request
        batch(urls)
        # empty list of urls for new async batch request
        urls = []


def batch(urls):
    async def fetch(url):
        async with ClientSession() as session:
            async with session.get(url) as resp:
                if resp.status == 200:
                    response = await resp.json()
                    # parse the url to fetch the point address id.
                    paid = str(resp.request_info.url).split('/')[4].split('?')[0]
                    # build the dictionary with pa id and full response.
                    resp_dict = 'paid': paid, 'response': response
                    with open('sucessful.json', 'a') as json_file:
                        json.dump(resp_dict, json_file)
                        json_file.write("\n")
                elif resp.status is None:
                    print(resp.status)
                elif resp.status != 200:
                    print(resp.status)
                    response = await resp.json()
                    # parse the url to fetch the paid.
                    paid = str(resp.request_info.url).split('/')[4].split('?')[0]
                    # build the dictionary with paid and full response.
                    resp_dict = 'paid': paid, 'response': response
                    with open('failed.json', 'a') as json_file:
                        json.dump(resp_dict, json_file)
                        json_file.write("\n")

    loop = asyncio.get_event_loop()

    tasks = []

    for url in urls:
        task = asyncio.ensure_future(fetch(url))
        tasks.append(task)
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except Exception:
        print("exception consumed")


if __name__ == "__main__":
    query_db()

【问题讨论】:

【参考方案1】:

从未检索到任务异常

当您创建某个任务时,您会看到此警告,它以异常完成,但您从未明确检索(等待)其结果。这是相关的doc section。

我敢打赌,你的问题出在线路上

loop.run_until_complete(asyncio.wait(tasks))

asyncio.wait() by default 只是等待所有tasks 完成。它不区分正常完成或异常完成的任务,它只是阻塞直到一切完成。在这种情况下,您的工作是从已完成的任务中检索异常,以下部分将无法帮助您解决此问题,因为 asyncio.wait() 永远不会引发错误:

try:
    loop.run_until_complete(asyncio.wait(tasks))
except Exception:
    print('...')  # You will probably NEVER see this message

如果您想在其中一项任务发生错误时立即捕获错误,我可以建议您使用 asyncio.gather()。 By default 它将引发第一次发生的异常。但是请注意,如果您希望它们正常关闭,那么您的工作就是 cancel 挂起的任务。

【讨论】:

以上是关于aiohttp 异步请求的任务异常的主要内容,如果未能解决你的问题,请参考以下文章

aiohttp 异步http请求-12.aiohttp 请求生命周期(和requests库有什么不一样?)

aiohttp 异步http请求-3.异步批量下载图片

aiohttp 异步http请求-7.https请求报SSL问题

python asyncio 异步 I/O - 实现并发http请求(asyncio + aiohttp)

aiohttp 异步http请求-1.快速入门 get 请求示例

小白学 Python 爬虫(32):异步请求库 AIOHTTP 基础入门