aiohttp:限制并行请求的速率

Posted

技术标签:

【中文标题】aiohttp:限制并行请求的速率【英文标题】:aiohttp: rate limiting parallel requests 【发布时间】:2018-07-18 19:49:18 【问题描述】:

API 通常有用户必须遵守的速率限制。例如,让我们以 50 个请求/秒为例。顺序请求需要 0.5-1 秒,因此太慢而无法接近该限制。但是,使用 aiohttp 的并行请求超过了速率限制。

要尽可能快地轮询 API,需要限制并行调用的速率。

目前我发现的示例装饰session.get,大致如下:

session.get = rate_limited(max_calls_per_second)(session.get)

这适用于顺序调用。尝试在并行调用中实现这一点无法按预期工作。

下面是一些代码示例:

async with aiohttp.ClientSession() as session:
    session.get = rate_limited(max_calls_per_second)(session.get)
    tasks = (asyncio.ensure_future(download_coroutine(  
          timeout, session, url)) for url in urls)
    process_responses_function(await asyncio.gather(*tasks))

这样做的问题是它会限制任务的排队gather 的执行仍将或多或少同时发生。两全其美;-)。

是的,我在这里aiohttp: set maximum number of requests per second 发现了一个类似的问题,但两个回复都没有回答限制请求率的实际问题。 the blog post from Quentin Pradet 也仅适用于限制排队的速率。

总结一下:对于并行aiohttp 请求,如何限制每秒的请求数

【问题讨论】:

相关问题可见***.com/questions/35196974/… @user1929959 是的,正如我在帖子中提到的那样。不幸的是,这个问题没有得到正确回答。回复谈到限制同时请求的数量。询问的是每秒的请求数。 【参考方案1】:

如果我理解你的话,你想限制同时请求的数量吗?

asyncio 内部有一个名为Semaphore 的对象,它的工作方式类似于异步RLock

semaphore = asyncio.Semaphore(50)
#...
async def limit_wrap(url):
    async with semaphore:
        # do what you want
#...
results = asyncio.gather([limit_wrap(url) for url in urls])

更新

假设我发出 50 个并发请求,它们都在 2 秒内完成。所以,它没有触及限制(每秒只有 25 个请求)。

这意味着我应该发出 100 个并发请求,它们也都在 2 秒内完成(每秒 50 个请求)。但是在你真正提出这些请求之前,你怎么能确定它们会在多长时间内完成呢?

或者,如果您不介意每秒完成的请求数,而是每秒发出的请求数。你可以:

async def loop_wrap(urls):
    for url in urls:
        asyncio.ensure_future(download(url))
        await asyncio.sleep(1/50)

asyncio.ensure_future(loop_wrap(urls))
loop.run_forever()

上面的代码将每隔1/50 秒创建一个Future 实例。

【讨论】:

不,这是关于限制每秒请求数,即每秒发送请求的次数。同时请求的数量取决于这些请求需要多长时间,但我们要使用的 API 并没有限制。 @Boffin 这是不可能的。首先,您需要提出这些请求,然后您可以获得这些成本的时间。你想要的是预见。比如我一开始并发了50个请求,怎么判断一秒能不能完成呢? 请求需要多长时间并不相关。我想在前一个请求之后 1/50 秒启动另一个请求。请求需要(假设)1 秒,因此必须同时进行多个请求。然而,有多少同时运行并不重要。也许我在这里误解了什么? @Boffin 已更新,希望对您有所帮助。 更新后的代码似乎等同于for url in urls: await asyncio.sleep(1/50); await download_coroutine(url)【参考方案2】:

我通过使用基于漏桶算法的速率限制器创建 aiohttp.ClientSession() 的子类来解决该问题。我使用asyncio.Queue() 来限制速率,而不是Semaphores。我只覆盖了_request() 方法。我发现这种方法更简洁,因为您只需将 session = aiohttp.ClientSession() 替换为 session = ThrottledClientSession(rate_limit=15)

