import os, logging DEBUG = os.environ.get('DEBUG') logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING) import asyncio import aiohttp from aiohttp import web import io from datetime import datetime, timedelta import torch import torch.nn.functional as F from torchvision import transforms from PIL import Image import mysecrets from model import (CropLowerRightTriangle, GarageDoorCNN, TRIANGLE_CROP_WIDTH, TRIANGLE_CROP_HEIGHT, RESIZE_DIM) # --- Configuration --- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') CAMERA_URL = "http://cameras.dns.t0.vc/image/SE-S?&w=9999&decode=1" MODEL_PATH = 'garage_door_cnn.pth' CLASS_NAMES = ['closed', 'open'] # From training, sorted alphabetically POLL_INTERVAL_SECONDS = 10 REQUEST_TIMEOUT_SECONDS = 5 UNSURE_CONFIDENCE_THRESHOLD = 0.97 PREDICTION_HISTORY = [] PREDICTION_HISTORY_MAX_LENGTH = 3 PREVIOUS_STATE = "unknown" LAST_OPEN_SAVE_TIME = None DOOR_OPEN_START_TIME = None OPEN_ALERT_THRESHOLDS_MINUTES = [5, 15, 30, 60, 120] OPEN_ALERTS_SENT_FOR_CURRENT_OPENING = [] async def controller_message(app, message): payload = {mysecrets.CONTROLLER_KEY: message} session = app['client_session'] try: async with session.post(mysecrets.CONTROLLER_URL, data=payload, timeout=10) as response: if response.status == 200: return True else: logging.error(f'Unable to communicate with controller! Message: {message}, Status: {response.status}') return False except Exception: logging.exception('Unable to communicate with controller! Message: ' + message) return False def get_derived_state(): """Derives the state from the prediction history.""" state = "unknown" if len(PREDICTION_HISTORY) == PREDICTION_HISTORY_MAX_LENGTH: if all(s == "open" for s in PREDICTION_HISTORY): state = "open" elif all(s == "closed" for s in PREDICTION_HISTORY): state = "closed" return state # --- Model Inference --- def get_prediction(model, image_bytes, device): """Run model inference on the provided image bytes.""" try: image = Image.open(io.BytesIO(image_bytes)).convert('RGB') except Exception as e: logging.error(f"Failed to open image from bytes: {e}") return None # Define the same transforms as used in validation transform = transforms.Compose([ CropLowerRightTriangle(triangle_width=TRIANGLE_CROP_WIDTH, triangle_height=TRIANGLE_CROP_HEIGHT), transforms.Resize((RESIZE_DIM, RESIZE_DIM)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) input_tensor = transform(image).unsqueeze(0).to(device) with torch.no_grad(): output = model(input_tensor) probabilities = F.softmax(output, dim=1) confidence, pred_idx = torch.max(probabilities, 1) return CLASS_NAMES[pred_idx.item()], confidence.item() # --- Background Task --- async def monitor_garage_door(app): """Periodically fetches an image and logs the garage door status.""" global LAST_OPEN_SAVE_TIME logging.info("Starting garage door monitoring task.") session = app['client_session'] model = app['model'] device = app['device'] headers = {'Authorization': 'Basic ' + mysecrets.BLUEIRIS_KEY} while True: try: await asyncio.sleep(POLL_INTERVAL_SECONDS) logging.debug("Fetching new image from camera...") async with session.get(CAMERA_URL, headers=headers, timeout=REQUEST_TIMEOUT_SECONDS) as response: if response.status == 200: image_bytes = await response.read() result = get_prediction(model, image_bytes, device) if result: prediction, confidence = result logging.debug(f"Garage door status: {prediction} (confidence: {confidence:.4f})") # Update prediction history if confidence >= UNSURE_CONFIDENCE_THRESHOLD: PREDICTION_HISTORY.append(prediction) else: PREDICTION_HISTORY.append('unknown') # Trim history if it's too long if len(PREDICTION_HISTORY) > PREDICTION_HISTORY_MAX_LENGTH: PREDICTION_HISTORY.pop(0) timestamp = datetime.now().isoformat().replace(':', '-') filename = f"{timestamp}.jpg" if confidence < UNSURE_CONFIDENCE_THRESHOLD: # Construct path and save file unsure_dir = os.path.join('data', 'unsure', prediction) os.makedirs(unsure_dir, exist_ok=True) filepath = os.path.join(unsure_dir, filename) with open(filepath, 'wb') as f: f.write(image_bytes) logging.info(f"Low confidence prediction: {prediction} ({confidence:.4f}). Saved for review: {filepath}") else: # High confidence, save to sorted if get_derived_state() == 'open': if LAST_OPEN_SAVE_TIME is None or (datetime.now() - LAST_OPEN_SAVE_TIME) > timedelta(minutes=5): sorted_dir = os.path.join('data', 'sorted', 'open') os.makedirs(sorted_dir, exist_ok=True) filepath = os.path.join(sorted_dir, filename) with open(filepath, 'wb') as f: f.write(image_bytes) LAST_OPEN_SAVE_TIME = datetime.now() logging.info(f"Saved high-confidence 'open' image: {filepath}") elif get_derived_state() == 'closed': open_dir = os.path.join('data', 'sorted', 'open') closed_dir = os.path.join('data', 'sorted', 'closed') os.makedirs(open_dir, exist_ok=True) os.makedirs(closed_dir, exist_ok=True) num_open = len(os.listdir(open_dir)) num_closed = len(os.listdir(closed_dir)) if num_closed < num_open: filepath = os.path.join(closed_dir, filename) with open(filepath, 'wb') as f: f.write(image_bytes) logging.info(f"Saved high-confidence 'closed' image: {filepath}") else: logging.error(f"Failed to fetch image. Status: {response.status}, Reason: {response.reason}") except asyncio.TimeoutError: logging.warning("Request to camera timed out.") PREDICTION_HISTORY.append('unknown') if len(PREDICTION_HISTORY) > PREDICTION_HISTORY_MAX_LENGTH: PREDICTION_HISTORY.pop(0) except aiohttp.ClientError as e: logging.error(f"Client error during image fetch: {e}") PREDICTION_HISTORY.append('unknown') if len(PREDICTION_HISTORY) > PREDICTION_HISTORY_MAX_LENGTH: PREDICTION_HISTORY.pop(0) except asyncio.CancelledError: logging.info("Monitoring task cancelled.") break except Exception as e: logging.error(f"An unexpected error occurred in the monitoring task: {e}", exc_info=True) # Add a small delay before retrying on unexpected errors await asyncio.sleep(5) async def monitor_state_transitions(app): """Periodically checks for state transitions and logs them.""" global PREVIOUS_STATE, DOOR_OPEN_START_TIME, OPEN_ALERTS_SENT_FOR_CURRENT_OPENING logging.info("Starting state transition monitoring task.") while True: try: await asyncio.sleep(5) current_state = get_derived_state() if current_state != "unknown" and current_state != PREVIOUS_STATE: logging.info(f"State transitioned from '{PREVIOUS_STATE}' to '{current_state}'.") PREVIOUS_STATE = current_state if current_state == 'open': if DOOR_OPEN_START_TIME is None: DOOR_OPEN_START_TIME = datetime.now() OPEN_ALERTS_SENT_FOR_CURRENT_OPENING = [] open_duration = datetime.now() - DOOR_OPEN_START_TIME open_duration_minutes = open_duration.total_seconds() / 60 for threshold in OPEN_ALERT_THRESHOLDS_MINUTES: if open_duration_minutes >= threshold and threshold not in OPEN_ALERTS_SENT_FOR_CURRENT_OPENING: msg = f"Doormind: Garage door has been open for {threshold} minutes." await controller_message(app, msg) logging.info(msg) OPEN_ALERTS_SENT_FOR_CURRENT_OPENING.append(threshold) elif current_state == 'closed': DOOR_OPEN_START_TIME = None OPEN_ALERTS_SENT_FOR_CURRENT_OPENING = [] except asyncio.CancelledError: logging.info("State transition monitoring task cancelled.") break except Exception as e: logging.error(f"An unexpected error occurred in the state monitoring task: {e}", exc_info=True) await asyncio.sleep(5) # --- Web Server --- async def handle_root(request): """Handler for the root GET request.""" return web.Response(text="hello world") async def handle_state(request): """Handler for the /state GET request.""" state = get_derived_state() return web.json_response({'door': state}) async def on_startup(app): """Actions to perform on application startup.""" # Set up device app['device'] = torch.device("cuda" if torch.cuda.is_available() else "cpu") logging.info(f"Using device: {app['device']}") # Load model logging.info(f"Loading model from {MODEL_PATH}...") model = GarageDoorCNN(resize_dim=RESIZE_DIM).to(app['device']) model.load_state_dict(torch.load(MODEL_PATH, map_location=app['device'])) model.eval() app['model'] = model logging.info("Model loaded successfully.") # Create client session app['client_session'] = aiohttp.ClientSession() # Start background task app['monitor_task'] = asyncio.create_task(monitor_garage_door(app)) app['state_monitor_task'] = asyncio.create_task(monitor_state_transitions(app)) async def on_cleanup(app): """Actions to perform on application cleanup.""" logging.info("Cleaning up...") app['monitor_task'].cancel() app['state_monitor_task'].cancel() try: await app['monitor_task'] await app['state_monitor_task'] except asyncio.CancelledError: pass await app['client_session'].close() logging.info("Cleanup complete.") def main(): app = web.Application() app.router.add_get('/', handle_root) app.router.add_get('/state', handle_state) app.on_startup.append(on_startup) app.on_cleanup.append(on_cleanup) web.run_app(app, port=8081) if __name__ == '__main__': if not os.path.exists(MODEL_PATH): logging.error(f"Model file '{MODEL_PATH}' not found. Please run train.py first.") else: main()