用Python实现ModbusTcp协议

Posted Silent starry sky

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用Python实现ModbusTcp协议相关的知识,希望对你有一定的参考价值。

用Python实现ModbusTcp协议

去年2021年写了两篇用Python实现Modbus-RTU串行通信协议的文章,今年有个项目用现场上位计算机通过ModbusTcp网关来读写现场的Modbus-RTU协议的仪表设备和IO设备。我则写了一个Python程序来测试采购的这台ModbusTcp网关设备。

首先是需要设置一下这台ModbucTcp网关,其IP地址是192.168.16.253,ip地址暂时就不动了,将自己计算机的网卡IP地址改成同一网段下的192.168.16.10。打开浏览器进入网关的设置界面。这是一台4个RS485口,1个100M以太网口的ModbusTcp网关,我就测试第一个485口,在第一个485口上连接了一台Modbus-RTU协议仪表。打开网关设置页面,将此485口的工作模式设置为ModbusTcp服务器端,端口号是502,502端口是国际认可的ModbusTcp协议使用端口,当然也可以使用其它端口。将此网关485口的串行参数设定的和仪表一致即,9600,8,N,1。具体设置不同品牌的ModbusTcp网关不太一样请自行参考使用手册。
我们知道ModbusTcp协议是基于TCP/IP协议的现场总线协议,故要实现改协议则要使用socket编程。关于socket编程推荐这篇文章。更详细的手册则这篇翻译的手册很不错。在本文中,上位机作为TCP协议的客户端,ModbusTcp网关则作为TCP协议的服务器端(502端口)。由上位机客户端向网关发起连接请求,连接后使用ModbusTcp协议规定的数据帧格式进行通信。关于ModbucTcp协议的帧格式可以参考官方文档,其实就是在Modbus-RTU协议基础上增加了MBAP头,去掉了CRC校验(TCP协议自身已有校验)。下面我们用Python来实现ModbusTcp协议的客户端代码。

tcp客户端连接ModbusTcp网关函数

def connectserver(ip, port):
    try:
        mysocket = socket.socket()
        mysocket.settimeout(10)
        # mysocket.bind(("192.168.16.11",5000))
        ret = mysocket.connect((ip,port))
        if ret == socket.error:
            # print("Connect ModbusTcp server fail!")
            return None
        else:
            # print("Connect ModbusTcp server sucess!")
            return mysocket;
    except Exception as e:
        logging.debug(e)
        return None

改函数有两个参数,一个是网关的ip地址,一个是端口号,成功则返回tcp客户端的socket对象实例,失败则返回None。此函数用于测试开始时连接网关时使用。

ModbusTcp协议的03和04号功能的发送帧打包和接收帧解析函数

03和04功能的发送帧打包函数

# Modbus-RTU协议的03或04读取保存或输入寄存器功能主-》从命令帧
def modbus03or04s(add, startregadd, regnum, funcode=3):
    if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D:
        print("Error: parameter error")
        return
    if funcode != 3 and funcode != 4:
        print("Error: parameter error")
        return
    # MBAP的实现
    ranvalue = random.randint(0, 0xFFFF)
    sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)
    sendbytes = sendbytes + b"\\x00\\x00\\x00\\x06"
    sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)
    # PDU实现
    sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \\
                regnum.to_bytes(2, byteorder="big", signed=False)
    # for b in list(sendbytes):
    #     print(f"b:02x")
    return sendbytes

此函数有4个参数,分别是Modbus-RTU设备的从站地址,开始寄存器地址,寄存器个数,功能号(03或04)默认03。函数根据几个参数构成ModbusTcp协议的发送帧,以字节串类型返回给调用者。发送帧由MBAP头和PDU两部分构成,MBAP头由7个字节构成,第1,2两个字节为一个帧标识,是一个两字节的随机整数用来标识此发送帧(字节顺序是高字节在前低字节在后),返回帧的MBAP头的第1,2两个字节和此标识一致则说明返回帧是此发送帧的响应帧。MBAP头的第3,4字节里是协议类型标识,都是0表示是Modbus协议。MBAP头的第5,6字节是第6字节后所有数据的字节个数,对于03和04功能号应该是6个字节。MBAP头的第7字节是如果发送对象是Modbus-RTU设备,这个字节是Modbus-RTU设备的地址。从第7字节开始其实就是ModBus-RTU协议的去掉CRC校验的部分,即:从站地址+PDU。

03或04功能的接收帧解析函数

