Python3+PCAN-USB基于PCAN-Basic二次开发实现上位机功能
Posted 疯狂的机器人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python3+PCAN-USB基于PCAN-Basic二次开发实现上位机功能相关的知识,希望对你有一定的参考价值。
一、环境搭建
1.概述
本文主要是通过Python3与CAN总线工具PCAN-USB基于官方PCAN-Basic库实现CAN总线消息读写功能。
2.PCANBasic.py和PCANBasic.dll下载地址
https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip
3.Python安装
下载地址:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe
二、项目展示
1.文件目录
2.xmt文件内容
3.CAN消息读取截图
4.CAN消息发送截图
三、完整代码
#!/usr/bin/python # _*_ coding:utf-8 _*_ from PCANBasic import * from queue import * import threading, time, os class PcanOperate(PCANBasic, threading.Thread): def __init__(self): super().__init__() # 继承父类的init方法 result = self.Initialize(PCAN_USBBUS1, PCAN_BAUD_500K) # 总线初始化 if result != PCAN_ERROR_OK: print(self.GetErrorText(result)) # 发生错误,获取错误信息 else: print("PCAN-USB 已初始化") def ProcessMessage(self, msg, timestamp): """CAN消息处理方法""" msg_dict = {} msg_id = hex(msg.ID) if len(msg_id[2:]) == 7: msg_dict["CANID"] = \'0\' + msg_id[2:].upper() else: msg_dict["CANID"] = msg_id[2:].upper() msg_dict["MSGTYPE"] = msg.MSGTYPE msg_dict["LEN"] = msg.LEN data = \'\' for i in range(8): if len(hex(msg.DATA[i])[2:]) == 1: d = \' \' + \'0\' + hex(msg.DATA[i])[2:].upper() else: d = \' \' + hex(msg.DATA[i])[2:].upper() data += d msg_dict["DATA"] = data[1:] timestamp_dict = {} timestamp_dict[\'millis\'] = timestamp.millis timestamp_dict[\'millis_overflow\'] = timestamp.millis_overflow timestamp_dict[\'micros\'] = timestamp.micros time_sum = timestamp.micros + 1000 * timestamp.millis + 0x100000000 * 1000 * timestamp.millis_overflow return msg_dict def PutQueue(self): """从总线中读取CAN消息及其时间戳,并放入队列。""" while True: """检查接收队列是否有新消息""" readResult = self.GetStatus(PCAN_USBBUS1) if readResult != PCAN_ERROR_OK: result = self.GetErrorText(readResult) print(result[1]) event2.set() else: readResult = self.Read(PCAN_USBBUS1) # 接收总线报文 time.sleep(0.01) if readResult[0] != PCAN_ERROR_QRCVEMPTY: print("消息入队") q.put(self.ProcessMessage(readResult[1], readResult[2])) # 消息入队 event1.set() else: result = self.GetErrorText(readResult[0]) print(result[1]) event2.set() break def GetQueue(self): """从队列中获取信息""" while True: if event2.is_set(): break if not q.empty(): event1.wait() print(q.get()) # 消息出队 def GetXmtMsg(self): """获取xmt文件中需要发送的消息""" with open(os.path.join(os.getcwd(), \'zuidazhi.xmt\'), \'r+\') as f: fo = f.readlines() allcandata = [] for i, j in enumerate(fo, start=1): onecandata = {} if i > 14: f1 = j.strip().split() if len(f1) == 14: # 获取长度为8的data onecandata[\'cycle\'] = (int(f1[3])) msg = TPCANMsg() msg.ID = int(f1[1][:-1], 16) msg.MSGTYPE = PCAN_MESSAGE_EXTENDED msg.LEN = int(f1[4]) msg.DATA[0] = int(f1[6][:-1], 16) msg.DATA[1] = int(f1[7][:-1], 16) msg.DATA[2] = int(f1[8][:-1], 16) msg.DATA[3] = int(f1[9][:-1], 16) msg.DATA[4] = int(f1[10][:-1], 16) msg.DATA[5] = int(f1[11][:-1], 16) msg.DATA[6] = int(f1[12][:-1], 16) msg.DATA[7] = int(f1[13][:-1], 16) onecandata[\'msg\'] = msg allcandata.append(onecandata) return allcandata def CanWrite(self): """发送CAN消息""" allcandata = self.GetXmtMsg() print(allcandata) if len(allcandata) > 0: for i in allcandata: result = self.Write(PCAN_USBBUS1, i[\'msg\']) currenttime = int(round(time.time() * 1000)) i[\'currenttime\'] = currenttime if result != PCAN_ERROR_OK: result = self.GetErrorText(result) print(result) else: print("时间戳记录成功") time.sleep(1) while True: currenttime = int(round(time.time() * 1000)) for i in allcandata: interval = currenttime - i[\'currenttime\'] if interval >= i[\'cycle\']: result = self.Write(PCAN_USBBUS1, i[\'msg\']) # 写 if result != PCAN_ERROR_OK: result = self.GetErrorText(result) print(result) else: print("消息发送成功") i[\'currenttime\'] = currenttime def __del__(self): result = self.Uninitialize(PCAN_USBBUS1) # 总线释放 if result != PCAN_ERROR_OK: result = self.GetErrorText(result) print(result[1]) else: print("PCAN-USB 已释放") if __name__ == \'__main__\': q = Queue() event1 = threading.Event() event2 = threading.Event() pcan = PcanOperate() # s1 = threading.Thread(target=pcan.CanWrite, name="发送CAN消息线程") # s1.start() s2 = threading.Thread(target=pcan.PutQueue, name="读取CAN消息并放入队列线程") s2.start() s3 = threading.Thread(target=pcan.GetQueue, name="从队列中读取CAN消息线程") s3.start()
python-can库基于PCAN-USB使用方法
一、概述
1.介绍
python-can库为Python提供了控制器局域网的支持,为不同的硬件设备提供了通用的抽象,并提供了一套实用程序,用于在CAN总线上发送和接收消息。
支持硬件接口:
Name |
Documentation |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2.环境搭建
Python安装:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe
PCAN-USB驱动:https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip
库:pip install python-can
3.参考文档
https://python-can.readthedocs.io/en/master/#
二、常用方法
1.接收报文
from can.interfaces.pcan.pcan import PcanBus def bus_recv(): """轮询接收消息""" try: while True: msg = bus.recv(timeout=100) print(msg) except KeyboardInterrupt: pass if __name__ == \'__main__\': bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000) bus_recv()
2.发送报文
from can.interfaces.pcan.pcan import PcanBus def bus_send(): """can消息发送""" while True: time.sleep(0.02) try: bus.send(msg) print("消息发送 ".format(bus.channel_info)) except can.CanError: print("消息未发送") if __name__ == \'__main__\': msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True) # 报文 bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000) bus_send()
3.定期发送报文
def bus_send_periodic(): """周期发送报文""" print("开始每200毫秒发送一条消息。持续时间10s") task = bus.send_periodic(msg, 1.5) # 定期发送 if not isinstance(task, can.ModifiableCyclicTaskABC): # 断言task类型 print("此接口似乎不支持") task.stop() return time.sleep(5) # 持续时间 print("发送完成") print("更改运行任务的数据以99开头") msg.data[0] = 0x99 task.modify_data(msg) # 修改data首字节 time.sleep(10) task.stop() print("停止循环发送") print("将停止任务的数据更改为单个 ff 字节") msg.data = bytearray([0xff]) # 重新定向data msg.dlc = 1 # 定义data长度 task.modify_data(msg) # 修改data time.sleep(10) print("重新开始") task.start() # 重新启动已停止的周期性任务 time.sleep(10) task.stop() print("完毕") if __name__ == \'__main__\': msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True) # 报文 bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000) bus_send_periodic()
4.循环收发消息
from can.interfaces.pcan.pcan import PcanBus def send_cyclic(stop_event): """循环发送消息""" print("开始每1秒发送1条消息") start_time = time.time() while not stop_event.is_set(): msg.timestamp = time.time() - start_time bus.send(msg) print(f"tx: msg") time.sleep(1) print("停止发送消息") def receive(stop_event): """循环接收消息""" print("开始接收消息") while not stop_event.is_set(): rx_msg = bus.recv(1) if rx_msg is not None: print(f"rx: rx_msg") print("停止接收消息") def send_and_recv_msg(): """发送一个消息并接收一个消息,需要双通道CAN""" stop_event = threading.Event() t_send_cyclic = threading.Thread(target=send_cyclic, args=(stop_event,)) t_receive = threading.Thread(target=receive, args=(stop_event,)) t_receive.start() t_send_cyclic.start() try: while True: time.sleep(0) # yield except KeyboardInterrupt: pass # 正常退出 stop_event.set() time.sleep(0.5) if __name__ == \'__main__\': msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True) # 报文 bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000) send_and_recv_msg()
以上是关于Python3+PCAN-USB基于PCAN-Basic二次开发实现上位机功能的主要内容,如果未能解决你的问题,请参考以下文章
[硬拆解]拆解一个USB转CAN-FD总线设备-PCAN-USB FD