Linux:在 PyQt5 GUI 上显示所有当前连接的 USB 记忆棒的名称

Posted

技术标签:

【中文标题】Linux:在 PyQt5 GUI 上显示所有当前连接的 USB 记忆棒的名称【英文标题】:Linux: Display names of all currently attached USB sticks on PyQt5 GUI 【发布时间】:2019-03-26 20:55:33 【问题描述】:

以下代码在 Linux 上的控制台(作为 PyQt5 GUI 的替代品)中显示了新插入的 USB 记忆棒的名称。

不幸的是,当您在未正确弹出 U 盘的情况下拔出 U 盘时,控制台中会立即出现 pyudev.device._errors.DeviceNotFoundAtPathError

需要改变什么来解决这个错误?

ma​​in.py

from functools import partial
import os
import sys

import pyudev

from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QSocketNotifier, QObject, pyqtSignal


class MainWindow():

    def __init__(self, parent=None):
        super().__init__()
        # GUI code
        pass

    def print_name(self, name):
        print(name)


class LinuxDeviceMonitor(QObject):
    devices_changed = pyqtSignal(list)

    def __init__(self):
        super().__init__()
        self._context = pyudev.Context()

        self._monitor = pyudev.Monitor.from_netlink(self._context)
        self._monitor.start()

        self._devices = set()

        self._process_devices(self._context.list_devices(), action="add")

    def fileno(self):
        return self._monitor.fileno()

    @property
    def device_names(self):
        return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]

    def process_incoming(self):
        read_device = partial(pyudev._util.eintr_retry_call, self._monitor.poll, timeout=0)
        self._process_devices(iter(read_device, None))
        self.devices_changed.emit(self.device_names)

    def _process_devices(self, devices, action=None):
        for device in devices:
            action = device.action if action is None else action

            if action in ("add", "change") and self._is_usb_mass_storage_device(device):
                self._devices.add(device.sys_path)
            elif action == "remove" and device.sys_path in self._devices:
                self._devices.remove(device.sys_path)

    @classmethod
    def _read_device_flag(self, device, name):
        path = os.path.join(device.sys_path, name)
        try:
            with open(path) as data:
                return bool(int(data.read()))
        except (IOError, ValueError):
            return False

    def _is_usb_mass_storage_device(self, device):
        is_removable = self._read_device_flag(device, "removable")
        has_size = self._read_device_flag(device, "size")
        has_usb = device.get("ID_BUS") == "usb"
        has_no_disc = device.get("ID_CDROM") is None
        return is_removable and has_size and has_usb and has_no_disc


def main():
    app = QApplication(sys.argv)

    main_window = MainWindow()

    linux_device_monitor = LinuxDeviceMonitor()

    notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
    notifier.activated.connect(linux_device_monitor.process_incoming)

    linux_device_monitor.devices_changed.connect(main_window.print_name)

    sys.exit(app.exec_())


if __name__ == '__main__':
    main()

【问题讨论】:

【参考方案1】:

我的第一个答案没有任何问题,但是您可以用自己的函数覆盖一个函数。以下是针对您的问题的示例。 主要变化有:

添加:

from pyudev._util import ensure_byte_string

并覆盖from_sys_path函数:

        pyudev.Devices.from_sys_path = self.from_sys_path

    def from_sys_path(self, context, sys_path):
        device = context._libudev.udev_device_new_from_syspath(
            context, ensure_byte_string(sys_path))
        if not device:
            return None
        return pyudev.Device(context, device)

改变:

    @property
    def device_names(self):
        return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]

到:

    def device_names(self):
        devices = []
        for device in self._devices:
            dev = pyudev.Devices.from_path(self._context, device)
            if dev is not None:
                devices.append(dev.get("DEVNAME"))
        return devices

        self.devices_changed.emit(self.device_names)

        self.devices_changed.emit(self.device_names())

整个代码如下所示:

from functools import partial
import os
import sys

import pyudev

from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QSocketNotifier, QObject, pyqtSignal
from pyudev._util import ensure_byte_string


class MainWindow():

    def __init__(self, parent=None):
        super().__init__()
        # GUI code
        pass

    def print_name(self, name):
        print(name)


