locust扩展支持其它协议压测
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了locust扩展支持其它协议压测相关的知识,希望对你有一定的参考价值。
参考技术A 语言版本:python2.7一:dnsClient.py
# -*- coding: utf-8 -*-
# @Time : 2020-01-08 15:32
# @Author : xiaobin
# @File : dnsClient.py
# @Software : PyCharm
import time
from locustimport (TaskSet,task,events,Locust)
from gevent._semaphoreimport Semaphore
import socket
all_locusts_spawned = Semaphore()
all_locusts_spawned.acquire()
def on_hatch_complete(**kwargs):
all_locusts_spawned.release()
events.hatch_complete += on_hatch_complete
class dnsClient(object):
def look_host(self, hostname):
start_time = time.time()
time.sleep(0.195)
resp =self.get_host(hostname)
total_time =int((time.time() - start_time) *1000)
print total_time
if resp:
events.request_success.fire(
request_type='dns-test',
name=r'dns',
response_time=total_time,
response_length=0)
else:
events.request_success.fire(
request_type ='dns-test',
name =r'dns-error',
response_time = total_time,
exception ='error')
def get_host(self, hostname):
falg =False
try:
result = socket.gethostbyaddr(hostname)
falg =True
except socket.herror, e:
print e
return falg
if __name__ =='__main__':
d = dnsClient()
d.look_host('dnstest.com')
二:locustDns.py
# -*- coding: utf-8 -*-
# @Time : 2020-01-08 15:47
# @Author : xiaobin
# @File : locustDns.py
# @Software : PyCharm
from locustimport (TaskSet,task,events,Locust)
import core.dns_test.dnsClient
class locustDns(Locust):
def __init__(self):
super(locustDns, self).__init__()
self.client = core.dns_test.dnsClient.dnsClient()
三:LocustRun.py
# -*- coding: utf-8 -*-
# @Time : 2020-01-08 15:52
# @Author : xiaobin
# @File : TestDns.py
# @Software : PyCharm
import sys
import os
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
from locustimport *
import core.dns_test.locustDns
class LocustRun(TaskSet):
def setup(self):
print('task setup')
def teardown(self):
print('task teardown')
def on_start(self):
print('start')
def on_stop(self):
# 虚拟用户结束Task时运行
print('end')
@task(weight=1)
def test_dns(self):
self.client.look_host('dnstest.com')
class myrun(core.dns_test.locustDns.locustDns):
task_set = LocustRun
host ='http://127.0.0.1:8888'
min_wait =0
max_wait =0
四:运行locust -f LocustRun.py 就可以了
记得本地/etc/hosts 增加测试域名
性能压测工具之 Locust
Locust 是一款基于python语言开发的一款新的性能压测工具,它是以协程(比线程还小)的方式运行。
先看一张 locust 运行图:
源码如下:
#!/usr/bin/env python
#-- coding: utf-8 --
from locust import HttpLocust, TaskSet, task
import subprocess
import json
import time
import random
import hashlib
import os
class UserBer(TaskSet):
def on_start(self):
pass
@task
def pubsend(self):
url = "/pub/send"
f = open("F:/python/data.json", ‘r‘)
data = f.read()
data = json.loads(data)
data["from"]["time"] = int(time.time())
data["from"]["nonce"] = random.randint(100000, 999999)
pubToken = [data["from"]["no"], data["from"]["pub"], data["from"]["pubtoken"]
pubToken.sort()
pubToken = "".join(pubToken)
sha1 = hashlib.sha1(pubToken.encode(‘utf-8‘))
sha1 = sha1.hexdigest()
data["from"]["pubtoken"] = sha1
dataCop = json.dumps(data)
head = {"Content-Type": "application/json"}
req = self.client.post(url=url,data=dataCop,headers=head)
if req.status_code == 200 and req.[‘success‘]==True:
print("响应正常!")
else:
print("服务端返回异常!")
class MobileUserLocust(HttpLocust):
task_set = UserBer
host = "http://hlep.his.com"
min_wait = 1000
max_wait = 3000
if name == "main":
os.system("locust -f locust-script.py")
说明:wins 下 pip install locustio ;运行脚本后 在浏览器中输入: localhost:8089
以上是关于locust扩展支持其它协议压测的主要内容,如果未能解决你的问题,请参考以下文章