Files
bluetooth-speaker-manager/scan.py
Tanner Collin 61eb680695 feat: Log new devices with '(new)' suffix
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-30 23:58:59 +00:00

78 lines
2.9 KiB
Python

import os, sys
import logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(stream=sys.stdout,
format='[%(asctime)s] %(levelname)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import asyncio
from dbus_next.aio import MessageBus
from dbus_next.constants import BusType
BLUEZ_SERVICE = 'org.bluez'
ADAPTER_IFACE = 'org.bluez.Adapter1'
DEVICE_IFACE = 'org.bluez.Device1'
seen_devices = set()
async def get_adapter(bus):
"""Gets the first Bluetooth adapter found."""
introspection = await bus.introspect(BLUEZ_SERVICE, '/')
manager_obj = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection)
manager_iface = manager_obj.get_interface('org.freedesktop.DBus.ObjectManager')
managed_objects = await manager_iface.call_get_managed_objects()
for path, ifaces in managed_objects.items():
if ADAPTER_IFACE in ifaces:
adapter_introspection = await bus.introspect(BLUEZ_SERVICE, path)
return bus.get_proxy_object(BLUEZ_SERVICE, path, adapter_introspection)
return None
def on_interfaces_added(path, interfaces):
"""Callback for when a new D-Bus interface is added."""
if DEVICE_IFACE in interfaces:
device_properties = interfaces[DEVICE_IFACE]
address = device_properties.get('Address')
alias = device_properties.get('Alias')
if address and alias and alias.value:
addr_str = address.value
if addr_str not in seen_devices:
seen_devices.add(addr_str)
logging.info(f"Found: {alias.value} ({addr_str}) (new)")
else:
logging.info(f"Found: {alias.value} ({addr_str})")
async def main():
"""Main function to run the scanner."""
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
adapter = await get_adapter(bus)
if not adapter:
logging.error("Bluetooth adapter not found.")
return
adapter_iface = adapter.get_interface(ADAPTER_IFACE)
# Subscribe to InterfacesAdded signal to discover new devices
introspection = await bus.introspect(BLUEZ_SERVICE, '/')
obj_manager = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection)
obj_manager_iface = obj_manager.get_interface('org.freedesktop.DBus.ObjectManager')
obj_manager_iface.on_interfaces_added(on_interfaces_added)
logging.info("Starting Bluetooth scan... Press Ctrl+C to stop.")
try:
await adapter_iface.call_start_discovery()
# Keep the script running to listen for signals
while True:
await asyncio.sleep(3600)
except Exception as e:
logging.error(f"An error occurred during scanning: {e}")
finally:
logging.info("Stopping Bluetooth scan.")
await adapter_iface.call_stop_discovery()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Scan stopped by user.")