From fb08301687ede2ec09ec9ca5463f4bb00cf7ac5d Mon Sep 17 00:00:00 2001 From: Tanner Collin Date: Tue, 30 Dec 2025 19:40:12 +0000 Subject: [PATCH] feat: Implement Bluetooth speaker mode with auto-pairing Co-authored-by: aider (gemini/gemini-2.5-pro) --- main.py | 143 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 141 insertions(+), 2 deletions(-) diff --git a/main.py b/main.py index 12504fc..32256c0 100644 --- a/main.py +++ b/main.py @@ -10,15 +10,154 @@ import json import asyncio from aiomqtt import Client +from dbus_next.aio import MessageBus +from dbus_next.service import ServiceInterface, method, dbus_service +from dbus_next import Variant + + +# --- Bluetooth constants and agent --- +BLUEZ_SERVICE = 'org.bluez' +ADAPTER_IFACE = 'org.bluez.Adapter1' +DEVICE_IFACE = 'org.bluez.Device1' +AGENT_IFACE = 'org.bluez.Agent1' +AGENT_MANAGER_IFACE = 'org.bluez.AgentManager1' +AGENT_PATH = '/dev/what/agent' +CAPABILITY = 'NoInputNoOutput' + +@dbus_service(ifaces=[AGENT_IFACE]) +class Agent(ServiceInterface): + def __init__(self, interface_name): + super().__init__(interface_name) + + @method() + def Release(self): + logging.info('Agent Released') + + @method(in_signature='o', out_signature='s') + def RequestPinCode(self, device: 'o') -> 's': + logging.info(f"RequestPinCode for {device}, returning static PIN") + return "0000" + + @method(in_signature='o', out_signature='u') + def RequestPasskey(self, device: 'o') -> 'u': + logging.info(f"RequestPasskey for {device}") + return 0 + + @method(in_signature='ouq', out_signature='') + def DisplayPasskey(self, device: 'o', passkey: 'u', entered: 'q'): + logging.info(f"DisplayPasskey for {device}: {passkey}") + + @method(in_signature='os', out_signature='') + def DisplayPinCode(self, device: 'o', pincode: 's'): + logging.info(f"DisplayPinCode for {device}: {pincode}") + + @method(in_signature='ou', out_signature='') + def RequestConfirmation(self, device: 'o', passkey: 'u'): + logging.info(f"RequestConfirmation for {device} with passkey {passkey}") + # Automatically confirm and trust + loop = asyncio.get_event_loop() + loop.create_task(trust_and_connect_device(device)) + + @method(in_signature='o', out_signature='') + def RequestAuthorization(self, device: 'o'): + logging.info(f"RequestAuthorization for {device}") + pass + + @method(in_signature='os', out_signature='') + def AuthorizeService(self, device: 'o', uuid: 's'): + logging.info(f"AuthorizeService for {device} with uuid {uuid}") + pass + + @method() + def Cancel(self): + logging.info('Pairing Cancelled') + +async def trust_and_connect_device(device_path): + logging.info(f'Trusting and connecting to {device_path}') + try: + bus = await MessageBus().connect() + introspection = await bus.introspect(BLUEZ_SERVICE, device_path) + device_obj = bus.get_proxy_object(BLUEZ_SERVICE, device_path, introspection) + + device_props = device_obj.get_interface('org.freedesktop.DBus.Properties') + await device_props.call_set(DEVICE_IFACE, 'Trusted', Variant('b', True)) + logging.info(f'Trusted device {device_path}') + + device_iface = device_obj.get_interface(DEVICE_IFACE) + await device_iface.call_connect() + logging.info(f'Connected to device {device_path}') + except Exception as e: + logging.error(f'Failed to trust/connect to {device_path}: {e}') + +async def get_adapter(): + bus = await MessageBus().connect() + introspection = await bus.introspect(BLUEZ_SERVICE, '/org/bluez') + manager_obj = bus.get_proxy_object(BLUEZ_SERVICE, '/org/bluez', introspection) + manager_iface = manager_obj.get_interface('org.freedesktop.DBus.ObjectManager') + managed_objects = await manager_iface.call_get_managed_objects() + + for path, ifaces in managed_objects.items(): + if ADAPTER_IFACE in ifaces: + adapter_introspection = await bus.introspect(BLUEZ_SERVICE, path) + return bus.get_proxy_object(BLUEZ_SERVICE, path, adapter_introspection) + return None + +async def register_agent(): + bus = await MessageBus().connect() + agent = Agent(AGENT_IFACE) + bus.export(AGENT_PATH, agent) + + introspection = await bus.introspect(BLUEZ_SERVICE, '/org/bluez') + manager_obj = bus.get_proxy_object(BLUEZ_SERVICE, '/org/bluez', introspection) + agent_manager = manager_obj.get_interface(AGENT_MANAGER_IFACE) + + try: + await agent_manager.call_register_agent(AGENT_PATH, CAPABILITY) + logging.info(f"Agent registered at {AGENT_PATH} with capability {CAPABILITY}") + await agent_manager.call_request_default_agent(AGENT_PATH) + logging.info("Agent set as default") + except Exception as e: + logging.error(f'Failed to register agent: {e}') + logging.info('Trying to unregister and register again') + try: + await agent_manager.call_unregister_agent(AGENT_PATH) + await agent_manager.call_register_agent(AGENT_PATH, CAPABILITY) + await agent_manager.call_request_default_agent(AGENT_PATH) + logging.info("Agent registered after unregistering") + except Exception as e2: + logging.error(f'Failed to register agent again: {e2}') + +# --- End Bluetooth --- async def manage_bluetooth(): + await register_agent() + # The agent will handle things, this task can just sleep while True: - await asyncio.sleep(1) + await asyncio.sleep(3600) async def process_bluetooth_command(topic, text): logging.info('Bluetooth command: %s', text) - pass + if text == "pair": + logging.info('Starting pairing process by making adapter discoverable') + adapter_obj = await get_adapter() + if not adapter_obj: + logging.error('Bluetooth adapter not found') + return + + adapter_props = adapter_obj.get_interface('org.freedesktop.DBus.Properties') + + try: + await adapter_props.call_set(ADAPTER_IFACE, 'Discoverable', Variant('b', True)) + await adapter_props.call_set(ADAPTER_IFACE, 'Pairable', Variant('b', True)) + logging.info('Adapter is discoverable and pairable for 120 seconds') + + await asyncio.sleep(120) + + await adapter_props.call_set(ADAPTER_IFACE, 'Discoverable', Variant('b', False)) + logging.info('Adapter is no longer discoverable') + except Exception as e: + logging.error(f"Failed to set adapter properties: {e}") async def process_mqtt(message): text = message.payload.decode()