python接口测试-项目实践

Posted dinghanhua

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python接口测试-项目实践相关的知识,希望对你有一定的参考价值。

脱敏后脚本

 

projectapi.py: 项目接口类

# -*- coding:utf-8 -*-
"""
xx项目接口类
2018-11
dinghanhua
"""

import requests
import re
import pymssql


#region 工具类函数
def findinfo_from_apiquerystockdetailinfo(str1,str2):
    """
    从str1中找第一个"str2":...后面的值
    :param str1:
    :param str2:
    :return: str2对应的值
    """
    pattern1 = "+str2 + ":(.*?)," #左右边界
    result = re.search(pattern1, str1) #正则匹配
    if result:
        result = result.group(1).replace(",‘‘)
    return result


def get_last_value_of_key(resultlist,key):
    ‘‘‘
    从二维数组取第一行的元素对应的最后一行的值
    :param resultlist:
    :param key:
    :return: value
    ‘‘‘
    for i in range(0,len(resultlist[0])):
        if key == resultlist[0][i]:   #第一行中找到对应字段名的索引
            result = resultlist[-1][i]
            return result #返回数组最后一行对应的值


def round_test(data,i=0):
    ‘‘‘
    四舍五入,解决round(7.35)=7.3的问题
    :param data:
    :param i: 保留的位数,默认保留一位小数
    :return:
    ‘‘‘
    if isinstance(i,int): #i是整数
        raise Exception(the second param must be int)
    else:
        mi = 10**i
        f = data*mi - int(data*mi)
        if f >=0.5:
            res = (int(data*mi)+1)/mi
        elif f <=-0.5:
            res = (int(data*mi-1))/mi
        else:
            res = int(data*mi)/mi
        if i<=0:
            res = int(res)
    return res
# endregion

class ProjectApi:
    def api_querystockdetailinfo(self,stockcode):
        """
        请求并提取股票基本信息接口数据
        :param stockcode:
        :return: 截取信息dict
        """
        api = http://testdomain/querystockdetailinfo?stockcode={stockcode}.format(stockcode = stockcode)
        response = requests.get(api)
        result = response.text.replace(r
,‘‘).replace(\, ‘‘)  # 去掉特殊字符
,
        result_dict = {stockcode: stockcode}

        #股票名称
        result_dict[StockName] = findinfo_from_apiquerystockdetailinfo(result, StockName)

        if result_dict[StockName]: #股票名称存在继续处理其他字段,否则报错并返回

            # 公司概要 #剔除公司概要中“公司”“公司是”、“公司是一家”高度重复的内容
            overviewvalue = result_dict[OverviewValue] = findinfo_from_apiquerystockdetailinfo(result, OverviewValue)

            if overviewvalue.startswith(公司是一家):
                result_dict[OverviewValue] = overviewvalue[5:]
            elif overviewvalue.startswith(公司是):
                result_dict[OverviewValue] = overviewvalue[3:]
            elif overviewvalue.startswith(公司):
                result_dict[OverviewValue] = overviewvalue[2:]

            if not overviewvalue.endswith(): #判断最后是否有句号,没有加一个
                result_dict[OverviewValue] += 

            # 市值
            typecap = findinfo_from_apiquerystockdetailinfo(result, TypeCap)
            dictcap = {1: 巨盘, 2: 大盘, 3: 中盘, 4: 小盘, 5: 微盘}
            result_dict[TypeCap] = dictcap[typecap]

            # 风格
            TypeStyle = result_dict[TypeStyle] = findinfo_from_apiquerystockdetailinfo(result, TypeStyle)
            dictstyle = {1: 成长, 2: 价值, 3: 周期, 4: 题材, 5: 高价值}
            if len(TypeStyle) == 1:
                result_dict[TypeStyle] = dictstyle[TypeStyle]
            elif len(TypeStyle) >1:
                typestylelist = TypeStyle.split(,)  #风格可能有多个
                for t in range(len(typestylelist)):
                        typestylelist[t] = dictstyle[typestylelist[t]]
                result_dict[TypeStyle] = .join(typestylelist)

            # 生命周期 LifecycleValue 生命周期(单选,例:1);(1初创期、2成长期、3成熟期、4衰退期)")
            LifecycleValue = findinfo_from_apiquerystockdetailinfo(result, LifecycleValue)
            dictlifecycle = {1: 初创期, 2: 成长期, 3: 成熟期, 4: 衰退期, 5: 周期底部, 6: 周期顶部, 7: 周期向下, 8: 周期向上}
            if LifecycleValue:
                result_dict[LifecycleValue] = dictlifecycle[LifecycleValue]

            # 估值 ScoreTTM 估值(分值1~5,>=4 偏低, <=2 偏高,其他适中)")
            ScoreTTM = findinfo_from_apiquerystockdetailinfo(result, ScoreTTM)
            if ScoreTTM:
                if float(ScoreTTM) >= 4:
                    result_dict[ScoreTTM] = 偏低
                elif ScoreTTM and float(ScoreTTM) <= 2:
                    result_dict[ScoreTTM] = 偏高
                else:
                    result_dict[ScoreTTM] = 适中

            # 成长指数 ScoreGrowing  成长指数(分值1~5,>=4 高, <=2 低,其他中)‘)
            ScoreGrowing = findinfo_from_apiquerystockdetailinfo(result, ScoreGrowing)
            if ScoreGrowing:
                if float(ScoreGrowing) >= 4:
                    result_dict[ScoreGrowing] = 
                elif float(ScoreGrowing) <= 2:
                    result_dict[ScoreGrowing] = 
                else:
                    result_dict[ScoreGrowing] = 
            else:
                result_dict[ScoreGrowing]=‘‘

            # 盈利能力
            ScoreProfit = findinfo_from_apiquerystockdetailinfo(result, ScoreProfit)  # ‘   ScoreProfit  盈利能力(分值1~5,>=4 高, <=2 低,其他中)‘ )
            if ScoreProfit:
                if float(ScoreProfit) >= 4:
                    result_dict[ScoreProfit] = 
                elif float(ScoreProfit) <= 2:
                    result_dict[ScoreProfit] = 
                else:
                    result_dict[ScoreProfit] = 
            else:
                result_dict[ScoreProfit]=‘‘

        return result_dict


    def api_finance(self,stockcode):
        """
        请求并提取财务数据
        :param stockcode:
        :return: dict
        """
        api = http://testdomain/finance?stockcode={stockcode}.format(stockcode = stockcode)
        response = requests.get(api)
        response.encoding = utf-8-sig
        result = response.json()[data][0][result]  # 转化为二位数组
        result_dict = {stockcode: stockcode} #存储返回结果

        if len(result) <3: #说明股票没数据
            return result_dict

        # 当期报告期
        result_dict[EndDate] = get_last_value_of_key(result, EndDate)
        # 预测收益报告期
        ReportPeriod = get_last_value_of_key(result, ReportPeriod)
        dictreportperoid = {03/31: 一季度, 06/30: 上半年, 09/30: 前三季度, 12/31: 本年度}
        if ReportPeriod and ReportPeriod != result_dict[EndDate]: #预测收益报告期不等于当期报告期
            ReportPeriod = get_last_value_of_key(result, ReportPeriod)[5:10]
            result_dict[ReportPeriod] = dictreportperoid[ReportPeriod]
        else:
            result_dict[ReportPeriod] = ‘‘
        # 预测业绩情况
        PerformanceType = get_last_value_of_key(result, PerformanceType)
        result_dict[PerformanceType] = PerformanceType
        # 预测业绩比例
        result_dict[PerformanceTypeRange] = get_last_value_of_key(result, PerformanceTypeRange)
        # 营业收入增长率
        result_dict[OperatingRevenueYoY] = get_last_value_of_key(result, OperatingRevenueYoY)
        # 营业利润增长率
        result_dict[NetProfitYoY] = get_last_value_of_key(result, NetProfitYoY)
        # 净资产收益率
        result_dict[ROE] = get_last_value_of_key(result, ROE)
        # 毛利率
        result_dict[SalesGrossMargin] =  get_last_value_of_key(result, SalesGrossMargin)

        return result_dict


    def api_quote(self,stockcode):
        """
        请求并提取PETTM
        :param stockcode:
        :return: dict
        """

        api = http://testdomain/quote?stockcode={stockcode}.format(stockcode=stockcode)
        response = requests.get(api)
        response.encoding = utf-8-sig
        result = response.json()[data][0][result]  # 转化为二位数组
        result_dict = {stockcode:stockcode}
        if len(result) <3: #说明股票没数据
            return result_dict
        result_dict[PETTM] = get_last_value_of_key(result, PE1)
        return result_dict

    def result_of_3sourceapi(self,stockcode):
        """
        拼接3个接口取出的字串
        :param stockcode:
        :return:
        """
        result_stockdetailinfo = self.api_querystockdetailinfo(stockcode)
        if result_stockdetailinfo[StockName]: #如果股票名称存在,执行后续步骤
            result_finance = self.api_finance(stockcode)
            result_quote = self.api_quote(stockcode)

            #显示三个接口结果
            #print(result_stockdetailinfo)
            #print(result_finance)
            #print(result_quote)

            #空值、精度处理
            OperatingRevenueYoY = round_test(float(result_finance[OperatingRevenueYoY]),1)
            NetProfitYoY = round_test(float(result_finance[NetProfitYoY]),1)

            if result_finance[ReportPeriod] ==‘‘                    or result_finance[PerformanceType] == ‘‘                    or result_finance[PerformanceTypeRange] == ‘‘:
                ReportPeriod  = PerformanceType = PerformanceTypeRange = ‘‘
            else:
                ReportPeriod = ,预计 + result_finance[ReportPeriod]
                PerformanceType = 业绩 + result_finance[PerformanceType]
                PerformanceTypeRange = result_finance[PerformanceTypeRange]

            if result_finance[ROE]:
                ROE = ,净资产收益率:{0}%.format(round_test(float(result_finance[ROE]),1))
            else:
                ROE = ‘‘

            if result_finance[SalesGrossMargin]:
                SalesGrossMargin = ,毛利率:{0}%.format(round_test(float(result_finance[SalesGrossMargin]),1))
            else:
                SalesGrossMargin = ‘‘


            result = {OverviewValue} {TypeCap}{TypeStyle}股,处于{LifecycleValue}。                      估值{ScoreTTM},PE(TTM):{PETTM};                      成长性{ScoreGrowing},当期营收增长:{OperatingRevenueYoY}%,当期利润增长:{NetProfitYoY}%;                      盈利能力{ScoreProfit}{ROE}{SalesGrossMargin}{ReportPeriod}{PerformanceType}{PerformanceTypeRange}。                .format(OverviewValue = result_stockdetailinfo[OverviewValue],
                        TypeCap = result_stockdetailinfo[TypeCap],
                        TypeStyle = result_stockdetailinfo[TypeStyle],
                        LifecycleValue = result_stockdetailinfo[LifecycleValue],
                        ScoreTTM = result_stockdetailinfo[ScoreTTM],
                        PETTM = round_test(float(result_quote[PETTM])),
                        ScoreGrowing = result_stockdetailinfo[ScoreGrowing],
                        OperatingRevenueYoY = OperatingRevenueYoY,
                        NetProfitYoY = NetProfitYoY,
                        ScoreProfit = result_stockdetailinfo[ScoreProfit],
                        ROE = ROE,
                        SalesGrossMargin=SalesGrossMargin,
                        ReportPeriod = ReportPeriod,
                        PerformanceType = PerformanceType,
                        PerformanceTypeRange = PerformanceTypeRange)

            return result
        else:
            return  不存在该股票数据



    def api_of_dev(self,stockcodelist,cookie,page=0,domain=‘testdomain.cn):
        """
        获取开发接口数据
        :param 股票列表;cookie;domain默认线上
        :return: 股票代码及数据
        """
        headers = {Cookie:cookie}
        url = http://{domain}/getstockbypage?StockCodeList={stockcodelist}&PageIndex={page}&PageSize=10.format(stockcodelist = stockcodelist,domain = domain,page=page)

        response = requests.get(url, headers=headers)
        jsonstr = response.json()# 转成json,取出message
        message = jsonstr[Message]
        dict_result = {}

        if message:
            for ele in message:
                 stockcode = ele[StockCode]
                 content = ele[Content]  # 实际结果
                 nickname = ele[NickName] #发布者
                 if nickname == ‘project000:
                    dict_result[stockcode] = content

        return dict_result

    def compare_vs_devapi(self,stockcodelist,cookie,page=0,domain=‘testdomain.cn):
        """
        开发接口数据与接口拼接数据对比
        :return: bool
        """
        diff_list = []  # 存储不一致的股票代码

        resultofdev = self.api_of_dev(stockcodelist,cookie,page,domain) #请求开发接口
        if resultofdev: #如果开发结果不为空
            for stock,actual in resultofdev.items():
                expected = self.result_of_3sourceapi(stock) #数据源拼接结果

                ‘‘‘去掉pe(ttm)对比‘‘‘
                beginindex = actual.find(PE(TTM))
                endindex = actual.find(, beginindex)
                actual_result = actual[:beginindex] + actual[endindex:]

                beginindex = expected.find(PE(TTM))
                endindex = expected.find(, beginindex)
                expected_result = expected[:beginindex] + expected[endindex:]

                if actual_result != expected_result: #预期实际对比
                    print(stock)
                    print(开发:,actual_result)
                    print(预期:,expected_result)
                    diff_list.append(stock)
                else:
                    print(stock,一致(不含PETTM))

            if diff_list: #异常股票列表不为空则输出;空则提示全部一致
                print(不一致的股票列表:, diff_list)
                return False
            else:
                print(对比结果:数据一致)
                return True
        else:
            print(接口没有数据)
            return True

    def compare_vs_database(self,count=10):
        """
        比较数据库数据与数据源拼接字串
        :param count:对比股票数量,default=10
        :return:True 一致;False 不一致
        """

        diff_list = []  # 存储不一致的股票代码

        with pymssql.connect(server=192.168.1.1, user=sa, password=sa,
                             database=test_db) as myconnect:
            with  myconnect.cursor(as_dict=True) as cursor:
                cursor.execute("""SELECT top {count} StockCode,StockName,content 
                FROM [test_db].[dbo].[table] 
                where NickName =‘project000‘ and isvalid = 1 and IsDeleted =0 order by createtime desc""".format(count=count))
                for row in cursor:
                    stockcode = row[StockCode]
                    actual = row[content]
                    expected = self.result_of_3sourceapi(stockcode)  # 数据源拼接结果

                    ‘‘‘去掉pe(ttm)对比‘‘‘
                    beginindex = actual.find(PE(TTM))
                    endindex = actual.find(, beginindex)
                    actual_result = actual[:beginindex] + actual[endindex:]
                    beginindex = expected.find(PE(TTM))
                    endindex = expected.find(, beginindex)
                    expected_result = expected[:beginindex] + expected[endindex:]

                    if actual_result != expected_result:  # 预期实际对比
                        print(stockcode)
                        print(开发:, actual_result)
                        print(预期:, expected_result)
                        diff_list.append(stockcode)
                    else:
                        print(stockcode, 一致(不含PETTM))

        if diff_list:
            print(不一致的股票列表:, diff_list)
            return False
        else:
            print(对比结果:数据全部一致)
            return True

 

 

run_test.py 执行脚本:

# coding:utf-8
"""
接口测试执行脚本
"""

import projectapi
import unittest


class ApiTest(unittest.TestCase):

    def setUp(self):
        self.projectapi1 = projectapi.ProjectApi() # 创建接口对象

    def testcase1(self):
        """与开发接口比对"""
        stockcodelist = 600000%2C600128%2C600146%2C600165%2C600186
        #通过抓包获取cookie
        cookie = globalid=24A85DEC-AF25-36DD-C774-ED092F705767
    
        testresult = self.projectapi1.compare_vs_devapi(stockcodelist,cookie)
        self.assertTrue(testresult)

    def testcase2(self):
        # 与数据库对比
        testresult = self.projectapi1.compare_vs_database(count=10)
        self.assertTrue(testresult)

    def testcase3(self):
        """手工查询原数据和拼接字串"""
        while True:
            stockcode = input(输入股票代码: )
            if stockcode.isdigit() and len(stockcode)==6 and stockcode[:2] in (00,60,30):
                print(数据请求中.....)
                print(self.projectapi1.api_quote(stockcode))
                print(self.projectapi1.api_finance(stockcode))
                print(self.projectapi1.api_querystockdetailinfo(stockcode))
                print(self.projectapi1.result_of_3sourceapi(stockcode))
            else:
                print(股票代码有误)

 

以上是关于python接口测试-项目实践的主要内容,如果未能解决你的问题,请参考以下文章

接口测试实践回顾

web接口测试实践回顾

接口测试框架实践(Python)

使用python requests库写接口自动化测试--记录学习过程中遇到的坑

接口测试实践篇

Flask 编写http接口api及接口自动化测试