如何用python写个串口通信的程序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何用python写个串口通信的程序相关的知识,希望对你有一定的参考价值。

使用 pyserial 就可以处理串口通信,这个包是跨平台的。

http://pyserial.sourceforge.net/


示例程序在这里:

https://pyserial.readthedocs.io/en/latest/examples.html#wxpython-examples


import serial

# 创建serial实例
serialport = serial.Serial()
serialport.port = 'COM1'
serialport.baudrate = 9600
serialport.parity = 'N'
serialport.bytesize = 8
serialport.stopbits = 1
serialport.timeout = 0.6
try:
    serialport.open()
    serialport.setDTR(True)
    serialport.setRTS(True)
except Exception, ex:
    print ex
    
# 发送数据
serialport.write(raw_data)

# 根据项目要求,可以开一个线程扫描接收数据

参考技术A

打开串口后启动一个线程来监听串口数据的进入,有数据时,就做数据的处理。

用python写串口通信程序的示例:

#coding=gb18030 


import sys,threading,time; 


import serial; 


import binascii,encodings; 


import re; 


import socket; 



class ReadThread:


def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):


self.l_serial = None;


self.alive = False;


self.waitEnd = None;


self.bFirstMethod = i_FirstMethod;


self.sendport = '';


self.log = Log;


self.output = Output;


self.port = Port;


self.re_num = None; 



def waiting(self):


if not self.waitEnd is None:


self.waitEnd.wait(); 



def SetStopEvent(self):


if not self.waitEnd is None:


self.waitEnd.set();


self.alive = False;


self.stop();

def start(self):


self.l_serial = serial.Serial();


self.l_serial.port = self.port;


self.l_serial.baudrate = 9600;


self.l_serial.timeout = 2; 


self.re_num = re.compile('\\d'); 



try:
if not self.output is None:


self.output.WriteTex;


if not self.log is None:


self.log.info;


self.l_serial.open();


except Exception, ex:


if self.l_serial.isOpen():


self.l_serial.close();

self.l_serial = None;

if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.error(u'%s' % ex);


return False;

if self.l_serial.isOpen():


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info;


self.waitEnd = threading.Event();


self.alive = True;


self.thread_read = None;


self.thread_read = threading.Thread(target=self.FirstReader);


self.thread_read.setDaemon(1);


self.thread_read.start();


return True;


else:


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info;


return False;

def InitHead(self):

try:


time.sleep(3);


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info;


self.l_serial.flushInput();


self.l_serial.write('\\x11');


data1 = self.l_serial.read(1024);


except ValueError,ex:


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.error(u'%s' % ex);


self.SetStopEvent();


return; 



if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info;


self.output.WriteText(u'===================================\\r\\n'); 



def SendData(self, i_msg):


lmsg = '';


isOK = False;


if isinstance(i_msg, unicode):


lmsg = i_msg.encode('gb18030');


lmsg = i_msg;



pass


except Exception, ex:


pass;


return isOK; 



def FirstReader(self):


data1 = '';


isQuanJiao = True;


isFirstMethod = True;


isEnd = True;


readCount = 0;


saveCount = 0;


RepPos = 0;


#read Head Infor content


self.InitHead(); 



while self.alive:


try:


data = '';


n = self.l_serial.inWaiting();


if n:


data = data + self.l_serial.read(n);


#print binascii.b2a_hex(data), 



for l in xrange(len(data)):


if ord(data[l])==0x8E:


isQuanJiao = True;


continue;


if ord(data[l])==0x8F:


isQuanJiao = False;


continue;


if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:


if len(data1)>10:


if not self.re_num.search(data1,1) is None:


saveCount = saveCount + 1;


if RepPos==0:


RepPos = self.output.GetInsertionPoint();


self.output.Remove(RepPos,self.output.GetLastPosition()); 



self.SendData(data1);


data1 = '';


continue;


except Exception, ex:


if not self.log is None:


self.log.error(u'%s' % ex); 



self.waitEnd.set();


self.alive = False; 


def stop(self):


self.alive = False;


self.thread_read.join();


if self.l_serial.isOpen():


self.l_serial.close();


if not self.output is None:


self.output.WriteText;


if not self.log is None:


self.log.info; 



def printHex(self, s):


s1 = binascii.b2a_hex(s);


print s1; 




if __name__ == '__main__':


rt = ReadThread();


f = open("sendport.cfg", "r")


rt.sendport = f.read()


f.close()


try:


if rt.start():


rt.waiting();


rt.stop();


else:


pass;


except Exception,se:


print str(se); 



if rt.alive:


rt.stop(); 



print 'End OK .';


del rt;

参考技术B 用split()分割即可a=input('inputaandb')lis

以上是关于如何用python写个串口通信的程序的主要内容,如果未能解决你的问题,请参考以下文章

vc 串口

python - serial communication(串口通信)

如何用VB通过485串口来读取电能表的数据?

Qt 中的qserialplot 串口通信功能如何在qt for android用?或者用Qt如何实现android版本的串口通信功能?

有人会 用python的 pySerial 进行串口通信的吗

程序如何写串口控制继电器开关?求思路和源码?