class ThrottledClientSession(aiohttp.ClientSession):
        """Rate-throttled client session class inherited from aiohttp.ClientSession)""" 
    MIN_SLEEP = 0.1

    def __init__(self, rate_limit: float =None, *args,**kwargs) -> None: 
        super().__init__(*args,**kwargs)
        self.rate_limit = rate_limit
        self._fillerTask = None
        self._queue = None
        self._start_time = time.time()
        if rate_limit != None:
            if rate_limit <= 0:
                raise ValueError('rate_limit must be positive')
            self._queue = asyncio.Queue(min(2, int(rate_limit)+1))
            self._fillerTask = asyncio.create_task(self._filler(rate_limit))

     
    def _get_sleep(self) -> list:
        if self.rate_limit != None:
            return max(1/self.rate_limit, self.MIN_SLEEP)
        return None
        
    async def close(self) -> None:
        """Close rate-limiter's "bucket filler" task"""
        if self._fillerTask != None:
            self._fillerTask.cancel()
        try:
            await asyncio.wait_for(self._fillerTask, timeout= 0.5)
        except asyncio.TimeoutError as err:
            print(str(err))
        await super().close()


    async def _filler(self, rate_limit: float = 1):
        """Filler task to fill the leaky bucket algo"""
        try:
            if self._queue == None:
                return 
            self.rate_limit = rate_limit
            sleep = self._get_sleep()
            updated_at = time.monotonic()
            fraction = 0
            extra_increment = 0
            for i in range(0,self._queue.maxsize):
                self._queue.put_nowait(i)
            while True:
                if not self._queue.full():
                    now = time.monotonic()
                    increment = rate_limit * (now - updated_at)
                    fraction += increment % 1
                    extra_increment = fraction // 1
                    items_2_add = int(min(self._queue.maxsize - self._queue.qsize(), int(increment) + extra_increment))
                    fraction = fraction % 1
                    for i in range(0,items_2_add):
                        self._queue.put_nowait(i)
                    updated_at = now
                await asyncio.sleep(sleep)
        except asyncio.CancelledError:
            print('Cancelled')
        except Exception as err:
            print(str(err))


    async def _allow(self) -> None:
        if self._queue != None:
            # debug 
            #if self._start_time == None:
            #    self._start_time = time.time()
            await self._queue.get()
            self._queue.task_done()
        return None


    async def _request(self, *args,**kwargs):
        """Throttled _request()"""
        await self._allow()
        return await super()._request(*args,**kwargs)
    ```

【讨论】:

如何使这个子类可访问?当我把它放到我的主脚本中时,我得到了错误:AttributeError: module 'aiohttp' has no attribute 'ThrottledClientSession' 嗨,新的子类不会成为 aiohttp 包的一部分。要么将类添加到同一个源文件中,要么将其导入:'import ThrottledClientSession from filename_of_the_class_source_file' 您可以从这里找到最新版本:github.com/Jylpah/blitz-tools/blob/master/blitzutils.py 在工作目录中保存为 blitzutils.py 并将import ThrottledClientSession from blitzutils 放在脚本中,但出现错误。我想你的意思是from blitzutils import ThrottledClientSession 其他人同意这个解决方案并写了一个包aiolimiter.readthedocs.io/en/latest【参考方案3】:

我喜欢@sraw 用 asyncio 来解决这个问题,但他们的回答对我来说并不完全正确。因为我不知道我的下载调用是否会比速率限制更快或更慢,所以我希望可以选择在请求很慢时并行运行许多,并且在请求非常快时一次运行一个,所以我总是在速率限制正确。

我通过使用一个队列与一个生产者以速率限制生成新任务,然后许多消费者要么都等待下一个工作,如果他们很快,要么在队列中备份工作如果它们很慢,并且将在处理器/网络允许的范围内以最快的速度运行:

import asyncio
from datetime import datetime 

async def download(url):
  # download or whatever
  task_time = 1/10
  await asyncio.sleep(task_time)
  result = datetime.now()
  return result, url

async def producer_fn(queue, urls, max_per_second):
  for url in urls:
    await queue.put(url)
    await asyncio.sleep(1/max_per_second)
 
async def consumer(work_queue, result_queue):
  while True:
    url = await work_queue.get()
    result = await download(url)
    work_queue.task_done()
    await result_queue.put(result)

urls = range(20)
async def main():
  work_queue = asyncio.Queue()
  result_queue = asyncio.Queue()

  num_consumer_tasks = 10
  max_per_second = 5
  consumers = [asyncio.create_task(consumer(work_queue, result_queue))
               for _ in range(num_consumer_tasks)]    
  producer = asyncio.create_task(producer_fn(work_queue, urls, max_per_second))
  await producer

  # wait for the remaining tasks to be processed
  await work_queue.join()
  # cancel the consumers, which are now idle
  for c in consumers:
    c.cancel()

  while not result_queue.empty():
    result, url = await result_queue.get()
    print(f'url finished at result')
 
asyncio.run(main())

【讨论】:

【参考方案4】:

就这里关于在调用gather() 时同时发送n 个请求的问题而言,关键是在每次调用之前使用带有await asyncio.sleep(1.1) 的create_task()。使用 create_task 创建的任何任务都会立即运行:

    for i in range(THREADS):
        await asyncio.sleep(1.1)
        tasks.append(
            asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
        )
    await asyncio.gather(*tasks) 

在下面的示例中,通过在 async_payload_wrapper 中使用 ClientSession() 上下文并使用限制设置连接器,也解决了限制同时连接数的另一个问题。

通过此设置,我可以运行 25 个协程 (THREADS=25),每个协程遍历 URL 队列,并且不违反 25 个并发连接规则:

async def send_request(session, url, routine):
    start_time = time.time()
    print(f"routine, sending request: datetime.now()")
    params = 
                'api_key': 'nunya',
                'url': '%s' % url, 
                'render_js': 'false',
                'premium_proxy': 'false', 
                'country_code':'us'
            
    try:
        async with session.get(url='http://yourAPI.com',params=params,) as response:              
            data = await response.content.read()                     
            print(f"routine, done request: time.time() - start_time seconds")                    
        return data

    except asyncio.TimeoutError as e:    
        print('timeout---------------------')  
        errors.append(url)
    except aiohttp.ClientResponseError as e:
        print('request failed - Server Error')
        errors.append(url)
    except Exception as e:
        errors.append(url)

async def getData(session, q, test):
    while True:
        if not q.empty():
            url = q.get_nowait()
            resp = await send_request(session, url ,test)                      
            if resp is not None:
                processData(resp, test, url)
        else:
            print(f'test queue empty')
            break

async def async_payload_wrapper():
    tasks = []
    q = asyncio.Queue()
    for url in urls:
        await q.put(url)  


    async with ClientSession(connector=aiohttp.TCPConnector(limit=THREADS), timeout=ClientTimeout(total=61), raise_for_status=True) as session:    

        for i in range(THREADS):
            await asyncio.sleep(1.1)
            tasks.append(
                asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
            )
        await asyncio.gather(*tasks)

if __name__ == '__main__':
    start_time = time.time()
    asyncio.run(async_payload_wrapper())

【讨论】:

【参考方案5】:

我开发了一个名为 octopus-api (https://pypi.org/project/octopus-api/) 的库,它使您能够在后台使用 aiohttp 来限制和设置对端点的连接(并行)调用数。它的目标是简化所有需要的 aiohttp 设置。

这是一个如何使用它的示例,其中 get_ethereum 是用户定义的请求函数:

from octopus_api import TentacleSession, OctopusApi
from typing import Dict, List

if __name__ == '__main__':
    async def get_ethereum(session: TentacleSession, request: Dict):
        async with session.get(url=request["url"], params=request["params"]) as response:
            body = await response.json()
            return body

    client = OctopusApi(rate=50, resolution="sec", connections=6)
    result: List = client.execute(requests_list=[
        "url": "https://api.pro.coinbase.com/products/ETH-EUR/candles?granularity=900&start=2021-12-04T00:00:00Z&end=2021-12-04T00:00:00Z",
        "params": ] * 1000, func=get_ethereum)
    print(result)

TentacleSession 的工作方式与您为 aiohttp.ClientSession 编写 POST、GET、PUT 和 PATCH 的方式相同。

如果它有助于解决与速率限制和并行调用相关的问题,请告诉我。

【讨论】:

以上是关于aiohttp:限制并行请求的速率的主要内容,如果未能解决你的问题,请参考以下文章

限制并发请求的数量 aiohttp

并行请求在使用 asyncio 恰好 100 个请求后无限阻塞

aiohttp:设置每秒最大请求数

aiohttp 异步http请求-8.TCPConnector限制连接池的大小

asyncio/aiohttp 不返回响应

aiohttp 异步http请求-12.aiohttp 请求生命周期(和requests库有什么不一样?)