138 lines
5.4 KiB
Python
138 lines
5.4 KiB
Python
import os, sys
|
|
import logging
|
|
DEBUG = os.environ.get('DEBUG')
|
|
logging.basicConfig(stream=sys.stdout,
|
|
format='[%(asctime)s] %(levelname)s - %(message)s',
|
|
level=logging.DEBUG if DEBUG else logging.INFO)
|
|
|
|
import asyncio
|
|
from dbus_next.aio import MessageBus
|
|
from dbus_next.constants import BusType, MessageType
|
|
from dbus_next import Message
|
|
|
|
BLUEZ_SERVICE = 'org.bluez'
|
|
ADAPTER_IFACE = 'org.bluez.Adapter1'
|
|
DEVICE_IFACE = 'org.bluez.Device1'
|
|
|
|
bus = None
|
|
seen_devices = set()
|
|
name_cache = {}
|
|
|
|
async def get_adapter(bus):
|
|
"""Gets the first Bluetooth adapter found."""
|
|
introspection = await bus.introspect(BLUEZ_SERVICE, '/')
|
|
manager_obj = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection)
|
|
manager_iface = manager_obj.get_interface('org.freedesktop.DBus.ObjectManager')
|
|
managed_objects = await manager_iface.call_get_managed_objects()
|
|
|
|
for path, ifaces in managed_objects.items():
|
|
if ADAPTER_IFACE in ifaces:
|
|
adapter_introspection = await bus.introspect(BLUEZ_SERVICE, path)
|
|
return bus.get_proxy_object(BLUEZ_SERVICE, path, adapter_introspection)
|
|
return None
|
|
|
|
def on_interfaces_added(path, interfaces):
|
|
"""Callback for when a new D-Bus interface is added."""
|
|
if DEVICE_IFACE in interfaces:
|
|
device_properties = interfaces[DEVICE_IFACE]
|
|
address_variant = device_properties.get('Address')
|
|
if not address_variant:
|
|
return
|
|
|
|
addr_str = address_variant.value
|
|
alias_variant = device_properties.get('Alias')
|
|
alias = alias_variant.value if alias_variant else name_cache.get(addr_str)
|
|
|
|
if alias:
|
|
# Update cache if we found a new alias
|
|
if alias_variant and alias_variant.value:
|
|
name_cache[addr_str] = alias_variant.value
|
|
|
|
if addr_str not in seen_devices:
|
|
seen_devices.add(addr_str)
|
|
logging.info(f"Found: {alias} ({addr_str}) (new)")
|
|
else:
|
|
# Log repeat discoveries only if they have a name
|
|
logging.info(f"Found: {alias} ({addr_str})")
|
|
|
|
def properties_changed_handler(message: Message):
|
|
"""Sync handler to dispatch async task for property changes."""
|
|
if message.message_type == MessageType.SIGNAL and \
|
|
message.member == 'PropertiesChanged' and \
|
|
message.interface == 'org.freedesktop.DBus.Properties':
|
|
# Further filtering is done in the async handler
|
|
asyncio.create_task(on_properties_changed(message))
|
|
|
|
async def on_properties_changed(message: Message):
|
|
"""Callback for when a device's properties change (e.g., Alias appears)."""
|
|
if not message.body or message.body[0] != DEVICE_IFACE:
|
|
return
|
|
|
|
changed_properties = message.body[1]
|
|
alias_variant = changed_properties.get('Alias')
|
|
if not alias_variant or not alias_variant.value:
|
|
return
|
|
|
|
alias = alias_variant.value
|
|
device_path = message.path
|
|
|
|
try:
|
|
introspection = await bus.introspect(BLUEZ_SERVICE, device_path)
|
|
device_obj = bus.get_proxy_object(BLUEZ_SERVICE, device_path, introspection)
|
|
device_props_iface = device_obj.get_interface('org.freedesktop.DBus.Properties')
|
|
|
|
address_variant = await device_props_iface.call_get(DEVICE_IFACE, 'Address')
|
|
addr_str = address_variant.value
|
|
|
|
# Update cache
|
|
name_cache[addr_str] = alias
|
|
|
|
if addr_str not in seen_devices:
|
|
seen_devices.add(addr_str)
|
|
logging.info(f"Found: {alias} ({addr_str}) (new)")
|
|
except Exception as e:
|
|
logging.debug(f"Could not process property change for {device_path}: {e}")
|
|
|
|
async def main():
|
|
"""Main function to run the scanner."""
|
|
global bus
|
|
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
|
adapter = await get_adapter(bus)
|
|
if not adapter:
|
|
logging.error("Bluetooth adapter not found.")
|
|
return
|
|
|
|
adapter_iface = adapter.get_interface(ADAPTER_IFACE)
|
|
|
|
# Subscribe to InterfacesAdded signal to discover new devices
|
|
introspection = await bus.introspect(BLUEZ_SERVICE, '/')
|
|
obj_manager = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection)
|
|
obj_manager_iface = obj_manager.get_interface('org.freedesktop.DBus.ObjectManager')
|
|
obj_manager_iface.on_interfaces_added(on_interfaces_added)
|
|
|
|
# Subscribe to PropertiesChanged signal to catch late-arriving device names
|
|
rule = "type='signal',interface='org.freedesktop.DBus.Properties',member='PropertiesChanged',path_namespace='/org/bluez'"
|
|
introspection = await bus.introspect('org.freedesktop.DBus', '/org/freedesktop/DBus')
|
|
proxy_obj = bus.get_proxy_object('org.freedesktop.DBus', '/org/freedesktop/DBus', introspection)
|
|
dbus_interface = proxy_obj.get_interface('org.freedesktop.DBus')
|
|
await dbus_interface.call_add_match(rule)
|
|
bus.add_message_handler(properties_changed_handler)
|
|
|
|
logging.info("Starting Bluetooth scan... Press Ctrl+C to stop.")
|
|
try:
|
|
await adapter_iface.call_start_discovery()
|
|
# Keep the script running to listen for signals
|
|
while True:
|
|
await asyncio.sleep(3600)
|
|
except Exception as e:
|
|
logging.error(f"An error occurred during scanning: {e}")
|
|
finally:
|
|
logging.info("Stopping Bluetooth scan.")
|
|
await adapter_iface.call_stop_discovery()
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
logging.info("Scan stopped by user.")
|