locust扩展支持其它协议压测

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了locust扩展支持其它协议压测相关的知识,希望对你有一定的参考价值。

参考技术A 语言版本:python2.7 

一:dnsClient.py

# -*- coding: utf-8 -*-

# @Time    : 2020-01-08 15:32

# @Author  : xiaobin

# @File    : dnsClient.py

# @Software : PyCharm

import time

from locustimport (TaskSet,task,events,Locust)

from gevent._semaphoreimport Semaphore

import socket

all_locusts_spawned = Semaphore()

all_locusts_spawned.acquire()

def on_hatch_complete(**kwargs):

all_locusts_spawned.release()

events.hatch_complete += on_hatch_complete

class dnsClient(object):

def look_host(self, hostname):

start_time = time.time()

time.sleep(0.195)

resp =self.get_host(hostname)

total_time =int((time.time() - start_time) *1000)

print total_time

if resp:

events.request_success.fire(

request_type='dns-test',

                name=r'dns',

                response_time=total_time,

                response_length=0)

else:

events.request_success.fire(

request_type ='dns-test',

            name =r'dns-error',

            response_time = total_time,

            exception ='error')

def get_host(self, hostname):

falg =False

        try:

result = socket.gethostbyaddr(hostname)

falg =True

        except socket.herror, e:

print e

return falg

if __name__ =='__main__':

d = dnsClient()

d.look_host('dnstest.com')

二:locustDns.py

# -*- coding: utf-8 -*-

# @Time    : 2020-01-08 15:47

# @Author  : xiaobin

# @File    : locustDns.py

# @Software : PyCharm

from locustimport (TaskSet,task,events,Locust)

import core.dns_test.dnsClient

class locustDns(Locust):

def __init__(self):

super(locustDns, self).__init__()

self.client = core.dns_test.dnsClient.dnsClient()

三:LocustRun.py

# -*- coding: utf-8 -*-

# @Time    : 2020-01-08 15:52

# @Author  : xiaobin

# @File    : TestDns.py

# @Software : PyCharm

import sys

import os

curPath = os.path.abspath(os.path.dirname(__file__))

rootPath = os.path.split(curPath)[0]

sys.path.append(rootPath)

from locustimport *

import core.dns_test.locustDns

class LocustRun(TaskSet):

def setup(self):

print('task setup')

def teardown(self):

print('task teardown')

def on_start(self):

print('start')

def on_stop(self):

# 虚拟用户结束Task时运行

        print('end')

@task(weight=1)

def test_dns(self):

self.client.look_host('dnstest.com')

class myrun(core.dns_test.locustDns.locustDns):

task_set = LocustRun

host ='http://127.0.0.1:8888'

    min_wait =0

    max_wait =0

四:运行locust -f LocustRun.py 就可以了

     记得本地/etc/hosts 增加测试域名

性能压测工具之 Locust

  Locust 是一款基于python语言开发的一款新的性能压测工具,它是以协程(比线程还小)的方式运行。

先看一张 locust 运行图:
技术图片

源码如下:

#!/usr/bin/env python
#-- coding: utf-8 --

from locust import HttpLocust, TaskSet, task
import subprocess
import json
import time
import random
import hashlib
import os

class UserBer(TaskSet):
def on_start(self):
pass

@task
def pubsend(self):
    url = "/pub/send"
    f = open("F:/python/data.json", ‘r‘)
    data = f.read()
    data = json.loads(data)
    data["from"]["time"] = int(time.time())
    data["from"]["nonce"] = random.randint(100000, 999999)

    pubToken = [data["from"]["no"], data["from"]["pub"], data["from"]["pubtoken"]
    pubToken.sort()
    pubToken = "".join(pubToken)
    sha1 = hashlib.sha1(pubToken.encode(‘utf-8‘))
    sha1 = sha1.hexdigest()
    data["from"]["pubtoken"] = sha1
    dataCop = json.dumps(data)
    head = {"Content-Type": "application/json"}

    req = self.client.post(url=url,data=dataCop,headers=head)

    if req.status_code == 200 and req.[‘success‘]==True:
        print("响应正常!")          
    else:
        print("服务端返回异常!")

class MobileUserLocust(HttpLocust):
task_set = UserBer
host = "http://hlep.his.com"
min_wait = 1000
max_wait = 3000

if name == "main":

os.system("locust -f locust-script.py")

说明:wins 下 pip install locustio ;运行脚本后 在浏览器中输入: localhost:8089

以上是关于locust扩展支持其它协议压测的主要内容,如果未能解决你的问题,请参考以下文章

locust压测非http协议

locust压测

Locust负载测试框架

接口性能测试工具之Locust

Locust学习总结分享

Locust性能测试3 no-web运行