class LinuxDeviceMonitor(QObject):
    devices_changed = pyqtSignal(list)

    def __init__(self):
        super().__init__()
        self._context = pyudev.Context()

        self._monitor = pyudev.Monitor.from_netlink(self._context)
        self._monitor.start()

        self._devices = set()

        self._process_devices(self._context.list_devices(), action="add")
        pyudev.Devices.from_sys_path = self.from_sys_path

    def from_sys_path(self, context, sys_path):
        device = context._libudev.udev_device_new_from_syspath(
            context, ensure_byte_string(sys_path))
        if not device:
            return None
        return pyudev.Device(context, device)

    def fileno(self):
        return self._monitor.fileno()

    def device_names(self):
        devices = []
        for device in self._devices:
            dev = pyudev.Devices.from_path(self._context, device)
            if dev is not None:
                devices.append(dev.get("DEVNAME"))
        return devices

    def process_incoming(self):
        read_device = partial(pyudev._util.eintr_retry_call, self._monitor.poll, timeout=0)
        self._process_devices(iter(read_device, None))
        self.devices_changed.emit(self.device_names())

    def _process_devices(self, devices, action=None):
        for device in devices:
            action = device.action if action is None else action

            if action in ("add", "change") and self._is_usb_mass_storage_device(device):
                self._devices.add(device.sys_path)
            elif action == "remove" and device.sys_path in self._devices:
                self._devices.remove(device.sys_path)

    @classmethod
    def _read_device_flag(self, device, name):
        path = os.path.join(device.sys_path, name)
        try:
            with open(path) as data:
                return bool(int(data.read()))
        except (IOError, ValueError):
            return False

    def _is_usb_mass_storage_device(self, device):
        is_removable = self._read_device_flag(device, "removable")
        has_size = self._read_device_flag(device, "size")
        has_usb = device.get("ID_BUS") == "usb"
        has_no_disc = device.get("ID_CDROM") is None
        return is_removable and has_size and has_usb and has_no_disc


def main():
    app = QApplication(sys.argv)

    main_window = MainWindow()

    linux_device_monitor = LinuxDeviceMonitor()

    notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
    notifier.activated.connect(linux_device_monitor.process_incoming)

    linux_device_monitor.devices_changed.connect(main_window.print_name)

    sys.exit(app.exec_())


if __name__ == '__main__':
    main()

您的 IDE 可能会抱怨您正在访问模块的受保护成员 (pyudev._util),但它仍然可以工作。

【讨论】:

【参考方案2】:

您可以简单地捕获异常:

改变

    @property
    def device_names(self):
        return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]

    def device_names(self):
        devices = []
        for device in self._devices:
            try:
                dev_name = pyudev.Devices.from_path(self._context, device).get("DEVNAME")
                devices.append(dev_name)
            except pyudev.DeviceNotFoundAtPathError:
                pass
        return devices

        self.devices_changed.emit(self.device_names)

        self.devices_changed.emit(self.device_names())

整个代码:

from functools import partial
import os
import sys

import pyudev

from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QSocketNotifier, QObject, pyqtSignal


class MainWindow():

    def __init__(self, parent=None):
        super().__init__()
        # GUI code
        pass

    def print_name(self, name):
        print(name)


class LinuxDeviceMonitor(QObject):
    devices_changed = pyqtSignal(list)

    def __init__(self):
        super().__init__()
        self._context = pyudev.Context()

        self._monitor = pyudev.Monitor.from_netlink(self._context)
        self._monitor.start()

        self._devices = set()

        self._process_devices(self._context.list_devices(), action="add")

    def fileno(self):
        return self._monitor.fileno()

    def device_names(self):
        devices = []
        for device in self._devices:
            try:
                dev_name = pyudev.Devices.from_path(self._context, device).get("DEVNAME")
                devices.append(dev_name)
            except pyudev.DeviceNotFoundAtPathError:
                pass
        return devices

    def process_incoming(self):
        read_device = partial(pyudev._util.eintr_retry_call, self._monitor.poll, timeout=0)
        self._process_devices(iter(read_device, None))
        self.devices_changed.emit(self.device_names())

    def _process_devices(self, devices, action=None):
        for device in devices:
            action = device.action if action is None else action

            if action in ("add", "change") and self._is_usb_mass_storage_device(device):
                self._devices.add(device.sys_path)
            elif action == "remove" and device.sys_path in self._devices:
                self._devices.remove(device.sys_path)

    @classmethod
    def _read_device_flag(self, device, name):
        path = os.path.join(device.sys_path, name)
        try:
            with open(path) as data:
                return bool(int(data.read()))
        except (IOError, ValueError):
            return False

    def _is_usb_mass_storage_device(self, device):
        is_removable = self._read_device_flag(device, "removable")
        has_size = self._read_device_flag(device, "size")
        has_usb = device.get("ID_BUS") == "usb"
        has_no_disc = device.get("ID_CDROM") is None
        return is_removable and has_size and has_usb and has_no_disc


def main():
    app = QApplication(sys.argv)

    main_window = MainWindow()

    linux_device_monitor = LinuxDeviceMonitor()

    notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
    notifier.activated.connect(linux_device_monitor.process_incoming)

    linux_device_monitor.devices_changed.connect(main_window.print_name)

    sys.exit(app.exec_())


if __name__ == '__main__':
    main()

插入和拔出 U 盘时的输出:

[]
[]
[]
['/dev/sdc']
['/dev/sdc']
['/dev/sdc']
[]
[]
[]

【讨论】:

太棒了。捕获异常可以避免错误消息。只是出于兴趣,有没有办法在不使用异常的情况下修复这个错误?

以上是关于Linux:在 PyQt5 GUI 上显示所有当前连接的 USB 记忆棒的名称的主要内容,如果未能解决你的问题,请参考以下文章

PyQt5快速入门PyQt5 GUI界面设计

认识PyQt5

认识PyQt5

如何在 PyQt5 GUI 中显示 Folium 地图?

从 Matplotlib 图中提取信息并将其显示在 PyQt5 GUI 中

无法使用pyqt5显示秒表