IOTOS驱动modbus_rtu从0到1对接详解

Posted 爱投斯

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了IOTOS驱动modbus_rtu从0到1对接详解相关的知识,希望对你有一定的参考价值。

本文章为原创,转载请注明出处!

登录平台:IOTOS®爱投斯物联中台

账号:iotos_test    密码:iotos123

代码地址:IOTOSDK-Python: IOTOS Python版本SDK,自带原生接口和采集引擎 (gitee.com)

  • 前言

        Modbus协议是应用于电子控制器上的一种通用语言。通过此协议,控制器相互之间、控制器经由网络例如以太网)和其它设备之间可以通信。它已经成为一通用工业标准。有了它,不同厂商生产的控制设备可以连成工业网络,进行集中监控此协议定义了一个控制器能认识使用的消息结构,而不管它们是经过何种网络进行通信的。它描述了一控制器请求访问其它设备的过程,如果回应来自其它设备的请求,以及怎样侦测错误并记录。它制定了消息域格局和内容的公共格式。

        Modbus具有两种串行传输模式:分别为ASCII和RTU。Modbus是一种单主站的主从通信模式,Modbus网络上只能有一个主站存在,主站在Modbus网络上没有地址,每个从站必须有唯一的地址,从站的地址范围为0 - 247,其中0为广播地址,从站的实际地址范围为1 - 247。

        Modbus RTU通信以主从的方式进行数据传输,在传输的过程中Modbus RTU主站是主动方,即主站发送数据请求报文到从站,Modbus RTU从站返回响应报文。

  • 驱动目的

        modbus RTU 该驱动是将中台(爱投斯物联中台)作为服务端(上位机)向客户端(下位机)/modbus 485通讯的前端设备发送询问帧,客户端(下位机)/modbus 485通讯的前端设备接收到询问帧并返回应答帧给到服务端(上位机)进行解析工作并展示数据

  • 适用范围

凡是走标准的modbus_rtu协议的设备,例如烟感、光感、PLC等,需要注意的是,若客户端(下位机)/modbus 485通讯的前端设备只有485/232 等的串口通讯不具备上网功能,需增加一个外接模块(如485 转4g /485 转wifi 模块)与服务器(上位机进行网络通讯)。使用者或用户可在生产生活中使用串口通讯测试软件测试设备是否通讯正常。

  • 使用示例

  • 以光感传感器(威海晶合数字矿山技术有限公司的光照传感器)为示例进行演示

  •  首先,连接光感和485模块,光感的485线的A,B级分别连接模块的485接口AB级;其次光感的电源线连接符合光感正常运转的电源(可用变压器控制电源大小),然后模块网口连接交换机,最后分别给模块和光感通上电,具体连接方式如下图:

  •  进入模块的IP,进行配置,将模式改为TCP Client格式,地址改为爱投斯中台的IP地址(121.36.152.36),端口改为为被分配的端口,注册包方式改为云转发

 

  •  进入爱投斯中台,账号为iotos_test,密码为iotos123,创建网关

 

  • 填好网关名称后点击确认 

  • 创建设备示例点击【我的设备】 -> 【通信网关】 -> 【设备实例】

  • 填写【设备名称】、选择刚刚创建的【模板驱动】和【通信网关】。参数tcp为中台需要开发的端口,用来与模块进行通讯,与模块配置里面的端口保持一致

  • 创建数据点,点击【我的设备】 -> 【通信网关】 -> 【设备实例】 -> 【数据点】,并在【设备实例】下拉列表选择刚刚创建的设备实例

点击右边的创建数据点,填写名称

 并在高级配置中配置需要给光感发送的指令,以下为光感的询问帧和中台的配置:

 中台配置:

  • 在【我的设备】 -> 【通信网关】中找到刚才创建的网关,点击【高级】

  •  开启云网关,密码为账号密码

  • 点击 【我的设备】 -> 【通信网关】 -> 【设备实例】->【数据点】,选择刚才创建的设备实例

  •  即可查看数据已经上报成功,light即为此时的光照强度

  • 驱动代码

#!coding:utf8
import json
import sys

sys.path.append("..")
from driver import *

import time
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk.exceptions import ModbusInvalidResponseError
import serial
import signal
import traceback
from jcomm import *
import re
import struct
import math

