Files
doormind/server.py
Tanner Collin de4c99bc1d feat: add background task to log state transitions
Co-authored-by: aider (gemini/gemini-2.5-pro-preview-05-06) <aider@aider.chat>
2025-07-31 19:37:21 -06:00

217 lines
8.4 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
logging.getLogger('aiohttp').setLevel(logging.DEBUG if DEBUG else logging.WARNING)
import asyncio
import aiohttp
from aiohttp import web
import io
from datetime import datetime
import torch
import torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import mysecrets
from model import (CropLowerRightTriangle, GarageDoorCNN, TRIANGLE_CROP_WIDTH,
TRIANGLE_CROP_HEIGHT, RESIZE_DIM)
# --- Configuration ---
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
CAMERA_URL = "http://cameras.dns.t0.vc/image/SE-S?&w=9999&decode=1"
MODEL_PATH = 'garage_door_cnn.pth'
CLASS_NAMES = ['closed', 'open'] # From training, sorted alphabetically
POLL_INTERVAL_SECONDS = 10
REQUEST_TIMEOUT_SECONDS = 5
UNSURE_CONFIDENCE_THRESHOLD = 0.97
PREDICTION_HISTORY = []
PREDICTION_HISTORY_MAX_LENGTH = 3
PREVIOUS_STATE = "unknown"
# --- Model Inference ---
def get_prediction(model, image_bytes, device):
"""Run model inference on the provided image bytes."""
try:
image = Image.open(io.BytesIO(image_bytes)).convert('RGB')
except Exception as e:
logging.error(f"Failed to open image from bytes: {e}")
return None
# Define the same transforms as used in validation
transform = transforms.Compose([
CropLowerRightTriangle(triangle_width=TRIANGLE_CROP_WIDTH, triangle_height=TRIANGLE_CROP_HEIGHT),
transforms.Resize((RESIZE_DIM, RESIZE_DIM)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
input_tensor = transform(image).unsqueeze(0).to(device)
with torch.no_grad():
output = model(input_tensor)
probabilities = F.softmax(output, dim=1)
confidence, pred_idx = torch.max(probabilities, 1)
return CLASS_NAMES[pred_idx.item()], confidence.item()
# --- Background Task ---
async def monitor_garage_door(app):
"""Periodically fetches an image and logs the garage door status."""
logging.info("Starting garage door monitoring task.")
session = app['client_session']
model = app['model']
device = app['device']
headers = {'Authorization': 'Basic ' + mysecrets.BLUEIRIS_KEY}
while True:
try:
await asyncio.sleep(POLL_INTERVAL_SECONDS)
logging.debug("Fetching new image from camera...")
async with session.get(CAMERA_URL, headers=headers, timeout=REQUEST_TIMEOUT_SECONDS) as response:
if response.status == 200:
image_bytes = await response.read()
result = get_prediction(model, image_bytes, device)
if result:
prediction, confidence = result
logging.debug(f"Garage door status: {prediction} (confidence: {confidence:.4f})")
# Update prediction history
if confidence >= UNSURE_CONFIDENCE_THRESHOLD:
PREDICTION_HISTORY.append(prediction)
else:
PREDICTION_HISTORY.append('unknown')
# Trim history if it's too long
if len(PREDICTION_HISTORY) > PREDICTION_HISTORY_MAX_LENGTH:
PREDICTION_HISTORY.pop(0)
if confidence < UNSURE_CONFIDENCE_THRESHOLD:
# Sanitize timestamp for use in filename
timestamp = datetime.now().isoformat().replace(':', '-')
filename = f"{timestamp}.jpg"
# Construct path and save file
unsure_dir = os.path.join('data', 'unsure', prediction)
os.makedirs(unsure_dir, exist_ok=True)
filepath = os.path.join(unsure_dir, filename)
with open(filepath, 'wb') as f:
f.write(image_bytes)
logging.info(f"Low confidence prediction: {prediction} ({confidence:.4f}). Saved for review: {filepath}")
else:
logging.error(f"Failed to fetch image. Status: {response.status}, Reason: {response.reason}")
except asyncio.TimeoutError:
logging.warning("Request to camera timed out.")
PREDICTION_HISTORY.append('unknown')
if len(PREDICTION_HISTORY) > PREDICTION_HISTORY_MAX_LENGTH:
PREDICTION_HISTORY.pop(0)
except aiohttp.ClientError as e:
logging.error(f"Client error during image fetch: {e}")
PREDICTION_HISTORY.append('unknown')
if len(PREDICTION_HISTORY) > PREDICTION_HISTORY_MAX_LENGTH:
PREDICTION_HISTORY.pop(0)
except asyncio.CancelledError:
logging.info("Monitoring task cancelled.")
break
except Exception as e:
logging.error(f"An unexpected error occurred in the monitoring task: {e}", exc_info=True)
# Add a small delay before retrying on unexpected errors
await asyncio.sleep(5)
async def monitor_state_transitions(app):
"""Periodically checks for state transitions and logs them."""
global PREVIOUS_STATE
logging.info("Starting state transition monitoring task.")
while True:
try:
await asyncio.sleep(5)
current_state = get_derived_state()
if current_state != "unknown" and current_state != PREVIOUS_STATE:
logging.info(f"State transitioned from '{PREVIOUS_STATE}' to '{current_state}'.")
PREVIOUS_STATE = current_state
except asyncio.CancelledError:
logging.info("State transition monitoring task cancelled.")
break
except Exception as e:
logging.error(f"An unexpected error occurred in the state monitoring task: {e}", exc_info=True)
await asyncio.sleep(5)
# --- Web Server ---
def get_derived_state():
"""Derives the state from the prediction history."""
state = "unknown"
if len(PREDICTION_HISTORY) == PREDICTION_HISTORY_MAX_LENGTH:
if all(s == "open" for s in PREDICTION_HISTORY):
state = "open"
elif all(s == "closed" for s in PREDICTION_HISTORY):
state = "closed"
return state
async def handle_root(request):
"""Handler for the root GET request."""
return web.Response(text="hello world")
async def handle_state(request):
"""Handler for the /state GET request."""
state = get_derived_state()
return web.Response(text=state)
async def on_startup(app):
"""Actions to perform on application startup."""
# Set up device
app['device'] = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logging.info(f"Using device: {app['device']}")
# Load model
logging.info(f"Loading model from {MODEL_PATH}...")
model = GarageDoorCNN(resize_dim=RESIZE_DIM).to(app['device'])
model.load_state_dict(torch.load(MODEL_PATH, map_location=app['device']))
model.eval()
app['model'] = model
logging.info("Model loaded successfully.")
# Create client session
app['client_session'] = aiohttp.ClientSession()
# Start background task
app['monitor_task'] = asyncio.create_task(monitor_garage_door(app))
app['state_monitor_task'] = asyncio.create_task(monitor_state_transitions(app))
async def on_cleanup(app):
"""Actions to perform on application cleanup."""
logging.info("Cleaning up...")
app['monitor_task'].cancel()
app['state_monitor_task'].cancel()
try:
await app['monitor_task']
await app['state_monitor_task']
except asyncio.CancelledError:
pass
await app['client_session'].close()
logging.info("Cleanup complete.")
def main():
app = web.Application()
app.router.add_get('/', handle_root)
app.router.add_get('/state', handle_state)
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
web.run_app(app, port=8081)
if __name__ == '__main__':
if not os.path.exists(MODEL_PATH):
logging.error(f"Model file '{MODEL_PATH}' not found. Please run train.py first.")
else:
main()