import os, logging DEBUG = os.environ.get('DEBUG') logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING) import os import sys import asyncio import aiohttp import RPi.GPIO as GPIO import time from aiohttp import ClientSession, CookieJar import settings from pyunifiprotect.unifi_protect_server import UpvServer RELAY_ON = False RELAY_OFF = True def ring_bell(mac): try: doorbell = settings.DOORBELLS[mac] GPIO.output(doorbell['gpio'], RELAY_ON) time.sleep(0.5) GPIO.output(doorbell['gpio'], RELAY_OFF) except KeyError: logging.error('Doorbell %s not found!', mac) def subscriber(updated): logging.debug('Subscription: updated=%s', updated) for _, data in updated.items(): if data['event_type'] == 'ring' and data['event_ring_on']: logging.info('%s: %s is ringing!', data['mac'], data['name']) ring_bell(data['mac']) async def ws_listener(): session = ClientSession(cookie_jar=CookieJar(unsafe=True)) unifiprotect = UpvServer( session, settings.UFP_ADDRESS, settings.UFP_PORT, settings.UFP_USERNAME, settings.UFP_PASSWORD, ) await unifiprotect.check_unifi_os() await unifiprotect.update() unsub = unifiprotect.subscribe_websocket(subscriber) for i in range(15000): await asyncio.sleep(1) await session.close() unsub() def disable_relays_on_exit(*args): logging.info('Exiting, disabling relays...') for _, doorbell in settings.DOORBELLS.items(): GPIO.output(doorbell['gpio'], RELAY_OFF) logging.info('Goodbye.') os._exit(0) def init(): GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) for _, doorbell in settings.DOORBELLS.items(): GPIO.setup(doorbell['gpio'], GPIO.OUT) GPIO.output(doorbell['gpio'], RELAY_OFF) logging.info('GPIO initialized') for sig in (SIGABRT, SIGILL, SIGINT, SIGSEGV, SIGTERM): signal(sig, disable_relays_on_exit) logging.info('Signals initialized') async def feed_watchdog(): while True: with open('/dev/watchdog', 'w') as wdt: wdt.write('1') await asyncio.sleep(1) if __name__ == '__main__': init() loop = asyncio.get_event_loop() loop.create_task(feed_watchdog()) loop.run_until_complete(ws_listener()) loop.close()