python_reques接口测试框架,Excel作为案例数据源
Posted 安琪儿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python_reques接口测试框架,Excel作为案例数据源相关的知识,希望对你有一定的参考价值。
一、框架菜单
1.1 common模块
1.2 其他
二、Excel接口测试案例编写
三、读取Excel测试封装(核心封装)
excel_utils.py 读取Excel中的数据
import os import xlrd #内置模块、第三方模块pip install 自定义模块 class ExcelUtils(): def __init__(self,file_path,sheet_name): self.file_path = file_path self.sheet_name = sheet_name self.sheet = self.get_sheet() # 整个表格对象 def get_sheet(self): wb = xlrd.open_workbook(self.file_path) sheet = wb.sheet_by_name(self.sheet_name) return sheet def get_row_count(self): row_count = self.sheet.nrows return row_count def get_col_count(self): col_count = self.sheet.ncols return col_count def __get_cell_value(self,row_index, col_index): cell_value = self.sheet.cell_value(row_index,col_index) return cell_value def get_merged_info(self): merged_info = self.sheet.merged_cells return merged_info def get_merged_cell_value(self,row_index, col_index): """既能获取普通单元格的数据又能获取合并单元格数据""" cell_value = None for (rlow, rhigh, clow, chigh) in self.get_merged_info(): if (row_index >= rlow and row_index < rhigh): if (col_index >= clow and col_index < chigh): cell_value = self.__get_cell_value(rlow, clow) break; # 防止循环去进行判断出现值覆盖的情况 else: cell_value = self.__get_cell_value(row_index, col_index) else: cell_value = self.__get_cell_value(row_index, col_index) return cell_value def get_sheet_data_by_dict(self): all_data_list = [] first_row = self.sheet.row(0) #获取首行数据 for row in range(1, self.get_row_count()): row_dict = {} for col in range(0, self.get_col_count()): row_dict[first_row[col].value] = self.get_merged_cell_value(row, col) all_data_list.append(row_dict) return all_data_list if __name__==\'__main__\': current_path = os.path.dirname(__file__) excel_path = os.path.join( current_path,\'..\',\'samples/data/test_case.xlsx\' ) excelUtils = ExcelUtils(excel_path,"Sheet1") for row in excelUtils.get_sheet_data_by_dict(): print( row )
testdata_utils.py 读取Excel中的数据后处理成需要的数据
import os from common.excel_utils import ExcelUtils from common.config_utils import config current_path = os.path.dirname(__file__) test_data_path = os.path.join( current_path,\'..\', config.CASE_DATA_PATH ) class TestdataUtils(): def __init__(self,test_data_path = test_data_path): self.test_data_path = test_data_path self.test_data = ExcelUtils(test_data_path,"Sheet1").get_sheet_data_by_dict() self.test_data_by_mysql = SqlUtils().get_mysql_test_case_info() def __get_testcase_data_dict(self): testcase_dict = {} for row_data in self.test_data: testcase_dict.setdefault( row_data[\'测试用例编号\'],[] ).append( row_data ) return testcase_dict def def_testcase_data_list(self): testcase_list = [] for k,v in self.__get_testcase_data_dict().items(): one_case_dict = {} one_case_dict["case_id"] = k one_case_dict["case_info"] = v testcase_list.append( one_case_dict ) return testcase_list if __name__=="__main__": testdataUtils = TestdataUtils() for i in testdataUtils.def_testcase_data_list(): print( i )
四、request封装(核心封装)
requests_utils.py 包含post请求,get请求,异常,调用断言
import ast import re import requests import jsonpath from requests.exceptions import RequestException from requests.exceptions import ProxyError from requests.exceptions import ConnectionError from common.config_utils import config from common.check_utils import CheckUtils class RequestsUtils(): def __init__(self): self.hosts = config.hosts self.headers = {"ContentType":"application/json;charset=utf-8"} self.session = requests.session() self.temp_variables = {} def __get(self,get_info): try: url = self.hosts + get_info["请求地址"] response = self.session.get( url = url, params = ast.literal_eval(get_info["请求参数(get)"]) ) response.encoding = response.apparent_encoding if get_info["取值方式"] == "json取值": value = jsonpath.jsonpath( response.json(),get_info["取值代码"] )[0] self.temp_variables[ get_info["传值变量"] ] = value elif get_info["取值方式"] == "正则取值": value = re.findall(get_info["取值代码"],response.text)[0] self.temp_variables[get_info["传值变量"]] = value result = CheckUtils(response).run_check(get_info[\'期望结果类型\'], get_info[\'期望结果\']) except ProxyError as e: result = {\'code\': 4, \'result\': \'[%s]请求:代理错误异常\' % (get_info["接口名称"])} except ConnectionError as e: result = {\'code\': 4, \'result\': \'[%s]请求:连接超时异常\' % (get_info["接口名称"])} except RequestException as e: result = {\'code\': 4, \'result\': \'[%s]请求:Request异常,原因:%s\' % (get_info["接口名称"], e.__str__())} except Exception as e: result = {\'code\':4,\'result\':\'[%s]请求:系统异常,原因:%s\'%(get_info["接口名称"],e.__str__())} return result def __post(self,post_info): try: url = self.hosts + post_info["请求地址"] response = self.session.post( url = url, headers = self.headers, params = ast.literal_eval(post_info["请求参数(get)"]), # data = post_infos["提交数据(post)"], json=ast.literal_eval(post_info["提交数据(post)"]) ) response.encoding = response.apparent_encoding if post_info["取值方式"] == "json取值": value = jsonpath.jsonpath( response.json(),post_info["取值代码"] )[0] self.temp_variables[ post_info["传值变量"] ] = value elif post_info["取值方式"] == "正则取值": value = re.findall(post_info["取值代码"],response.text)[0] self.temp_variables[post_info["传值变量"]] = value #调用CheckUtils() result = CheckUtils(response).run_check(post_info[\'期望结果类型\'],post_info[\'期望结果\']) except ProxyError as e: result = {\'code\': 4, \'result\': \'[%s]请求:代理错误异常\' % (post_info["接口名称"])} except ConnectionError as e: result = {\'code\': 4, \'result\': \'[%s]请求:连接超时异常\' % (post_info["接口名称"])} except RequestException as e: result = {\'code\': 4, \'result\': \'[%s]请求:Request异常,原因:%s\' % (post_info["接口名称"], e.__str__())} except Exception as e: result = {\'code\':4,\'result\':\'[%s]请求:系统异常,原因:%s\'%(post_info["接口名称"],e.__str__())} return result def request(self,step_info): try: request_type = step_info["请求方式"] param_variable_list = re.findall(\'\\\\${\\w+}\', step_info["请求参数(get)"]) if param_variable_list: for param_variable in param_variable_list: step_info["请求参数(get)"] = step_info["请求参数(get)"]\\ .replace(param_variable,\'"%s"\' % self.temp_variables.get(param_variable[2:-1])) if request_type == "get": result = self.__get( step_info ) elif request_type == "post": data_variable_list = re.findall(\'\\\\${\\w+}\', step_info["提交数据(post)"]) if data_variable_list: for param_variable in data_variable_list: step_info["提交数据(post)"] = step_info["提交数据(post)"] \\ .replace(param_variable, \'"%s"\' % self.temp_variables.get(param_variable[2:-1])) result = self.__post( step_info ) else: result = {\'code\':1,\'result\':\'请求方式不支持\'} except Exception as e: result = {\'code\':4,\'result\':\'用例编号[%s]的[%s]步骤出现系统异常,原因:%s\'%(step_info[\'测试用例编号\'],step_info["测试用例步骤"],e.__str__())} return result def request_by_step(self,step_infos): self.temp_variables = {} for step_info in step_infos: temp_result = self.request( step_info ) # print( temp_result ) if temp_result[\'code\']!=0: break return temp_result if __name__=="__main__": case_info = [ {\'请求方式\': \'get\', \'请求地址\': \'/cgi-bin/token\', \'请求参数(get)\': \'{"grant_type":"client_credential","appid":"wxXXXXXxc16","secret":"XXXXXXXX"}\', \'提交数据(post)\': \'\', \'取值方式\': \'json取值\', \'传值变量\': \'token\', \'取值代码\': \'$.access_token\', \'期望结果类型\': \'正则匹配\', \'期望结果\': \'{"access_token":"(.+?)","expires_in":(.+?)}\'}, {\'请求方式\': \'post\', \'请求地址\': \'/cgi-bin/tags/create\', \'请求参数(get)\': \'{"access_token":${token}}\', \'提交数据(post)\': \'{"tag" : {"name" : "衡东"}}\',\'取值方式\': \'无\', \'传值变量\': \'\', \'取值代码\': \'\', \'期望结果类型\': \'正则匹配\', \'期望结果\': \'{"tag":{"id":(.+?),"name":"衡东"}}\'} ] RequestsUtils().request_by_step(case_info)
五、断言封装(核心封装)
check_utils.py 断言封装,与实际结果核对
import re import ast class CheckUtils(): def __init__(self,check_response=None): self.ck_response=check_response self.ck_rules = { \'无\': self.no_check, \'json键是否存在\': self.check_key, \'json键值对\': self.check_keyvalue, \'正则匹配\': self.check_regexp } self.pass_result = { \'code\': 0, \'response_reason\': self.ck_response.reason, \'response_code\': self.ck_response.status_code, \'response_headers\': self.ck_response.headers, \'response_body\': self.ck_response.text, \'check_result\': True, \'message\': \'\' # 扩招作为日志输出等 } self.fail_result = { \'code\': 2, \'response_reason\': self.ck_response.reason, \'response_code\': self.ck_response.status_code, \'response_headers\': self.ck_response.headers, \'response_body\': self.ck_response.text, \'check_result\': False, \'message\': \'\' # 扩招作为日志输出等 } def no_check(self): return self.pass_result def check_key(self,check_data=None): check_data_list = check_data.split(\',\') #把需要判断的值做切割,取出键值 res_list = [] #存放每次比较的结果 wrong_key = [] #存放比较失败key for check_data in check_data_list: #把切割的键值和取出响应结果中的所有的键一个一个对比 if check_data in self.ck_response.json().keys(): res_list.append(self.pass_result ) else: res_list.append( self.fail_result ) wrong_key.append(check_data) #把失败的键放进来,便于后续日志输出 # print(res_list) # print(wrong_key) if self.fail_result in res_list: return self.fail_result else: return self.pass_result def check_keyvalue(self,check_data=None): res_list = [] # 存放每次比较的结果 wrong_items = [] # 存放比较失败 items for check_item in ast.literal_eval(check_data).items(): #literal_eval()安全性的把字符串转成字典,items()取出键值对 if check_item in self.ck_response.json().items(): res_list.append( self.pass_result ) else: res_list.append( self.fail_result ) wrong_items.append(check_item) # print( res_list ) # print( wrong_items ) if self.fail_result in res_list: return self.fail_result else: return self.pass_result def check_regexp(self,check_data=None): pattern = re.compile(check_data) if re.findall(pattern=pattern,string=self.ck_response.text): #匹配到了,不为空,为true return self.pass_result else: return self.fail_result def run_check(self,check_type=None,check_data=None): code = self.ck_response.status_code if code == 200: if check_type in self.ck_rules.keys(): result=self.ck_rules[check_type](check_data) return result else: self.fail_result[\'message\'] = \'不支持%s判断方法\'%check_type return self.fail_result else: self.fail_result[\'message\'] = \'请求的响应状态码非%s\'%str(code) return self.fail_result if __name__=="__main__": # 检查键是否存在,{"access_token":"hello","expires_":7200} 设为响应结果,"access_token,expires_in" 为检查对象值 CheckUtils({"access_token":"hello","expires_":7200}).check_key("access_token,expires_in") #检查键值对是否存在 CheckUtils({"access_token":"hello","expires_i":7200}).check_keyvalue(\'{"expires_in": 7200}\') #正则对比 #TURE print(CheckUtils(\'{"access_token":"hello","expires_in":7200}\').check_regexp(\'"expires_in":(.+?)\')) #False print(CheckUtils(\'{"access_token":"hello","expires":7200}\').check_regexp(\'"expires_in":(.+?)\'))
六、api_testcase下的api_test.py 封装
import warnings import unittest import paramunittest from common.testdata_utils import TestdataUtils from common.requests_utils import RequestsUtils #如果是mysql数据源的话切换成 def_testcase_data_list_by_mysql() exccel数据源:def_testcase_data_list() case_infos = TestdataUtils().def_testcase_data_list_by_mysql() @paramunittest.parametrized( *case_infos ) class APITest(paramunittest.ParametrizedTestCase): def setUp(self) -> None: warnings.simplefilter(\'ignore\', ResourceWarning) #不会弹出警告提示 def setParameters(self, case_id, case_info): self.case_id = case_id self.case_info = case_info def test_api_common_function(self): \'\'\'测试描述\'\'\' self._testMethodName = self.case_info[0].get("测试用例编号") self._testMethodDoc = self.case_info[0].get("测试用例名称") actual_result = RequestsUtils().request_by_step(self.case_info) self.assertTrue( actual_result.get(以上是关于python_reques接口测试框架,Excel作为案例数据源的主要内容,如果未能解决你的问题,请参考以下文章