python 之 session鉴权的处理

Posted 奈非天

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 之 session鉴权的处理相关的知识,希望对你有一定的参考价值。

一、session鉴权的处理

1. requests的会话对象

就像一个浏览器一样,它会在同一个会话中自动处理cookie信息,不需要写任何额外的代码。

import requests
 
 
session = requests.Session()  # 理解为就是一个浏览器
 
 
type(session)
 
 
requests.sessions.Session
 
 
session.post()# 登录
session.get() # 获取某个数据,会自动携带上一步收到的cookie
 
 
# 课堂派案例
headers = \'cookie\': \'FZ_STROAGE.ketangpai.com=eyJTRUVTSU9OSUQiOiIzMTI5MjRiNTU2MzNmMDUxIiwiU0VFU0lPTkRBVEUiOjE2Mzk1NzA0NDQ3Njd9; ARK_ID=undefined; ketangpai_home_slb=3fbda3fc94d5d1be63720d9c156288d0; PHPSESSID=kmugv5id4lcecie33asikt3p96; ketangpai_home_remember=think%3A%7B%22username%22%3A%22MDAwMDAwMDAwMLV2x5eHz7dthN523LWtftmC0IDak4NubQ%22%2C%22expire%22%3A%22MDAwMDAwMDAwMLOGvd6IubtrhKiGl7G2dZ4%22%2C%22token%22%3A%22MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-q6iZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug6eDl36KYW0%22%2C%22sign%22%3A%2207f1bd0c97817e6d7ebe92bfe8e30fe9%22%7D\',
          \'user-agent\': \'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36\'