#硬件心跳线程
class RunHardwareHeartbeatThread(threading.Thread,JLib):
    def __init__(self, driver):
        threading.Thread.__init__(self)
        JLib.__init__(self)
        self.driver = driver
    def run(self):
        statetmp = False
        dataIdTmp = ''
        recycletmp = 0
        for dataId,attrs in self.driver.data2attrs.items():
            if 'param' not in attrs['config']:
                self.error(attrs['config'])
                break
            if 'hbt' in attrs['config']['param']:
                dataIdTmp = dataId
                recycletmp = attrs['config']['param']['hbt']
                break
        while True:
            try:
                if not self.driver.startHeartbeat:
                    return

                #状态反转及延时
                if statetmp == False:
                    statetmp = True
                else:
                    statetmp = False
                time.sleep(recycletmp)

                # self.warn('HARDWARE HEATBEAT ' + dataIdTmp + u'硬件心跳:' + str(statetmp))
                #控制执行
                rettmp = ''
                if statetmp:
                    rettmp = self.driver.Event_setData(dataIdTmp,'true')
                else:
                    rettmp = self.driver.Event_setData(dataIdTmp,'false')
                if json.loads(rettmp)["code"] == 0:
                    self.driver.setValue(self.driver.name(dataIdTmp), statetmp)
            except Exception,e:
                traceback.print_exc(e.message)
                continue

