利用pytest hook函数实现自动化测试结果推送企业微信

Posted 大刚测试开发实战

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用pytest hook函数实现自动化测试结果推送企业微信相关的知识,希望对你有一定的参考价值。

前言


通常,自动化测试用例在执行完成后,都会发送一个结果,以通知测试人员或测试leader测试的结果。如有测试失败的情况,测试人员再去查看具体的测试报告,检查是哪个场景没有测试通过。当前较为流行的提醒方式有:

  • 邮件
  • 企业微信、钉钉等push消息

由于我们公司所使用的办公软件是企业微信,因此,在实现测试结果通知提醒的功能时,选用的是企业微信。当前较为流行的实现方式有两种形式:

  • 企业微信应用通知:需要在企业微信中创建一个应用,再获取Secret
  • 普通群消息推送:需要在群中添加一个群机器人(会自动生成webhook_url,以供后续接口调用)

由于方式一需要在企业微信中创建应用(需要管理员操作权限),总体实现起来较为繁琐,因此我选用的是第二种群机器人的实现方式


一、实现原理及实现效果

1.外部链路流程

利用pytest

2.内部调用原理及过程

利用pytest

1)各模块&方法功能:
  • RedisHandler基类:用于初始化redis连接、查询数据、写入数据
  • CaseCount基类:用于初始化用例统计、获取统计成功&失败&跳过&报错的用例数、计算用例通过率
  • EnterpriseWechatNotification基类:用于定义发送企业微信消息的内容模板、定义调用hook_url(群机器人)发送消息方法
  • hook方法pytest_runtest_makereport:用于获取pytest执行后的测试结果、将结果写入缓存、生成控制台测试报告
  • hook方法pytest_terminal_summary:用于获取执行结果、调用发送消息方法发送微信消息
2)具体调用原理、流程:

① 前提:

  • 已添加企业微信群机器人,并记住hook地址;
  • python+pytest已编写测试用例;

利用pytest

② pytest运行测试用例,RedisHandler连接redis                                     ,pytest_runtest_makereport获取用例执行结果,并:

  • 调用RedisHandler中的写入缓存方法,将结果写入缓存;
  • 调用CaseCount中的计算用例通过率方法获取用例通过率;
  • 将获取到的各条测试结果分输出到控制台进行展示:↓(Windows本地运行效果)

利用pytest

③ pytest_terminal_summary方法:

  • 分别调用CaseCount中的获取通过、失败、跳过、报错的用例条数的方法(此方法调用RedisHandler中的get_key方法),获取到各个(通过、失败、跳过、报错)执行结果统计;
  • 调用CaseCount中的计算用例通过率方法获取用例通过率;
  • 调用EnterpriseWechatNotification中的发送企业微信消息方法,将获取到的各个(通过、失败、跳过、报错)执行结果的数量统计与EnterpriseWechatNotification中预定义的模板进行拼接,发送到企业微信;
  • 将获取到的各个(通过、失败、跳过、报错)执行结果与用例通过率一起,输出到控制台展示:↓(Windows本地运行效果)

利用pytest

二、编码实现

1.各个基类

RedisHandler基类:

import redis


class RedisHandler:
def __init__(self, host, port=6379, db=0):
self.client = redis.StrictRedis(host=host, port=port, db=db) # 生成客户端连接,StrictRedis()默认使用连接池,不必再单独使用ConnectPool

def set_string(self, name: str, value, ex=None, px=None, nx=False, xx=False) -> None:
"""
缓存中写入str(单个)
:param name: 缓存名称
:param value: 缓存值
:param ex: 过期时间(秒)
:param px: 过期时间(毫秒)
:param nx: 如果设置为True,则只有name不存在时,当前set操作才执行(新增)
:param xx: 如果设置为True,则只有name不存在时,当前set操作才执行(修改)
:return:
"""
self.client.set(name, value=value, ex=ex, px=px, nx=nx, xx=xx)

