import asyncio import aiohttp import logging import os import io import torch from torchvision import transforms from PIL import Image from model import (CropLowerRightTriangle, GarageDoorCNN, TRIANGLE_CROP_WIDTH, TRIANGLE_CROP_HEIGHT, RESIZE_DIM) # --- Configuration --- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') BLUEIRIS_KEY = os.getenv('BLUEIRIS_KEY') if not BLUEIRIS_KEY: raise ValueError("BLUEIRIS_KEY environment variable not set.") CAMERA_URL = "http://cameras.dns.t0.vc/image/Dump?&w=9999&decode=1" MODEL_PATH = 'garage_door_cnn.pth' CLASS_NAMES = ['closed', 'open'] # From training, sorted alphabetically POLL_INTERVAL_SECONDS = 30 REQUEST_TIMEOUT_SECONDS = 5 # --- Model Inference --- def get_prediction(model, image_bytes, device): """Run model inference on the provided image bytes.""" try: image = Image.open(io.BytesIO(image_bytes)).convert('RGB') except Exception as e: logging.error(f"Failed to open image from bytes: {e}") return None # Define the same transforms as used in validation transform = transforms.Compose([ CropLowerRightTriangle(triangle_width=TRIANGLE_CROP_WIDTH, triangle_height=TRIANGLE_CROP_HEIGHT), transforms.Resize((RESIZE_DIM, RESIZE_DIM)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) input_tensor = transform(image).unsqueeze(0).to(device) with torch.no_grad(): output = model(input_tensor) _, pred_idx = torch.max(output, 1) return CLASS_NAMES[pred_idx.item()] # --- Background Task --- async def monitor_garage_door(app): """Periodically fetches an image and logs the garage door status.""" logging.info("Starting garage door monitoring task.") session = app['client_session'] model = app['model'] device = app['device'] headers = {'Authorization': 'Basic ' + BLUEIRIS_KEY} while True: try: await asyncio.sleep(POLL_INTERVAL_SECONDS) logging.debug("Fetching new image from camera...") async with session.get(CAMERA_URL, headers=headers, timeout=REQUEST_TIMEOUT_SECONDS) as response: if response.status == 200: image_bytes = await response.read() prediction = get_prediction(model, image_bytes, device) if prediction: logging.debug(f"Garage door status: {prediction}") else: logging.error(f"Failed to fetch image. Status: {response.status}, Reason: {response.reason}") except asyncio.TimeoutError: logging.warning("Request to camera timed out.") except aiohttp.ClientError as e: logging.error(f"Client error during image fetch: {e}") except asyncio.CancelledError: logging.info("Monitoring task cancelled.") break except Exception as e: logging.error(f"An unexpected error occurred in the monitoring task: {e}", exc_info=True) # Add a small delay before retrying on unexpected errors await asyncio.sleep(5) # --- Web Server --- async def handle_root(request): """Handler for the root GET request.""" return aiohttp.web.Response(text="hello world") async def on_startup(app): """Actions to perform on application startup.""" # Set up device app['device'] = torch.device("cuda" if torch.cuda.is_available() else "cpu") logging.info(f"Using device: {app['device']}") # Load model logging.info(f"Loading model from {MODEL_PATH}...") model = GarageDoorCNN(resize_dim=RESIZE_DIM).to(app['device']) model.load_state_dict(torch.load(MODEL_PATH, map_location=app['device'])) model.eval() app['model'] = model logging.info("Model loaded successfully.") # Create client session app['client_session'] = aiohttp.ClientSession() # Start background task app['monitor_task'] = asyncio.create_task(monitor_garage_door(app)) async def on_cleanup(app): """Actions to perform on application cleanup.""" logging.info("Cleaning up...") app['monitor_task'].cancel() try: await app['monitor_task'] except asyncio.CancelledError: pass await app['client_session'].close() logging.info("Cleanup complete.") def main(): app = aiohttp.web.Application() app.router.add_get('/', handle_root) app.on_startup.append(on_startup) app.on_cleanup.append(on_cleanup) aiohttp.web.run_app(app) if __name__ == '__main__': if not os.path.exists(MODEL_PATH): logging.error(f"Model file '{MODEL_PATH}' not found. Please run train.py first.") else: main()