Arduino - Raspberry Pi,使用 D-BUS API 的蓝牙连接

Posted

技术标签:

【中文标题】Arduino - Raspberry Pi,使用 D-BUS API 的蓝牙连接【英文标题】:Arduino - Raspberry Pi, Bluetooth Connection using D-BUS API 【发布时间】:2021-05-01 07:26:10 【问题描述】:

任务是使用基于 python 脚本的 D-BUS API 通过蓝牙自动完成 Arduino 和 Raspberry Pi 之间的配对和连接过程。

与Arduino连接的蓝牙模块为:Grove - Serial Bluetooth v3.0.

我能够自动化配对过程。配对脚本按顺序执行以下操作:

    它通过创建适配器对象并使用 StartDiscovery 方法查找名为 Slave 的蓝牙模块。(在 Arduino 中完成命名)。 注册蓝牙代理。 如果设备尚未配对,则通过Pair方法创建设备对象和配对。

执行上述步骤的代码部分如下:

register_agent()
start_discovery() 
time.sleep(10) 
for i in range(len(address_list)):
    new_dbus_device = get_device(address_list[i])
    dev_path = new_dbus_device.object_path
    device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
    pair_status = device_properties.Get("org.bluez.Device1", "Paired")
    if not pair_status:
        new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)

这是 register_agent() 按照要求执行的操作:

def register_agent():    
    agent = Agent(bus, path)
    capability = "NoInputNoOutput"
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    manager = dbus.Interface(obj, "org.bluez.AgentManager1")
    manager.RegisterAgent(path, capability)

但是,当我尝试调用 Bluez 的 device-api 中记录的 Connect 方法时,它总是失败。我创建了一个自定义串行端口配置文件并尝试了 ConnectProfile 方法,但又失败了。

如果我使用已弃用的 rfcomm 工具,则通过蓝牙进行的通信可以正常工作,或者如果我使用 python 套接字模块,则它可以正常工作。 但是,由于已弃用,我想避免使用 rfcomm .关于使用python socket库,连接只在rfcomm通道1有效,其他通道产生Connection Refused错误。

添加MRE,更好地澄清问题:

import dbus
import dbus.service
import dbus.mainloop.glib
import sys
import time
import subprocess

from bluezutils import *
from bluetooth import *
from gi.repository import GObject, GLib
from dbus.mainloop.glib import DBusGMainLoop

DBusGMainLoop(set_as_default=True) 

path = "/test/agent"
AGENT_INTERFACE = 'org.bluez.Agent1'
BUS_NAME = 'org.bluez'
bus = dbus.SystemBus() 

device_obj = None
dev_path = None

def set_trusted(path2):
    props = dbus.Interface(bus.get_object("org.bluez", path2),
                    "org.freedesktop.DBus.Properties")
    props.Set("org.bluez.Device1", "Trusted", True)

class Rejected(dbus.DBusException):
    _dbus_error_name = "org.bluez.Error.Rejected"
    
class Agent(dbus.service.Object):
    exit_on_release = True

    def set_exit_on_release(self, exit_on_release):
        self.exit_on_release = exit_on_release

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="", out_signature="")
    def Release(self):
        print("Release")
        if self.exit_on_release:
            mainloop.quit()

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="os", out_signature="")
    def AuthorizeService(self, device, uuid):
        print("AuthorizeService (%s, %s)" % (device, uuid))
        return 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="s")
    def RequestPinCode(self, device):
        set_trusted(device)
        return "0000" 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="u")
    def RequestPasskey(self, device): 
        set_trusted(device)
        return dbus.UInt32("0000") 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="ou", out_signature="")
    def RequestConfirmation(self, device, passkey):
        set_trusted(device)
        return 
        
    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="o", out_signature="")
    def RequestAuthorization(self, device):
        return 

    @dbus.service.method(AGENT_INTERFACE,
                    in_signature="", out_signature="")
    def Cancel(self):
        print("Cancel")

def pair_reply():
    print("Device paired and trusted")
    set_trusted(dev_path) 
    
def pair_error(error):
    err_name = error.get_dbus_name()
    if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj:
        print("Timed out. Cancelling pairing")
        device_obj.CancelPairing()
    else:
        print("Creating device failed: %s" % (error))
    mainloop.quit() 
    
def register_agent():    
    agent = Agent(bus, path)
    capability = "NoInputNoOutput"
    obj = bus.get_object(BUS_NAME, "/org/bluez");
    manager = dbus.Interface(obj, "org.bluez.AgentManager1")
    manager.RegisterAgent(path, capability)
    