class ModbusDriver(IOTOSDriverI):
    def __init__(self):
        IOTOSDriverI.__init__(self)
        self.master = None
        # 心跳开关
        self.startHeartbeat = False
        self.bitsState = [0,0,0,0,0,0,0,0]
        self.sourceDataIn = []

    # 1、通信初始化
    def InitComm(self, attrs = None):
        try:
            #一、tcp端口监听
            self.__port = self.sysAttrs['config']['param']['tcp']
            self.__tcpServer = TcpServerThread(self,self.__port)
            self.__tcpServer.setDaemon(True)
            self.__tcpServer.start()
            self.debug(self.sysAttrs['name'] + u' TCP端口' + str(self.__port) + u"已启动监听!")

            #二、创建串口1 <=> 串口2
            serialtmp = self.sysAttrs['config']['param']['serial']
            self.__serial = SerialDtu(serialtmp)
            self.__serial.setCallback(self.serialCallback)
            self.__serial.open()

            #三、串口1 <=> modbus_tk
            self.master = modbus_rtu.RtuMaster(self.__serial.serial)
            self.master.set_timeout(5)
            self.master.set_verbose(False)
            self.debug(self.sysAttrs['name'] + u' 串口' + self.__serial.portName() + u'已打开!')

            self.zm.pauseCollect = True
            # 实例化硬件心跳线程
            RunHardwareHeartbeatThread(self).start()

        except Exception,e:
            self.online(False)
            traceback.print_exc(u'通信初始化失败' + e.message)

    #四、串口2 <=> tcp
    #tcp => 串口2
    def tcpCallback(self,data):
        datastr = self.str2hex(data)
        self.sourceDataIn = data
        self.info("Master < < < < < < Device: " + datastr)
        self.__serial.send(data)

    #tcp <= 串口2
    def serialCallback(self,data):
        self.info("Master > > > > > > Device: " + self.str2hex(data))
        self.__tcpServer.send(data)

    #连接状态回调
    def connectEvent(self,state):
        self.online(state)
        try:
            if state == True:
                self.warn('连接成功,启动采集、心跳')
                self.pauseCollect = False
                #启动软件看门狗
                self.startHeartbeat = True
            else:
                self.warn('连接断开,将关闭采集和心跳!')
                self.startHeartbeat = False
                self.pauseCollect = True
        except Exception,e:
            self.error(u'硬件心跳错误, ' + e.message)

    # 2、采集
    def Collecting(self, dataId):
        try:
            rtu_ret = ()
            cfgtmp = self.data2attrs[dataId]['config']

            #added by lrq,过滤非modbus rtu配置的点
            if not cfgtmp.has_key('param') or not cfgtmp.has_key('proxy'):
                return ()

            #当是新一组功能号时;当没有proxy.pointer,或者有,但是值为null时,就进行采集!否则(有pointer且值不为null,表明设置了采集代理,那么自己自然就被略过了,因为被代理了)当前数据点遍历轮询会被略过!
            if 'pointer' not in cfgtmp['proxy'] or cfgtmp['proxy']['pointer'] == None or cfgtmp['proxy']['pointer'] == '':

                #added by lrq,某些过滤掉不采集,因为有的地址的设备不在线,只要在proxy下面配置disabled:true,这样就不会轮训到它!
                if 'disabled' in cfgtmp['proxy'] and cfgtmp['proxy']['disabled'] == True:
                    return ()
                else:
                    self.warn(self.name(dataId))

                # added by lrq,过滤非modbus rtu配置的点
                if not cfgtmp['param'].has_key('funid'):
                    return ()

                funid = cfgtmp['param']['funid']
                devid = cfgtmp['param']['devid']
                regad = cfgtmp['param']['regad']
                format = cfgtmp['param']['format']
                quantity = re.findall(r"\\d+\\.?\\d*", format)
                if len(quantity):
                    quantity = int(quantity[0])
                else:
                    quantity = 1
                if format.lower().find('i') != -1:       #I、i类型数据为4个字节,所以n个数据,就是4n字节,除一般应对modbus标准协议的2字节一个数据的个数单位!
                    quantity *= 4/2
                elif format.lower().find('h') != -1:
                    quantity *= 2/2
                elif format.lower().find('b') != -1:
                    quantity *= 1/2
                elif format.find('d') != -1:
                    quantity *= 8/2
                elif format.find('f') != -1:
                    quantity *= 4/2
                elif format.find('?') != -1:           #对于功能号1、2的开关量读,开关个数,对于这种bool开关型,个数就不是返回字节数的两倍了!返回的字节个数是动态的,要字节数对应的位数总和,能覆盖传入的个数数值!
                    quantity *= 1
                    format = ''                        #实践发现,对于bool开关型,传入开关量个数就行,format保留为空!如果format设置为 "?"或"8?"、">?"等,都会解析不正确!!
                self.debug('>>>>>>' + '(PORT-' + str(self.__port) + ')' + str(devid) + ' ' + str(funid) + ' ' + str(regad) + ' ' + str(quantity) + ' ' + str(format))
                rtu_ret = self.master.execute(devid, funid, regad, quantity,data_format=format)

                
                if funid == 3:
                    retlist = []
                    for i in range(len(rtu_ret)):
                        retlist.append(rtu_ret[i])
                    rtu_ret = tuple(retlist)

                #周期查询的开关量输出状态进行备份,用来给控制用
                if funid == 1:
                    self.bitsState = list(rtu_ret)
                self.debug(rtu_ret)
                return rtu_ret
            # 一组功能号内的数据点,不进行遍历采集!跳过!
            else:
                return ()   #注意,这种情况下不是采集错误,如果返回None,那么会当作采集错误处理,进行采集错误计数了!!
        except ModbusInvalidResponseError, e:
            self.error(u'MODBUS响应超时, ' + e.message)
            return None
        except Exception, e:
            traceback.print_exc(e.message)
            self.error(u'采集解析参数错误:' + e.message)
            return None

    # 3、控制 数据点配置
    # 事件回调接口,监测点操作访问
    def Event_getData(self, dataId, condition=''):

        return json.dumps({'code': 0, 'msg': '', 'data': new_val})

    # 事件回调接口,监测点操作访问
    def Event_setData(self, dataId, value):
        self.warn(value)
        try:
            if self.master == None:
                self.InitComm()
            data_config = self.data2attrs[dataId]['config']
            bit = 0
            if 'proxy' in data_config.keys() and 'pointer' in data_config['proxy'] and data_config['proxy']['pointer'] != None:
                bit = data_config['proxy']['index']
            if self.valueTyped(dataId,value) == True:
                self.bitsState[bit] = 1
            else:
                self.bitsState[bit] = 0
            self.warn(self.bitsState)

            #注意,这里地址是1,但是再huaihua等用了3合一设备的,地址是2,接下来需要这里也做个区分,按照当前操作的数据点对应的实际数据点来!
            ret = self.master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=self.bitsState)
            self.warn(ret)
            return json.dumps({'code': 0, 'msg': u'操作成功!', 'data': list(ret)})
        except Exception,e:
            return json.dumps({'code': 501, 'msg': u'操作失败,错误码501,' + e.message, 'data': None})
  • 驱动解析

  • 编写环境为python2,首先需要导入modbus、数据解析和爱投斯中台驱动文件(driver)的相关包
#!coding:utf8
import json
import sys

sys.path.append("..")
from driver import *

import time
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk.exceptions import ModbusInvalidResponseError
import serial
import signal
import traceback
from jcomm import *
import re
import struct
import math
  •  创建硬件心跳进程,判断中台的数据点是否含有必要的属性,如果没有则提示error,防止后续过程报错,开启心跳进程则启动中台的通讯
