接口测试-并发处理
Posted cirr-zhou
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了接口测试-并发处理相关的知识,希望对你有一定的参考价值。
1 多线程执行
import threading
from datetime import *
def test():
print(datetime.now())
def thd():
Theaders = []
for i in range(10):
t = threading.Thread(target=test)
Theaders.append(t)
for t in Theaders:
t.start()
if __name__ == ‘__main__‘:
thd()
如果要并发执行N次,建议将并发数拆分成n次,每个线程循环执行n次函数,这样在启动下一个线程的时候,上一个线程已经在循环执行了。
import threading
from datetime import *
def test():
print(datetime.now())
def looptest():
for i in range(50):
test()
def thd():
Threads= []
for i in range(100):
t = threading.Thread(target=looptest)
Threads.append(t)
for t in Threads:
t.start()
if __name__ == ‘__main__‘:
thd()
2 守护线程
上面创建的线程是main()线程的子线程,即先启动主线程main(),然后执行thd自动子线程。
守护线程则是在主线程执行完后,所有的子线程都被关闭(无论子线程是否执行完成)。默认是没有守护线程,主线程执行完毕之后,会等待子线程全部执行完毕,才会结束程序。如果当子线程死循环,程序将不会关闭,因此在测试时可以设置守护线程解决这个问题。
import threading
from datetime import *
def test():
print(datetime.now())
def looptest():
for i in range(50):
test()
def thd():
Threads= []
for i in range(1):
t = threading.Thread(target=looptest)
Threads.append(t)
t.setDaemon(True)
for t in Threads:
t.start()
if __name__ == ‘__main__‘:
thd()
3 阻塞线程
还可以通过子线程join()方法阻塞线程,让主线程等待子线程完成之后再往下执行,等主线程执行完毕后在关闭所有子线程。
并且可以通过join()的timeout参数来控制,如果超过时间子线程未完成,则主线程继续执行,执行完成后关闭子线程。
import threading
from datetime import *
def test():
print(datetime.now())
def looptest():
for i in range(50):
test()
def thd():
Threads= []
for i in range(1):
t = threading.Thread(target=looptest)
Threads.append(t)
t.setDaemon(True)
for t in Threads:
t.start()
for t in Threads:
t.join(1)
if __name__ == ‘__main__‘:
thd()
超时机制:完成第一个线程的超时之后开始计算第二个线程的超时,如果执行10个线程超时时间就是10秒。一般不推荐用timeout参数,而是在函数内加上超时判断。
阻塞线程的主要意义在于控制子线程与主线程的执行顺序。
以上是关于接口测试-并发处理的主要内容,如果未能解决你的问题,请参考以下文章