def start_discovery():
    global pi_adapter
    pi_adapter = find_adapter() 
    scan_filter = dict("DuplicateData": False) 
    pi_adapter.SetDiscoveryFilter(scan_filter)
    pi_adapter.StartDiscovery()
    
def stop_discovery():
    pi_adapter.StopDiscovery()
    
def get_device(dev_str):
    # use [Service] and [Object path]:
    device_proxy_object = bus.get_object("org.bluez","/org/bluez/hci0/dev_"+dev_str)
    # use [Interface]:
    device1 = dbus.Interface(device_proxy_object,"org.bluez.Device1")
    return device1

def char_changer(text):
    text = text.replace(':', r'_')
    return text

def slave_finder(device_name):
    
    global sublist_normal
    sublist_normal = []
    sublist= []
    address = []
    edited_address = None
    sub = subprocess.run(["hcitool scan"], text = True, shell = True, capture_output=True)
    print(sub.stdout) #string type
    sublist = sub.stdout.split()
    for i in range(len(sublist)):
        if sublist[i] == device_name:
            print(sublist[i-1])
            sublist_normal.append(sublist[i-1])
            edited_address = char_changer(sublist[i-1])
            address.append(edited_address)
    return address
    
def remove_all_paired():
    for i in range(len(sublist_normal)):
        sub = subprocess.run(["bluetoothctl remove " + sublist_normal[i]], text = True, shell = True, capture_output=True)
        time.sleep(1)
    
    
if __name__ == '__main__':
    
    
    pair_status = None
    address_list = slave_finder('Slave') #Arduino bluetooth module named as "Slave", here we are finding it.
    time.sleep(2)
    remove_all_paired() #rfcomm requires repairing after release
    print(sublist_normal)
    if address_list: 
        register_agent()
        start_discovery() 
        time.sleep(10) 
        for i in range(len(address_list)):
            new_dbus_device = get_device(address_list[i])
            dev_path = new_dbus_device.object_path
            device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
            pair_status = device_properties.Get("org.bluez.Device1", "Paired")
            if not pair_status:
                new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)
    
    
    mainloop = GLib.MainLoop()
    mainloop.run()

sudo btmon 输出:

Bluetooth monitor ver 5.50
= Note: Linux version 5.4.83-v7l+ (armv7l)                             0.832473
= Note: Bluetooth subsystem version 2.22                               0.832478
= New Index: DC:A6:32:58:FE:13 (Primary,UART,hci0)              [hci0] 0.832481
= Open Index: DC:A6:32:58:FE:13                                 [hci0] 0.832484
= Index Info: DC:A6:32:5.. (Cypress Semiconductor Corporation)  [hci0] 0.832487
@ MGMT Open: bluetoothd (privileged) version 1.14             0x0001 0.832490
@ MGMT Open: btmon (privileged) version 1.14                  0x0002 0.832540

所以问题是为什么 ConnectConnectProfile 方法总是失败,我需要做什么才能在 Arduino 和 Raspberry Pi 之间建立基于 D-BUS API 的蓝牙通信?

【问题讨论】:

您还没有详细说明register_agent() 在做什么。我假设您正在注册自己的代理;类似于simple-agent? 编辑了问题。是的,确实如你所说。 感谢您更新问题。 Agent 代码中的内容是什么? pair_replypair_error 是什么?你从终端和sudo btmon 得到什么输出?我可以提请您注意How to create a Minimal, Reproducible Example 我也添加了它们。 getdevice() 是来自bluezutils 模块的方法。因此,当我调用 new_dbus_device.Connect() 时,它会产生错误。 【参考方案1】:

我想你已经回答了你自己的问题。 Connect 不起作用的原因是您没有在 Raspberry Pi 上运行串行端口配置文件 (SPP)。如果您正在运行btmon,您将看到客户端断开连接,因为没有与 Arduino 提供的匹配配置文件。

Python Socket 连接中使用的端口号需要与 Arduino 蓝牙模块上的端口号相匹配。这可能就是只有 1 起作用的原因。

作为参考,我用我放置的 Raspberry Pi 和 HC-06 进行了测试。我删除了所有扫描代码以尝试获得最小的可运行代码。这是我使用的代码:

import socket
from time import sleep
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib

