Files
2025-12-31 00:08:24 +00:00

138 lines
5.4 KiB
Python

import os, sys
import logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(stream=sys.stdout,
format='[%(asctime)s] %(levelname)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import asyncio
from dbus_next.aio import MessageBus
from dbus_next.constants import BusType, MessageType
from dbus_next import Message
BLUEZ_SERVICE = 'org.bluez'
ADAPTER_IFACE = 'org.bluez.Adapter1'
DEVICE_IFACE = 'org.bluez.Device1'
bus = None
seen_devices = set()
name_cache = {}
async def get_adapter(bus):
"""Gets the first Bluetooth adapter found."""
introspection = await bus.introspect(BLUEZ_SERVICE, '/')
manager_obj = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection)
manager_iface = manager_obj.get_interface('org.freedesktop.DBus.ObjectManager')
managed_objects = await manager_iface.call_get_managed_objects()
for path, ifaces in managed_objects.items():
if ADAPTER_IFACE in ifaces:
adapter_introspection = await bus.introspect(BLUEZ_SERVICE, path)
return bus.get_proxy_object(BLUEZ_SERVICE, path, adapter_introspection)
return None
def on_interfaces_added(path, interfaces):
"""Callback for when a new D-Bus interface is added."""
if DEVICE_IFACE in interfaces:
device_properties = interfaces[DEVICE_IFACE]
address_variant = device_properties.get('Address')
if not address_variant:
return
addr_str = address_variant.value
alias_variant = device_properties.get('Alias')
alias = alias_variant.value if alias_variant else name_cache.get(addr_str)
if alias:
# Update cache if we found a new alias
if alias_variant and alias_variant.value:
name_cache[addr_str] = alias_variant.value
if addr_str not in seen_devices:
seen_devices.add(addr_str)
logging.info(f"Found: {alias} ({addr_str}) (new)")
else:
# Log repeat discoveries only if they have a name
logging.info(f"Found: {alias} ({addr_str})")
def properties_changed_handler(message: Message):
"""Sync handler to dispatch async task for property changes."""
if message.message_type == MessageType.SIGNAL and \
message.member == 'PropertiesChanged' and \
message.interface == 'org.freedesktop.DBus.Properties':
# Further filtering is done in the async handler
asyncio.create_task(on_properties_changed(message))
async def on_properties_changed(message: Message):
"""Callback for when a device's properties change (e.g., Alias appears)."""
if not message.body or message.body[0] != DEVICE_IFACE:
return
changed_properties = message.body[1]
alias_variant = changed_properties.get('Alias')
if not alias_variant or not alias_variant.value:
return
alias = alias_variant.value
device_path = message.path
try:
introspection = await bus.introspect(BLUEZ_SERVICE, device_path)
device_obj = bus.get_proxy_object(BLUEZ_SERVICE, device_path, introspection)
device_props_iface = device_obj.get_interface('org.freedesktop.DBus.Properties')
address_variant = await device_props_iface.call_get(DEVICE_IFACE, 'Address')
addr_str = address_variant.value
# Update cache
name_cache[addr_str] = alias
if addr_str not in seen_devices:
seen_devices.add(addr_str)
logging.info(f"Found: {alias} ({addr_str}) (new)")
except Exception as e:
logging.debug(f"Could not process property change for {device_path}: {e}")
async def main():
"""Main function to run the scanner."""
global bus
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
adapter = await get_adapter(bus)
if not adapter:
logging.error("Bluetooth adapter not found.")
return
adapter_iface = adapter.get_interface(ADAPTER_IFACE)
# Subscribe to InterfacesAdded signal to discover new devices
introspection = await bus.introspect(BLUEZ_SERVICE, '/')
obj_manager = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection)
obj_manager_iface = obj_manager.get_interface('org.freedesktop.DBus.ObjectManager')
obj_manager_iface.on_interfaces_added(on_interfaces_added)
# Subscribe to PropertiesChanged signal to catch late-arriving device names
rule = "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged',path_namespace='/org/bluez'"
introspection = await bus.introspect('org.freedesktop.DBus', '/org/freedesktop/DBus')
proxy_obj = bus.get_proxy_object('org.freedesktop.DBus', '/org/freedesktop/DBus', introspection)
dbus_interface = proxy_obj.get_interface('org.freedesktop.DBus')
await dbus_interface.call_add_match(rule)
bus.add_message_handler(properties_changed_handler)
logging.info("Starting Bluetooth scan... Press Ctrl+C to stop.")
try:
await adapter_iface.call_start_discovery()
# Keep the script running to listen for signals
while True:
await asyncio.sleep(3600)
except Exception as e:
logging.error(f"An error occurred during scanning: {e}")
finally:
logging.info("Stopping Bluetooth scan.")
await adapter_iface.call_stop_discovery()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Scan stopped by user.")