测试平台系列如何停止测试任务执行

Posted 自动化软件测试

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了测试平台系列如何停止测试任务执行相关的知识,希望对你有一定的参考价值。

今天我们就玩点好玩的,和大家一起探讨:怎么停止一段python代码。之后我们将会运用到pity之中。

知识要点

本文需要大家对asyncio相关知识有一些了解,至于原理方面,大家可以自行查阅,因为我也没细看。

为什么要做这个?

针对测试任务执行非常久的时候,看起来会是阻塞的情况,举个例子:

某个python脚本里面写了time.sleep(1000),导致你的case一直好像没有执行完成,那我们想结束它,该咋整呢?

今天博主要聊的就是这方面的内容。

先看看"难题"

我们来写一个简单的demo。

我们先定义一个异步无限循环的方法,让它每隔一秒就打印一行内容

import asyncio


async def run():
    while True:
        print("still alive")
        await asyncio.sleep(1)

试想一下,如果这个方法开启了,不结束它,它是不是会一直打印下去?没错,我们先试试直接运行之:


很显然,它是绝对不会停止的,除非你关闭这个py程序。

我们把它想象成同步方法,是不是也会遇到这个困难:

import time


def run():
    time.sleep(10000)
    
if __name__ == "__main__":
    run()
    print("done")

可以发现run一旦开始,它就阻塞了整个程序,下面的done必须要等run结束了才能打印出来。

怎么停止run

我们都知道,在python3.4以后新增了异步编程相关的概念,最初是由@coroutine这样的装饰器放到方法上,把方法标注为异步方法,后面直接从语言层面支持了异步方法定义(async),那其实里面还有很多我们不太常用的部分,比如今天要说的create_task。

我们改写下方法

import asyncio


async def run():
    while True:
        print("still alive")
        await asyncio.sleep(1)
        
        
async def main():
    await asyncio.create_task(run())


if __name__ == "__main__":
    asyncio.run(main())

这次我们包了一层方法,利用asyncio.create_task来创建异步任务,create_task接收一个coroutine并执行。

执行之后,可以发现这个和上面的情况,任务也会一直进行下去。

我们继续下一步改造:


我们去掉await,可以看到run方法确实执行了,但是still alive只打印了一次就结束了。

我对它的理解是,虽然create_task创建了一个异步任务,但没说要await,也就是说没有说要等它结束。

想象一下这些异步任务都由一个事件循环控制,当你执行main(main本身也是一个异步任务)的时候,asyncio.run默认是要执行到main方法执行完毕的,也就是说,现在事件循环等待main方法执行完毕,main方法里面又创建了一个异步任务,但没有强调需要该任务完成,创建完毕后,由于main任务已经完成了,就导致整个事件结束了。

梳理一下:

  • 有await

线程开启 -> 执行main -> main里面创建异步任务run -> 等待异步任务run(一直等一直等)

由于主线程没有结束,所以整个python程序一直在等待异步任务执行完毕,毕竟它是死循环,所以会一直等下去。

  • 无await

线程开启 -> 执行main -> main里面创建异步任务run -> 不等待异步任务run -> main方法结束 -> 线程结束 -> 程序退出

以上都是个人结合go的goroutine给出的理解。肯定会有一些差别的地方。

  • 再次改造

其实create_task会返回一个task对象,里面有done和cancel方法,也就是说咱们可以取消他也可以完成他。

import asyncio


async def run():
    while True:
        print("still alive")
        await asyncio.sleep(1)


async def main():
    task = asyncio.create_task(run())
    await asyncio.sleep(2)
    # 2秒后,就停掉这个任务
    task.cancel()
    print("task任务结束了,main也即将完成,整个程序即将退出")


if __name__ == "__main__":
    asyncio.run(main())

一旦调用了create_task,那么任务就已经开始了,接着我们用await让main等待2秒,再调用task.cancel方法就可以取消这个task,这样所有事件都结束,程序也会退出了。

看看gif:

这时候有的同学可能会问了,await2秒以后,不管你是否调用cancel,因为没有await task,所以程序照样会退出啊,没法证明task真的被cancel了。

仔细想想,确实说的有道理。那我们再来改造下:

可以看到,这个task死之前还做了垂死挣扎!!!

加上await

可以看到加上await之后,它还是不会停止。

那我们加上cancel试试:

很遗憾,报错了~~~不过没关系,我们继续改下。只需要加上异常处理,就可以完美实现2秒后自动取消任务了。

但这个例子依然不是很帅,如果只是想控制异步任务的执行时间,那我们可以用wait_for:

import asyncio


async def run():
    while True:
        print("still alive")
        await asyncio.sleep(1)


async def main():
    try:
        await asyncio.wait_for(run(), 2)
    except asyncio.TimeoutError:
        print("run方法超过2秒仍未执行完成")


if __name__ == "__main__":
    asyncio.run(main())

这样2秒后,任务还没结束也不会继续执行了。


去掉死循环以后,任务会在2秒内完成(因为只等待1秒),这时候就可以看到timeout异常不会被触发。

今天的内容就介绍到这了,异步还有挺多玩法的,我也不是很清楚,大家可以互相交流交流~~~

最后:下方这份完整的软件测试视频学习教程已经整理上传完成,朋友们如果需要可以自行免费领取 【保证100%免费】

以上是关于测试平台系列如何停止测试任务执行的主要内容,如果未能解决你的问题,请参考以下文章

软件测试工具都有哪些?

压力测试工具有哪些

常用性能测试工具有哪些?

接口自动化测试平台系列:场景化执行

《上海悠悠接口自动化平台》-5.测试计划与定时任务

Android 手机自动化测试工具有哪几种