DRF权限和频率限制
Posted zhuzhizheng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DRF权限和频率限制相关的知识,希望对你有一定的参考价值。
权限
views
from rest_framework.permissions import BasePermission
from rest_framework import exceptions
# 权限类操作
class MyPermission(BasePermission):
# 也是一种报错,返回给前端
message = {'code': 10001, 'error': '你没权限'}
def has_permission(self, request, view):
"""
Return `True` if permission is granted, `False` otherwise.
"""
if request.user:
return True
# 报错 ,返回给前端
# raise exceptions.PermissionDenied({'code': 10001, 'error': '你没权限'})
return False
def has_object_permission(self, request, view, obj):
"""
Return `True` if permission is granted, `False` otherwise.
"""
return False
class OrderView(APIView):
# 局部配置:权限类
permission_classes = [MyPermission,]
def get(self,request,*args,**kwargs):
return Response('order')
class UserView(APIView):
# 局部配置:权限类
permission_classes = [MyPermission, ]
def get(self,request,*args,**kwargs):
return Response('user')
settings
REST_FRAMEWORK = {
"PAGE_SIZE":2, # 分页
"DEFAULT_PAGINATION_CLASS":"rest_framework.pagination.PageNumberPagination",# 分页
"DEFAULT_VERSIONING_CLASS":"rest_framework.versioning.URLPathVersioning",# 版本
"ALLOWED_VERSIONS":['v1','v2'],# 版本
'VERSION_PARAM':'version',# 版本
"DEFAULT_AUTHENTICATION_CLASSES":["kka.auth.TokenAuthentication",]# 认证
# 全局配置:权限
#"DEFAULT_PERMISSION_CLASSES" : ["api.extension.permission.LuffyPermission"],
}
权限源码分析
class APIView(View):
permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES
def dispatch(self, request, *args, **kwargs):
封装request对象
self.initial(request, *args, **kwargs)
通过反射执行视图中的方法
def initial(self, request, *args, **kwargs):
版本的处理
# 认证
self.perform_authentication(request)
# 权限判断
self.check_permissions(request)
self.check_throttles(request)
def perform_authentication(self, request):
request.user
def check_permissions(self, request):
# [对象,对象,]
for permission in self.get_permissions():
if not permission.has_permission(request, self):
self.permission_denied(request, message=getattr(permission, 'message', None))
def permission_denied(self, request, message=None):
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated()
# 报错,将权限类中定义的message返回给前端
raise exceptions.PermissionDenied(detail=message)
def get_permissions(self):
return [permission() for permission in self.permission_classes]
class UserView(APIView):
# 局部配置:权限
permission_classes = [MyPermission, ]
def get(self,request,*args,**kwargs):
return Response('user')
频率
settings
REST_FRAMEWORK = {
'DEFAULT_VERSIONING_CLASS':"rest_framework.versioning.URLPathVersioning",
"ALLOWED_VERSIONS":['v1',],
"DEFAULT_THROTTLE_RATES":{
# scope默认位anon
"anon":'3/m' # 3表示频率,m表示时间:60s
# 源码中parse_rate()方法
# num, period = rate.split('/')
# num_requests = int(num)
# duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
}
}
views
from rest_framework.throttling import AnonRateThrottle
class ArticleDetailView(APIView):
throttle_classes = [AnonRateThrottle, ]
def get(self,request,*args,**kwargs):
return Response('文章列表')
频率限制在认证、权限之后
知识点
{
throttle_anon_1.1.1.1:[100121340,],
1.1.1.2:[100121251,100120450,]
}
限制:60s能访问3次
来访问时:
1.获取当前时间 100121280
2.100121280-60 = 100121220,小于100121220所有记录删除
3.判断1分钟以内已经访问多少次了? 4
4.无法访问
停一会
来访问时:
1.获取当前时间 100121340
2.100121340-60 = 100121280,小于100121280所有记录删除
3.判断1分钟以内已经访问多少次了? 0
4.可以访问
源码
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.throttling import AnonRateThrottle,BaseThrottle
class ArticleView(APIView):
throttle_classes = [AnonRateThrottle,]
def get(self,request,*args,**kwargs):
return Response('文章列表')
class ArticleDetailView(APIView):
def get(self,request,*args,**kwargs):
return Response('文章列表')
class BaseThrottle:
"""
Rate throttling of requests.
"""
def allow_request(self, request, view):
"""
Return `True` if the request should be allowed, `False` otherwise.
"""
raise NotImplementedError('.allow_request() must be overridden')
def get_ident(self, request):
"""
Identify the machine making the request by parsing HTTP_X_FORWARDED_FOR
if present and number of proxies is > 0. If not use all of
HTTP_X_FORWARDED_FOR if it is available, if not use REMOTE_ADDR.
"""
xff = request.META.get('HTTP_X_FORWARDED_FOR')
remote_addr = request.META.get('REMOTE_ADDR')
num_proxies = api_settings.NUM_PROXIES
if num_proxies is not None:
if num_proxies == 0 or xff is None:
return remote_addr
addrs = xff.split(',')
client_addr = addrs[-min(num_proxies, len(addrs))]
return client_addr.strip()
return ''.join(xff.split()) if xff else remote_addr
def wait(self):
"""
Optionally, return a recommended number of seconds to wait before
the next request.
"""
return None
class SimpleRateThrottle(BaseThrottle):
"""
A simple cache implementation, that only requires `.get_cache_key()`
to be overridden.
The rate (requests / seconds) is set by a `rate` attribute on the View
class. The attribute is a string of the form 'number_of_requests/period'.
Period should be one of: ('s', 'sec', 'm', 'min', 'h', 'hour', 'd', 'day')
Previous request information used for throttling is stored in the cache.
"""
cache = default_cache
timer = time.time
cache_format = 'throttle_%(scope)s_%(ident)s'
scope = None
THROTTLE_RATES = api_settings.DEFAULT_THROTTLE_RATES
def __init__(self):
if not getattr(self, 'rate', None):
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
def get_cache_key(self, request, view):
"""
Should return a unique cache-key which can be used for throttling.
Must be overridden.
May return `None` if the request should not be throttled.
"""
raise NotImplementedError('.get_cache_key() must be overridden')
def get_rate(self):
"""
Determine the string representation of the allowed request rate.
"""
if not getattr(self, 'scope', None):
msg = ("You must set either `.scope` or `.rate` for '%s' throttle" %
self.__class__.__name__)
raise ImproperlyConfigured(msg)
try:
# 拿scope,自定义类中没有,就去settings中那
return self.THROTTLE_RATES[self.scope]
except KeyError:
msg = "No default throttle rate set for '%s' scope" % self.scope
raise ImproperlyConfigured(msg)
def parse_rate(self, rate):
"""
Given the request rate string, return a two tuple of:
<allowed number of requests>, <period of time in seconds>
"""
if rate is None:
return (None, None)
num, period = rate.split('/')
num_requests = int(num)
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]]
return (num_requests, duration)
def allow_request(self, request, view):
"""
Implement the check to see if the request should be throttled.
On success calls `throttle_success`.
On failure calls `throttle_failure`.
"""
if self.rate is None:
return True
# 获取请求用户的IP
self.key = self.get_cache_key(request, view)
if self.key is None:
return True
# 根据IP获取他的所有访问记录,[]
self.history = self.cache.get(self.key, [])
self.now = self.timer()
# Drop any requests from the history which have now passed the
# throttle duration
while self.history and self.history[-1] <= self.now - self.duration:
self.history.pop()
if len(self.history) >= self.num_requests:
return self.throttle_failure()
return self.throttle_success()
def throttle_success(self):
"""
Inserts the current request's timestamp along with the key
into the cache.
"""
self.history.insert(0, self.now)
self.cache.set(self.key, self.history, self.duration)
return True
def throttle_failure(self):
"""
Called when a request to the API has failed due to throttling.
"""
return False
def wait(self):
"""
Returns the recommended next request time in seconds.
"""
if self.history:
remaining_duration = self.duration - (self.now - self.history[-1])
else:
remaining_duration = self.duration
available_requests = self.num_requests - len(self.history) + 1
if available_requests <= 0:
return None
return remaining_duration / float(available_requests)
class AnonRateThrottle(SimpleRateThrottle):
"""
Limits the rate of API calls that may be made by a anonymous users.
The IP address of the request will be used as the unique cache key.
"""
scope = 'anon'
def get_cache_key(self, request, view):
if request.user.is_authenticated:
return None # Only throttle unauthenticated requests.
return self.cache_format % {
'scope': self.scope,
'ident': self.get_ident(request)
}
总结
如何实现的评率限制
- 匿名用户,用IP作为用户唯一标记,但如果用户换代理IP,无法做到真正的限制。 - 登录用户,用用户名或用户ID做标识。 具体实现: 在django的缓存中 = { throttle_anon_1.1.1.1:[100121340,], 1.1.1.2:[100121251,100120450,] } 限制:60s能访问3次 来访问时: 1.获取当前时间 100121280 2.100121280-60 = 100121220,小于100121220所有记录删除 3.判断1分钟以内已经访问多少次了? 4 4.无法访问 停一会 来访问时: 1.获取当前时间 100121340 2.100121340-60 = 100121280,小于100121280所有记录删除 3.判断1分钟以内已经访问多少次了? 0 4.可以访问
以上是关于DRF权限和频率限制的主要内容,如果未能解决你的问题,请参考以下文章