res = requests.get(url=\'https://v4.ketangpai.com/UserApi/getUserInfo\')
 
 
res.status_code
 
 
200
 
 
res.cookies
 
 
<RequestsCookieJar[Cookie(version=0, name=\'PHPSESSID\', value=\'krm5vua2f6f07m5rjipa0uti16\', port=None, port_specified=False, domain=\'v4.ketangpai.com\', domain_specified=False, domain_initial_dot=False, path=\'/\', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest=\'HttpOnly\': None, rfc2109=False), Cookie(version=0, name=\'ketangpai_home_slb\', value=\'3fbda3fc94d5d1be63720d9c156288d0\', port=None, port_specified=False, domain=\'v4.ketangpai.com\', domain_specified=False, domain_initial_dot=False, path=\'/\', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest=\'httponly\': None, rfc2109=False)]>
 
 
res.json()
 
 
\'status\': 0, \'info\': \'您还未登陆!\'
 
 
session = requests.Session() # 1. 创建一个session对象
 
 
# 2. 登录
login_url = \'https://v4.ketangpai.com/UserApi/login\'
data = \'email\': \'877***9301@qq.com\',
        \'password\': \'Pyt***inlan\',
        \'remember\': 0
# json data params
response = session.post(url=login_url, data=data)
 
 
session.cookies
 
 
<RequestsCookieJar[Cookie(version=0, name=\'PHPSESSID\', value=\'5if12vo96vtulhfhr9bvu1nnr2\', port=None, port_specified=False, domain=\'v4.ketangpai.com\', domain_specified=False, domain_initial_dot=False, path=\'/\', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest=\'HttpOnly\': None, rfc2109=False), Cookie(version=0, name=\'ketangpai_home_remember\', value=\'think%3A%7B%22username%22%3A%22MDAwMDAwMDAwMLV2x5eHz7dthN523LWtftmC0IDak4NubQ%22%2C%22expire%22%3A%22MDAwMDAwMDAwMLOGvd6IubtrhKigl7O2dZ4%22%2C%22token%22%3A%22MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-q6iZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug6edl4CKYW0%22%2C%22sign%22%3A%2298880a4b0ee67193316c6c40dd40441f%22%7D\', port=None, port_specified=False, domain=\'v4.ketangpai.com\', domain_specified=False, domain_initial_dot=False, path=\'/\', path_specified=True, secure=True, expires=1639581779, discard=False, comment=None, comment_url=None, rest=\'httponly\': None, rfc2109=False), Cookie(version=0, name=\'ketangpai_home_slb\', value=\'3fbda3fc94d5d1be63720d9c156288d0\', port=None, port_specified=False, domain=\'v4.ketangpai.com\', domain_specified=False, domain_initial_dot=False, path=\'/\', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest=\'httponly\': None, rfc2109=False)]>
 
 
res = session.get(url=\'https://v4.ketangpai.com/UserApi/getUserInfo\')
 
 
res.json()
 
 
\'status\': 1,
 \'data\': \'id\': \'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ\',
  \'username\': \'****\',
  \'avatar\': \'http://v4.ketangpai.com/Public/Common/img/40/26.png\',
  \'usertype\': \'1\',
  \'email\': \'877***01@qq.com\',
  \'stno\': \'\',
  \'atteststate\': 0,
  \'attestInfo\': []
 
 
# 如果不用session对象,每一步都需要自己处理cookie
 
 
login_url = \'https://v4.ketangpai.com/UserApi/login\'
data = \'email\': 877***9301@qq.com\',
        \'password\': \'Pyt***inlan\',
        \'remember\': 0
# 1.登录
response = requests.post(url=login_url, data=data)
 
 
response.status_code
 
 
200
 
 
response.json()
 
 
\'status\': 1,
 \'url\': \'/Main/index.html\',
 \'token\': \'MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-haiZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug7d33n96YW0\',
 \'isenterprise\': 0,
 \'uid\': \'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ\'
 
 
# 2.获取数据
res = requests.get(url=\'https://v4.ketangpai.com/UserApi/getUserInfo\', cookies=response.cookies)
 
 
res.json()
 
 
\'status\': 1,
 \'data\': \'id\': \'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ\',
  \'username\': \'****\',
  \'avatar\': \'http://v4.ketangpai.com/Public/Common/img/40/26.png\',
  \'usertype\': \'1\',
  \'email\': \'877***01@qq.com\',
  \'stno\': \'\',
  \'atteststate\': 0,
  \'attestInfo\': []
 
 

requests库的session对象仅仅只是自动帮我们处理了cookie的携带问题。

2. 封装处理session鉴权的http请求函数

思路:

  • 保证在一个会话中使用同一个会话对象即可
  • 给每一个用例类创建一个会话对象即可。

import json
import unittest
from jsonpath import jsonpath
import setting
from common import logger, db
from common.data_handler import (
    replace_args_by_re,
    generate_no_usr_phone)
from common.encrypt_handler import generate_sign
import requests


class BaseCase(unittest.TestCase):
    """
    用例基类
    """
    db = db
    logger = logger
    setting = setting
    name = \'base_case\'
    session = requests.session()  # 创建一个session对象用来处理session鉴权

    @classmethod
    def setUpClass(cls) -> None:
        cls.logger.info(\'===========接口开始测试===========\'.format(cls.name))

    @classmethod
    def tearDownClass(cls) -> None:
        cls.logger.info(\'===========接口结束测试===========\'.format(cls.name))

    def flow(self, item):
        """
        测试流程
        """
        self.logger.info(\'>>>>>>>用例开始执行>>>>>>>>\'.format(item[\'title\']))
        # 把测试数据绑定到方法属性case上,其他也要把一些变量绑定到对象的属性上
        self._case = item
        # 1. 处理测试数据
        self.process_test()
        # 2. 发送请求
        self.send_request()
        # 3. 断言
        self.assert_all()

    def process_test(self):
        """
        测试数据的处理
        """
        # 1.1 生成测试数据
        self.generate_test_data()
        # 1.2 替换依赖参数
        self.replace_args()
        # 1.3 处理url
        self.process_url()
        # 1.4 鉴权处理
        self.auth_process()

    def auth_process(self):
        """
        v3版本鉴权处理
        :return:
        """
        request_data = self._case.get(\'request_data\')
        if request_data:
            headers = request_data.get(\'headers\')
            if headers:
                if headers.get(\'X-Lemonban-Media-Type\') == \'lemonban.v3\':
                    # 获取token
                    token = self._case[\'request_data\'][\'headers\'][\'Authorization\'].split(\' \')[-1]
                    # 生成签名
                    sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY)
                    # 添加到请求数据中
                    self._case[\'request_data\'][\'json\'][\'sign\'] = sign
                    self._case[\'request_data\'][\'json\'][\'timestamp\'] = timestamp

        # if self._case[\'request_data\'][\'headers\'][\'X-Lemonban-Media-Type\'] == \'lemonban.v3\':
        #     # 获取token
        #     token = self._case[\'request_data\'][\'headers\'][\'Authorization\'].split(\' \')[-1]
        #     # 生成签名
        #     sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY)
        #     # 添加到请求数据中
        #     self._case[\'request_data\'][\'json\'][\'sign\'] = sign
        #     self._case[\'request_data\'][\'json\'][\'timestamp\'] = timestamp

    def generate_test_data(self):
        """
        生成测试数据
        """
        """
           生成测试数据,不是固定流程,有不同可以复写
           :return:
           """
        self._case = json.dumps(self._case)
        if \'$phone_number$\' in self._case:
            phone = generate_no_usr_phone()
            self._case = self._case.replace(\'$phone_number$\', phone)
        self._case = json.loads(self._case)

    def replace_args(self):
        """
        替换参数
        """
        self._case = json.dumps(self._case)  # 把用例数据dumps成字符串,一次替换
        self._case = replace_args_by_re(self._case, self)
        self._case = json.loads(self._case)
        # 再将request_data, expect_data loads为字典
        try:
            self._case[\'request_data\'] = json.loads(self._case[\'request_data\'])
            self._case[\'expect_data\'] = json.loads(self._case[\'expect_data\'])
        except Exception as e:
            self.logger.error(\'用例的测试数据格式不正确\'.format(self._case[\'title\']))
            raise e

    def process_url(self):
        """
        处理url
        """

        if self._case[\'url\'].startswith(\'http\'):
            # 是否是全地址
            pass
        elif self._case[\'url\'].startswith(\'/\'):
            # 是否是短地址
            self._case[\'url\'] = self.setting.PROJECT_HOST + self._case[\'url\']
        else:
            # 接口名称
            try:
                self._case[\'url\'] = self.setting.INTERFACES[self._case[\'url\']]
            except Exception as e:
                self.logger.error(\'接口名称:不存在\'.format(self._case[\'url\']))
                raise e

    def send_request(self):
        """
        发送请求
        @return:
        """
        self._response = self.session.request(
            url=self._case[\'url\'], method=self._case[\'method\'], **self._case[\'request_data\'])
        # self._response = send_http_request(url=self._case[\'url\'], method=self._case[\'method\'],
        #                                    **self._case[\'request_data\'])

    def assert_all(self):
        try:
            # 3.1 断言响应状态码
            self.assert_status_code()
            # 3.2 断言响应数据
            self.assert_response()
            # 响应结果断言成功后就提取依赖数据
            self.extract_data()
            # 3.3 数据库断言后面的任务
            self.assert_sql()
        except  Exception as e:
            self.logger.error(\'++++++用例测试失败\'.format(self._case[\'title\']))
            raise e
        else:
            self.logger.info(\'<<<<<<<<<用例测试成功<<<<<<<\'.format(self._case[\'title\']))

    def assert_status_code(self):
        """
        断言响应状态码
        """
        try:
            self.assertEqual(self._case[\'status_code\'], self._response.status_code)
        except AssertionError as e:
            self.logger.warning(\'用例【】响应状态码断言异常\'.format(self._case[\'title\']))
            raise e
        else:
            self.logger.info(\'用例【】响应状态码断言成功\'.format(self._case[\'title\']))

    def assert_response(self):
        """
        断言响应数据
        """
        if self._case[\'res_type\'].lower() == \'json\':
            res = self._response.json()
        elif self._case[\'res_type\'].lower() == \'html\':
            # 扩展思路
            res = self._response.text
        try:
            self.assertEqual(self._case[\'expect_data\'], \'code\': res[\'code\'], \'msg\': res[\'msg\'])
        except AssertionError as e:
            self.logger.warning(\'用例【】响应数据断言异常\'.format(self._case[\'title\']))
            self.logger.warning(\'用例【】期望结果为:\'.format(self._case[\'title\'], self._case[\'expect_data\']))
            self.logger.warning(\'用例【】的响应结果:\'.format(self._case[\'title\'], res))
            raise e
        else:
            self.logger.info(\'用例【】响应数据断言成功\'.format(self._case[\'title\']))

    def assert_sql(self):
        """
        断言数据库
        """
        if self._case.get(\'sql\'):  # 返回指定键的值,如果键不在字典中返回默认值 None 或者设置的默认值。
            # 只有sql字段有sql的才需要校验数据库
            # 只有sql字段有sql的才需要校验数据库
            sqls = self._case[\'sql\'].split(\',\')
            for sql in sqls:
                try:
                    self.assertTrue(self.db.exist(sql))
                except AssertionError as e:
                    self.logger.warning(\'用例【】数据库断言异常,执行的sql为:\'.format(self._case[\'title\'], sql))
                    raise e
                except Exception as e:
                    self.logger.warning(\'用例【】数据库断言异常,执行的sql为:\'.format(self._case[\'title\'], sql))
                    raise e

    def extract_data(self):
        """
        根据提取表达式提取对应的数据
        :return:
        """
        if self._case.get(\'extract\'):
            if self._case[\'res_type\'].lower() == \'json\':
                self.extract_data_from_json()
            elif self._case[\'res_type\'].lower() == \'html\':
                self.extract_data_from_html()
            elif self._case[\'res_type\'].lower() == \'xml\':
                self.extract_data_from_xml()
            else:
                raise ValueError(\'res_type类型不正确,只支持json,html,xml\')

    def extract_data_from_json(self):
        """
        从json数据中提取数据并绑定到类属性中
        :return:
        """
        try:
            rules = json.loads(self._case.get(\'extract\'))
        except Exception as e:
            raise ValueError(\'用例【】的extract字段数据:格式不正确\'.format(self._case[\'title\'], self._case[\'extract\']))
        for rule in rules:
            # 类属性名
            name = rule[0]
            # 提取表达式
            exp = rule[1]
            # 根据jsonpath去响应中提取值
            value = jsonpath(self._response.json(), exp)
            # 如果能提取到值
            if value:
                # 把值绑定到对应的类属性上
                setattr(self.__class__, name, value[0])  # 注意value是个列表
            else:
                # 提取不到值,说明jsonpath写错了,或者是响应又问题
                raise ValueError(\'用例【】的提取表达式提取不到数据\'.format(self._case[\'title\'], self._case[\'extract\']))

    def extract_data_from_html(self):
        """
        从html数据中提取数据并绑定到类属性中
        :return:
        """
        raise ValueError(\'请实现此方法\')

    def extract_data_from_xml(self):
        """
        从xml数据中提取数据并绑定到类属性中
        :return:
        """
        raise ValueError(\'请实现此方法\')
 
 

from unittestreport import ddt, list_data
from common.base_case import BaseCase

cases = [
    \'title\': \'课堂派登录\',
     \'method\': \'post\',
     \'url\': \'https://v4.ketangpai.com/UserApi/login\',
     \'request_data\': \'"data": "email": "877***01@qq.com", "password": "Pyt***inlan", "remember": 0\',
     \'status_code\': 200,
     \'res_type\': \'json\',
     \'expect_data\': \'"status": 1\'
     ,
    
        \'title\': \'获取用户信息\',
        \'method\': \'get\',
        \'url\': \'https://v4.ketangpai.com/UserApi/getUserInfo\',
        \'request_data\': \'\',
        \'status_code\': 200,
        \'res_type\': \'json\',
        \'expect_data\': \'"status": 1\'
    
]


@ddt
class TestCourseFlow(BaseCase):
    name = "课堂派测试"

    @list_data(cases)
    def test_course(self, item):
        self.flow(item)

    def assert_response(self):
        """
        复写assert_response方法实现课堂派的断言
        :return:
        """
        if self._case[\'res_type\'].lower() == \'json\':
            res = self._response.json()
        elif self._case[\'res_type\'].lower() == \'html\':
            # 扩展思路
            res = self._response.text
        try:
            self.assertEqual(self._case[\'expect_data\'], \'status\': res[\'status\'])
        except AssertionError as e:
            self.logger.warning(\'用例【】响应数据断言异常\'.format(self._case[\'title\']))
            self.logger.warning(\'用例【】期望结果为:\'.format(self._case[\'title\'], self._case[\'expect_data\']))
            self.logger.warning(\'用例【】的响应结果:\'.format(self._case[\'title\'], res))
            raise e
        else:
            self.logger.info(\'用例【】响应数据断言成功\'.format(self._case[\'title\']))


if __name__ == \'__main__\':
    BaseCase.unittest.main()

 

WebSocket连接鉴权的过程

参考技术A 根据WebSocket文档上的说明,鉴权授权是需要自己实现。我们自己实现的流程大概是,在每次连接前,访问接口取得鉴权必须的参数,在连接WebSocket的时候拼到url后面,聊天服务器在连接的时候取url后面的参数,用于判断是否连接是否合法

WebSocket协议一个应用层协议,它基于HTTP协议上的一个补充。它利用了HTTP协议的握手过程,通过HTTP请求头中的某些字段表示是否是WebSocket协议。

由于HTTP协议是一个无状态协议,基于HTTP协议实现长链接必须通过ajax轮询或者long pull实现。
通过第一个HTTP请求建立TCP连接后,之后的数据交换就不需要发送HTTP请求头了,所以WebSocket协议时一个真正的长链接协议,它与HTTP协议还是有区别的。
在此基础上WebSocket实现了一个双通道的连接,也就是在同一个TCP上既可以收消息也可以发消息。

以上是关于python 之 session鉴权的处理的主要内容,如果未能解决你的问题,请参考以下文章

前端鉴权的兄弟们:cookiesessiontokenjwt单点登录

shiro原理之过滤器

视频+源码登录鉴权的三种方式:tokenjwtsession实战分享

快速搭建一个网关服务,动态路由鉴权的流程,看完秒会(含流程图)

快速搭建一个网关服务,动态路由鉴权的流程,看完秒会(含流程图)

WebSocket连接鉴权的过程