import os, logging DEBUG = os.environ.get('DEBUG') logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING) import json import os import sys import asyncio try: import RPi.GPIO as GPIO IS_PI = True except ModuleNotFoundError: logging.info('RPi.GPIO not found, running without GPIO.') IS_PI = False import time from signal import * import unifi import settings RELAY_ON = False RELAY_OFF = True cooldown_time = time.time() def set_relay(pin, state): if IS_PI: GPIO.output(pin, state) logging.info('Set relay on pin %s to %s', pin, 'ON' if state == RELAY_ON else 'OFF') def pulse_relay(pin): set_relay(pin, RELAY_ON) time.sleep(0.25) # atomic set_relay(pin, RELAY_OFF) def ring_bell(camera): global cooldown_time if time.time() - cooldown_time < 2: logging.info('Cooldown skipping.') return cooldown_time = time.time() try: doorbell = settings.DOORBELLS[camera] pulse_relay(doorbell['gpio']) except KeyError: logging.error('Doorbell %s not found!', camera) async def process_message(msg): if msg.get('type', '') != 'ring': return logging.info('Ring message: %s', msg) ring_bell(msg['camera']) async def main(): while True: try: async for msg in unifi.connect(): await process_message(msg) except BaseException as e: logging.exception('Error connecting to Unifi Protect: %s. Trying again...', str(e)) await asyncio.sleep(5) def disable_relays_on_exit(*args): logging.info('Exiting, disabling relays...') for _, doorbell in settings.DOORBELLS.items(): set_relay(doorbell['gpio'], RELAY_OFF) logging.info('Goodbye.') os._exit(0) def init(): if IS_PI: GPIO.setmode(GPIO.BCM) if IS_PI: GPIO.setwarnings(False) for _, doorbell in settings.DOORBELLS.items(): if IS_PI: GPIO.setup(doorbell['gpio'], GPIO.OUT) set_relay(doorbell['gpio'], RELAY_OFF) #pulse_relay(doorbell['gpio']) time.sleep(1) logging.info('GPIO initialized') for sig in (SIGABRT, SIGILL, SIGINT, SIGSEGV, SIGTERM): signal(sig, disable_relays_on_exit) logging.info('Signals initialized') if __name__ == '__main__': logging.info('') logging.info('======================================') logging.info('Boot up...') init() loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()