aiohttp 异步请求的任务异常
Posted
技术标签:
【中文标题】aiohttp 异步请求的任务异常【英文标题】:Task exception with aiohttp async request 【发布时间】:2019-09-21 19:20:50 【问题描述】:我正在尝试使用 asyncio 和 aiohttp 加速对 Web 服务的多个 get 请求。
为此,我在函数内部使用 psycopg2 模块 .fetchmany() 从 postgresql 数据库中获取我的数据,并构造一个包含 100 条记录的字典,以作为字典 url 列表发送到名为 batch() 的异步函数。分批处理。
我在 batch() 函数中面临的问题是,一些请求正在记录下面的消息,尽管脚本继续并且不会失败,但我无法捕获并记录此异常以便以后重新处理它们。
Task exception was never retrieved
future: <Task finished coro=<batch.<locals>.fetch() done, defined at C:/PythonProjects/bindings/batch_fetch.py:34> exception=ClientOSError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)>
Traceback (most recent call last):
File "C:/PythonProjects/bindings/batch_fetch.py", line 36, in fetch
async with session.get(url) as resp:
File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 1005, in __aenter__
self._resp = await self._coro
File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 497, in _request
await resp.start(conn)
File "C:\Miniconda3\lib\site-packages\aiohttp\client_reqrep.py", line 844, in start
message, payload = await self._protocol.read() # type: ignore # noqa
File "C:\Miniconda3\lib\site-packages\aiohttp\streams.py", line 588, in read
await self._waiter
aiohttp.client_exceptions.ClientOSError: [WinError 10054] An existing connection was forcibly closed by the remote host
Task exception was never retrieved
future: <Task finished coro=<batch.<locals>.fetch() done, defined at C:/PythonProjects/bindings/batch_fetch.py:34> exception=ClientConnectorError(10060, "Connect call failed ('xx.xxx.xx.xxx', 80)")>
Traceback (most recent call last):
File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 924, in _wrap_create_connection
await self._loop.create_connection(*args, **kwargs))
File "C:\Miniconda3\lib\asyncio\base_events.py", line 778, in create_connection
raise exceptions[0]
File "C:\Miniconda3\lib\asyncio\base_events.py", line 765, in create_connection
yield from self.sock_connect(sock, address)
File "C:\Miniconda3\lib\asyncio\selector_events.py", line 450, in sock_connect
return (yield from fut)
File "C:\Miniconda3\lib\asyncio\selector_events.py", line 480, in _sock_connect_cb
raise OSError(err, 'Connect call failed %s' % (address,))
TimeoutError: [Errno 10060] Connect call failed ('xx.xxx.xx.xxx', 80)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:/PythonProjects/bindings/batch_fetch.py", line 36, in fetch
async with session.get(url) as resp:
File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 1005, in __aenter__
self._resp = await self._coro
File "C:\Miniconda3\lib\site-packages\aiohttp\client.py", line 476, in _request
timeout=real_timeout
File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 522, in connect
proto = await self._create_connection(req, traces, timeout)
File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 854, in _create_connection
req, traces, timeout)
File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 992, in _create_direct_connection
raise last_exc
File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 974, in _create_direct_connection
req=req, client_error=client_error)
File "C:\Miniconda3\lib\site-packages\aiohttp\connector.py", line 931, in _wrap_create_connection
raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host cms-uat.cme.in.here.com:80 ssl:None [Connect call failed ('xx.xxx.xx.xxx', 80)]
正如您从我的代码中所描述的那样,我刚刚进入 asyncio 世界,因此非常欢迎有关此场景的完整代码方法的所有建议。
谢谢
完整代码如下。
import psycopg2.extras
import asyncio
import json
from aiohttp import ClientSession
from aiohttp import TCPConnector
base_url = 'http://url-example/'
def query_db():
urls = []
# connection to postgres table , fetch data.
conn = psycopg2.connect("dbname='pac' user='user' host='db'")
cursor = conn.cursor('psycopg2 request', cursor_factory=psycopg2.extras.NamedTupleCursor)
sql = "select gid, paid from table"
cursor.execute(sql)
while True:
rec = cursor.fetchmany(100)
for item in rec:
record = "gid": item.gid, "url": base_url.format(item.paid)
urls.append(record.get('url'))
if not rec:
break
# send batch for async batch request
batch(urls)
# empty list of urls for new async batch request
urls = []
def batch(urls):
async def fetch(url):
async with ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
response = await resp.json()
# parse the url to fetch the point address id.
paid = str(resp.request_info.url).split('/')[4].split('?')[0]
# build the dictionary with pa id and full response.
resp_dict = 'paid': paid, 'response': response
with open('sucessful.json', 'a') as json_file:
json.dump(resp_dict, json_file)
json_file.write("\n")
elif resp.status is None:
print(resp.status)
elif resp.status != 200:
print(resp.status)
response = await resp.json()
# parse the url to fetch the paid.
paid = str(resp.request_info.url).split('/')[4].split('?')[0]
# build the dictionary with paid and full response.
resp_dict = 'paid': paid, 'response': response
with open('failed.json', 'a') as json_file:
json.dump(resp_dict, json_file)
json_file.write("\n")
loop = asyncio.get_event_loop()
tasks = []
for url in urls:
task = asyncio.ensure_future(fetch(url))
tasks.append(task)
try:
loop.run_until_complete(asyncio.wait(tasks))
except Exception:
print("exception consumed")
if __name__ == "__main__":
query_db()
【问题讨论】:
【参考方案1】:从未检索到任务异常
当您创建某个任务时,您会看到此警告,它以异常完成,但您从未明确检索(等待)其结果。这是相关的doc section。
我敢打赌,你的问题出在线路上
loop.run_until_complete(asyncio.wait(tasks))
asyncio.wait()
by default 只是等待所有tasks
完成。它不区分正常完成或异常完成的任务,它只是阻塞直到一切完成。在这种情况下,您的工作是从已完成的任务中检索异常,以下部分将无法帮助您解决此问题,因为 asyncio.wait()
永远不会引发错误:
try:
loop.run_until_complete(asyncio.wait(tasks))
except Exception:
print('...') # You will probably NEVER see this message
如果您想在其中一项任务发生错误时立即捕获错误,我可以建议您使用 asyncio.gather()
。 By default 它将引发第一次发生的异常。但是请注意,如果您希望它们正常关闭,那么您的工作就是 cancel 挂起的任务。
【讨论】:
以上是关于aiohttp 异步请求的任务异常的主要内容,如果未能解决你的问题,请参考以下文章
aiohttp 异步http请求-12.aiohttp 请求生命周期(和requests库有什么不一样?)
aiohttp 异步http请求-7.https请求报SSL问题
python asyncio 异步 I/O - 实现并发http请求(asyncio + aiohttp)