BUS_NAME = 'org.bluez'
AGENT_IFACE = 'org.bluez.Agent1'
AGNT_MNGR_IFACE = 'org.bluez.AgentManager1'
ADAPTER_IFACE = 'org.bluez.Adapter1'
AGENT_PATH = '/ukBaz/bluezero/agent'
AGNT_MNGR_PATH = '/org/bluez'
DEVICE_IFACE = 'org.bluez.Device1'
CAPABILITY = 'KeyboardDisplay'
my_adapter_address = '11:22:33:44:55:66'
my_device_path = '/org/bluez/hci0/dev_00_00_12_34_56_78'
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()


class Agent(dbus.service.Object):

    @dbus.service.method(AGENT_IFACE,
                         in_signature='o', out_signature='s')
    def RequestPinCode(self, device):
        print(f'RequestPinCode device')
        return '0000'


class Device:
    def __init__(self, device_path):
        dev_obj = bus.get_object(BUS_NAME, device_path)
        self.methods = dbus.Interface(dev_obj, DEVICE_IFACE)
        self.props = dbus.Interface(dev_obj, dbus.PROPERTIES_IFACE)
        self._port = 1
        self._client_sock = socket.socket(socket.AF_BLUETOOTH,
                                          socket.SOCK_STREAM,
                                          socket.BTPROTO_RFCOMM)

    def connect(self):
        # self.methods.Connect()
        self._client_sock.bind((my_adapter_address, self._port))
        self._client_sock.connect((self.address, self._port))

    def disconnect(self):
        self.methods.Disconnect()

    def pair(self):
        self.methods.Pair(reply_handler=self._pair_reply,
                          error_handler=self._pair_error)

    def _pair_reply(self):
        print(f'Device trusted=self.trusted, connected=self.connected')
        self.trusted = True
        print(f'Device trusted=self.trusted, connected=self.connected')
        while self.connected:
            sleep(0.5)
        self.connect()
        print('Successfully paired and connected')

    def _pair_error(self, error):
        err_name = error.get_dbus_name()
        print(f'Creating device failed: err_name')

    @property
    def trusted(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Trusted'))

    @trusted.setter
    def trusted(self, value):
        self.props.Set(DEVICE_IFACE, 'Trusted', bool(value))

    @property
    def paired(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Paired'))

    @property
    def connected(self):
        return bool(self.props.Get(DEVICE_IFACE, 'Connected'))

    @property
    def address(self):
        return str(self.props.Get(DEVICE_IFACE, 'Address'))


if __name__ == '__main__':
    agent = Agent(bus, AGENT_PATH)

    agnt_mngr = dbus.Interface(bus.get_object(BUS_NAME, AGNT_MNGR_PATH),
                               AGNT_MNGR_IFACE)
    agnt_mngr.RegisterAgent(AGENT_PATH, CAPABILITY)

    device = Device(my_device_path)
    if device.paired:
        device.connect()
    else:
        device.pair()


    mainloop = GLib.MainLoop()
    try:
        mainloop.run()
    except KeyboardInterrupt:
        agnt_mngr.UnregisterAgent(AGENT_PATH)
        mainloop.quit()

【讨论】:

在测试代码中,您使用的是 python 套接字模块的 connectbind 方法,我已经提到它们工作正常。抱歉我没有提到,但是我已经编辑了here提到的服务文件,所以已经有了SPP,我也可以通过sdptool browse local的输出来确认。我很好奇为什么device-api 的 Connect 方法失败了。 另外,关于 rfcomm 端口,这里是我拥有的模块的spreadsheet。我没有看到任何可以让我配置 rfcomm 端口的 AT 命令。 -Csdptool 将使用旧的弃用 API 绕过 D-Bus 守护程序。我已经让配置文件为服务器工作,但我从来没有让它为客户端工作。当套接字库似乎可以完成所需的一切时,似乎不值得付出努力。听起来我的回答没有添加任何你不知道的东西 肯定加了,现在我知道不值得,所以我也会去socket。你检查过电子表格吗?我仍在尝试通过发送所需的 AT 命令找到一种从 Arduino 配置 rfcomm 端口的方法。 我快速浏览了数据表,没有什么明显的。有一个指向另一个蓝牙命令列表的链接,但我的防火墙不允许我下载它。

以上是关于Arduino - Raspberry Pi,使用 D-BUS API 的蓝牙连接的主要内容,如果未能解决你的问题,请参考以下文章

Raspberry Pi通过蓝牙与Arduino连接

Raspberry Pi 上的 Tornado 使用 websockets 以及监控串行端口 Arduino 通信

Arduino - Raspberry Pi,使用 D-BUS API 的蓝牙连接

Arduino Uno Raspberry Pi 串行通信双读数

将Raspberry PI 3与Android Things连接到Arduino [已关闭]

Raspberry Pi,Arduino,Node.js和串口