feat: Add scan.py for continuous Bluetooth device scanning
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
This commit is contained in:
70
scan.py
70
scan.py
@@ -1,5 +1,73 @@
|
||||
|
||||
|
||||
|
||||
import os, sys
|
||||
import logging
|
||||
DEBUG = os.environ.get('DEBUG')
|
||||
logging.basicConfig(stream=sys.stdout,
|
||||
format='[%(asctime)s] %(levelname)s - %(message)s',
|
||||
level=logging.DEBUG if DEBUG else logging.INFO)
|
||||
|
||||
import asyncio
|
||||
from dbus_next.aio import MessageBus
|
||||
from dbus_next.constants import BusType
|
||||
|
||||
BLUEZ_SERVICE = 'org.bluez'
|
||||
ADAPTER_IFACE = 'org.bluez.Adapter1'
|
||||
DEVICE_IFACE = 'org.bluez.Device1'
|
||||
|
||||
async def get_adapter(bus):
|
||||
"""Gets the first Bluetooth adapter found."""
|
||||
introspection = await bus.introspect(BLUEZ_SERVICE, '/')
|
||||
manager_obj = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection)
|
||||
manager_iface = manager_obj.get_interface('org.freedesktop.DBus.ObjectManager')
|
||||
managed_objects = await manager_iface.call_get_managed_objects()
|
||||
|
||||
for path, ifaces in managed_objects.items():
|
||||
if ADAPTER_IFACE in ifaces:
|
||||
adapter_introspection = await bus.introspect(BLUEZ_SERVICE, path)
|
||||
return bus.get_proxy_object(BLUEZ_SERVICE, path, adapter_introspection)
|
||||
return None
|
||||
|
||||
def on_interfaces_added(path, interfaces):
|
||||
"""Callback for when a new D-Bus interface is added."""
|
||||
if DEVICE_IFACE in interfaces:
|
||||
device_properties = interfaces[DEVICE_IFACE]
|
||||
address = device_properties.get('Address')
|
||||
alias = device_properties.get('Alias')
|
||||
if address and alias and alias.value:
|
||||
logging.info(f"Found: {alias.value} ({address.value})")
|
||||
|
||||
async def main():
|
||||
"""Main function to run the scanner."""
|
||||
bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
||||
adapter = await get_adapter(bus)
|
||||
if not adapter:
|
||||
logging.error("Bluetooth adapter not found.")
|
||||
return
|
||||
|
||||
adapter_iface = adapter.get_interface(ADAPTER_IFACE)
|
||||
|
||||
# Subscribe to InterfacesAdded signal to discover new devices
|
||||
introspection = await bus.introspect(BLUEZ_SERVICE, '/')
|
||||
obj_manager = bus.get_proxy_object(BLUEZ_SERVICE, '/', introspection)
|
||||
obj_manager_iface = obj_manager.get_interface('org.freedesktop.DBus.ObjectManager')
|
||||
obj_manager_iface.on_interfaces_added(on_interfaces_added)
|
||||
|
||||
logging.info("Starting Bluetooth scan... Press Ctrl+C to stop.")
|
||||
try:
|
||||
await adapter_iface.call_start_discovery()
|
||||
# Keep the script running to listen for signals
|
||||
while True:
|
||||
await asyncio.sleep(3600)
|
||||
except Exception as e:
|
||||
logging.error(f"An error occurred during scanning: {e}")
|
||||
finally:
|
||||
logging.info("Stopping Bluetooth scan.")
|
||||
await adapter_iface.call_stop_discovery()
|
||||
|
||||
if __name__ == '__main__':
|
||||
pass
|
||||
try:
|
||||
asyncio.run(main())
|
||||
except KeyboardInterrupt:
|
||||
logging.info("Scan stopped by user.")
|
||||
|
||||
Reference in New Issue
Block a user