def incr(self, key):
"""
使用incr方法,处理并发问题
当key不存在时,会先初始为0,每次调用,则会+1
:param key:
:return:
"""
self.client.incr(key)

def get_key(self, name):
"""读取缓存"""
result = self.client.get(name)
return result


CaseCount基类:

from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig


class CaseCountName:
ERROR: str = "error_count"
FAILED: str = "failed_count"
PASSED: str = "passed_count"
SKIP: str = "skip_count"
TOTAL: str = "total_count"


class CaseCount:
"""
redis 缓存统计用例执行情况
"""

def __init__(self):
self.redis = RedisHandler(host=DBConfig.redis_config.get(host)) # redis主机地址可以写死在这里,也可以从配置类中获取

def init_process(self):
"""
初始化进度、总数、成功数、失败数
"""
self.redis.set_string(CaseCountName.TOTAL, 0)
self.redis.set_string(CaseCountName.SKIP, 0)
self.redis.set_string(CaseCountName.PASSED, 0)
self.redis.set_string(CaseCountName.FAILED, 0)
self.redis.set_string(CaseCountName.ERROR, 0)

def failed_count(self):
"""失败用例数"""
return int(self.redis.get_key(CaseCountName.FAILED))

def passed_count(self):
"""通过用例总数"""
return int(self.redis.get_key(CaseCountName.PASSED))

def skip_count(self):
"""跳过用例数"""
return int(self.redis.get_key(CaseCountName.SKIP))

def error_count(self):
"""报错用例数"""
return int(self.redis.get_key(CaseCountName.ERROR))

def total_count(self):
"""用例总数"""
return int(self.redis.get_key(CaseCountName.TOTAL))

def pass_rate(self):
"""用例成功率"""
try:
rate = round((self.passed_count() + self.skip_count()) / self.total_count() * 100, 2)
return rate
except ZeroDivisionError:
raise Exception("执行失败,未检测到用例执行数量")


EnterpriseWechatNotification基类:

import os
import json
import requests
import platform


def get_env_from_jenkins(name, base=):
"""从Jenkins中获取全局环境变量"""
return os.getenv(name) and os.getenv(name).strip() or base


ProjectName = get_env_from_jenkins("JOB_NAME") # Jenkins构建项目名称
BUILD_URL = get_env_from_jenkins("BUILD_URL") # Jenkins构建项目URL
BUILD_NUMBER = get_env_from_jenkins("BUILD_NUMBER") # Jenkins构建编号


class EnterpriseWechatNotification:
def __init__(self, hook: list):
# 企业微信群机器人的hook地址,一个机器人就一个,多个就定义多个,可以写死,也可以写在配置类中
self.hook_url_list = [f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=i" for i in hook]
# allure生成报告的地址,Jenkins执行时会用到,Windows暂未配置allure地址
self.allure_url = f"http://192.168.1.122:8088/jenkins/job/ProjectName/BUILD_NUMBER/allure/"
self.header = Content-Type: application/json

def send_msg(self, result=):
"""发送企业微信消息通知"""
global payload
linux_content = f"""** 【ProjectName】**
> 项目名称:ProjectName
> 构件编号:#BUILD_NUMBER
> 测试环境:platform.system()
> [报告链接](self.allure_url)
> [控制台链接](BUILD_URL)
result"""

windows_content = f"""** 【auto_test_project】**
> 测试环境:platform.system()
result"""
if platform.system() == "Linux":
payload =
"msgtype": "markdown",
"markdown":
"content": linux_content


elif platform.system() == "Windows":
payload =
"msgtype": "markdown",
"markdown":
"content": windows_content


for hook_url in self.hook_url_list:
requests.post(url=hook_url, headers=self.header, data=json.dumps(payload))

注意事项: get_env_from_jenkins方法为从Jenkins获取全局变量,查看全局变量的路径为:Jenkins流水线语法-全局变量-env,见下图:

利用pytest

2.pytest的hook方法,定义在conftest.py中

import time
import pytest
from api_test.config.config import HookUrlConf
from api_test.common.send_enterprise_wechat import EnterpriseWechatNotification
from api_test.common.redis_handler import RedisHandler
from api_test.config.db_config import DBConfig
from api_test.common.case_count_control import CaseCountName, CaseCount

redis = RedisHandler(host=DBConfig.redis_config.get(host))
case_count = CaseCount()
case_count.init_process() # 初始化Redis中的用例统计缓存数据


@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item, call):
"""获取测试结果、生成测试报告"""
print(------------------------------------)
out = yield
report = out.get_result()
if report.when == call:
# print(f"测试报告:report")
# print(f"步骤:report.when")
print(f"用例id:report.nodeid")
print(f"用例描述:str(item.function.__doc__)")
print(f"运行结果:report.outcome")
"""将用例执行结果写入缓存"""
if report.outcome == passed:
redis.incr(CaseCountName.PASSED)
if report.outcome == failed:
redis.incr(CaseCountName.FAILED)

