import os, logging DEBUG = os.environ.get('DEBUG') logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING) import os import sys import asyncio import aiohttp import RPi.GPIO as GPIO import time from signal import * from aiohttp import ClientSession, CookieJar import settings from pyunifiprotect.unifi_protect_server import UpvServer RELAY_ON = False RELAY_OFF = True allow_watchdog = False def set_relay(pin, state): GPIO.output(pin, state) logging.info('Set relay on pin %s to %s', pin, 'ON' if state == RELAY_ON else 'OFF') def pulse_relay(pin): set_relay(pin, RELAY_ON) time.sleep(0.5) set_relay(pin, RELAY_OFF) def ring_bell(mac): global allow_watchdog if not allow_watchdog and not DEBUG: logging.info('Enabling watchdog...') allow_watchdog = True try: doorbell = settings.DOORBELLS[mac] pulse_relay(doorbell['gpio']) except KeyError: logging.error('Doorbell %s not found!', mac) def subscriber(updated): logging.debug('Subscription: updated=%s', updated) for _, data in updated.items(): if data['event_type'] == 'ring' and data['event_ring_on']: logging.info('%s: %s is ringing!', data['mac'], data['name']) ring_bell(data['mac']) def feed_watchdog(): with open('/dev/watchdog', 'w') as wdt: wdt.write('1') async def ws_listener(): session = ClientSession(cookie_jar=CookieJar(unsafe=True)) unifiprotect = UpvServer( session, settings.UFP_ADDRESS, settings.UFP_PORT, settings.UFP_USERNAME, settings.UFP_PASSWORD, ) await unifiprotect.check_unifi_os() await unifiprotect.update() unsub = unifiprotect.subscribe_websocket(subscriber) while True: if allow_watchdog and not DEBUG: feed_watchdog() await asyncio.sleep(1) await session.close() unsub() def disable_relays_on_exit(*args): logging.info('Exiting, disabling relays...') for _, doorbell in settings.DOORBELLS.items(): set_relay(doorbell['gpio'], RELAY_OFF) logging.info('Goodbye.') os._exit(0) def init(): GPIO.setmode(GPIO.BCM) GPIO.setwarnings(False) for _, doorbell in settings.DOORBELLS.items(): GPIO.setup(doorbell['gpio'], GPIO.OUT) pulse_relay(doorbell['gpio']) time.sleep(1) logging.info('GPIO initialized') for sig in (SIGABRT, SIGILL, SIGINT, SIGSEGV, SIGTERM): signal(sig, disable_relays_on_exit) logging.info('Signals initialized') if __name__ == '__main__': logging.info('') logging.info('======================================') logging.info('Boot up...') init() loop = asyncio.get_event_loop() loop.run_until_complete(ws_listener()) loop.close()