python---atp自动化测试框架

Posted 冒泡泡de可乐

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python---atp自动化测试框架相关的知识,希望对你有一定的参考价值。

代码结构:

bin---启动文件

cases---存放测试用例

conf ---配置文件

core --- 核心代码

logs --- 日志

report --- 存放测试结果

readme.txt --- 使用说明

requirements.txt-----保存所有安装的第三方模块,方便代码移植到其他电脑

写代码思路:

1、读excel取出所有用例

2、解析用例

      1) 解析入参: a =1,b=1,phone=<phone>  把入参变成字典 {\'a\'=1,\'b\':2,\'phone\':15326687945}
          url
         请求方式:post

      2) 发请求,获取到返回结果 MyRequest

      3) 解析返回   a、解析检查点   \'k=1,age=18,name!=abc\'     [[\'k\',\'=\',\'1\'],[\'age\',\'=\',\'18\'],[\'name\',\'!=\',\'abc\']]
                   b、获取实际结果做对比,检查用例通过还是失败

3、返回结果写入到excel中

4、生成报告,发邮件

atp测试缺点:不能测试有业务流程(即接口间有依赖)的情况

readme.txt

写用例的支持参数化,支持以下参数化:
<phone>   自动产生手机号
<id_card>  身份证号
<email>    邮箱
<cur_time>  当前时间戳

requirements.txt

xlrd
requests
jsonpath
xlutils
nnlog
yagmail

case_operation.py

import xlrd
from core.my_requests import MyRequest

\'\'\'
读exce获取用例
\'\'\'
def get_case(path):
    all_case = []  #定义一个二维数组,保存所有的测试用例
    book = xlrd.open_workbook(path)
    sheet = book.sheet_by_index(0)
    for i in range(1,sheet.nrows):    #第一行表头不要
        row_data = sheet.row_values(i)[4:8]  #切片(顾头不顾尾),取4-7列:请求url、请求方式、请求数据、预期结果
        all_case.append(row_data)
    return all_case   #[[url,get,data,check],[url,get,data,check]]

