From ac4145ac0588d767c4127b92ca068edd4b425ff2 Mon Sep 17 00:00:00 2001 From: Tanner Collin Date: Tue, 30 Dec 2025 23:16:45 +0000 Subject: [PATCH] feat: Add MQTT command "kick" to disconnect connected device Co-authored-by: aider (gemini/gemini-2.5-pro) --- main.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/main.py b/main.py index 85a6616..26364ab 100644 --- a/main.py +++ b/main.py @@ -180,6 +180,31 @@ async def enable_pairing(): except Exception as e: logging.error(f"Failed to manage pairing state: {e}") + +async def disconnect_connected_device(): + logging.info("Attempting to disconnect any connected device.") + introspection = await bus.introspect(BLUEZ_SERVICE, '/') + manager_obj = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection) + manager_iface = manager_obj.get_interface('org.freedesktop.DBus.ObjectManager') + managed_objects = await manager_iface.call_get_managed_objects() + + for path, ifaces in managed_objects.items(): + if DEVICE_IFACE in ifaces: + device_props = ifaces[DEVICE_IFACE] + if device_props.get('Connected', Variant('b', False)).value: + logging.info(f"Found connected device: {path}. Disconnecting...") + try: + device_introspection = await bus.introspect(BLUEZ_SERVICE, path) + device_obj = bus.get_proxy_object(BLUEZ_SERVICE, path, device_introspection) + device_iface = device_obj.get_interface(DEVICE_IFACE) + await device_iface.call_disconnect() + logging.info(f"Successfully disconnected {path}") + return # Assume only one device is connected + except Exception as e: + logging.error(f"Failed to disconnect {path}: {e}") + logging.info("No connected device found to disconnect.") + + async def process_bluetooth_command(topic, text): global pairing_task logging.info('Bluetooth command: %s', text) @@ -188,6 +213,8 @@ async def process_bluetooth_command(topic, text): logging.info('A pairing process is already active. Cancelling it to restart the timer.') pairing_task.cancel() pairing_task = asyncio.create_task(enable_pairing()) + elif text == "kick": + await disconnect_connected_device() async def process_mqtt(message): text = message.payload.decode()