if report.when == setup:
if report.outcome == skipped:
redis.incr(CaseCountName.SKIP)

CaseCount().total_run_count()


def pytest_terminal_summary(terminalreporter, exitstatus, config):
"""收集测试结果,从Redis缓存数据中获取"""
total_case = case_count.total_count()
pass_case = case_count.passed_count()
fail_case = case_count.failed_count()
skip_case = case_count.skip_count()
error_case = case_count.error_count()
pass_rate = case_count.pass_rate()
run_time = round((time.time() - terminalreporter._sessionstarttime), 2)
print("******用例执行结果统计******")
print(f"总用例数:total_case条")
print(f"通过:pass_case条")
print(f"失败:fail_case条")
print(f"跳过:skip_case条")
print(f"报错:error_case条")
print(f"用例通过率:pass_rate%")
print(f"用时:run_times")
desc = """
本次执行情况如下:
总用例数为:
通过用例数:<font color=\\"info\\">条</font>
失败用例数:<font color=\\"warning\\">条</font>
错误用例数:
跳过用例数:
通过率为: %
用时:s
""".format(total_case, pass_case, fail_case, error_case, skip_case, pass_rate, run_time)
EnterpriseWechatNotification(hook=HookUrlConf.HOOK_URL.value).send_msg(desc) # 执行结果发送企业微信


三、运行过程与运行效果

1.运行过程

  • Windows本地运行
  • Jenkins触发运行


2.企业微信接收到测试结果通知:

  • 通过Jenkins触发运行的通知效果:↓

利用pytest

  • Windows本地手动触发运行的通知效果:↓

利用pytest

小结


以上就是利用pytest的hook函数(pytest_runtest_makereport、pytest_terminal_summary‍)+redis,实现发送测试结果到企业微信的原理及过程,当然还有一些不足之处,如:

  • 不管是接口自动化测试还是UI自动化测试都可以通过这种方式来实现消息通知;
  • 除了在代码中调用pytest hook函数实现消息通知外,Jenkins也可以通过安装插件达到邮件通知、执行Python脚本达到企微消息通知的目的;
  • 测试结果的存储不一定要用到redis,也可以写在本地文件等,因为多一层调用,就多一层处理和可能面临的报错,另外redis所在服务器连接出错也会影响用例的正常运行;
  • 发送消息的内容样式支持Markdown,发送内容还可以继续优化,比如:通知哪条用例报错等等;

更多实用干货,同步首发于微信公众号【测试开发实战】,欢迎关注!

利用pytest

以上是关于利用pytest hook函数实现自动化测试结果推送企业微信的主要内容,如果未能解决你的问题,请参考以下文章

pytest文档35-Hooks函数之统计测试结果(pytest_terminal_summary)

pytest文档54-Hooks函数terminal打印测试结果(pytest_report_teststatus)

pytest常用hook函数

pytest文档33-Hooks函数获取用例执行结果(pytest_runtest_makereport)

pytest接口自动化测试框架 | 插件二次开发实战

pytest文档33-Hooks函数获取用例执行结果(pytest_runtest_makereport)