# Modbus协议的03或04读取保持或输入寄存器功能从-》主的数据帧解析(浮点数2,1,4,3格式,16位短整形(定义正负数))
def modbus03or04p(recvdata, valueformat=0, intsigned=False):
    if not recvdata:
        print("Error: data error")
        return
    datalist = list(recvdata)
    if datalist[7] != 0x3 and datalist[7] != 0x4:
        print("Error: recv data funcode error")
        return
    bytenums = datalist[8]
    if bytenums % 2 != 0:
        print("Error: recv data reg data error")
        return
    retdata = []
    if valueformat == 0:
        floatnums = bytenums / 4
        # print("float nums: ", str(floatnums))
        floatlist = [0, 0, 0, 0]
        for i in range(int(floatnums)):
            floatlist[1] = datalist[9+i*4]
            floatlist[0] = datalist[10+i*4]
            floatlist[3] = datalist[11+i*4]
            floatlist[2] = datalist[12+i*4]
            bfloatdata = bytes(floatlist)
            [fvalue] = struct.unpack('f', bfloatdata)
            retdata.append(fvalue)
            # print(f'Datai+1: fvalue:.3f')
    elif valueformat == 1:
        shortintnums = bytenums / 2
        # print("short int nums: ", str(shortintnums))
        for i in range(int(shortintnums)):
            btemp = recvdata[9+i*2:11+i*2]
            shortvalue = int.from_bytes(btemp, byteorder="big", signed=intsigned)
            retdata.append(shortvalue)
            # print(f"Datai+1: shortvalue")
    return retdata

此函数将03或04功能号的返回帧解析为单精度浮点数或短整型数。解析单精度数时,是按照2,1,4,3则字节顺序解析,这是现场设备用的浮点数字节顺序,一般仪表常用的是4,3,2,1顺序。此函数有3个参数,第1个是socket接收到完整帧数据。第2个参数是解析出数据的格式,0代表单精度数,1代表短整型数。第3个参数是当第2个参数为1时,短整型数是有符号还是无符号。此函数的返回一个列表,里面是读取的寄存器数据值。

ModbusTcp协议的01或02功能发送帧打包和接收帧解析函数

和03或04功能的打包和解析函数差不多就是Modbus协议的功能号之间的区别,01功能是读取线圈寄存器值,02功能是读取数字量输入寄存器值,读回的数据是用位bit表示一个寄存器值是0或1,具体参考Modbus文档吧,直接上代码。

# modbus的01或02功能号命令打包函数
def modbus01or02s(add, startregadd, regnum, funcode=2):
    if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D0:
        print("Error: parameter error")
        return
    if funcode != 1 and funcode != 2:
        print("Error: parameter error")
        return
    # MBAP实现
    ranvalue = random.randint(0, 0xFFFF)
    sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)
    sendbytes = sendbytes + b"\\x00\\x00\\x00\\x06"
    sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)
    # PDU实现
    sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \\
                regnum.to_bytes(2, byteorder="big", signed=False)
    # for b in list(sendbytes):
    #     print(f"b:02x")
    return sendbytes

# modbus的01或02功能号的返回包解析函数
def modbus01or02p(recvdata):
    if not recvdata:
        print("Error: data error")
        return
    datalist = list(recvdata)
    if datalist[7] != 0x1 and datalist[7] != 0x2:
        print("Error: recv data funcode error")
        return
    bytenums = datalist[8]
    ret_data = []
    for i in range(bytenums):
        intvalue = int(recvdata[9+i])
        for bit in range(8):
            nowvalue = intvalue & 0x01
            intvalue = intvalue >> 1
            ret_data.append(nowvalue)
    return ret_data

读取Modbus寄存器数值函数

使用socket来通过ModbusTcp网关读取Modbus-RTU从站设备的数据。有两个函数,一个是用03或04功能号读取保持寄存器或输入寄存器的函数,一个是用01或02功能号读取线圈寄存器和数字量寄存器的函数。

# 读取仪表数据并解析返回
def readmeterdata(mysocket, meter_add, start_reg, reg_num):
    try:
        send_data = modbus03or04s(meter_add, start_reg, reg_num)
        if not send_data:
            print("读取命令处理错误!")
            return
        starttime = time.time()
        mysocket.send(send_data)
        recv_data = mysocket.recv(1024) #(reg_num*2+9)
        endtime = time.time()
        # print(f"Used time is endtime-starttime:.3f")
        if recv_data and len(recv_data) > 0:
            retdata = modbus03or04p(recv_data)
            if retdata:
                return retdata
            else:
                return
        else:
            return
    except Exception as e:
        # print(f"Exception : e")
        endtime = time.time()
        print(f"读取超时时间: endtime-starttime:.3f")        
        return

# 读取仪表数据并解析返回
def readmeterdata2(mysocket, meter_add, start_reg, reg_num):
    try:
        send_data = modbus01or02s(meter_add, start_reg, reg_num)
        if not send_data:
            print("读取命令处理错误!")
            return
        starttime = time.time()
        mysocket.send(send_data)
        recv_data = mysocket.recv(1024) #(reg_num*2+9)
        endtime = time.time()
        # print(f"Used time is endtime-starttime:.3f")
        if recv_data and len(recv_data) > 0:
            retdata = modbus01or02p(recv_data)
            if retdata:
                return retdata
            else:
                return
        else:
            return
    except Exception as e:
        # print(f"Exception : e")
        endtime = time.time()
        print(f"读取超时时间: endtime-starttime:.3f")        
        return   

完整代码

这样基本构成函数都有了。然后就是在主程序中调用以上函数。下面是完整代码。
完整代码里使用了rich库,用于在终端构造一个实时数据表格进行显示,相应内容请参考我的另一篇文章:用Python实现Modbus-RTU协议及串口调试(二)

