如何使用 pySerial 从 serialException 中恢复
Posted
技术标签:
【中文标题】如何使用 pySerial 从 serialException 中恢复【英文标题】:How do I recover from a serialException using pySerial 【发布时间】:2013-01-25 16:24:21 【问题描述】:我有一个应用程序可以读取数据并将数据传输到通过 USB 连接的设备。我正在使用 pySerial 来促进这种交流。一切正常,直到从 PC 上拔下 USB 电缆并引发异常。重新插入电缆后,我似乎无法恢复并重新连接到我的设备。我恢复的唯一方法是关闭应用程序并拔下并重新插入电缆。任何有助于了解正在发生的事情将不胜感激。
这是我用来帮助我理解流程的基本测试代码。
# Class used to communicate with USB Dongle
import serial
import time
import sys
class LPort:
def __init__(self, port=0):
"initialize the LPort class"
self.error = ""
self.traffic = ""
self.dest = None
if port == None:
self.simulation = True
else:
self.simulation = False
self.port = port # serial port we should use
self.reset()
self.time = time.time()
def reInit(self):
self.close()
def reset(self):
"flush port, reset the LPort, initialize LPort"
if self.simulation:
r = "LPort simulator"
else:
self.port.flushInput()
self.port.flushOutput()
self.fail = False
self.command("/H1")
self.dest = None
r = "reset"
self.error = ""
self.traffic = ""
return r
def status(self):
"return accumulated status info, reset collection"
s = self.error
self.error = ""
return s
def data(self):
"return accumulated traffic data, reset collection"
s = self.traffic
self.traffic = ""
return s
def set_dest(self, addr):
"set the destination address (if necessary)"
if addr != self.dest:
self.dest = addr
self.command("/O")
r = self.command("/D%02X" % addr)
if r != "*":
self.dest = None
self.error += r
else:
r = True
return r
def checksum(self, bytes):
"calculate the CRC-8 checksum for the given packet"
crc_table = [
# this table is taken from the CP rectifier code
0x00,0x07,0x0E,0x09,0x1C,0x1B,0x12,0x15,0x38,0x3F,
0x36,0x31,0x24,0x23,0x2A,0x2D,0x70,0x77,0x7E,0x79,
0x6C,0x6B,0x62,0x65,0x48,0x4F,0x46,0x41,0x54,0x53,
0x5A,0x5D,0xE0,0xE7,0xEE,0xE9,0xFC,0xFB,0xF2,0xF5,
0xD8,0xDF,0xD6,0xD1,0xC4,0xC3,0xCA,0xCD,0x90,0x97,
0x9E,0x99,0x8C,0x8B,0x82,0x85,0xA8,0xAF,0xA6,0xA1,
0xB4,0xB3,0xBA,0xBD,0xC7,0xC0,0xC9,0xCE,0xDB,0xDC,
0xD5,0xD2,0xFF,0xF8,0xF1,0xF6,0xE3,0xE4,0xED,0xEA,
0xB7,0xB0,0xB9,0xBE,0xAB,0xAC,0xA5,0xA2,0x8F,0x88,
0x81,0x86,0x93,0x94,0x9D,0x9A,0x27,0x20,0x29,0x2E,
0x3B,0x3C,0x35,0x32,0x1F,0x18,0x11,0x16,0x03,0x04,
0x0D,0x0A,0x57,0x50,0x59,0x5E,0x4B,0x4C,0x45,0x42,
0x6F,0x68,0x61,0x66,0x73,0x74,0x7D,0x7A,0x89,0x8E,
0x87,0x80,0x95,0x92,0x9B,0x9C,0xB1,0xB6,0xBF,0xB8,
0xAD,0xAA,0xA3,0xA4,0xF9,0xFE,0xF7,0xF0,0xE5,0xE2,
0xEB,0xEC,0xC1,0xC6,0xCF,0xC8,0xDD,0xDA,0xD3,0xD4,
0x69,0x6E,0x67,0x60,0x75,0x72,0x7B,0x7C,0x51,0x56,
0x5F,0x58,0x4D,0x4A,0x43,0x44,0x19,0x1E,0x17,0x10,
0x05,0x02,0x0B,0x0C,0x21,0x26,0x2F,0x28,0x3D,0x3A,
0x33,0x34,0x4E,0x49,0x40,0x47,0x52,0x55,0x5C,0x5B,
0x76,0x71,0x78,0x7F,0x6A,0x6D,0x64,0x63,0x3E,0x39,
0x30,0x37,0x22,0x25,0x2C,0x2B,0x06,0x01,0x08,0x0F,
0x1A,0x1D,0x14,0x13,0xAE,0xA9,0xA0,0xA7,0xB2,0xB5,
0xBC,0xBB,0x96,0x91,0x98,0x9F,0x8A,0x8D,0x84,0x83,
0xDE,0xD9,0xD0,0xD7,0xC2,0xC5,0xCC,0xCB,0xE6,0xE1,
0xE8,0xEF,0xFA,0xFD,0xF4,0xF3]
for i in range(len(bytes)):
b = int(bytes[i])
if i == 0: chksum = crc_table[b]
else: chksum = crc_table[chksum ^ b]
return chksum
def command(self, cmd):
"transmit distinct commands to unit, and accept response"
if self.simulation:
r = "*"
else:
try:
self.port.write(cmd + chr(13))
except serial.serialutil.SerialTimeoutException:
r = "/TO"
return r
except:
print "Unexpected error:", sys.exc_info()[0]
r = "/Unknown"
return r
r = ""
eol = False
while True:
c = self.port.read(1)
if not c:
r = "/FAIL " + r + " " + cmd
self.error = r
break
else:
r += c
ordc = ord(c)
if ordc == 13 or ordc == 42:
break
return r
def checkRawDataForErrors(self, raw, errors = []):
errorCodes = '/SNA':'Slave Not Acknowledging',
'/I81':'Busy, Command Ignored',
'/I88':'Connection Not Open',
'/I89':'Invalid Command Argument',
'/I8A':'Transmit Not Active',
'/I8F':'Invalid Command',
'/I90':'Buffer Overflow',
'/DAT':'Data Error',
'/BADPEC':'Bad PEC Value',
'/NO_MRC':'No Master Read Complete Signal',
'/FAIL':'General Failure',
'/LEN':'Data Length Error'
for ekey, eval in errorCodes.items():
if ekey in raw:
errors.append(eval)
return errors
# self-testing module
if __name__ == "__main__":
com = serial.Serial(port=4, baudrate=115200, timeout=1, xonxoff=0)
if com:
port = LPort(com)
print port
time.sleep(5)
port = LPort(com)
print "/V =", port.command("/V")
print "/V", port.data(), port.status()
print "/O =", port.command("/O")
print "/O", port.data(), port.status()
print "/A =", port.command("/A")
print "/A", port.data(), port.status()
print "/L =", port.command("/L")
print "/L", port.data(), port.status()
com.close()
else:
print "cannot open com port"
更新: 以下是 serialwin32.py 中 creatfile() 周围的代码,它返回以下消息: serial.serialutil.SerialException: 无法打开端口 COM5: [错误 2] 系统找不到指定的文件。
self.hComPort = win32.CreateFile(port,
win32.GENERIC_READ | win32.GENERIC_WRITE,
0, # exclusive access
None, # no security
win32.OPEN_EXISTING,
win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
0)
if self.hComPort == win32.INVALID_HANDLE_VALUE:
self.hComPort = None # 'cause __del__ is called anyway
raise SerialException("could not open port %s: %s" % (self.portstr, ctypes.WinError()))
【问题讨论】:
这可能与操作系统有关。 “关闭,睡眠,重新打开”有帮助吗?如果是linux,拔出后插入是否会创建同名设备? (如果没有,你能用 udev 规则之类的东西让它一样吗?) 抱歉,我想我应该指定我使用的是 Windows。如果我 close() 然后在睡觉时拔掉插头并重新插入,它似乎会恢复。我希望完成的是无需拔下插头并重新插入即可恢复。我知道某处有一个文件包含端口信息以及它是否已打开和正在使用,但我似乎找不到方法无需人工干预即可重置。 PySerial 在命令“win32file.CreateFile()”上失败,有谁知道为什么会失败? 它没有说明原因吗? (对您来说唯一有趣的情况是它失败,因为端口已经在使用中(它可能会触发不明显的错误消息,如“访问被拒绝”或其他东西)。所有其他错误大多是无望的“这个事情拒绝工作”)。 我得到的错误信息是:serial.serialutil.SerialException: could not open port COM5: [Error 2] The system cannot find the file specified.我已更新我的问题以包含有关此的更多信息。 【参考方案1】:假设您的设备性能良好,您所要做的就是:
关闭你的串口(@987654321@实例) 再次找到您的端口的 COMX 名称 打开串口第二部分是有问题的,因为 Windows 试图变得聪明。在您的情况下,会发生以下情况:
USB 设备已连接并分配名称COM2
您的程序会打开设备
USB 断开连接
在您的程序发现设备死机之前,USB 会快速重新连接
Windows 发现 COM2
正忙并为此 USB 设备分配不同的名称
(可选)您的程序关闭设备
您的程序尝试再次打开 COM2
,但该名称没有硬件
巧妙地绕过 Windows 的方法 - 您可以在设备管理器、COM 端口、您的端口、高级选项中专门为该设备分配固定的 COMX
名称。
另一种选择是检测设备快速死亡并关闭文件句柄。如果幸运的话,当设备重新连接时,原来的COM2
又是免费的了。
另一种选择是使用来自其他制造商的 USB 串行转换器,该制造商使用其他驱动程序。不知何故,COMX
字母分配是特定于驱动程序的。更好的驱动程序可能会给你一个稳定的名字。
【讨论】:
【参考方案2】:我也遇到过这个问题。有时再次插入设备时我的程序已锁定。
注意。我已经修复了@qarma 提到的端口的COMx
名称
我重新安排了我的程序,以便一旦从Serial
的read()
或write()
方法引发异常,我就会停止调用这些方法。
然后我有一个功能,它会定期重试打开端口以尝试检测设备何时再次插入。
这个函数创建一个 Serial
的新实例,其参数与原始实例相同,并尝试打开它:
def try_to_open_new_port(self):
ret = False
test = serial.Serial(baudrate=9600, timeout=0, writeTimeout=0)
test.port = self.current_port_name
try:
test.open()
if test.isOpen():
test.close()
ret = True
except serial.serialutil.SerialException:
pass
return ret
返回True
表示该端口再次存在。
【讨论】:
以上是关于如何使用 pySerial 从 serialException 中恢复的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 PySerial 与 micro:bit 建立串行通信?
如何通过 pySerial 从 Python 发送 int 或 string 并在 C 中转换为 int 或 String