python_reques接口测试框架,Excel作为案例数据源

Posted 安琪儿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python_reques接口测试框架,Excel作为案例数据源相关的知识,希望对你有一定的参考价值。

一、框架菜单

1.1 common模块

 1.2 其他

 二、Excel接口测试案例编写

 

三、读取Excel测试封装(核心封装)

excel_utils.py  读取Excel中的数据

import os
import xlrd   #内置模块、第三方模块pip install  自定义模块


class ExcelUtils():
    def __init__(self,file_path,sheet_name):
        self.file_path = file_path
        self.sheet_name = sheet_name
        self.sheet = self.get_sheet()  # 整个表格对象

    def get_sheet(self):
        wb = xlrd.open_workbook(self.file_path)
        sheet = wb.sheet_by_name(self.sheet_name)
        return sheet

    def get_row_count(self):
        row_count = self.sheet.nrows
        return row_count

    def get_col_count(self):
        col_count = self.sheet.ncols
        return col_count

    def __get_cell_value(self,row_index, col_index):
        cell_value = self.sheet.cell_value(row_index,col_index)
        return cell_value

    def get_merged_info(self):
        merged_info = self.sheet.merged_cells
        return merged_info

    def get_merged_cell_value(self,row_index, col_index):
        """既能获取普通单元格的数据又能获取合并单元格数据"""
        cell_value = None
        for (rlow, rhigh, clow, chigh) in self.get_merged_info():
            if (row_index >= rlow and row_index < rhigh):
                if (col_index >= clow and col_index < chigh):
                    cell_value = self.__get_cell_value(rlow, clow)
                    break;  # 防止循环去进行判断出现值覆盖的情况
                else:
                    cell_value = self.__get_cell_value(row_index, col_index)
            else:
                cell_value = self.__get_cell_value(row_index, col_index)
        return cell_value

    def get_sheet_data_by_dict(self):
        all_data_list = []
        first_row = self.sheet.row(0)  #获取首行数据
        for row in range(1, self.get_row_count()):
            row_dict = {}
            for col in range(0, self.get_col_count()):
                row_dict[first_row[col].value] = self.get_merged_cell_value(row, col)
            all_data_list.append(row_dict)
        return all_data_list