# ModbusTcp协议客户端模块

import socket
import random
import struct
from rich.console import Console
from rich.table import Column, Table
from rich.live import Live
from rich.panel import Panel
import time, sys
import logging

LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(filename='modbustcp.log', level=logging.DEBUG, format=LOG_FORMAT)

def connectserver(ip, port):
    try:
        mysocket = socket.socket()
        mysocket.settimeout(10)
        # mysocket.bind(("192.168.16.11",5000))
        ret = mysocket.connect((ip,port))
        if ret == socket.error:
            # print("Connect ModbusTcp server fail!")
            return None
        else:
            # print("Connect ModbusTcp server sucess!")
            return mysocket;
    except Exception as e:
        logging.debug(e)
        return None

# Modbus-RTU协议的03或04读取保存或输入寄存器功能主-》从命令帧
def modbus03or04s(add, startregadd, regnum, funcode=3):
    if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D:
        print("Error: parameter error")
        return
    if funcode != 3 and funcode != 4:
        print("Error: parameter error")
        return
    # MBAP的实现
    ranvalue = random.randint(0, 0xFFFF)
    sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)
    sendbytes = sendbytes + b"\\x00\\x00\\x00\\x06"
    sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)
    # PDU实现
    sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \\
                regnum.to_bytes(2, byteorder="big", signed=False)
    # for b in list(sendbytes):
    #     print(f"b:02x")
    return sendbytes

# Modbus协议的03或04读取保持或输入寄存器功能从-》主的数据帧解析(浮点数2,1,4,3格式,16位短整形(定义正负数))
def modbus03or04p(recvdata, valueformat=0, intsigned=False):
    if not recvdata:
        print("Error: data error")
        return
    datalist = list(recvdata)
    if datalist[7] != 0x3 and datalist[7] != 0x4:
        print("Error: recv data funcode error")
        return
    bytenums = datalist[8]
    if bytenums % 2 != 0:
        print("Error: recv data reg data error")
        return
    retdata = []
    if valueformat == 0:
        floatnums = bytenums / 4
        # print("float nums: ", str(floatnums))
        floatlist = [0, 0, 0, 0]
        for i in range(int(floatnums)):
            floatlist[1] = datalist[9+i*4]
            floatlist[0] = datalist[10+i*4]
            floatlist[3] = datalist[11+i*4]
            floatlist[2] = datalist[12+i*4]
            bfloatdata = bytes(floatlist)
            [fvalue] = struct.unpack('f', bfloatdata)
            retdata.append(fvalue)
            # print(f'Datai+1: fvalue:.3f')
    elif valueformat == 1:
        shortintnums = bytenums / 2
        # print("short int nums: ", str(shortintnums))
        for i in range(int(shortintnums)):
            btemp = recvdata[9+i*2:11+i*2]
            shortvalue = int.from_bytes(btemp, byteorder="big", signed=intsigned)
            retdata.append(shortvalue)
            # print(f"Datai+1: shortvalue")
    return retdata    

# modbus的01或02功能号命令打包函数
def modbus01or02s(add, startregadd, regnum, funcode=2):
    if add < 0 or add > 0xFF or startregadd < 0 or startregadd > 0xFFFF or regnum < 1 or regnum > 0x7D0:
        print("Error: parameter error")
        return
    if funcode != 1 and funcode != 2:
        print("Error: parameter error")
        return
    # MBAP实现
    ranvalue = random.randint(0, 0xFFFF)
    sendbytes = ranvalue.to_bytes(2, byteorder="big", signed=False)
    sendbytes = sendbytes + b"\\x00\\x00\\x00\\x06"
    sendbytes = sendbytes + add.to_bytes(1, byteorder="big", signed=False)
    # PDU实现
    sendbytes = sendbytes + funcode.to_bytes(1, byteorder="big", signed=False) + startregadd.to_bytes(2, byteorder="big", signed=False) + \\
                regnum.to_bytes(2, byteorder="big", signed=False)
    # for b in list(sendbytes):
    #     print(f"b:02x")
    return sendbytes

# modbus的01或02功能号的返回包解析函数
def modbus01or02p(recvdata):
    if not recvdata:
        print("Error: data error")
        return
    datalist = list(recvdata)
    if datalist[7] != 0x1 and datalist[7] != 0x2:
        print("Error: recv data funcode error")
        return
    bytenums = datalist[8]
    ret_data = []
    for i in range(bytenums):
        intvalue = int(recvdata[9+i])
        for bit in range(8):
            nowvalue = intvalue & 0x01
            intvalue = intvalue >> 1
            ret_data.append以上是关于用Python实现ModbusTcp协议的主要内容,如果未能解决你的问题,请参考以下文章

请教高手关于西门子S7-300的ModbusTCP通讯问题,急!!

IoTClient开发3 - ModBusTcp协议客户端实现

众为兴SCARA四轴机械臂编程——基于ModbusTCP协议

modbustcp封装使用获取设备数据示例

Modbus库开发笔记之三:Modbus TCP Server开发

兴达易控ModbusTCP转Profinet网关