class RunHardwareHeartbeatThread(threading.Thread,JLib):
    def __init__(self, driver):
        threading.Thread.__init__(self)
        JLib.__init__(self)
        self.driver = driver
    def run(self):
        statetmp = False
        dataIdTmp = ''
        recycletmp = 0
        for dataId,attrs in self.driver.data2attrs.items():
            if 'param' not in attrs['config']:
                self.error(attrs['config'])
                break
            if 'hbt' in attrs['config']['param']:
                dataIdTmp = dataId
                recycletmp = attrs['config']['param']['hbt']
                break
        while True:
            try:
                if not self.driver.startHeartbeat:
                    return

                #状态反转及延时
                if statetmp == False:
                    statetmp = True
                else:
                    statetmp = False
                time.sleep(recycletmp)

                # self.warn('HARDWARE HEATBEAT ' + dataIdTmp + u'硬件心跳:' + str(statetmp))
                #控制执行
                rettmp = ''
                if statetmp:
                    rettmp = self.driver.Event_setData(dataIdTmp,'true')
                else:
                    rettmp = self.driver.Event_setData(dataIdTmp,'false')
                if json.loads(rettmp)["code"] == 0:
                    self.driver.setValue(self.driver.name(dataIdTmp), statetmp)
            except Exception,e:
                traceback.print_exc(e.message)
                continue
  •  继承IOTOSDriverI类,进行初始化,设置心跳开关
class ModbusDriver(IOTOSDriverI):
    def __init__(self):
        IOTOSDriverI.__init__(self)
        self.master = None
        # 心跳开关
        self.startHeartbeat = False
        self.bitsState = [0,0,0,0,0,0,0,0]
        self.sourceDataIn = []
  • 进行通讯初始化,获取爱投斯中台设备实例中配置的端口和serial属性并且启动tcp监听,实例化心跳进程
# 1、通信初始化
    def InitComm(self, attrs = None):
        try:
            #一、tcp端口监听
            self.__port = self.sysAttrs['config']['param']['tcp']
            self.__tcpServer = TcpServerThread(self,self.__port)
            self.__tcpServer.setDaemon(True)
            self.__tcpServer.start()
            self.debug(self.sysAttrs['name'] + u' TCP端口' + str(self.__port) + u"已启动监听!")

            #二、创建串口1 <=> 串口2
            serialtmp = self.sysAttrs['config']['param']['serial']
            self.__serial = SerialDtu(serialtmp)
            self.__serial.setCallback(self.serialCallback)
            self.__serial.open()

            #三、串口1 <=> modbus_tk
            self.master = modbus_rtu.RtuMaster(self.__serial.serial)
            self.master.set_timeout(5)
            self.master.set_verbose(False)
            self.debug(self.sysAttrs['name'] + u' 串口' + self.__serial.portName() + u'已打开!')

            self.zm.pauseCollect = True
            # 实例化硬件心跳线程
            RunHardwareHeartbeatThread(self).start()

        except Exception,e:
            self.online(False)
            traceback.print_exc(u'通信初始化失败' + e.message)
  • tcp回调,可以查看设备是否与中台以及连接成功
#四、串口2 <=> tcp
    #tcp => 串口2
    def tcpCallback(self,data):
        datastr = self.str2hex(data)
        self.sourceDataIn = data
        self.info("Master < < < < < < Device: " + datastr)
        self.__serial.send(data)

#tcp <= 串口2
    def serialCallback(self,data):
        self.info("Master > > > > > > Device: " + self.str2hex(data))
        self.__tcpServer.send(data)
  • 连接状态回调,连接成功则启动硬件心跳进程并且设置中台的网关状态
#连接状态回调
    def connectEvent(self,state):
        self.online(state)
        try:
            if state == True:
                self.warn('连接成功,启动采集、心跳')
                self.pauseCollect = False
                #启动软件看门狗
                self.startHeartbeat = True
            else:
                self.warn('连接断开,将关闭采集和心跳!')
                self.startHeartbeat = False
                self.pauseCollect = True
        except Exception,e:
            self.error(u'硬件心跳错误, ' + e.message)
  •  最后是采集函数,先过滤掉中台非modbus rtu配置的点,再拿到数据点属性中的参数,将参数进行处理后可以拿到需要给设备发送的指令,发送过去后对接收过来的数据进行进制转换和处理后再上传至中台即可将设备的数据上云