\'\'\'
发request请求
\'\'\'
def send_request(url,method,data,headers=None):
    req = MyRequest(url,data,headers=headers)   #实例化一个MyRequest实例
    if method.upper()=="POST":    #测试用例中的请求方式可能是大写或小写
        res = req.post()
    elif method.upper() ==\'GET\':
        res = req.get()
    else:
        res = {"data":"暂时不支持该方法!"}
    return res[\'data\']
View Code

my_requests.py    封装请求模块类  处理了异常,打印了日志

import requests
import nnlog
import os
from conf.setting import LOG_PATH
class MyRequest:
    log_file_name  = os.path.join(LOG_PATH,\'MyRequest.log\') #日志存放在cases文件夹下
    time_out = 10 #请求超时时间
    def __init__(self,url,data=None,headers=None,file=None):
        self.url = url
        self.data = data
        self.headers = headers
        self.file = file
    def post(self):
        try:
            req = requests.post(self.url,data=self.data,headers=self.headers,
                                files=self.file,timeout=self.time_out)
        except Exception as e:
            res = {"status":0,"data":e.args}  #0代表请求失败
        else:
            try:
               res = {"status":1,"data":req.json()} #1代表返回的json
            except Exception as e:
                res = {"staus":2,"data":req.text} #2代表返回不是json
        log_str = \'url: %s 请求方式:post  data:%s ,返回数据:%s\'%(self.url,self.data,res)
        self.write_log(log_str)
        return res

    def get(self):
        try:
            req = requests.get(self.url,params=self.data,headers=self.headers,timeout=self.time_out)
        except Exception as e:
            res = {"status":0,"data":e.args}  #0代表请求失败
        else:
            try:
               res = {"status":1,"data":req.json()} #1代表返回的json

            except Exception as e:
                res = {"staus":2,"data":req.text} #2代表返回不是json
        log_str = \'url: %s get请求 data:%s ,返回数据:%s\'%(self.url,self.data,res)
        self.write_log(log_str)
        return res

    @classmethod   #类方法
    def write_log(cls,content):
        log = nnlog.Logger(cls.log_file_name)
        log.debug(content)
View Code

parse_param.py

import random
import string
import time

#这个类是用来解析请求参数的
class ParseParam:

    func_map = [\'phone\',\'email\',\'id_card\',\'cur_time\']  #映射函数的

    def __init__(self,param):
        self.param = param
        self.parse()   #实例化时就做映射
    def phone(self):
        phone_starts = [\'134\',\'181\',\'138\',\'177\',\'150\',\'132\',\'188\',\'186\',\'189\',\'130\',\'170\',\'153\',\'155\']  #手机号开头3位固定
        start = random.choice(phone_starts)
        end = str(random.randint(0,99999999))  #产生手机号后8位
        res = start+ end.zfill(8)
        return res
    def email(self):
        email_end=[\'163.com\',\'qq.com\',\'126.com\',\'sina.com\']
        end = random.choice(email_end)
        start_str=\'ATP_test_\'
        email_start = \'\'.join(random.sample(string.ascii_letters+string.digits,6)) #从所有的大写字母、小写字母、数字中取6位 sample取到的是list,用join转成字典
        return start_str+email_start+\'@\'+end
    def id_card(self):
        \'\'\'这个产生身份证号的\'\'\'
        return 410881199011212121
    def cur_time(self):
        return int(time.time())  #time.time()精确的毫秒 int()取整
    def order_id(self):
        \'\'\'从数据库里面获取\'\'\'
        pass
    def session_id(self):
        \'\'\'从redis里面获取的\'\'\'
        pass
    def parse(self):
        for func in self.func_map:
            temp = str(getattr(self,func)()) #手机号
            self.param = self.param.replace(\'<%s>\'%func,temp)
    def strToDict(self):
        #这个函数是把请求参数转成字典的
        data ={}
        pl = self.param.split(\',\')
        for p in pl:
            temp = p.split(\'=\')
            if len(temp)>1:   #用例中不一定key value都写完整了,最好先判断是否都存在,否则会报错
                key,value = temp
                data[key] = value
        return data



if __name__ == \'__main__\':
    param = \'username=niuhanyang\' \\
            \',phone=<phone>,email=<email>\' \\
            \',id_card=<id_card>,start_time=\' \\
            \'<cur_time>\'
    p = ParseParam(param)
    data = p.strToDict()
    print(data)

    print(p.phone())
    res = getattr(p,\'phone\') #getattr()是一个内置函数,第一个参数是一个对象,第二个参数是一个字符串(方法的名字) 获取一个对象里面的属性(方法、变量)
    print(res())  #返回的res是一个函数名,加()就可以调用了

    import os,requests
    res = hasattr(requests,\'get\')#第一个参数-模块名 第二个参数-字符串(方法的名字)  判断某个模块、类下面有没有某个方法或者变量
    print(res)  #True
View Code

parse_response.py

import jsonpath

class ResponseParse:
    seqs = [\'!=\', \'>=\', \'<=\', \'=\', \'<\', \'>\', \'in\', \'notin\']
    #定义支持的运算符
    def __init__(self,response,check):   #response是请求返回实际值  check是用例中的期望值
        self.response = response
        self.check = check

    def format_check(self):
        #格式化检查信息,分别列出key 运算符 实际结果
        #会返回 [[\'error_code\',\'=\',\'0\'],[\'name\',\'!=\',\'xxx\']]
        format_list = []
        check_list = self.check.split(\',\')
        for s in check_list:
            for seq in self.seqs:
                if seq in s:
                    if len(s.split(seq))>1:
                        key, value = s.split(seq)
                        temp = [key, seq, value]
                        format_list.append(temp)
                        break
        return format_list

    def get_real_value(self,key):
        #从字典里面获取key对应的value
        res = jsonpath.jsonpath(self.response,\'$..%s\'%key) #$..%s这个是jsonpath这个模块的用法
        if res:
            return res[0]
        return \'找不到该key【%s】\'%key

    def operation_check(self,real,seq,hope):
        #根据运算符判断结果
        msg = "判断信息:%s %s %s "%(real,seq,hope)
        real = str(real)#注意:为了保持类型一致  返回值从字典中取出可能是int类型,先转成str  hope从字符串中取出来的,一定是字符串
        if seq == \'=\':
            status = real == hope
        elif seq == \'!=\':
            status = real != hope
        elif seq ==\'in\':
            status = real in hope
        elif seq == \'notin\':
            status = real not in hope
        else:
            status,msg = self.num_check(real,seq,hope)
        return status,msg

    def num_check(self,real,seq,hope):
        #判断数值类型的  > <  >=  <=
        msg = "判断信息:%s %s %s "%(real,seq,hope)
        try:
            real = float(real)
            hope = float(hope)
        except Exception as e:
            msg = "比较时出错,大小比较只能是数字类型!" \\
                  "%s %s %s"%(real,seq,hope)
            status = False
        else:
            if seq == \'>\':
                status = real > hope
            elif seq == \'<\':
                status = real < hope
            elif seq == \'<=\':
                status = real <= hope
            else:
                status = real >= hope
        return status,msg

    def check_res(self):
        #校验所有的检查点
        check_list = self.format_check()   #format_check()返回的是一个二维数组,循环取出每条出来判断
        # [[\'error_code\', \'=\', \'0\'], [\'name\', \'!=\', \'xxx\']]
        all_msg=\'\'
        for check in check_list:#循环所有的检查点
            key,seq,hope = check
            real = self.get_real_value(key)  #在response里找到key对应的值
            status,msg = self.operation_check(real,seq,hope)
            all_msg = all_msg+msg+\'\\n\' #累加提示信息
            if status:
                pass
            else:
                return \'失败\',all_msg
        return \'通过\',all_msg    #所有的check点都pass才返回pass,有一个失败就返回fail
View Code

tool.py

import xlrd
from xlutils.copy import copy
import os
import datetime
from conf import setting
import yagmail

def make_today_dir():
    #创建当天的文件夹,返回绝对路径
    today = str(datetime.date.today())
    #c:/xxx/xxx/atp/report/2018-11-24/测试用例.xls
    abs_path = os.path.join(setting.REPORT_PATH,today)
    #拼成当天的绝对路径
    if os.path.exists(abs_path):
        pass
    else:
        os.mkdir(abs_path)
    return abs_path

def write_res(case_path,case_res):
    #c:/xxx/xxx/atp/cases/测试用例.xls
    #[ [\'{"xdfsdf}\',\'通过\'],[\'{"xdfsdf}\',\'失败\'] ]
    book = xlrd.open_workbook(case_path)
    new_book = copy(book)
    sheet = new_book.get_sheet(0)   #xlutils 里不能用 sheet_by_index()方法
    for row,res in enumerate(case_res,1):    #1 表示row要从1开始取
        response,status = res
        # 写第8列和第9列
        sheet.write(row,8,response)
        sheet.write(row,9,status)

    cur_date_dir = make_today_dir()#创建当前文件夹,并且返回绝对路径
    file_name = os.path.split(case_path)[-1] #只获取到filename
    cur_time = datetime.datetime.today().strftime(\'%H%M%S\') #获取到当天时分秒
    new_file_name = cur_time+\'_\'+file_name #165530_测试用例.xls
    real_path = os.path.join(cur_date_dir,new_file_name)#拼路径 测试报告放在report目录下
    new_book.save(real_path)
    return real_path

def send_mail(content,file_path=None):
    #发邮件,传入邮件正文和附件
    m = yagmail.SMTP(**setting.MAIL_INFO,)
    subject = \'接口测试报告_%s\'%str(datetime.datetime.today())
    m.send(subject=subject,to=setting.TO,contents=content,attachments=file_path)
View Code

setting.py

import os

#常量定义时大写
BAE_PATH  = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) #atp的目录
LOG_PATH = os.path.join(BAE_PATH,\'logs\') #log目录
CASE_PATH = os.path.join(BAE_PATH,\'cases\') #case目录
REPORT_PATH = os.path.join(BAE_PATH,\'report\') #report目录

#发邮件相关信息
MAIL_INFO = {                 #定义为字典,调用时用**MAIL_INFO即可
    \'user\':\'15xxx3@qq.com\',
    \'password\':\'xxxmqrdgjcd\',
    \'host\':\'smtp.qq.com\',  #163邮箱  smtp.163.com
    \'smtp_ssl\':True,    #发件箱是qq邮箱的话,为True
}

TO = [\'xxxx@qq.com\',\'4xxx7026@qq.com\']


HOST = {
    \'QA\':\'http://api.nnzhp.cn\',  #测试环境
    \'DEV\':\'http://dev.nnzhp.cn\', #开发环境
    \'PRE\':\'http://dev.nnzhp.cn\'  #预生产环境
}

default_host = HOST.get(\'QA\')  #默认用测试环境
View Code

start.py---单线程 

#首先将该project目录加入到python环境变量
import os,sys
BAE_PATH  = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) #atp的目录
sys.path.insert(0,BAE_PATH)

from conf.setting import CASE_PATH,default_host
from core import case_operation,parse_param,parse_response
from core import tools
import glob
class RunCase:
    content = \'\'\'
    各位好!
        本次测试结果:总共运行%s条用例,通过%s条,失败%s条。详细信息见附件。
    \'\'\'
    def get_excel(self):
        all_excel_case_count = 0  #存放所有测试用例的次数
        all_excel_success_count = 0  #存放所有测试用例成功的次数
        report_path_list = []  #存放所有附件地址
        #s=\'/Users/nhy/test*.xls\'
        for excel in glob.glob(os.path.join(CASE_PATH,\'test*.xls\')):   #glob模块过滤--只要以test开头,.xls结尾的文件
            cases = case_operation.get_case(excel)#调用读取excel的函数
            results = self.send_requests(cases) #发送请求,并校验结果  返回一个二维数组[[real_res,status],[real_res,status],.....]
            report_file_path = tools.write_res(excel,results)#写入结果,返回的是测试结果的路径
            report_path_list.append(report_file_path)
            all_count = len(cases) #本次循环的excel中总共多条用例
            all_excel_case_count += all_count
            all_excel_success_count += self.success_count
        all_excel_fail_count = all_excel_case_count - all_excel_success_count
        content = self.content % (all_excel_case_count, all_excel_success_count, all_excel_fail_count)
        tools.send_mail(content,report_path_list)

    def send_requests(self,cases):   #发送多个请求
           #[[url,get,data,check],[url,get,data,check]]
        self.success_count = 0  #类变量,保存成功用例条数
        results = []
        for case in cases:
            url,method,param,check = case #获取到每条用例的参数
            p = parse_param.ParseParam(param) #解析请求参数  实例化一个ParseParam实例 ,实例化的过程中就将里面的<phone>,<email>等替换了
            data = p.strToDict()#请求参数转成字典
            url = default_host + url   #用例中的url只有接口名,没有IP,IP可以在setting文件中根据需要配置测试/开发/预生产环境
            response = case_operation.send_request(url,method,data)#发请求 send_request()发送单个请求,返回结果是一个string
            #下面这2行代码是判断用例执行是否通过的
            p2 = parse_response.ResponseParse(response,check)  #实例化一个ResponseParse实例
            status, msg = p2.check_res()#调用写好的校验结果方法,
            real_res = str(response)+\'\\n\'+msg #是把校验的信息和返回的json拼到一起
            results.append([real_res,status]) #这里面的小list是每一个用例运行的结果
            if status == \'通过\':
                self.success_count += 1 #统计成功的次数
        return results #返回运行的结果

    def main(self):
        print(\'开始测试\'.center(50,\'*\'))
        self.get_excel()
        print(\'测试结束\'.center(50,\'*\'))

if __name__ == \'__main__\':
    run = RunCase()
    run.main()
View Code

 start.py---多线程