diff --git a/scan.py b/scan.py index e96dceb..41687cd 100644 --- a/scan.py +++ b/scan.py @@ -1,5 +1,73 @@ +import os, sys +import logging +DEBUG = os.environ.get('DEBUG') +logging.basicConfig(stream=sys.stdout, + format='[%(asctime)s] %(levelname)s - %(message)s', + level=logging.DEBUG if DEBUG else logging.INFO) + +import asyncio +from dbus_next.aio import MessageBus +from dbus_next.constants import BusType + +BLUEZ_SERVICE = 'org.bluez' +ADAPTER_IFACE = 'org.bluez.Adapter1' +DEVICE_IFACE = 'org.bluez.Device1' + +async def get_adapter(bus): + """Gets the first Bluetooth adapter found.""" + introspection = await bus.introspect(BLUEZ_SERVICE, '/') + manager_obj = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection) + manager_iface = manager_obj.get_interface('org.freedesktop.DBus.ObjectManager') + managed_objects = await manager_iface.call_get_managed_objects() + + for path, ifaces in managed_objects.items(): + if ADAPTER_IFACE in ifaces: + adapter_introspection = await bus.introspect(BLUEZ_SERVICE, path) + return bus.get_proxy_object(BLUEZ_SERVICE, path, adapter_introspection) + return None + +def on_interfaces_added(path, interfaces): + """Callback for when a new D-Bus interface is added.""" + if DEVICE_IFACE in interfaces: + device_properties = interfaces[DEVICE_IFACE] + address = device_properties.get('Address') + alias = device_properties.get('Alias') + if address and alias and alias.value: + logging.info(f"Found: {alias.value} ({address.value})") + +async def main(): + """Main function to run the scanner.""" + bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + adapter = await get_adapter(bus) + if not adapter: + logging.error("Bluetooth adapter not found.") + return + + adapter_iface = adapter.get_interface(ADAPTER_IFACE) + + # Subscribe to InterfacesAdded signal to discover new devices + introspection = await bus.introspect(BLUEZ_SERVICE, '/') + obj_manager = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection) + obj_manager_iface = obj_manager.get_interface('org.freedesktop.DBus.ObjectManager') + obj_manager_iface.on_interfaces_added(on_interfaces_added) + + logging.info("Starting Bluetooth scan... Press Ctrl+C to stop.") + try: + await adapter_iface.call_start_discovery() + # Keep the script running to listen for signals + while True: + await asyncio.sleep(3600) + except Exception as e: + logging.error(f"An error occurred during scanning: {e}") + finally: + logging.info("Stopping Bluetooth scan.") + await adapter_iface.call_stop_discovery() + if __name__ == '__main__': - pass + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Scan stopped by user.")