# 2、采集
    def Collecting(self, dataId):
        try:
            rtu_ret = ()
            cfgtmp = self.data2attrs[dataId]['config']

            #added by lrq,过滤非modbus rtu配置的点
            if not cfgtmp.has_key('param') or not cfgtmp.has_key('proxy'):
                return ()

            #当是新一组功能号时;当没有proxy.pointer,或者有,但是值为null时,就进行采集!否则(有pointer且值不为null,表明设置了采集代理,那么自己自然就被略过了,因为被代理了)当前数据点遍历轮询会被略过!
            if 'pointer' not in cfgtmp['proxy'] or cfgtmp['proxy']['pointer'] == None or cfgtmp['proxy']['pointer'] == '':

                #added by lrq,某些过滤掉不采集,因为有的地址的设备不在线,只要在proxy下面配置disabled:true,这样就不会轮训到它!
                if 'disabled' in cfgtmp['proxy'] and cfgtmp['proxy']['disabled'] == True:
                    return ()
                else:
                    self.warn(self.name(dataId))

                # added by lrq,过滤非modbus rtu配置的点
                if not cfgtmp['param'].has_key('funid'):
                    return ()

                funid = cfgtmp['param']['funid']
                devid = cfgtmp['param']['devid']
                regad = cfgtmp['param']['regad']
                format = cfgtmp['param']['format']
                quantity = re.findall(r"\\d+\\.?\\d*", format)
                if len(quantity):
                    quantity = int(quantity[0])
                else:
                    quantity = 1
                if format.lower().find('i') != -1:       #I、i类型数据为4个字节,所以n个数据,就是4n字节,除一般应对modbus标准协议的2字节一个数据的个数单位!
                    quantity *= 4/2
                elif format.lower().find('h') != -1:
                    quantity *= 2/2
                elif format.lower().find('b') != -1:
                    quantity *= 1/2
                elif format.find('d') != -1:
                    quantity *= 8/2
                elif format.find('f') != -1:
                    quantity *= 4/2
                elif format.find('?') != -1:           #对于功能号1、2的开关量读,开关个数,对于这种bool开关型,个数就不是返回字节数的两倍了!返回的字节个数是动态的,要字节数对应的位数总和,能覆盖传入的个数数值!
                    quantity *= 1
                    format = ''                        #实践发现,对于bool开关型,传入开关量个数就行,format保留为空!如果format设置为 "?"或"8?"、">?"等,都会解析不正确!!
                self.debug('>>>>>>' + '(PORT-' + str(self.__port) + ')' + str(devid) + ' ' + str(funid) + ' ' + str(regad) + ' ' + str(quantity) + ' ' + str(format))
                rtu_ret = self.master.execute(devid, funid, regad, quantity,data_format=format)

                
                if funid == 3:
                    retlist = []
                    for i in range(len(rtu_ret)):
                        retlist.append(rtu_ret[i])
                    rtu_ret = tuple(retlist)

                #周期查询的开关量输出状态进行备份,用来给控制用
                if funid == 1:
                    self.bitsState = list(rtu_ret)
                self.debug(rtu_ret)
                return rtu_ret
            # 一组功能号内的数据点,不进行遍历采集!跳过!
            else:
                return ()   #注意,这种情况下不是采集错误,如果返回None,那么会当作采集错误处理,进行采集错误计数了!!
        except ModbusInvalidResponseError, e:
            self.error(u'MODBUS响应超时, ' + e.message)
            return None
        except Exception, e:
            traceback.print_exc(e.message)
            self.error(u'采集解析参数错误:' + e.message)
            return None
  •  部分设备可以进行数据的下发来控制设备的状态或者配置设备的参数,可以利用如下的函数
# 事件回调接口,监测点操作访问
    def Event_setData(self, dataId, value):
        self.warn(value)
        try:
            if self.master == None:
                self.InitComm()
            data_config = self.data2attrs[dataId]['config']
            bit = 0
            if 'proxy' in data_config.keys() and 'pointer' in data_config['proxy'] and data_config['proxy']['pointer'] != None:
                bit = data_config['proxy']['index']
            if self.valueTyped(dataId,value) == True:
                self.bitsState[bit] = 1
            else:
                self.bitsState[bit] = 0
            self.warn(self.bitsState)

            #注意,这里地址是1,但是再huaihua等用了3合一设备的,地址是2,接下来需要这里也做个区分,按照当前操作的数据点对应的实际数据点来!
            ret = self.master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=self.bitsState)
            self.warn(ret)
            return json.dumps({'code': 0, 'msg': u'操作成功!', 'data': list(ret)})
        except Exception,e:
            return json.dumps({'code': 501, 'msg': u'操作失败,错误码501,' + e.message, 'data': None})

以上是关于IOTOS驱动modbus_rtu从0到1对接详解的主要内容,如果未能解决你的问题,请参考以下文章

IOTOS驱动详解-参数的上传

IOTOS驱动详解-获取中台中设备驱动的配置信息

从0学Linux驱动-Makefile详解5

关于modbus_rtu

从0到1,苏宁API网关的演进之路

西门子1200实现MODBUS_RTU的轮询方式