Locust如何测试物联网MQTT

Posted SummerStone

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Locust如何测试物联网MQTT相关的知识,希望对你有一定的参考价值。

MQTT是干什么的

简单来说,它是物联网的通信协议,是消息通道建立,消息发送和消息订阅的标准。如果大家想了解更多概念上的详细可以网上搜索。

Locust测试MQTT的步骤

测试步骤,可以用以下图形表示:

Locust如何测试物联网MQTT_python

准备环境,安装Locust测试环境

这个比较简单,主要是准备好Python的虚拟开发环境,并安装好locust的python软件包。可以在Locust官方网站找到相关步骤,这里不再赘述。

安装MQTT客户端库 paho-mqtt

可以在Python虚拟环境,执行如下安装命令:

pip install paho-mqtt


locustfile中首先实现消息发送

这里为了简单起见,使用公共的MQTT broker: EMQX, 它的地址为:"

broker.emqx.io"

具体实现的代码如下:


broker_add = broker.emqx.io
port = 1883
topic = "/python/mqtt_topic_for_python"

client_id = f"python-mqtt-random.randint(0,100)"

def connect_mqtt():
def on_connect(client,userdata,flags,rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\\n",rc)

client = mqtt_c.Client(client_id)
client.on_connect= on_connect
client.connect(broker_add,port)

return client

def publish(client):
msg_count = 0
while True:
time.sleep(3)
msg = f"message: msg_count"
result = client.publish(topic,msg)
status = result[0]
if status == 0:
print(f"send `msg` to topic `topic` ")
else:
print(f"Failed to send message to topic topic")
msg_count += 1

def run():
client = connect_mqtt()
client.loop_start()
publish(client)


完善locustfile中关于Taskset和User的相关配置

具体如下代码,所以会发现,用Python-Locust去测试非HTTP协议的应用系统还是比较方便的,代码即测试。


class TheTaskSet(TaskSet):
@task
def task_1(self):
run()


class TheUser(User):
tasks = [TheTaskSet]
wait_time = constant_pacing(1)


完整的locustfile如下:

import random,time

from paho.mqtt import client as mqtt_c
from locust import TaskSet,task,User,constant_pacing

broker_add = broker.emqx.io
port = 1883
topic = "/python/mqtt_topic_for_python"

client_id = f"python-mqtt-random.randint(0,100)"

def connect_mqtt():
def on_connect(client,userdata,flags,rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\\n",rc)

client = mqtt_c.Client(client_id)
client.on_connect= on_connect
client.connect(broker_add,port)

return client

def publish(client):
msg_count = 0
while True:
time.sleep(3)
msg = f"message: msg_count"
result = client.publish(topic,msg)
status = result[0]
if status == 0:
print(f"send `msg` to topic `topic` ")
else:
print(f"Failed to send message to topic topic")
msg_count += 1

def run():
client = connect_mqtt()
client.loop_start()
publish(client)


class TheTaskSet(TaskSet):
@task
def task_1(self):
run()


class TheUser(User):
tasks = [TheTaskSet]
wait_time = constant_pacing(1)


具体的执行结果如下:

Connected to MQTT Broker!
send `message: 0` to topic `/python/mqtt_topic_for_python`
send `message: 1` to topic `/python/mqtt_topic_for_python`
send `message: 2` to topic `/python/mqtt_topic_for_python`
send `message: 3` to topic `/python/mqtt_topic_for_python`
send `message: 4` to topic `/python/mqtt_topic_for_python`
send `message: 5` to topic `/python/mqtt_topic_for_python`
...


参考文档:EMQ官方文档,https://www.emqx.com/zh/blog/how-to-use-mqtt-in-python

以上是关于Locust如何测试物联网MQTT的主要内容,如果未能解决你的问题,请参考以下文章

如何发起 MQTT 亿级连接和千万消息吞吐性能测试

如何在 JMeter 中使用 MQTT 插件

如何在 JMeter 中使用 MQTT 插件

ESA2GJK1DH1K基础篇: 阿里云物联网平台: 测试MQTT连接阿里云物联网平台

MQTT压力测试工具之JMeter插件教程

物联网服务NodeJs-5天学习第四天存储篇④ ——基于MQTT的环境温度检测,升级存储为mysql