python throttling.py
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python throttling.py相关的知识,希望对你有一定的参考价值。
import time
from django.core.cache import cache as default_cache
from rest_framework.exceptions import Throttled
class RateLimitThrottle:
"""
The alternative implementation of Rest Framework's throttling feature, that is more driven
but is incompatible with a throttling_classes attribute. The example below gives 10 login
attempts in an hour to user "john".
throttle = RateLimitThrottle(scope='login', rate='10/hour', ident='john')
throttle.is_limit_reached(raise_exception=True)
throttle.register_attempt()
"""
cache_key_format = 'throttle_{scope}_{ident}'
exception_class = Throttled
cache = default_cache
timer = time.time
def __init__(self, scope, rate, ident):
self.scope = scope
self.now = self.timer()
self.ident = self.parse_ident(ident)
self.num_requests, self.duration = self.parse_rate(rate)
def parse_ident(self, ident):
"""
This method could be overridden for using any data structure as an indent.
"""
return ident
def get_cache_key(self):
"""
Returns cache key based on scope and ident.
"""
if not self.ident:
return None
return self.cache_key_format.format(scope=self.scope, ident=self.ident)
def is_limit_reached(self, raise_exception=False):
self.cache_key = self.get_cache_key()
if self.cache_key is None:
return True
self.history = self.cache.get(self.cache_key, [])
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
if raise_exception:
raise self.exception_class(wait=self.wait())
return False
return True
def register_attempt(self):
if self.cache_key is None:
return
self.history.insert(0, self.now)
self.cache.set(self.cache_key, self.history, self.duration)
def parse_rate(self, rate):
"""
Given the request rate string, return a two tuple of:
<allowed number of requests>, <period of time in seconds>
"""
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
def wait(self):
"""
Returns the recommended next request time in seconds.
"""
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
else:
remaining_duration = self.duration
available_requests = self.num_requests - len(self.history) + 1
if available_requests <= 0:
return None
return remaining_duration / float(available_requests)
def reset(self):
key = self.get_cache_key()
if key is not None:
self.cache.delete(key)
class CompoundThrottle:
"""
Wrapper which allows using a few throttles like one. The example below gives
10 login attempts in an hour to user "john" but not more than 50 attempts a day.
throttle = CompoundThrottle(
RateLimitThrottle(scope='login', rate='10/hour', ident='john'),
RateLimitThrottle(scope='login', rate='50/day', ident='john'),
)
throttle.is_limit_reached(raise_exception=True)
throttle.register_attempt()
"""
def __init__(self, *throttles):
self.throttles = throttles
def is_limit_reached(self, *args, **kwargs):
for throttle in self.throttles:
result = throttle.is_limit_reached(*args, **kwargs)
if result:
return True
return False
def register_attempt(self):
affected_scopes = set()
for throttle in self.throttles:
if throttle.scope not in affected_scopes:
throttle.register_attempt()
affected_scopes.add(throttle.scope)
以上是关于python throttling.py的主要内容,如果未能解决你的问题,请参考以下文章