if __name__==\'__main__\':
    current_path = os.path.dirname(__file__)
    excel_path = os.path.join( current_path,\'..\',\'samples/data/test_case.xlsx\' )
    excelUtils = ExcelUtils(excel_path,"Sheet1")
    for row in excelUtils.get_sheet_data_by_dict():
        print(  row )
View Code

testdata_utils.py 读取Excel中的数据后处理成需要的数据

import os
from common.excel_utils import ExcelUtils
from common.config_utils import config

current_path = os.path.dirname(__file__)
test_data_path = os.path.join( current_path,\'..\', config.CASE_DATA_PATH )

class TestdataUtils():
    def __init__(self,test_data_path = test_data_path):
        self.test_data_path = test_data_path
        self.test_data = ExcelUtils(test_data_path,"Sheet1").get_sheet_data_by_dict()
        self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info()


    def __get_testcase_data_dict(self):
        testcase_dict = {}
        for row_data in self.test_data:
            testcase_dict.setdefault( row_data[\'测试用例编号\'],[] ).append( row_data )
        return testcase_dict

    def def_testcase_data_list(self):
        testcase_list = []
        for k,v in self.__get_testcase_data_dict().items():
            one_case_dict = {}
            one_case_dict["case_id"] = k
            one_case_dict["case_info"] = v
            testcase_list.append( one_case_dict )
        return testcase_list


if __name__=="__main__":
    testdataUtils = TestdataUtils()
    for i in testdataUtils.def_testcase_data_list():
        print( i )
View Code

四、request封装(核心封装)

requests_utils.py   包含post请求,get请求,异常,调用断言
import ast
import re
import requests
import jsonpath
from requests.exceptions import RequestException
from requests.exceptions import ProxyError
from requests.exceptions import ConnectionError
from common.config_utils import config
from common.check_utils import CheckUtils

class RequestsUtils():
    def __init__(self):
        self.hosts =  config.hosts
        self.headers = {"ContentType":"application/json;charset=utf-8"}
        self.session = requests.session()
        self.temp_variables = {}

    def __get(self,get_info):
        try:
            url = self.hosts + get_info["请求地址"]
            response = self.session.get( url = url,
                                         params = ast.literal_eval(get_info["请求参数(get)"])
                                         )
            response.encoding = response.apparent_encoding
            if get_info["取值方式"] == "json取值":
                value = jsonpath.jsonpath( response.json(),get_info["取值代码"] )[0]
                self.temp_variables[ get_info["传值变量"] ] = value
            elif get_info["取值方式"] == "正则取值":
                value = re.findall(get_info["取值代码"],response.text)[0]
                self.temp_variables[get_info["传值变量"]] = value
            result = CheckUtils(response).run_check(get_info[\'期望结果类型\'], get_info[\'期望结果\'])
        except ProxyError as e:
            result = {\'code\': 4, \'result\': \'[%s]请求:代理错误异常\' % (get_info["接口名称"])}
        except ConnectionError as e:
            result = {\'code\': 4, \'result\': \'[%s]请求:连接超时异常\' % (get_info["接口名称"])}
        except RequestException as e:
            result = {\'code\': 4, \'result\': \'[%s]请求:Request异常,原因:%s\' % (get_info["接口名称"], e.__str__())}
        except Exception as e:
            result = {\'code\':4,\'result\':\'[%s]请求:系统异常,原因:%s\'%(get_info["接口名称"],e.__str__())}
        return result

    def __post(self,post_info):
        try:
            url = self.hosts + post_info["请求地址"]
            response = self.session.post( url = url,
                                         headers = self.headers,
                                         params = ast.literal_eval(post_info["请求参数(get)"]),
                                        # data = post_infos["提交数据(post)"],
                                         json=ast.literal_eval(post_info["提交数据(post)"])
                                        )
            response.encoding = response.apparent_encoding
            if post_info["取值方式"] == "json取值":
                value = jsonpath.jsonpath( response.json(),post_info["取值代码"] )[0]
                self.temp_variables[ post_info["传值变量"] ] = value
            elif post_info["取值方式"] == "正则取值":
                value = re.findall(post_info["取值代码"],response.text)[0]
                self.temp_variables[post_info["传值变量"]] = value
            #调用CheckUtils()
            result = CheckUtils(response).run_check(post_info[\'期望结果类型\'],post_info[\'期望结果\'])
        except ProxyError as e:
            result = {\'code\': 4, \'result\': \'[%s]请求:代理错误异常\' % (post_info["接口名称"])}
        except ConnectionError as e:
            result = {\'code\': 4, \'result\': \'[%s]请求:连接超时异常\' % (post_info["接口名称"])}
        except RequestException as e:
            result = {\'code\': 4, \'result\': \'[%s]请求:Request异常,原因:%s\' % (post_info["接口名称"], e.__str__())}
        except Exception as e:
            result = {\'code\':4,\'result\':\'[%s]请求:系统异常,原因:%s\'%(post_info["接口名称"],e.__str__())}
        return result

    def request(self,step_info):
        try:
            request_type = step_info["请求方式"]
            param_variable_list = re.findall(\'\\\\${\\w+}\', step_info["请求参数(get)"])
            if param_variable_list:
                for param_variable in param_variable_list:
                    step_info["请求参数(get)"] = step_info["请求参数(get)"]\\
                        .replace(param_variable,\'"%s"\' % self.temp_variables.get(param_variable[2:-1]))
            if request_type == "get":
                result = self.__get( step_info )
            elif request_type == "post":
                data_variable_list = re.findall(\'\\\\${\\w+}\', step_info["提交数据(post)"])
                if data_variable_list:
                    for param_variable in data_variable_list:
                        step_info["提交数据(post)"] = step_info["提交数据(post)"] \\
                            .replace(param_variable, \'"%s"\' % self.temp_variables.get(param_variable[2:-1]))
                result = self.__post( step_info )
            else:
                result = {\'code\':1,\'result\':\'请求方式不支持\'}
        except Exception as e:
            result = {\'code\':4,\'result\':\'用例编号[%s]的[%s]步骤出现系统异常,原因:%s\'%(step_info[\'测试用例编号\'],step_info["测试用例步骤"],e.__str__())}
        return result

    def request_by_step(self,step_infos):
        self.temp_variables = {}
        for step_info in step_infos:
            temp_result = self.request( step_info )
            # print( temp_result )
            if temp_result[\'code\']!=0:
                break
        return temp_result


if __name__=="__main__":

    case_info = [
        {\'请求方式\': \'get\', \'请求地址\': \'/cgi-bin/token\', \'请求参数(get)\': \'{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}\', \'提交数据(post)\': \'\', \'取值方式\': \'json取值\', \'传值变量\': \'token\', \'取值代码\': \'$.access_token\', \'期望结果类型\': \'正则匹配\', \'期望结果\': \'{"access_token":"(.+?)","expires_in":(.+?)}\'},
        {\'请求方式\': \'post\', \'请求地址\': \'/cgi-bin/tags/create\', \'请求参数(get)\': \'{"access_token":${token}}\', \'提交数据(post)\': \'{"tag" : {"name" : "衡东"}}\',\'取值方式\': \'\', \'传值变量\': \'\', \'取值代码\': \'\', \'期望结果类型\': \'正则匹配\', \'期望结果\': \'{"tag":{"id":(.+?),"name":"衡东"}}\'}
    ]
    RequestsUtils().request_by_step(case_info)
View Code

五、断言封装(核心封装)

check_utils.py  断言封装,与实际结果核对
import re
import ast

class CheckUtils():
    def __init__(self,check_response=None):
        self.ck_response=check_response
        self.ck_rules = {
            \'\': self.no_check,
            \'json键是否存在\': self.check_key,
            \'json键值对\': self.check_keyvalue,
            \'正则匹配\': self.check_regexp
        }
        self.pass_result = {
            \'code\': 0,
            \'response_reason\': self.ck_response.reason,
            \'response_code\': self.ck_response.status_code,
            \'response_headers\': self.ck_response.headers,
            \'response_body\': self.ck_response.text,
            \'check_result\': True,
            \'message\': \'\'  # 扩招作为日志输出等
        }
        self.fail_result = {
            \'code\': 2,
            \'response_reason\': self.ck_response.reason,
            \'response_code\': self.ck_response.status_code,
            \'response_headers\': self.ck_response.headers,
            \'response_body\': self.ck_response.text,
            \'check_result\': False,
            \'message\': \'\'  # 扩招作为日志输出等
        }


    def no_check(self):
        return self.pass_result


    def check_key(self,check_data=None):
        check_data_list = check_data.split(\',\')   #把需要判断的值做切割,取出键值
        res_list = [] #存放每次比较的结果
        wrong_key = [] #存放比较失败key
        for check_data in check_data_list:   #把切割的键值和取出响应结果中的所有的键一个一个对比
            if check_data in self.ck_response.json().keys():
                res_list.append(self.pass_result )
            else:
                res_list.append( self.fail_result )
                wrong_key.append(check_data)     #把失败的键放进来,便于后续日志输出
        # print(res_list)
        # print(wrong_key)
        if self.fail_result in res_list:
            return self.fail_result
        else:
            return self.pass_result

    def check_keyvalue(self,check_data=None):
        res_list = []  # 存放每次比较的结果
        wrong_items = []  # 存放比较失败 items
        for check_item in ast.literal_eval(check_data).items():  #literal_eval()安全性的把字符串转成字典,items()取出键值对
            if check_item in self.ck_response.json().items():
                res_list.append( self.pass_result )
            else:
                res_list.append( self.fail_result )
                wrong_items.append(check_item)
        # print( res_list )
        # print( wrong_items )

        if self.fail_result in res_list:
            return self.fail_result
        else:
            return self.pass_result

    def check_regexp(self,check_data=None):
        pattern = re.compile(check_data)
        if re.findall(pattern=pattern,string=self.ck_response.text):  #匹配到了,不为空,为true
            return self.pass_result
        else:
            return self.fail_result

    def run_check(self,check_type=None,check_data=None):
        code = self.ck_response.status_code
        if code == 200:
            if check_type in self.ck_rules.keys():
                result=self.ck_rules[check_type](check_data)
                return result
            else:
                self.fail_result[\'message\'] = \'不支持%s判断方法\'%check_type
                return self.fail_result
        else:
            self.fail_result[\'message\'] = \'请求的响应状态码非%s\'%str(code)
            return self.fail_result




if __name__=="__main__":
   # 检查键是否存在,{"access_token":"hello","expires_":7200} 设为响应结果,"access_token,expires_in" 为检查对象值
    CheckUtils({"access_token":"hello","expires_":7200}).check_key("access_token,expires_in")
    #检查键值对是否存在
    CheckUtils({"access_token":"hello","expires_i":7200}).check_keyvalue(\'{"expires_in": 7200}\')
    #正则对比
   #TURE
    print(CheckUtils(\'{"access_token":"hello","expires_in":7200}\').check_regexp(\'"expires_in":(.+?)\'))
   #False
    print(CheckUtils(\'{"access_token":"hello","expires":7200}\').check_regexp(\'"expires_in":(.+?)\'))
View Code

六、api_testcase下的api_test.py 封装

import warnings
import unittest
import paramunittest
from common.testdata_utils import TestdataUtils
from common.requests_utils import RequestsUtils

#如果是mysql数据源的话切换成  def_testcase_data_list_by_mysql()   exccel数据源:def_testcase_data_list()

case_infos = TestdataUtils().def_testcase_data_list_by_mysql()

@paramunittest.parametrized(
    *case_infos
)

class APITest(paramunittest.ParametrizedTestCase):
    def setUp(self) -> None:
        warnings.simplefilter(\'ignore\', ResourceWarning)  #不会弹出警告提示

    def setParameters(self, case_id, case_info):
        self.case_id = case_id
        self.case_info = case_info

    def test_api_common_function(self):
        \'\'\'测试描述\'\'\'
        self._testMethodName = self.case_info[0].get("测试用例编号")
        self._testMethodDoc = self.case_info[0].get("测试用例名称")
        actual_result = RequestsUtils().request_by_step(self.case_info)
        self.assertTrue( actual_result.get(以上是关于python_reques接口测试框架,Excel作为案例数据源的主要内容,如果未能解决你的问题,请参考以下文章

Pytest+Yaml+Excel 接口自动化测试框架

Python接口测试实战4(上) - 接口测试框架实战

Python接口测试实战4(上) - 接口测试框架实战

接口测试框架——第六篇-读Excel封装方法

API接口自动化测试框架搭建-设计excel自动化用例数据case_excel.xlsx

接口自动化---简单的数据驱动框架ATP(基于excel)