Python3+PCAN-USB基于PCAN-Basic二次开发实现上位机功能

Posted 疯狂的机器人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python3+PCAN-USB基于PCAN-Basic二次开发实现上位机功能相关的知识,希望对你有一定的参考价值。

一、环境搭建

1.概述

本文主要是通过Python3与CAN总线工具PCAN-USB基于官方PCAN-Basic库实现CAN总线消息读写功能。

 

2.PCANBasic.py和PCANBasic.dll下载地址

https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip

 

3.Python安装

下载地址:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe

 

二、项目展示

1.文件目录

2.xmt文件内容

 

3.CAN消息读取截图

 

4.CAN消息发送截图

 

三、完整代码

#!/usr/bin/python
# _*_ coding:utf-8 _*_
from PCANBasic import *
from queue import *
import threading, time, os


class PcanOperate(PCANBasic, threading.Thread):
    def __init__(self):
        super().__init__()  # 继承父类的init方法
        result = self.Initialize(PCAN_USBBUS1, PCAN_BAUD_500K)  # 总线初始化
        if result != PCAN_ERROR_OK:
            print(self.GetErrorText(result))  # 发生错误,获取错误信息
        else:
            print("PCAN-USB 已初始化")

    def ProcessMessage(self, msg, timestamp):
        """CAN消息处理方法"""
        msg_dict = {}
        msg_id = hex(msg.ID)
        if len(msg_id[2:]) == 7:
            msg_dict["CANID"] = \'0\' + msg_id[2:].upper()
        else:
            msg_dict["CANID"] = msg_id[2:].upper()
        msg_dict["MSGTYPE"] = msg.MSGTYPE
        msg_dict["LEN"] = msg.LEN
        data = \'\'
        for i in range(8):
            if len(hex(msg.DATA[i])[2:]) == 1:
                d = \' \' + \'0\' + hex(msg.DATA[i])[2:].upper()
            else:
                d = \' \' + hex(msg.DATA[i])[2:].upper()
            data += d
        msg_dict["DATA"] = data[1:]

        timestamp_dict = {}
        timestamp_dict[\'millis\'] = timestamp.millis
        timestamp_dict[\'millis_overflow\'] = timestamp.millis_overflow
        timestamp_dict[\'micros\'] = timestamp.micros
        time_sum = timestamp.micros + 1000 * timestamp.millis + 0x100000000 * 1000 * timestamp.millis_overflow
        return msg_dict

    def PutQueue(self):
        """从总线中读取CAN消息及其时间戳,并放入队列。"""
        while True:
            """检查接收队列是否有新消息"""
            readResult = self.GetStatus(PCAN_USBBUS1)
            if readResult != PCAN_ERROR_OK:
                result = self.GetErrorText(readResult)
                print(result[1])
                event2.set()
            else:
                readResult = self.Read(PCAN_USBBUS1)  # 接收总线报文
                time.sleep(0.01)
                if readResult[0] != PCAN_ERROR_QRCVEMPTY:
                    print("消息入队")
                    q.put(self.ProcessMessage(readResult[1], readResult[2]))  # 消息入队
                    event1.set()
                else:
                    result = self.GetErrorText(readResult[0])
                    print(result[1])
                    event2.set()
                    break

    def GetQueue(self):
        """从队列中获取信息"""
        while True:
            if event2.is_set():
                break
            if not q.empty():
                event1.wait()
                print(q.get())  # 消息出队

    def GetXmtMsg(self):
        """获取xmt文件中需要发送的消息"""
        with open(os.path.join(os.getcwd(), \'zuidazhi.xmt\'), \'r+\') as f:
            fo = f.readlines()
        allcandata = []
        for i, j in enumerate(fo, start=1):
            onecandata = {}
            if i > 14:
                f1 = j.strip().split()
                if len(f1) == 14:  # 获取长度为8的data
                    onecandata[\'cycle\'] = (int(f1[3]))
                    msg = TPCANMsg()
                    msg.ID = int(f1[1][:-1], 16)
                    msg.MSGTYPE = PCAN_MESSAGE_EXTENDED
                    msg.LEN = int(f1[4])
                    msg.DATA[0] = int(f1[6][:-1], 16)
                    msg.DATA[1] = int(f1[7][:-1], 16)
                    msg.DATA[2] = int(f1[8][:-1], 16)
                    msg.DATA[3] = int(f1[9][:-1], 16)
                    msg.DATA[4] = int(f1[10][:-1], 16)
                    msg.DATA[5] = int(f1[11][:-1], 16)
                    msg.DATA[6] = int(f1[12][:-1], 16)
                    msg.DATA[7] = int(f1[13][:-1], 16)
                    onecandata[\'msg\'] = msg
                    allcandata.append(onecandata)
        return allcandata

    def CanWrite(self):
        """发送CAN消息"""
        allcandata = self.GetXmtMsg()
        print(allcandata)
        if len(allcandata) > 0:
            for i in allcandata:
                result = self.Write(PCAN_USBBUS1, i[\'msg\'])
                currenttime = int(round(time.time() * 1000))
                i[\'currenttime\'] = currenttime
                if result != PCAN_ERROR_OK:
                    result = self.GetErrorText(result)
                    print(result)
                else:
                    print("时间戳记录成功")
        time.sleep(1)
        while True:
            currenttime = int(round(time.time() * 1000))
            for i in allcandata:
                interval = currenttime - i[\'currenttime\']
                if interval >= i[\'cycle\']:
                    result = self.Write(PCAN_USBBUS1, i[\'msg\'])  #
                    if result != PCAN_ERROR_OK:
                        result = self.GetErrorText(result)
                        print(result)
                    else:
                        print("消息发送成功")
                        i[\'currenttime\'] = currenttime

    def __del__(self):
        result = self.Uninitialize(PCAN_USBBUS1)  # 总线释放
        if result != PCAN_ERROR_OK:
            result = self.GetErrorText(result)
            print(result[1])
        else:
            print("PCAN-USB 已释放")


if __name__ == \'__main__\':
    q = Queue()
    event1 = threading.Event()
    event2 = threading.Event()
    pcan = PcanOperate()
    # s1 = threading.Thread(target=pcan.CanWrite, name="发送CAN消息线程")
    # s1.start()
    s2 = threading.Thread(target=pcan.PutQueue, name="读取CAN消息并放入队列线程")
    s2.start()
    s3 = threading.Thread(target=pcan.GetQueue, name="从队列中读取CAN消息线程")
    s3.start()

 

—————————————————————————————— 选择正确的事、再把事做正确 ——————————————————————————————

python-can库基于PCAN-USB使用方法

一、概述

1.介绍

python-can库为Python提供了控制器局域网的支持,为不同的硬件设备提供了通用的抽象,并提供了一套实用程序,用于在CAN总线上发送和接收消息。

支持硬件接口:

Name

Documentation

"socketcan"

SocketCAN

"kvaser"

Kvaser’s CANLIB

"serial"

CAN over Serial

"slcan"

CAN over Serial / SLCAN

"ixxat"

IXXAT Virtual CAN Interface

"pcan"

PCAN Basic API

"usb2can"

USB2CAN Interface

"nican"

NI-CAN

"iscan"

isCAN

"neovi"

neoVI

"vector"

Vector

"virtual"

Virtual

"canalystii"

CANalyst-II

"systec"

SYSTEC interface

 

2.环境搭建

Python安装:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe

PCAN-USB驱动:https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip

库:pip install python-can

 

3.参考文档

https://python-can.readthedocs.io/en/master/#

 

二、常用方法

1.接收报文

from can.interfaces.pcan.pcan import PcanBus


def bus_recv():
    """轮询接收消息"""
    try:
        while True:
            msg = bus.recv(timeout=100)
            print(msg)
    except KeyboardInterrupt:
        pass

if __name__ == \'__main__\':
    bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000)
    bus_recv()

 

2.发送报文

from can.interfaces.pcan.pcan import PcanBus


def bus_send():
    """can消息发送"""
    while True:
        time.sleep(0.02)
        try:
            bus.send(msg)
            print("消息发送 ".format(bus.channel_info))
        except can.CanError:
            print("消息未发送")


if __name__ == \'__main__\':
    msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE],
                      is_extended_id=True)  # 报文
    bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000)
    bus_send()

 

3.定期发送报文

def bus_send_periodic():
    """周期发送报文"""
    print("开始每200毫秒发送一条消息。持续时间10s")
    task = bus.send_periodic(msg, 1.5)  # 定期发送
    if not isinstance(task, can.ModifiableCyclicTaskABC):  # 断言task类型
        print("此接口似乎不支持")
        task.stop()
        return
    time.sleep(5)  # 持续时间
    print("发送完成")
    print("更改运行任务的数据以99开头")
    msg.data[0] = 0x99
    task.modify_data(msg)  # 修改data首字节
    time.sleep(10)

    task.stop()
    print("停止循环发送")
    print("将停止任务的数据更改为单个 ff 字节")
    msg.data = bytearray([0xff])  # 重新定向data
    msg.dlc = 1  # 定义data长度
    task.modify_data(msg)  # 修改data
    time.sleep(10)
    print("重新开始")
    task.start()  # 重新启动已停止的周期性任务
    time.sleep(10)
    task.stop()
    print("完毕")

if __name__ == \'__main__\':
    msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE],
                      is_extended_id=True)  # 报文
    bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000)
    bus_send_periodic()

 

4.循环收发消息

from can.interfaces.pcan.pcan import PcanBus


def send_cyclic(stop_event):
    """循环发送消息"""
    print("开始每1秒发送1条消息")
    start_time = time.time()
    while not stop_event.is_set():
        msg.timestamp = time.time() - start_time
        bus.send(msg)
        print(f"tx: msg")
        time.sleep(1)
    print("停止发送消息")


def receive(stop_event):
    """循环接收消息"""
    print("开始接收消息")
    while not stop_event.is_set():
        rx_msg = bus.recv(1)
        if rx_msg is not None:
            print(f"rx: rx_msg")
    print("停止接收消息")


def send_and_recv_msg():
    """发送一个消息并接收一个消息,需要双通道CAN"""
    stop_event = threading.Event()
    t_send_cyclic = threading.Thread(target=send_cyclic, args=(stop_event,))
    t_receive = threading.Thread(target=receive, args=(stop_event,))
    t_receive.start()
    t_send_cyclic.start()
    try:
        while True:
            time.sleep(0)  # yield
    except KeyboardInterrupt:
        pass  # 正常退出
    stop_event.set()
    time.sleep(0.5)


if __name__ == \'__main__\':
    msg = can.Message(arbitration_id=0x181DFF00, data=[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE],
                      is_extended_id=True)  # 报文
    bus = PcanBus(channel=\'PCAN_USBBUS1\', bitrate=500000)
    send_and_recv_msg()

 

—————————————————————————————— 选择正确的事、再把事做正确 ——————————————————————————————

以上是关于Python3+PCAN-USB基于PCAN-Basic二次开发实现上位机功能的主要内容,如果未能解决你的问题,请参考以下文章

[硬拆解]拆解一个USB转CAN-FD总线设备-PCAN-USB FD

基于Python3的漏洞检测工具 ( Python3 插件式框架 )

基于沙盒环境,安装python3.6

python3基于email模块发送邮件

python3 基于zabbix 自动抓取监控图片

基于deepin搭建Python3编程环境