带有 Python 请求的异步请求
Posted
技术标签:
【中文标题】带有 Python 请求的异步请求【英文标题】:Asynchronous Requests with Python requests 【发布时间】:2012-02-25 00:07:12 【问题描述】:我尝试了requests library for python 文档中提供的示例。
使用async.map(rs)
,我获得了响应代码,但我想获得请求的每个页面的内容。例如,这不起作用:
out = async.map(rs)
print out[0].content
【问题讨论】:
也许你得到的回复是空的? 为我工作。请发布您遇到的完整错误。 没有错误。它只是通过提供的测试 url 永远运行。 当我在 https 上使用 url 时,它显然会出现。 http 工作正常 大多数答案都已过时。在 2021 年,当前的潮流赢家是:docs.aiohttp.org/en/stable 【参考方案1】:不幸的是,据我所知,请求库不具备执行异步请求的功能。您可以将async/await
语法包装在requests
周围,但这将使底层请求的同步性不减。如果您想要真正的异步请求,则必须使用提供它的其他工具。一种这样的解决方案是aiohttp
(Python 3.5.3+)。根据我使用 Python 3.7 async/await
语法的经验,它运行良好。下面我写了三个使用来执行 n 个 web 请求的实现
-
使用 Python
requests
库的纯同步请求 (sync_requests_get_all
)
使用 Python 3.7 async/await
语法和 asyncio
包装的 Python requests
库的同步请求 (async_requests_get_all
)
真正的异步实现 (async_aiohttp_get_all
),使用 Python 3.7 async/await
语法和 asyncio
包装的 Python aiohttp
库
"""
Tested in Python 3.5.10
"""
import time
import asyncio
import requests
import aiohttp
from asgiref import sync
def timed(func):
"""
records approximate durations of function calls
"""
def wrapper(*args, **kwargs):
start = time.time()
print('name:<30 started'.format(name=func.__name__))
result = func(*args, **kwargs)
duration = "name:<30 finished in elapsed:.2f seconds".format(
name=func.__name__, elapsed=time.time() - start
)
print(duration)
timed.durations.append(duration)
return result
return wrapper
timed.durations = []
@timed
def sync_requests_get_all(urls):
"""
performs synchronous get requests
"""
# use session to reduce network overhead
session = requests.Session()
return [session.get(url).json() for url in urls]
@timed
def async_requests_get_all(urls):
"""
asynchronous wrapper around synchronous requests
"""
session = requests.Session()
# wrap requests.get into an async function
def get(url):
return session.get(url).json()
async_get = sync.sync_to_async(get)
async def get_all(urls):
return await asyncio.gather(*[
async_get(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)
@timed
def async_aiohttp_get_all(urls):
"""
performs asynchronous get requests
"""
async def get_all(urls):
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
return await response.json()
return await asyncio.gather(*[
fetch(url) for url in urls
])
# call get_all as a sync function to be used in a sync context
return sync.async_to_sync(get_all)(urls)
if __name__ == '__main__':
# this endpoint takes ~3 seconds to respond,
# so a purely synchronous implementation should take
# little more than 30 seconds and a purely asynchronous
# implementation should take little more than 3 seconds.
urls = ['https://postman-echo.com/delay/3']*10
async_aiohttp_get_all(urls)
async_requests_get_all(urls)
sync_requests_get_all(urls)
print('----------------------')
[print(duration) for duration in timed.durations]
在我的机器上,这是输出:
async_aiohttp_get_all started
async_aiohttp_get_all finished in 3.20 seconds
async_requests_get_all started
async_requests_get_all finished in 30.61 seconds
sync_requests_get_all started
sync_requests_get_all finished in 30.59 seconds
----------------------
async_aiohttp_get_all finished in 3.20 seconds
async_requests_get_all finished in 30.61 seconds
sync_requests_get_all finished in 30.59 seconds
【讨论】:
"asnyc" 这是一个错字还是故意的? 绝对是错字 您的async_aiohttp_get_all()
是一个不错的解决方案。我想出了类似的东西,但在它之外有一个额外的async def fetch_all(urls): return await asyncio.gather(*[fetch(url) for url in urls])
,它让我的解决方案为每个 URL 创建单独的aiohttp.ClientSession()
实例,而通过嵌入本地函数,您可以重用相同的会话......更多 Pythonic IMO。你能提醒我使用sync.async_to_sync()
与get_all()
与asyncio.run()
没有 get_all()
存在的好处吗?
太棒了,绝对 async_aiohttp 比所有工作都好!【参考方案2】:
我支持suggestion above 使用HTTPX,但我经常以不同的方式使用它,所以添加我的答案。
我个人使用asyncio.run
(introduced in Python 3.7)而不是asyncio.gather
,也更喜欢aiostream
的方式,可以结合asyncio和httpx使用。
就像我刚刚发布的this example 一样,这种样式有助于异步处理一组 URL,即使(常见)错误发生也是如此。我特别喜欢这种风格如何阐明响应处理发生的位置以及便于错误处理(我发现异步调用往往会提供更多)。
发布一个异步触发一堆请求的简单示例更容易,但通常您还想处理响应内容(用它计算一些东西,也许参考您请求的 URL 的原始对象做)。
该方法的核心如下:
async with httpx.AsyncClient(timeout=timeout) as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
zs = stream.map(ys, process)
return await zs
地点:
process_thing
是一个异步响应内容处理函数
things
是输入列表(urls
的 URL 字符串生成器来自),例如对象/字典列表
pbar
是一个进度条(例如tqdm.tqdm
)[可选但有用]
所有这些都在一个异步函数async_fetch_urlset
中,然后通过调用名为例如的同步“***”函数运行。 fetch_things
运行协程 [这是异步函数返回的内容] 并管理事件循环:
def fetch_things(urls, things, pbar=None, verbose=False):
return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))
由于可以就地修改作为输入传递的列表(这里是 things
),因此您可以有效地取回输出(就像我们习惯于从同步函数调用中一样)
【讨论】:
【参考方案3】:免责声明:Following code creates different threads for each function
.
这可能对某些情况有用,因为它更易于使用。但是要知道它不是异步的,而是使用多个线程产生异步的错觉,即使装饰器建议这样做。
你可以使用下面的装饰器在函数执行完成后给出回调,回调必须处理函数返回的数据。
请注意,函数被修饰后会返回一个Future
对象。
import asyncio
## Decorator implementation of async runner !!
def run_async(callback, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
def inner(func):
def wrapper(*args, **kwargs):
def __exec():
out = func(*args, **kwargs)
callback(out)
return out
return loop.run_in_executor(None, __exec)
return wrapper
return inner
实现示例:
urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = [] # OPTIONAL, used for showing realtime, which urls are loaded !!
def _callback(resp):
print(resp.url)
print(resp)
loaded_urls.append((resp.url, resp)) # OPTIONAL, used for showing realtime, which urls are loaded !!
# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
return requests.get(url)
for url in urls:
get(url)
如果你想实时查看加载了哪些url,你也可以在最后添加以下代码:
while True:
print(loaded_urls)
if len(loaded_urls) == len(urls):
break
【讨论】:
这可行,但它会为每个请求生成一个新线程,这似乎违背了使用 asyncio 的目的。 @rtaft 谢谢你的建议,我已经纠正了我的话。【参考方案4】:您可以为此使用httpx
。
import httpx
async def get_async(url):
async with httpx.AsyncClient() as client:
return await client.get(url)
urls = ["http://google.com", "http://wikipedia.org"]
# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))
如果您需要函数式语法,gamla 库会将其包装到 get_async
中。
那你就可以了
await gamla.map(gamla.get_async(10))(["http://google.com", "http://wikipedia.org"])
10
是以秒为单位的超时时间。
(免责声明:我是它的作者)
【讨论】:
和respx
用于模拟/测试:)
嗨@Uri,我在尝试你在这个答案中提到的代码时遇到了错误。 await asyncio.gather(*map(get_async, urls)) ^ SyntaxError: invalid syntax
请指导
请注意,您需要一个异步上下文才能使用await
。【参考方案5】:
from threading import Thread
threads=list()
for requestURI in requests:
t = Thread(target=self.openURL, args=(requestURI,))
t.start()
threads.append(t)
for thread in threads:
thread.join()
...
def openURL(self, requestURI):
o = urllib2.urlopen(requestURI, timeout = 600)
o...
【讨论】:
这是线程中的“正常”请求。不错的例子,购买是题外话。【参考方案6】:我测试了requests-futures 和grequests。 Grequests 更快,但会带来猴子补丁和其他依赖问题。 requests-futures 比 grequests 慢几倍。我决定编写自己的并将请求简单地包装到 ThreadPoolExecutor 中,它几乎与 grequests 一样快,但没有外部依赖。
import requests
import concurrent.futures
def get_urls():
return ["url1","url2"]
def load_url(url, timeout):
return requests.get(url, timeout = timeout)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_to_url = executor.submit(load_url, url, 10): url for url in get_urls()
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
resp_err = resp_err + 1
else:
resp_ok = resp_ok + 1
【讨论】:
这里可能出现什么类型的异常? requests.exceptions.Timeout 对不起,我不明白你的问题。在多个线程中仅使用单个 url?只有一种情况 DDoS 攻击)) 我不明白为什么这个答案会得到如此多的支持。 OP 问题是关于异步请求的。 ThreadPoolExecutor 运行线程。是的,你可以在多个线程中发出请求,但这永远不会是一个异步程序,所以我怎么能回答原来的问题? 实际上,问题是关于如何并行加载 URL。是的,线程池执行器不是最好的选择,最好使用异步 io,但它在 Python 中运行良好。而且我不明白为什么线程不能用于异步?如果需要异步运行 CPU 密集型任务怎么办?【参考方案7】:也许requests-futures 是另一种选择。
from requests_futures.sessions import FuturesSession
session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: 0'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: 0'.format(response_two.status_code))
print(response_two.content)
the office document也推荐。如果您不想涉及 gevent,这是一个很好的选择。
【讨论】:
最简单的解决方案之一。可以通过定义max_workers参数增加并发请求数 很高兴看到这个缩放的例子,所以我们不会为每个项目使用一个变量名来循环。 每个请求只有一个线程是对资源的极大浪费!例如,不可能同时执行 500 个请求,它会杀死你的 CPU。这绝不应该被认为是一个好的解决方案。 @CorneliuMaftuleac 好点。关于线程使用,您肯定需要关心它,并且库提供了启用线程池或处理池的选项。ThreadPoolExecutor(max_workers=10)
@Dreampuf 处理池我相信更糟糕?【参考方案8】:
我发布的大多数答案都有很多问题 - 他们要么使用已被移植但功能有限的已弃用库,要么提供了一个在执行请求时具有太多魔力的解决方案,因此很难出错处理。如果它们不属于上述类别之一,则它们是第 3 方库或已弃用。
有些解决方案纯粹在 http 请求中可以正常工作,但对于任何其他类型的请求,这些解决方案都达不到要求,这很可笑。这里不需要高度定制的解决方案。
只需使用 python 内置库asyncio
就足以执行任何类型的异步请求,并为复杂和特定于用例的错误处理提供足够的流动性。
import asyncio
loop = asyncio.get_event_loop()
def do_thing(params):
async def get_rpc_info_and_do_chores(id):
# do things
response = perform_grpc_call(id)
do_chores(response)
async def get_httpapi_info_and_do_chores(id):
# do things
response = requests.get(URL)
do_chores(response)
async_tasks = []
for element in list(params.list_of_things):
async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))
loop.run_until_complete(asyncio.gather(*async_tasks))
它的工作原理很简单。您正在创建一系列您希望异步执行的任务,然后要求循环执行这些任务并在完成后退出。没有因缺乏维护而导致的额外库,也没有缺乏所需的功能。
【讨论】:
如果我理解正确,这会在进行 GRPC 和 HTTP 调用时阻塞事件循环?那么如果这些调用需要几秒钟才能完成,你的整个事件循环会阻塞几秒钟吗?为避免这种情况,您需要使用async
的 GRPC 或 HTTP 库。然后你可以例如做await response = requests.get(URL)
。没有?
不幸的是,在尝试此操作时,我发现围绕requests
进行包装几乎比同步调用 URL 列表快(在某些情况下更慢)。例如,使用上述策略请求一个需要 3 秒响应 10 次的端点大约需要 30 秒。如果你想要真正的async
性能,你需要使用aiohttp
之类的东西。
@DragonBobZ 就我而言,我看到时间减少了约 40%。主要好处是能够在等待下一个呼叫时执行必要的家务。在我的数据集中,我进行了数百次调用,因此规模也可能是一个因素。
@arshbot 是的,如果您的杂务是异步的,那么尽管您正在等待对 requests.get
的同步调用,但您会看到速度提升。但问题是如何使用 python requests
库执行异步请求。这个答案没有做到这一点,所以我的批评是成立的。
@iedmrc 遗憾的是,事实并非如此。对于非阻塞任务,它必须使用 Python 中较新的异步工具来实现,而 requests 库并非如此。如果您只是在异步事件循环中使用棒请求任务,那么这些任务仍然会阻塞。话虽如此,您可以(如其他响应中所建议的那样)使用 gevent 或带有请求的线程之类的东西,但肯定不能使用 asyncio。【参考方案9】:
如果你想使用 asyncio,那么 requests-async
为 requests
- https://github.com/encode/requests-async 提供 async/await 功能
【讨论】:
已确认,效果很好。在项目页面上,它说这项工作已被以下项目 github.com/encode/httpx 取代【参考方案10】:我知道这已经关闭了一段时间,但我认为推广另一个基于 requests 库的异步解决方案可能会很有用。
list_of_requests = ['http://moop.com', 'http://doop.com', ...]
from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
print response.content
文档在这里:http://pythonhosted.org/simple-requests/
【讨论】:
@YSY 随时发布问题:github.com/ctheiss/simple-requests/issues;我确实每天使用这个库数千次。 波士顿,你如何处理 404/500 错误? https 网址呢?将欣赏支持数千个网址的剪辑。你能贴一个例子吗?谢谢 @YSY 默认情况下,404/500 错误会引发异常。可以覆盖此行为(请参阅pythonhosted.org/simple-requests/…)。由于对 gevent 的依赖,HTTPS url 很棘手,它目前对此有一个突出的错误 (github.com/gevent/gevent/issues/477)。您可以运行票证中的 shim,但它仍会向 SNI 服务器发出警告(但它会工作)。至于剪辑,恐怕我所有的用法都在我的公司并且关闭了。但我向您保证,我们在数十个作业中执行数千个请求。 库在交互方面看起来很时尚。 Python3+ 可以用吗?抱歉,没有看到任何提及。 @Jethro 绝对正确,该库需要完全重写,因为 Python 3 中的底层技术完全不同。目前,该库是“完整的”,但仅适用于 Python 2。 【参考方案11】:注意
以下答案不适用于请求 v0.13.0+。编写此问题后,异步功能已移至grequests。但是,您可以将requests
替换为下面的grequests
,它应该可以工作。
我保留这个答案是为了反映关于使用请求
要使用async.map
异步执行多个任务,您必须:
-
为每个对象(您的任务)定义一个函数
将该函数作为事件挂钩添加到您的请求中
致电
async.map
查看所有请求/操作的列表
例子:
from requests import async
# If using requests > v0.13.0, use
# from grequests import async
urls = [
'http://python-requests.org',
'http://httpbin.org',
'http://python-guide.org',
'http://kennethreitz.com'
]
# A simple task to do to each response object
def do_something(response):
print response.url
# A list to hold our things to do via async
async_list = []
for u in urls:
# The "hooks = ..." part is where you define what you want to do
#
# Note the lack of parentheses following do_something, this is
# because the response will be used as the first argument automatically
action_item = async.get(u, hooks = 'response' : do_something)
# Add the task to our list of things to do via async
async_list.append(action_item)
# Do our list of things to do via async
async.map(async_list)
【讨论】:
留下您的评论的好主意:由于最新请求和 grequests 之间的兼容性问题(请求 1.1.0 中缺少 max_retries 选项),我不得不降级请求以检索异步,我发现异步功能随版本 0.13+ (pypi.python.org/pypi/requests) 一起移动from grequests import async
不起作用.. 这个定义对我有用 def do_something(response, **kwargs):
,我从 ***.com/questions/15594015/… 找到它
如果 async.map 调用仍然阻塞,那么这是如何异步的?除了请求本身是异步发送的,检索仍然是同步的?
将from requests import async
替换为import grequests as async
对我有用。
grequests
现在推荐requests-threads
或requests-futures
【参考方案12】:
async
现在是一个独立模块:grequests
。
请看这里:https://github.com/kennethreitz/grequests
还有:Ideal method for sending multiple HTTP requests over Python?
安装:
$ pip install grequests
用法:
建立一个堆栈:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
rs = (grequests.get(u) for u in urls)
发送堆栈
grequests.map(rs)
结果看起来像
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
grequests 似乎没有为并发请求设置限制,即当多个请求发送到同一个服务器时。
【讨论】:
关于并发请求的限制 - 您可以在运行 map()/imap() 时指定池大小。即 grequests.map(rs, size=20) 有 20 个并发抓取。 到目前为止,这不是 python3 支持的(gevent 无法在 py3.4 上构建 v2.6)。 我不太了解异步部分。如果我让results = grequests.map(rs)
这行后面的代码被阻塞,我能看到异步效果吗?
在github,repo上,grequests的作者推荐改用requests-threads或者requests-futures。【参考方案13】:
一段时间以来,我一直在使用 python 请求对 github 的 gist API 进行异步调用。
例如,请看这里的代码:
https://github.com/davidthewatson/flasgist/blob/master/views.py#L60-72
这种风格的 python 可能不是最清楚的例子,但我可以向你保证,代码是有效的。如果这让您感到困惑,请告诉我,我会记录下来。
【讨论】:
【参考方案14】:我还尝试了一些在 python 中使用异步方法的东西,但是我在使用 twisted 进行异步编程时运气好得多。它的问题更少,并且有据可查。这是与您正在尝试扭曲的内容类似的链接。
http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html
【讨论】:
Twisted 是老式的。请改用 HTTPX。以上是关于带有 Python 请求的异步请求的主要内容,如果未能解决你的问题,请参考以下文章