Arduino - Raspberry Pi,使用 D-BUS API 的蓝牙连接
Posted
技术标签:
【中文标题】Arduino - Raspberry Pi,使用 D-BUS API 的蓝牙连接【英文标题】:Arduino - Raspberry Pi, Bluetooth Connection using D-BUS API 【发布时间】:2021-05-01 07:26:10 【问题描述】:任务是使用基于 python 脚本的 D-BUS API 通过蓝牙自动完成 Arduino 和 Raspberry Pi 之间的配对和连接过程。
与Arduino连接的蓝牙模块为:Grove - Serial Bluetooth v3.0.
我能够自动化配对过程。配对脚本按顺序执行以下操作:
-
它通过创建适配器对象并使用 StartDiscovery 方法查找名为 Slave 的蓝牙模块。(在 Arduino 中完成命名)。
注册蓝牙代理。
如果设备尚未配对,则通过Pair方法创建设备对象和配对。
执行上述步骤的代码部分如下:
register_agent()
start_discovery()
time.sleep(10)
for i in range(len(address_list)):
new_dbus_device = get_device(address_list[i])
dev_path = new_dbus_device.object_path
device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
pair_status = device_properties.Get("org.bluez.Device1", "Paired")
if not pair_status:
new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)
这是 register_agent() 按照要求执行的操作:
def register_agent():
agent = Agent(bus, path)
capability = "NoInputNoOutput"
obj = bus.get_object(BUS_NAME, "/org/bluez");
manager = dbus.Interface(obj, "org.bluez.AgentManager1")
manager.RegisterAgent(path, capability)
但是,当我尝试调用 Bluez 的 device-api 中记录的 Connect 方法时,它总是失败。我创建了一个自定义串行端口配置文件并尝试了 ConnectProfile 方法,但又失败了。
如果我使用已弃用的 rfcomm 工具,则通过蓝牙进行的通信可以正常工作,或者如果我使用 python 套接字模块,则它可以正常工作。 但是,由于已弃用,我想避免使用 rfcomm .关于使用python socket库,连接只在rfcomm通道1有效,其他通道产生Connection Refused错误。
添加MRE,更好地澄清问题:
import dbus
import dbus.service
import dbus.mainloop.glib
import sys
import time
import subprocess
from bluezutils import *
from bluetooth import *
from gi.repository import GObject, GLib
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True)
path = "/test/agent"
AGENT_INTERFACE = 'org.bluez.Agent1'
BUS_NAME = 'org.bluez'
bus = dbus.SystemBus()
device_obj = None
dev_path = None
def set_trusted(path2):
props = dbus.Interface(bus.get_object("org.bluez", path2),
"org.freedesktop.DBus.Properties")
props.Set("org.bluez.Device1", "Trusted", True)
class Rejected(dbus.DBusException):
_dbus_error_name = "org.bluez.Error.Rejected"
class Agent(dbus.service.Object):
exit_on_release = True
def set_exit_on_release(self, exit_on_release):
self.exit_on_release = exit_on_release
@dbus.service.method(AGENT_INTERFACE,
in_signature="", out_signature="")
def Release(self):
print("Release")
if self.exit_on_release:
mainloop.quit()
@dbus.service.method(AGENT_INTERFACE,
in_signature="os", out_signature="")
def AuthorizeService(self, device, uuid):
print("AuthorizeService (%s, %s)" % (device, uuid))
return
@dbus.service.method(AGENT_INTERFACE,
in_signature="o", out_signature="s")
def RequestPinCode(self, device):
set_trusted(device)
return "0000"
@dbus.service.method(AGENT_INTERFACE,
in_signature="o", out_signature="u")
def RequestPasskey(self, device):
set_trusted(device)
return dbus.UInt32("0000")
@dbus.service.method(AGENT_INTERFACE,
in_signature="ou", out_signature="")
def RequestConfirmation(self, device, passkey):
set_trusted(device)
return
@dbus.service.method(AGENT_INTERFACE,
in_signature="o", out_signature="")
def RequestAuthorization(self, device):
return
@dbus.service.method(AGENT_INTERFACE,
in_signature="", out_signature="")
def Cancel(self):
print("Cancel")
def pair_reply():
print("Device paired and trusted")
set_trusted(dev_path)
def pair_error(error):
err_name = error.get_dbus_name()
if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj:
print("Timed out. Cancelling pairing")
device_obj.CancelPairing()
else:
print("Creating device failed: %s" % (error))
mainloop.quit()
def register_agent():
agent = Agent(bus, path)
capability = "NoInputNoOutput"
obj = bus.get_object(BUS_NAME, "/org/bluez");
manager = dbus.Interface(obj, "org.bluez.AgentManager1")
manager.RegisterAgent(path, capability)
def start_discovery():
global pi_adapter
pi_adapter = find_adapter()
scan_filter = dict("DuplicateData": False)
pi_adapter.SetDiscoveryFilter(scan_filter)
pi_adapter.StartDiscovery()
def stop_discovery():
pi_adapter.StopDiscovery()
def get_device(dev_str):
# use [Service] and [Object path]:
device_proxy_object = bus.get_object("org.bluez","/org/bluez/hci0/dev_"+dev_str)
# use [Interface]:
device1 = dbus.Interface(device_proxy_object,"org.bluez.Device1")
return device1
def char_changer(text):
text = text.replace(':', r'_')
return text
def slave_finder(device_name):
global sublist_normal
sublist_normal = []
sublist= []
address = []
edited_address = None
sub = subprocess.run(["hcitool scan"], text = True, shell = True, capture_output=True)
print(sub.stdout) #string type
sublist = sub.stdout.split()
for i in range(len(sublist)):
if sublist[i] == device_name:
print(sublist[i-1])
sublist_normal.append(sublist[i-1])
edited_address = char_changer(sublist[i-1])
address.append(edited_address)
return address
def remove_all_paired():
for i in range(len(sublist_normal)):
sub = subprocess.run(["bluetoothctl remove " + sublist_normal[i]], text = True, shell = True, capture_output=True)
time.sleep(1)
if __name__ == '__main__':
pair_status = None
address_list = slave_finder('Slave') #Arduino bluetooth module named as "Slave", here we are finding it.
time.sleep(2)
remove_all_paired() #rfcomm requires repairing after release
print(sublist_normal)
if address_list:
register_agent()
start_discovery()
time.sleep(10)
for i in range(len(address_list)):
new_dbus_device = get_device(address_list[i])
dev_path = new_dbus_device.object_path
device_properties = dbus.Interface(new_dbus_device, "org.freedesktop.DBus.Properties")
pair_status = device_properties.Get("org.bluez.Device1", "Paired")
if not pair_status:
new_dbus_device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)
mainloop = GLib.MainLoop()
mainloop.run()
sudo btmon 输出:
Bluetooth monitor ver 5.50
= Note: Linux version 5.4.83-v7l+ (armv7l) 0.832473
= Note: Bluetooth subsystem version 2.22 0.832478
= New Index: DC:A6:32:58:FE:13 (Primary,UART,hci0) [hci0] 0.832481
= Open Index: DC:A6:32:58:FE:13 [hci0] 0.832484
= Index Info: DC:A6:32:5.. (Cypress Semiconductor Corporation) [hci0] 0.832487
@ MGMT Open: bluetoothd (privileged) version 1.14 0x0001 0.832490
@ MGMT Open: btmon (privileged) version 1.14 0x0002 0.832540
所以问题是为什么 Connect 和 ConnectProfile 方法总是失败,我需要做什么才能在 Arduino 和 Raspberry Pi 之间建立基于 D-BUS API 的蓝牙通信?
【问题讨论】:
您还没有详细说明register_agent()
在做什么。我假设您正在注册自己的代理;类似于simple-agent?
编辑了问题。是的,确实如你所说。
感谢您更新问题。 Agent
代码中的内容是什么? pair_reply
和 pair_error
是什么?你从终端和sudo btmon
得到什么输出?我可以提请您注意How to create a Minimal, Reproducible Example
我也添加了它们。 getdevice() 是来自bluezutils 模块的方法。因此,当我调用 new_dbus_device.Connect() 时,它会产生错误。
【参考方案1】:
我想你已经回答了你自己的问题。 Connect
不起作用的原因是您没有在 Raspberry Pi 上运行串行端口配置文件 (SPP)。如果您正在运行btmon
,您将看到客户端断开连接,因为没有与 Arduino 提供的匹配配置文件。
Python Socket 连接中使用的端口号需要与 Arduino 蓝牙模块上的端口号相匹配。这可能就是只有 1
起作用的原因。
作为参考,我用我放置的 Raspberry Pi 和 HC-06 进行了测试。我删除了所有扫描代码以尝试获得最小的可运行代码。这是我使用的代码:
import socket
from time import sleep
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib
BUS_NAME = 'org.bluez'
AGENT_IFACE = 'org.bluez.Agent1'
AGNT_MNGR_IFACE = 'org.bluez.AgentManager1'
ADAPTER_IFACE = 'org.bluez.Adapter1'
AGENT_PATH = '/ukBaz/bluezero/agent'
AGNT_MNGR_PATH = '/org/bluez'
DEVICE_IFACE = 'org.bluez.Device1'
CAPABILITY = 'KeyboardDisplay'
my_adapter_address = '11:22:33:44:55:66'
my_device_path = '/org/bluez/hci0/dev_00_00_12_34_56_78'
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
class Agent(dbus.service.Object):
@dbus.service.method(AGENT_IFACE,
in_signature='o', out_signature='s')
def RequestPinCode(self, device):
print(f'RequestPinCode device')
return '0000'
class Device:
def __init__(self, device_path):
dev_obj = bus.get_object(BUS_NAME, device_path)
self.methods = dbus.Interface(dev_obj, DEVICE_IFACE)
self.props = dbus.Interface(dev_obj, dbus.PROPERTIES_IFACE)
self._port = 1
self._client_sock = socket.socket(socket.AF_BLUETOOTH,
socket.SOCK_STREAM,
socket.BTPROTO_RFCOMM)
def connect(self):
# self.methods.Connect()
self._client_sock.bind((my_adapter_address, self._port))
self._client_sock.connect((self.address, self._port))
def disconnect(self):
self.methods.Disconnect()
def pair(self):
self.methods.Pair(reply_handler=self._pair_reply,
error_handler=self._pair_error)
def _pair_reply(self):
print(f'Device trusted=self.trusted, connected=self.connected')
self.trusted = True
print(f'Device trusted=self.trusted, connected=self.connected')
while self.connected:
sleep(0.5)
self.connect()
print('Successfully paired and connected')
def _pair_error(self, error):
err_name = error.get_dbus_name()
print(f'Creating device failed: err_name')
@property
def trusted(self):
return bool(self.props.Get(DEVICE_IFACE, 'Trusted'))
@trusted.setter
def trusted(self, value):
self.props.Set(DEVICE_IFACE, 'Trusted', bool(value))
@property
def paired(self):
return bool(self.props.Get(DEVICE_IFACE, 'Paired'))
@property
def connected(self):
return bool(self.props.Get(DEVICE_IFACE, 'Connected'))
@property
def address(self):
return str(self.props.Get(DEVICE_IFACE, 'Address'))
if __name__ == '__main__':
agent = Agent(bus, AGENT_PATH)
agnt_mngr = dbus.Interface(bus.get_object(BUS_NAME, AGNT_MNGR_PATH),
AGNT_MNGR_IFACE)
agnt_mngr.RegisterAgent(AGENT_PATH, CAPABILITY)
device = Device(my_device_path)
if device.paired:
device.connect()
else:
device.pair()
mainloop = GLib.MainLoop()
try:
mainloop.run()
except KeyboardInterrupt:
agnt_mngr.UnregisterAgent(AGENT_PATH)
mainloop.quit()
【讨论】:
在测试代码中,您使用的是 python 套接字模块的 connect 和 bind 方法,我已经提到它们工作正常。抱歉我没有提到,但是我已经编辑了here提到的服务文件,所以已经有了SPP,我也可以通过sdptool browse local
的输出来确认。我很好奇为什么device-api 的 Connect 方法失败了。
另外,关于 rfcomm 端口,这里是我拥有的模块的spreadsheet。我没有看到任何可以让我配置 rfcomm 端口的 AT 命令。
-C
和 sdptool
将使用旧的弃用 API 绕过 D-Bus 守护程序。我已经让配置文件为服务器工作,但我从来没有让它为客户端工作。当套接字库似乎可以完成所需的一切时,似乎不值得付出努力。听起来我的回答没有添加任何你不知道的东西
肯定加了,现在我知道不值得,所以我也会去socket。你检查过电子表格吗?我仍在尝试通过发送所需的 AT 命令找到一种从 Arduino 配置 rfcomm 端口的方法。
我快速浏览了数据表,没有什么明显的。有一个指向另一个蓝牙命令列表的链接,但我的防火墙不允许我下载它。以上是关于Arduino - Raspberry Pi,使用 D-BUS API 的蓝牙连接的主要内容,如果未能解决你的问题,请参考以下文章
Raspberry Pi 上的 Tornado 使用 websockets 以及监控串行端口 Arduino 通信
Arduino - Raspberry Pi,使用 D-BUS API 的蓝牙连接
Arduino Uno Raspberry Pi 串行通信双读数