feat: Add MQTT command "kick" to disconnect connected device

Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
This commit is contained in:
2025-12-30 23:16:45 +00:00
parent be180b073a
commit ac4145ac05

27
main.py
View File

@@ -180,6 +180,31 @@ async def enable_pairing():
except Exception as e: except Exception as e:
logging.error(f"Failed to manage pairing state: {e}") logging.error(f"Failed to manage pairing state: {e}")
async def disconnect_connected_device():
logging.info("Attempting to disconnect any connected device.")
introspection = await bus.introspect(BLUEZ_SERVICE, '/')
manager_obj = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection)
manager_iface = manager_obj.get_interface('org.freedesktop.DBus.ObjectManager')
managed_objects = await manager_iface.call_get_managed_objects()
for path, ifaces in managed_objects.items():
if DEVICE_IFACE in ifaces:
device_props = ifaces[DEVICE_IFACE]
if device_props.get('Connected', Variant('b', False)).value:
logging.info(f"Found connected device: {path}. Disconnecting...")
try:
device_introspection = await bus.introspect(BLUEZ_SERVICE, path)
device_obj = bus.get_proxy_object(BLUEZ_SERVICE, path, device_introspection)
device_iface = device_obj.get_interface(DEVICE_IFACE)
await device_iface.call_disconnect()
logging.info(f"Successfully disconnected {path}")
return # Assume only one device is connected
except Exception as e:
logging.error(f"Failed to disconnect {path}: {e}")
logging.info("No connected device found to disconnect.")
async def process_bluetooth_command(topic, text): async def process_bluetooth_command(topic, text):
global pairing_task global pairing_task
logging.info('Bluetooth command: %s', text) logging.info('Bluetooth command: %s', text)
@@ -188,6 +213,8 @@ async def process_bluetooth_command(topic, text):
logging.info('A pairing process is already active. Cancelling it to restart the timer.') logging.info('A pairing process is already active. Cancelling it to restart the timer.')
pairing_task.cancel() pairing_task.cancel()
pairing_task = asyncio.create_task(enable_pairing()) pairing_task = asyncio.create_task(enable_pairing())
elif text == "kick":
await disconnect_connected_device()
async def process_mqtt(message): async def process_mqtt(message):
text = message.payload.decode() text = message.payload.decode()