""" Immich Drop Uploader – Backend (FastAPI, simplified) ---------------------------------------------------- - Serves static frontend (no settings UI) - Uploads to Immich using values from .env ONLY - Duplicate checks (local SHA-1 DB + optional Immich bulk-check) - WebSocket progress per session - Ephemeral "Connected" banner via /api/ping """ from __future__ import annotations import asyncio import io import json import hashlib import os import sqlite3 import binascii import pytz from datetime import datetime from typing import Dict, List, Optional import logging from fastapi import FastAPI, UploadFile, WebSocket, WebSocketDisconnect, Request, Form from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, RedirectResponse, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from starlette.websockets import WebSocketState from starlette.middleware.sessions import SessionMiddleware from PIL import Image, ExifTags try: import qrcode except Exception: qrcode = None from app.config import Settings, load_settings # ---- App & static ---- app = FastAPI(title="Immich Drop Uploader (Python)") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global settings (read-only at runtime) SETTINGS: Settings = load_settings() # Basic logging setup using settings logging.basicConfig(level=SETTINGS.log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s") logger = logging.getLogger("immich_drop") # Cookie-based session for short-lived auth token storage (no persistence) app.add_middleware(SessionMiddleware, secret_key=SETTINGS.session_secret, same_site="lax") FRONTEND_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend") app.mount("/static", StaticFiles(directory=FRONTEND_DIR), name="static") # Chunk upload storage CHUNK_ROOT = "./data/chunks" try: os.makedirs(CHUNK_ROOT, exist_ok=True) except Exception: pass # ---------- DB (local dedupe cache) ---------- def db_init() -> None: """Create the local SQLite table used for duplicate checks (idempotent).""" conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS uploads ( id INTEGER PRIMARY KEY AUTOINCREMENT, checksum TEXT UNIQUE, filename TEXT, size INTEGER, device_asset_id TEXT, immich_asset_id TEXT, created_at TEXT, inserted_at TEXT DEFAULT CURRENT_TIMESTAMP ); """ ) conn.commit() conn.close() def db_lookup_checksum(checksum: str) -> Optional[dict]: """Return a record for the given checksum if seen before (None if not).""" conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute("SELECT checksum, immich_asset_id FROM uploads WHERE checksum = ?", (checksum,)) row = cur.fetchone() conn.close() if row: return {"checksum": row[0], "immich_asset_id": row[1]} return None def db_lookup_device_asset(device_asset_id: str) -> bool: """True if a deviceAssetId has been uploaded by this service previously.""" conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute("SELECT 1 FROM uploads WHERE device_asset_id = ?", (device_asset_id,)) row = cur.fetchone() conn.close() return bool(row) def db_insert_upload(checksum: str, filename: str, size: int, device_asset_id: str, immich_asset_id: Optional[str], created_at: str) -> None: """Insert a newly-uploaded asset into the local cache (ignore on duplicates).""" conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute( "INSERT OR IGNORE INTO uploads (checksum, filename, size, device_asset_id, immich_asset_id, created_at) VALUES (?,?,?,?,?,?)", (checksum, filename, size, device_asset_id, immich_asset_id, created_at) ) conn.commit() conn.close() db_init() # ---------- WebSocket hub ---------- class SessionHub: """Holds WebSocket connections per session and broadcasts progress updates.""" def __init__(self) -> None: self.sessions: Dict[str, List[WebSocket]] = {} async def connect(self, session_id: str, ws: WebSocket) -> None: """Register a newly accepted WebSocket under the given session id.""" self.sessions.setdefault(session_id, []).append(ws) def _cleanup_closed(self, session_id: str) -> None: """Drop closed sockets and cleanup empty session buckets.""" if session_id not in self.sessions: return self.sessions[session_id] = [w for w in self.sessions[session_id] if w.client_state == WebSocketState.CONNECTED] if not self.sessions[session_id]: del self.sessions[session_id] async def send(self, session_id: str, payload: dict) -> None: """Broadcast a JSON payload to all sockets for one session.""" conns = self.sessions.get(session_id, []) for ws in list(conns): try: await ws.send_text(json.dumps(payload)) except Exception: try: await ws.close() except Exception: pass self._cleanup_closed(session_id) async def disconnect(self, session_id: str, ws: WebSocket) -> None: """Remove a socket from the hub and close it (best-effort).""" if session_id in self.sessions and ws in self.sessions[session_id]: self.sessions[session_id].remove(ws) self._cleanup_closed(session_id) # Only try to close if the connection is still open if ws.client_state == WebSocketState.CONNECTED: try: await ws.close() except Exception: pass hub = SessionHub() # ---------- Helpers ---------- def sha1_hex(file_bytes: bytes) -> str: """Return SHA-1 hex digest of file_bytes.""" h = hashlib.sha1() h.update(file_bytes) return h.hexdigest() def sanitize_filename(name: Optional[str]) -> str: """Return a minimally sanitized filename that preserves the original name. - Removes control characters (\x00-\x1F, \x7F) - Replaces path separators ('/' and '\\') with underscore - Falls back to 'file' if empty Other Unicode characters and spaces are preserved. """ if not name: return "file" cleaned_chars = [] for ch in str(name): o = ord(ch) if o < 32 or o == 127: continue if ch in ('/', '\\'): cleaned_chars.append('_') else: cleaned_chars.append(ch) cleaned = ''.join(cleaned_chars).strip() return cleaned or "file" def read_exif_datetimes(file_bytes: bytes): """ Extract EXIF DateTimeOriginal / ModifyDate values when possible. Returns (created, modified) as datetime or (None, None) on failure. """ created = modified = None try: with Image.open(io.BytesIO(file_bytes)) as im: exif = getattr(im, "_getexif", lambda: None)() or {} if exif: tags = {ExifTags.TAGS.get(k, k): v for k, v in exif.items()} dt_original = tags.get("DateTimeOriginal") or tags.get("CreateDate") dt_modified = tags.get("ModifyDate") or dt_original def parse_dt(s: str): try: return datetime.strptime(s, "%Y:%m:%d %H:%M:%S") except Exception: return None if isinstance(dt_original, str): created = parse_dt(dt_original) if isinstance(dt_modified, str): modified = parse_dt(dt_modified) except Exception: pass return created, modified def _hash_password(pw: str) -> str: """Return PBKDF2-SHA256 hash of a password.""" try: if not pw: return "" salt = os.urandom(16) iterations = 200_000 dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations) return f"pbkdf2_sha256${iterations}${binascii.hexlify(salt).decode()}${binascii.hexlify(dk).decode()}" except Exception: return "" def _verify_password(stored: str, pw: Optional[str]) -> bool: """Verify a password against a PBKDF2-SHA256 hash.""" if not pw or not stored: return False try: algo, iter_s, salt_hex, hash_hex = stored.split("$") if algo != 'pbkdf2_sha256': return False iterations = int(iter_s) salt = binascii.unhexlify(salt_hex) dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations) return binascii.hexlify(dk).decode() == hash_hex except Exception: return False def get_or_create_album_dir(album_name: str) -> str: """Get or create a directory for an album. Returns the path.""" if not album_name or not isinstance(album_name, str): album_name = "public" if album_name == "public": try: tz = pytz.timezone(SETTINGS.timezone) today = datetime.now(tz).strftime('%Y-%m-%d') save_dir = os.path.join("./data/uploads", "public", today) except Exception as e: logger.warning("Timezone logic failed, falling back to 'public' album. Timezone: %s. Error: %s", SETTINGS.timezone, e) save_dir = os.path.join("./data/uploads", "public") else: safe_album_name = sanitize_filename(album_name) save_dir = os.path.join("./data/uploads", safe_album_name) os.makedirs(save_dir, exist_ok=True) return save_dir async def send_progress(session_id: str, item_id: str, status: str, progress: int = 0, message: str = "", response_id: Optional[str] = None) -> None: """Push a progress update over WebSocket for one queue item.""" await hub.send(session_id, { "item_id": item_id, "status": status, "progress": progress, "message": message, "responseId": response_id, }) # ---------- Routes ---------- @app.get("/", response_class=HTMLResponse) async def index(request: Request) -> HTMLResponse: """Serve the SPA (frontend/index.html) or redirect to login if disabled.""" if not SETTINGS.public_upload_page_enabled: return RedirectResponse(url="/login") return FileResponse(os.path.join(FRONTEND_DIR, "index.html")) @app.get("/login", response_class=HTMLResponse) async def login_page(_: Request) -> HTMLResponse: """Serve the login page.""" return FileResponse(os.path.join(FRONTEND_DIR, "login.html")) @app.get("/menu", response_class=HTMLResponse) async def menu_page(request: Request) -> HTMLResponse: """Serve the menu page for creating invite links. Requires login.""" if not request.session.get("accessToken"): return RedirectResponse(url="/login") return FileResponse(os.path.join(FRONTEND_DIR, "menu.html")) @app.get("/favicon.ico") async def favicon() -> Response: """Serve favicon from /static/favicon.png if present (avoids 404 noise).""" path = os.path.join(FRONTEND_DIR, "favicon.png") if os.path.exists(path): with open(path, "rb") as f: return Response(content=f.read(), media_type="image/png") return Response(status_code=204) @app.post("/api/ping") async def api_ping() -> dict: """Connectivity test endpoint used by the UI to display a temporary banner.""" return { "ok": True, "base_url": "Local Save Mode", "album_name": None } @app.get("/api/config") async def api_config() -> dict: """Expose minimal public configuration flags for the frontend.""" return { "public_upload_page_enabled": SETTINGS.public_upload_page_enabled, "chunked_uploads_enabled": SETTINGS.chunked_uploads_enabled, "chunk_size_mb": SETTINGS.chunk_size_mb, } @app.websocket("/ws") async def ws_endpoint(ws: WebSocket) -> None: """WebSocket endpoint for pushing per-item upload progress.""" await ws.accept() try: init = await ws.receive_text() data = json.loads(init) session_id = data.get("session_id") or "default" except Exception: session_id = "default" await hub.connect(session_id, ws) # keepalive to avoid proxy idle timeouts try: while True: msg_task = asyncio.create_task(ws.receive_text()) keep_task = asyncio.create_task(asyncio.sleep(30)) done, pending = await asyncio.wait({msg_task, keep_task}, return_when=asyncio.FIRST_COMPLETED) if msg_task in done: _ = msg_task.result() else: await ws.send_text('{"type":"ping"}') for t in pending: t.cancel() except WebSocketDisconnect: await hub.disconnect(session_id, ws) @app.post("/api/upload") async def api_upload( request: Request, file: UploadFile, item_id: str = Form(...), session_id: str = Form(...), last_modified: Optional[int] = Form(None), invite_token: Optional[str] = Form(None), fingerprint: Optional[str] = Form(None), ): """Receive a file, check duplicates, forward to Immich; stream progress via WS.""" raw = await file.read() size = len(raw) checksum = sha1_hex(raw) exif_created, exif_modified = read_exif_datetimes(raw) created_at = exif_created or (datetime.fromtimestamp(last_modified / 1000) if last_modified else datetime.utcnow()) modified_at = exif_modified or created_at created_iso = created_at.isoformat() modified_iso = modified_at.isoformat() device_asset_id = f"{file.filename}-{last_modified or 0}-{size}" if db_lookup_checksum(checksum): await send_progress(session_id, item_id, "duplicate", 100, "Duplicate (by checksum - local cache)") return JSONResponse({"status": "duplicate", "id": None}, status_code=200) if db_lookup_device_asset(device_asset_id): await send_progress(session_id, item_id, "duplicate", 100, "Already uploaded from this device (local cache)") return JSONResponse({"status": "duplicate", "id": None}, status_code=200) # Invite token validation (if provided) target_album_name: Optional[str] = None if invite_token: try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_by_session, password_hash, COALESCE(disabled,0) FROM invites WHERE token = ?", (invite_token,)) row = cur.fetchone() conn.close() except Exception as e: logger.exception("Invite lookup error: %s", e) row = None if not row: await send_progress(session_id, item_id, "error", 100, "Invalid invite token") return JSONResponse({"error": "invalid_invite"}, status_code=403) _, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_by_session, password_hash, disabled = row # Admin deactivation check try: if int(disabled) == 1: await send_progress(session_id, item_id, "error", 100, "Invite disabled") return JSONResponse({"error": "invite_disabled"}, status_code=403) except Exception: pass # If invite requires password, ensure this session is authorized if password_hash: try: ia = request.session.get("inviteAuth") or {} if not ia.get(invite_token): await send_progress(session_id, item_id, "error", 100, "Password required") return JSONResponse({"error": "invite_password_required"}, status_code=403) except Exception: await send_progress(session_id, item_id, "error", 100, "Password required") return JSONResponse({"error": "invite_password_required"}, status_code=403) # Expiry check if expires_at: try: if datetime.utcnow() > datetime.fromisoformat(expires_at): await send_progress(session_id, item_id, "error", 100, "Invite expired") return JSONResponse({"error": "invite_expired"}, status_code=403) except Exception: pass # One-time claim or multi-use enforcement try: max_uses_int = int(max_uses) if max_uses is not None else -1 except Exception: max_uses_int = -1 if max_uses_int == 1: # Already claimed? if claimed: # Allow same session to continue; block different sessions if claimed_by_session and claimed_by_session != session_id: await send_progress(session_id, item_id, "error", 100, "Invite already used") return JSONResponse({"error": "invite_claimed"}, status_code=403) # claimed by same session (or unknown): allow else: # Atomically claim the one-time invite to prevent concurrent use try: connc = sqlite3.connect(SETTINGS.state_db) curc = connc.cursor() curc.execute( "UPDATE invites SET claimed = 1, claimed_at = CURRENT_TIMESTAMP, claimed_by_session = ? WHERE token = ? AND (claimed IS NULL OR claimed = 0)", (session_id, invite_token) ) connc.commit() changed = connc.total_changes connc.close() except Exception as e: logger.exception("Invite claim failed: %s", e) return JSONResponse({"error": "invite_claim_failed"}, status_code=500) if changed == 0: # Someone else just claimed; re-check owner try: conn2 = sqlite3.connect(SETTINGS.state_db) cur2 = conn2.cursor() cur2.execute("SELECT claimed_by_session FROM invites WHERE token = ?", (invite_token,)) owner_row = cur2.fetchone() conn2.close() owner = owner_row[0] if owner_row else None except Exception: owner = None if not owner or owner != session_id: await send_progress(session_id, item_id, "error", 100, "Invite already used") return JSONResponse({"error": "invite_claimed"}, status_code=403) else: # Usage check for multi-use (max_uses < 0 => indefinite) if (used_count or 0) >= (max_uses_int if max_uses_int >= 0 else 10**9): await send_progress(session_id, item_id, "error", 100, "Invite already used up") return JSONResponse({"error": "invite_exhausted"}, status_code=403) target_album_name = album_name album_for_saving = target_album_name if invite_token else "public" if not invite_token and not SETTINGS.public_upload_page_enabled: await send_progress(session_id, item_id, "error", 100, "Public uploads disabled") return JSONResponse({"error": "public_upload_disabled"}, status_code=403) try: save_dir = get_or_create_album_dir(album_for_saving) safe_name = sanitize_filename(file.filename) save_path = os.path.join(save_dir, safe_name) # Avoid overwriting if os.path.exists(save_path): base, ext = os.path.splitext(safe_name) i = 1 while os.path.exists(save_path): save_path = os.path.join(save_dir, f"{base}_{i}{ext}") i += 1 with open(save_path, "wb") as f: f.write(raw) db_insert_upload(checksum, file.filename, size, device_asset_id, None, created_iso) msg = f"Saved to {album_for_saving}/{os.path.basename(save_path)}" await send_progress(session_id, item_id, "done", 100, msg) # Increment invite usage on success if invite_token: try: conn2 = sqlite3.connect(SETTINGS.state_db) cur2 = conn2.cursor() # Keep one-time used_count at 1; multi-use increments per asset cur2.execute("SELECT max_uses FROM invites WHERE token = ?", (invite_token,)) row_mu = cur2.fetchone() mx = None try: mx = int(row_mu[0]) if row_mu and row_mu[0] is not None else None except Exception: mx = None if mx == 1: cur2.execute("UPDATE invites SET used_count = 1 WHERE token = ?", (invite_token,)) else: cur2.execute("UPDATE invites SET used_count = used_count + 1 WHERE token = ?", (invite_token,)) conn2.commit() conn2.close() except Exception as e: logger.exception("Failed to increment invite usage: %s", e) # Log uploader identity and file metadata try: connlg = sqlite3.connect(SETTINGS.state_db) curlg = connlg.cursor() curlg.execute( """ CREATE TABLE IF NOT EXISTS upload_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, token TEXT, uploaded_at TEXT DEFAULT CURRENT_TIMESTAMP, ip TEXT, user_agent TEXT, fingerprint TEXT, filename TEXT, size INTEGER, checksum TEXT, immich_asset_id TEXT ); """ ) ip = None try: ip = (request.client.host if request and request.client else None) or request.headers.get('x-forwarded-for') except Exception: ip = None ua = request.headers.get('user-agent', '') if request else '' curlg.execute( "INSERT INTO upload_events (token, ip, user_agent, fingerprint, filename, size, checksum, immich_asset_id) VALUES (?,?,?,?,?,?,?,?)", (invite_token or '', ip, ua, fingerprint or '', file.filename, size, checksum, None) ) connlg.commit() connlg.close() except Exception: pass return JSONResponse({"id": None, "status": "done"}, status_code=200) except Exception as e: logger.exception("Local save failed: %s", e) await send_progress(session_id, item_id, "error", 100, "Failed to save file locally") return JSONResponse({"error": "local_save_failed"}, status_code=500) # --------- Chunked upload endpoints --------- def _chunk_dir(session_id: str, item_id: str) -> str: safe_session = session_id.replace('/', '_') safe_item = item_id.replace('/', '_') return os.path.join(CHUNK_ROOT, safe_session, safe_item) @app.post("/api/upload/chunk/init") async def api_upload_chunk_init(request: Request) -> JSONResponse: """Initialize a chunked upload; creates a temp directory for incoming parts.""" try: data = await request.json() except Exception: return JSONResponse({"error": "invalid_json"}, status_code=400) item_id = (data or {}).get("item_id") session_id = (data or {}).get("session_id") if not item_id or not session_id: return JSONResponse({"error": "missing_ids"}, status_code=400) d = _chunk_dir(session_id, item_id) try: os.makedirs(d, exist_ok=True) # Write manifest for later use meta = { "name": (data or {}).get("name"), "size": (data or {}).get("size"), "last_modified": (data or {}).get("last_modified"), "invite_token": (data or {}).get("invite_token"), "content_type": (data or {}).get("content_type") or "application/octet-stream", "created_at": datetime.utcnow().isoformat(), } with open(os.path.join(d, "meta.json"), "w", encoding="utf-8") as f: json.dump(meta, f) except Exception as e: logger.exception("Chunk init failed: %s", e) return JSONResponse({"error": "init_failed"}, status_code=500) return JSONResponse({"ok": True}) @app.post("/api/upload/chunk") async def api_upload_chunk( request: Request, item_id: str = Form(...), session_id: str = Form(...), chunk_index: int = Form(...), total_chunks: int = Form(...), invite_token: Optional[str] = Form(None), chunk: UploadFile = Form(...), ) -> JSONResponse: """Receive a single chunk; write to disk under chunk directory.""" d = _chunk_dir(session_id, item_id) try: os.makedirs(d, exist_ok=True) # Persist invite token in meta if provided (for consistency) meta_path = os.path.join(d, "meta.json") if os.path.exists(meta_path): try: with open(meta_path, "r", encoding="utf-8") as f: meta = json.load(f) except Exception: meta = {} else: meta = {} if invite_token: meta["invite_token"] = invite_token meta["total_chunks"] = int(total_chunks) with open(meta_path, "w", encoding="utf-8") as f: json.dump(meta, f) # Save chunk content = await chunk.read() with open(os.path.join(d, f"part_{int(chunk_index):06d}"), "wb") as f: f.write(content) except Exception as e: logger.exception("Chunk write failed: %s", e) return JSONResponse({"error": "chunk_write_failed"}, status_code=500) return JSONResponse({"ok": True}) @app.post("/api/upload/chunk/complete") async def api_upload_chunk_complete(request: Request) -> JSONResponse: """Assemble all parts and run the regular upload flow to Immich.""" try: data = await request.json() except Exception: return JSONResponse({"error": "invalid_json"}, status_code=400) item_id = (data or {}).get("item_id") session_id = (data or {}).get("session_id") name = (data or {}).get("name") or "upload.bin" last_modified = (data or {}).get("last_modified") invite_token = (data or {}).get("invite_token") fingerprint = (data or {}).get("fingerprint") content_type = (data or {}).get("content_type") or "application/octet-stream" if not item_id or not session_id: return JSONResponse({"error": "missing_ids"}, status_code=400) d = _chunk_dir(session_id, item_id) meta_path = os.path.join(d, "meta.json") # Basic validation try: with open(meta_path, "r", encoding="utf-8") as f: meta = json.load(f) except Exception: meta = {} total_chunks = int(meta.get("total_chunks") or (data or {}).get("total_chunks") or 0) if total_chunks <= 0: return JSONResponse({"error": "missing_total"}, status_code=400) # Prefer the name captured at init if request did not include it if not name: try: name = meta.get("name") or name except Exception: pass if not name: name = "upload.bin" # Assemble parts = [] try: for i in range(total_chunks): p = os.path.join(d, f"part_{i:06d}") if not os.path.exists(p): return JSONResponse({"error": "missing_part", "index": i}, status_code=400) with open(p, "rb") as f: parts.append(f.read()) raw = b"".join(parts) except Exception as e: logger.exception("Assemble failed: %s", e) return JSONResponse({"error": "assemble_failed"}, status_code=500) # Cleanup parts promptly try: for i in range(total_chunks): try: os.remove(os.path.join(d, f"part_{i:06d}")) except Exception: pass try: os.remove(meta_path) except Exception: pass try: os.rmdir(d) except Exception: pass except Exception: pass # Now reuse the core logic from api_upload but with assembled bytes item_id_local = item_id session_id_local = session_id file_like_name = name file_size = len(raw) checksum = sha1_hex(raw) exif_created, exif_modified = read_exif_datetimes(raw) created_at = exif_created or (datetime.fromtimestamp(last_modified / 1000) if last_modified else datetime.utcnow()) modified_at = exif_modified or created_at created_iso = created_at.isoformat() modified_iso = modified_at.isoformat() device_asset_id = f"{file_like_name}-{last_modified or 0}-{file_size}" # Local duplicate checks if db_lookup_checksum(checksum): await send_progress(session_id_local, item_id_local, "duplicate", 100, "Duplicate (by checksum - local cache)") return JSONResponse({"status": "duplicate", "id": None}, status_code=200) if db_lookup_device_asset(device_asset_id): await send_progress(session_id_local, item_id_local, "duplicate", 100, "Already uploaded from this device (local cache)") return JSONResponse({"status": "duplicate", "id": None}, status_code=200) # Invite validation/gating mirrors api_upload target_album_name: Optional[str] = None if invite_token: try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_by_session, password_hash, COALESCE(disabled,0) FROM invites WHERE token = ?", (invite_token,)) row = cur.fetchone() conn.close() except Exception as e: logger.exception("Invite lookup error: %s", e) row = None if not row: await send_progress(session_id_local, item_id_local, "error", 100, "Invalid invite token") return JSONResponse({"error": "invalid_invite"}, status_code=403) _, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_by_session, password_hash, disabled = row # Admin deactivation check try: if int(disabled) == 1: await send_progress(session_id_local, item_id_local, "error", 100, "Invite disabled") return JSONResponse({"error": "invite_disabled"}, status_code=403) except Exception: pass if password_hash: try: ia = request.session.get("inviteAuth") or {} if not ia.get(invite_token): await send_progress(session_id_local, item_id_local, "error", 100, "Password required") return JSONResponse({"error": "invite_password_required"}, status_code=403) except Exception: await send_progress(session_id_local, item_id_local, "error", 100, "Password required") return JSONResponse({"error": "invite_password_required"}, status_code=403) # expiry if expires_at: try: if datetime.utcnow() > datetime.fromisoformat(expires_at): await send_progress(session_id_local, item_id_local, "error", 100, "Invite expired") return JSONResponse({"error": "invite_expired"}, status_code=403) except Exception: pass try: max_uses_int = int(max_uses) if max_uses is not None else -1 except Exception: max_uses_int = -1 if max_uses_int == 1: if claimed: if claimed_by_session and claimed_by_session != session_id_local: await send_progress(session_id_local, item_id_local, "error", 100, "Invite already used") return JSONResponse({"error": "invite_claimed"}, status_code=403) else: try: connc = sqlite3.connect(SETTINGS.state_db) curc = connc.cursor() curc.execute( "UPDATE invites SET claimed = 1, claimed_at = CURRENT_TIMESTAMP, claimed_by_session = ? WHERE token = ? AND (claimed IS NULL OR claimed = 0)", (session_id_local, invite_token) ) connc.commit() changed = connc.total_changes connc.close() except Exception as e: logger.exception("Invite claim failed: %s", e) return JSONResponse({"error": "invite_claim_failed"}, status_code=500) if changed == 0: try: conn2 = sqlite3.connect(SETTINGS.state_db) cur2 = conn2.cursor() cur2.execute("SELECT claimed_by_session FROM invites WHERE token = ?", (invite_token,)) owner_row = cur2.fetchone() conn2.close() owner = owner_row[0] if owner_row else None except Exception: owner = None if not owner or owner != session_id_local: await send_progress(session_id_local, item_id_local, "error", 100, "Invite already used") return JSONResponse({"error": "invite_claimed"}, status_code=403) else: if (used_count or 0) >= (max_uses_int if max_uses_int >= 0 else 10**9): await send_progress(session_id_local, item_id_local, "error", 100, "Invite already used up") return JSONResponse({"error": "invite_exhausted"}, status_code=403) target_album_name = album_name album_for_saving = target_album_name if invite_token else "public" if not invite_token and not SETTINGS.public_upload_page_enabled: await send_progress(session_id_local, item_id_local, "error", 100, "Public uploads disabled") return JSONResponse({"error": "public_upload_disabled"}, status_code=403) try: save_dir = get_or_create_album_dir(album_for_saving) safe_name = sanitize_filename(file_like_name) save_path = os.path.join(save_dir, safe_name) if os.path.exists(save_path): base, ext = os.path.splitext(safe_name) i = 1 while os.path.exists(save_path): save_path = os.path.join(save_dir, f"{base}_{i}{ext}") i += 1 with open(save_path, "wb") as f: f.write(raw) db_insert_upload(checksum, file_like_name, file_size, device_asset_id, None, created_iso) msg = f"Saved to {album_for_saving}/{os.path.basename(save_path)}" await send_progress(session_id_local, item_id_local, "done", 100, msg) if invite_token: try: conn2 = sqlite3.connect(SETTINGS.state_db) cur2 = conn2.cursor() cur2.execute("SELECT max_uses FROM invites WHERE token = ?", (invite_token,)) row_mu = cur2.fetchone() mx = None try: mx = int(row_mu[0]) if row_mu and row_mu[0] is not None else None except Exception: mx = None if mx == 1: cur2.execute("UPDATE invites SET used_count = 1 WHERE token = ?", (invite_token,)) else: cur2.execute("UPDATE invites SET used_count = used_count + 1 WHERE token = ?", (invite_token,)) conn2.commit() conn2.close() except Exception as e: logger.exception("Failed to increment invite usage: %s", e) # Log uploader identity and file metadata try: connlg = sqlite3.connect(SETTINGS.state_db) curlg = connlg.cursor() curlg.execute( """ CREATE TABLE IF NOT EXISTS upload_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, token TEXT, uploaded_at TEXT DEFAULT CURRENT_TIMESTAMP, ip TEXT, user_agent TEXT, fingerprint TEXT, filename TEXT, size INTEGER, checksum TEXT, immich_asset_id TEXT ); """ ) ip = None try: ip = (request.client.host if request and request.client else None) or request.headers.get('x-forwarded-for') except Exception: ip = None ua = request.headers.get('user-agent', '') if request else '' curlg.execute( "INSERT INTO upload_events (token, ip, user_agent, fingerprint, filename, size, checksum, immich_asset_id) VALUES (?,?,?,?,?,?,?,?)", (invite_token or '', ip, ua, fingerprint or '', file_like_name, file_size, checksum, None) ) connlg.commit() connlg.close() except Exception: pass return JSONResponse({"id": None, "status": "done"}, status_code=200) except Exception as e: await send_progress(session_id_local, item_id_local, "error", 100, "Failed to save file locally") return JSONResponse({"error": "local_save_failed"}, status_code=500) # ---------- Auth & Albums & Invites APIs ---------- @app.post("/api/login") async def api_login(request: Request) -> JSONResponse: """Authenticate against the local admin password.""" try: body = await request.json() except Exception: return JSONResponse({"error": "invalid_json"}, status_code=400) email = (body or {}).get("email") password = (body or {}).get("password") if not email or not password: return JSONResponse({"error": "missing_credentials"}, status_code=400) stored_password = SETTINGS.admin_password password_ok = False if stored_password.startswith("pbkdf2_sha256$"): password_ok = _verify_password(stored_password, password) else: password_ok = (password == stored_password) if email == "admin" and password_ok: user_info = { "accessToken": "local_admin_session", # dummy value "userEmail": "admin", "userId": "admin", "name": "Admin", "isAdmin": True, } request.session.update(user_info) logger.info("Admin user logged in") return JSONResponse({"ok": True, **{k: user_info.get(k) for k in ("userEmail","userId","name","isAdmin")}}) logger.warning("Failed login attempt for user %s", email) return JSONResponse({"error": "unauthorized"}, status_code=401) @app.post("/api/logout") async def api_logout(request: Request) -> dict: request.session.clear() return {"ok": True} @app.get("/logout") async def logout_get(request: Request) -> RedirectResponse: request.session.clear() return RedirectResponse(url="/login") @app.get("/api/albums") async def api_albums(request: Request) -> JSONResponse: """Return list of albums (directories) if authorized.""" if not request.session.get("accessToken"): return JSONResponse({"error": "unauthorized"}, status_code=401) upload_root = "./data/uploads" try: os.makedirs(upload_root, exist_ok=True) # also make public dir os.makedirs(os.path.join(upload_root, "public"), exist_ok=True) albums = [] for name in os.listdir(upload_root): if os.path.isdir(os.path.join(upload_root, name)): albums.append({"id": name, "albumName": name}) return JSONResponse(albums) except Exception as e: logger.exception("Failed to list album directories: %s", e) return JSONResponse({"error": "list_albums_failed"}, status_code=500) @app.post("/api/albums") async def api_albums_create(request: Request) -> JSONResponse: if not request.session.get("accessToken"): return JSONResponse({"error": "unauthorized"}, status_code=401) try: body = await request.json() except Exception: return JSONResponse({"error": "invalid_json"}, status_code=400) name = (body or {}).get("name") if not name: return JSONResponse({"error": "missing_name"}, status_code=400) try: get_or_create_album_dir(name) return JSONResponse({"id": name, "albumName": name}, status_code=201) except Exception as e: logger.exception("Create album directory failed: %s", e) return JSONResponse({"error": "create_album_failed"}, status_code=500) # ---------- Invites (one-time/expiring links) ---------- def ensure_invites_table() -> None: try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute( """ CREATE TABLE IF NOT EXISTS invites ( token TEXT PRIMARY KEY, album_id TEXT, album_name TEXT, max_uses INTEGER DEFAULT 1, used_count INTEGER DEFAULT 0, expires_at TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ); """ ) # Attempt to add new columns for claiming semantics try: cur.execute("ALTER TABLE invites ADD COLUMN claimed INTEGER DEFAULT 0") except Exception: pass try: cur.execute("ALTER TABLE invites ADD COLUMN claimed_at TEXT") except Exception: pass try: cur.execute("ALTER TABLE invites ADD COLUMN claimed_by_session TEXT") except Exception: pass # Optional password protection for invites try: cur.execute("ALTER TABLE invites ADD COLUMN password_hash TEXT") except Exception: pass # Ownership and management fields (best-effort migrations) try: cur.execute("ALTER TABLE invites ADD COLUMN owner_user_id TEXT") except Exception: pass try: cur.execute("ALTER TABLE invites ADD COLUMN owner_email TEXT") except Exception: pass try: cur.execute("ALTER TABLE invites ADD COLUMN owner_name TEXT") except Exception: pass try: cur.execute("ALTER TABLE invites ADD COLUMN name TEXT") except Exception: pass try: cur.execute("ALTER TABLE invites ADD COLUMN disabled INTEGER DEFAULT 0") except Exception: pass conn.commit() conn.close() except Exception as e: logger.exception("Failed to ensure invites table: %s", e) ensure_invites_table() @app.post("/api/invites") async def api_invites_create(request: Request) -> JSONResponse: """Create an invite link for uploads with optional expiry and max uses.""" # Require a logged-in session to create invites if not request.session.get("accessToken"): return JSONResponse({"error": "unauthorized"}, status_code=401) try: body = await request.json() except Exception: return JSONResponse({"error": "invalid_json"}, status_code=400) album_id = (body or {}).get("albumId") album_name = (body or {}).get("albumName") max_uses = (body or {}).get("maxUses", 1) invite_password = (body or {}).get("password") expires_days = (body or {}).get("expiresDays") # Normalize max_uses try: max_uses = int(max_uses) except Exception: max_uses = 1 # Allow blank album for invites (will default to public) if not album_name: album_name = "public" # Ensure album directory exists get_or_create_album_dir(album_name) resolved_album_id = None # not used # Compute expiry expires_at = None if expires_days is not None: try: days = int(expires_days) expires_at = (datetime.utcnow()).replace(microsecond=0).isoformat() # Use timedelta from datetime import timedelta expires_at = (datetime.utcnow() + timedelta(days=days)).replace(microsecond=0).isoformat() except Exception: expires_at = None # Generate token import uuid token = uuid.uuid4().hex # Prepare password hash, if provided pw_hash = _hash_password(invite_password or "") if (invite_password and str(invite_password).strip()) else None # Owner info from session owner_user_id = str(request.session.get("userId") or "") owner_email = str(request.session.get("userEmail") or "") owner_name = str(request.session.get("name") or "") # Friendly name: default to album + creation timestamp if not provided in future updates # Here we set a default immediately now_tag = datetime.utcnow().strftime("%Y%m%d-%H%M") default_link_name = f"{album_name or 'NoAlbum'}-{now_tag}" try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() if pw_hash: cur.execute( "INSERT INTO invites (token, album_id, album_name, max_uses, expires_at, password_hash, owner_user_id, owner_email, owner_name, name) VALUES (?,?,?,?,?,?,?,?,?,?)", (token, None, album_name, max_uses, expires_at, pw_hash, owner_user_id, owner_email, owner_name, default_link_name) ) else: cur.execute( "INSERT INTO invites (token, album_id, album_name, max_uses, expires_at, owner_user_id, owner_email, owner_name, name) VALUES (?,?,?,?,?,?,?,?,?)", (token, None, album_name, max_uses, expires_at, owner_user_id, owner_email, owner_name, default_link_name) ) conn.commit() conn.close() except Exception as e: logger.exception("Failed to create invite: %s", e) return JSONResponse({"error": "db_error"}, status_code=500) # Build absolute URL using PUBLIC_BASE_URL if set, else request base try: base_url = SETTINGS.public_base_url.strip().rstrip('/') if SETTINGS.public_base_url else str(request.base_url).rstrip('/') except Exception: base_url = str(request.base_url).rstrip('/') absolute = f"{base_url}/invite/{token}" return JSONResponse({ "ok": True, "token": token, "url": f"/invite/{token}", "absoluteUrl": absolute, "albumId": None, "albumName": album_name, "maxUses": max_uses, "expiresAt": expires_at, "name": default_link_name }) @app.get("/api/invites") async def api_invites_list(request: Request) -> JSONResponse: """List invites owned by the logged-in user, with optional q/sort filters.""" if not request.session.get("accessToken"): return JSONResponse({"error": "unauthorized"}, status_code=401) owner_user_id = str(request.session.get("userId") or "") q = (request.query_params.get("q") or "").strip() sort = (request.query_params.get("sort") or "-created").strip() # Map sort tokens to SQL sort_sql = "created_at DESC" if sort in ("created", "+created"): sort_sql = "created_at ASC" elif sort in ("-created",): sort_sql = "created_at DESC" elif sort in ("expires", "+expires"): sort_sql = "expires_at ASC" elif sort in ("-expires",): sort_sql = "expires_at DESC" elif sort in ("name", "+name"): sort_sql = "name ASC" elif sort in ("-name",): sort_sql = "name DESC" try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() if q: like = f"%{q}%" cur.execute( """ SELECT token, name, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), COALESCE(disabled,0), created_at FROM invites WHERE owner_user_id = ? AND ( COALESCE(name,'') LIKE ? OR COALESCE(album_name,'') LIKE ? OR token LIKE ? ) ORDER BY """ + sort_sql, (owner_user_id, like, like, like) ) else: cur.execute( f"SELECT token, name, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), COALESCE(disabled,0), created_at FROM invites WHERE owner_user_id = ? ORDER BY {sort_sql}", (owner_user_id,) ) rows = cur.fetchall() conn.close() except Exception as e: logger.exception("List invites failed: %s", e) return JSONResponse({"error": "db_error"}, status_code=500) items = [] now = datetime.utcnow() for (token, name, album_id, album_name, max_uses, used_count, expires_at, claimed, disabled, created_at) in rows: try: max_uses_int = int(max_uses) if max_uses is not None else -1 except Exception: max_uses_int = -1 remaining = None try: if max_uses_int >= 0: remaining = int(max_uses_int) - int(used_count or 0) except Exception: remaining = None expired = False if expires_at: try: expired = now > datetime.fromisoformat(expires_at) except Exception: expired = False inactive_reason = None active = True if (max_uses_int == 1 and claimed) or (remaining is not None and remaining <= 0): active = False inactive_reason = "claimed" if max_uses_int == 1 else "exhausted" if expired: active = False inactive_reason = inactive_reason or "expired" try: if int(disabled) == 1: active = False inactive_reason = "disabled" except Exception: pass items.append({ "token": token, "name": name, "albumId": album_id, "albumName": album_name, "maxUses": max_uses, "used": used_count or 0, "remaining": remaining, "expiresAt": expires_at, "active": active, "inactiveReason": inactive_reason, "createdAt": created_at, }) return JSONResponse({"items": items}) @app.patch("/api/invite/{token}") async def api_invite_update(token: str, request: Request) -> JSONResponse: """Update invite fields: name, disabled, maxUses, expiresAt/expiresDays, password, resetUsage.""" if not request.session.get("accessToken"): return JSONResponse({"error": "unauthorized"}, status_code=401) try: body = await request.json() except Exception: body = {} owner_user_id = str(request.session.get("userId") or "") # Build dynamic update fields = [] params = [] # Name if "name" in (body or {}): fields.append("name = ?") params.append(str((body or {}).get("name") or "").strip()) # Disabled toggle if "disabled" in (body or {}): try: disabled = 1 if (bool((body or {}).get("disabled")) is True) else 0 except Exception: disabled = 0 fields.append("disabled = ?") params.append(disabled) # Max uses if "maxUses" in (body or {}): try: mu = int((body or {}).get("maxUses")) except Exception: mu = 1 fields.append("max_uses = ?") params.append(mu) # Expiration if "expiresAt" in (body or {}) or "expiresDays" in (body or {}): expires_at = None if (body or {}).get("expiresAt"): try: # trust provided ISO string expires_at = str((body or {}).get("expiresAt")) except Exception: expires_at = None else: try: days = int((body or {}).get("expiresDays")) from datetime import timedelta expires_at = (datetime.utcnow() + timedelta(days=days)).replace(microsecond=0).isoformat() except Exception: expires_at = None fields.append("expires_at = ?") params.append(expires_at) # Password if "password" in (body or {}): pw = str((body or {}).get("password") or "").strip() if pw: fields.append("password_hash = ?") params.append(_hash_password(pw)) else: fields.append("password_hash = NULL") # Reset usage reset_usage = bool((body or {}).get("resetUsage")) try: if fields: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute( f"UPDATE invites SET {', '.join(fields)} WHERE token = ? AND owner_user_id = ?", (*params, token, owner_user_id) ) if reset_usage: cur.execute("UPDATE invites SET used_count = 0, claimed = 0, claimed_at = NULL, claimed_by_session = NULL WHERE token = ? AND owner_user_id = ?", (token, owner_user_id)) conn.commit() updated = conn.total_changes conn.close() else: updated = 0 except Exception as e: logger.exception("Invite update failed: %s", e) return JSONResponse({"error": "db_error"}, status_code=500) if updated == 0: return JSONResponse({"ok": False, "updated": 0}, status_code=404) return JSONResponse({"ok": True, "updated": updated}) @app.post("/api/invites/bulk") async def api_invites_bulk(request: Request) -> JSONResponse: """Bulk enable/disable invites owned by current user. Body: {tokens:[], action:'disable'|'enable'}""" if not request.session.get("accessToken"): return JSONResponse({"error": "unauthorized"}, status_code=401) try: body = await request.json() except Exception: body = {} tokens = list((body or {}).get("tokens") or []) action = str((body or {}).get("action") or "disable").lower().strip() if not tokens: return JSONResponse({"error": "missing_tokens"}, status_code=400) val = 1 if action == "disable" else 0 owner_user_id = str(request.session.get("userId") or "") try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() # Build query with correct number of placeholders placeholders = ",".join(["?"] * len(tokens)) cur.execute( f"UPDATE invites SET disabled = ? WHERE owner_user_id = ? AND token IN ({placeholders})", (val, owner_user_id, *tokens) ) conn.commit() changed = conn.total_changes conn.close() except Exception as e: logger.exception("Bulk update failed: %s", e) return JSONResponse({"error": "db_error"}, status_code=500) return JSONResponse({"ok": True, "updated": changed}) @app.post("/api/invites/delete") async def api_invites_delete(request: Request) -> JSONResponse: """Hard delete invites owned by the current user and their upload logs. Body: { tokens: ["...", ...] } """ if not request.session.get("accessToken"): return JSONResponse({"error": "unauthorized"}, status_code=401) try: body = await request.json() except Exception: body = {} tokens = list((body or {}).get("tokens") or []) if not tokens: return JSONResponse({"error": "missing_tokens"}, status_code=400) owner_user_id = str(request.session.get("userId") or "") try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() placeholders = ",".join(["?"] * len(tokens)) # Delete upload events first to avoid orphan rows cur.execute( f"DELETE FROM upload_events WHERE token IN ({placeholders})", (*tokens,) ) # Delete invites scoped to owner cur.execute( f"DELETE FROM invites WHERE owner_user_id = ? AND token IN ({placeholders})", (owner_user_id, *tokens) ) conn.commit() changed = conn.total_changes conn.close() except Exception as e: logger.exception("Bulk delete failed: %s", e) return JSONResponse({"error": "db_error"}, status_code=500) return JSONResponse({"ok": True, "deleted": changed}) @app.get("/api/invite/{token}/uploads") async def api_invite_uploads(token: str, request: Request) -> JSONResponse: """Return upload events for a given token (owner-only).""" if not request.session.get("accessToken"): return JSONResponse({"error": "unauthorized"}, status_code=401) owner_user_id = str(request.session.get("userId") or "") try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() # Verify ownership cur.execute("SELECT 1 FROM invites WHERE token = ? AND owner_user_id = ?", (token, owner_user_id)) row = cur.fetchone() if not row: conn.close() return JSONResponse({"error": "forbidden"}, status_code=403) cur.execute("SELECT uploaded_at, ip, user_agent, fingerprint, filename, size, checksum, immich_asset_id FROM upload_events WHERE token = ? ORDER BY uploaded_at DESC LIMIT 500", (token,)) rows = cur.fetchall() conn.close() except Exception as e: logger.exception("Fetch uploads failed: %s", e) return JSONResponse({"error": "db_error"}, status_code=500) items = [] for uploaded_at, ip, ua, fp, filename, size, checksum, asset_id in rows: items.append({ "uploadedAt": uploaded_at, "ip": ip, "userAgent": ua, "fingerprint": fp, "filename": filename, "size": size, "checksum": checksum, "assetId": asset_id, }) return JSONResponse({"items": items}) @app.get("/invite/{token}", response_class=HTMLResponse) async def invite_page(token: str, request: Request) -> HTMLResponse: # If public invites disabled and no user session, require login #if not request.session.get("accessToken"): # return RedirectResponse(url="/login") return FileResponse(os.path.join(FRONTEND_DIR, "invite.html")) @app.get("/api/invite/{token}") async def api_invite_info(token: str, request: Request) -> JSONResponse: try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_at, password_hash, COALESCE(disabled,0), name FROM invites WHERE token = ?", (token,)) row = cur.fetchone() conn.close() except Exception as e: logger.exception("Invite info error: %s", e) return JSONResponse({"error": "db_error"}, status_code=500) if not row: return JSONResponse({"error": "not_found"}, status_code=404) _, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_at, password_hash, disabled, link_name = row # compute remaining remaining = None try: if max_uses is not None and int(max_uses) >= 0: remaining = int(max_uses) - int(used_count or 0) except Exception: remaining = None # compute state flags try: one_time = (int(max_uses) == 1) except Exception: one_time = False expired = False if expires_at: try: expired = datetime.utcnow() > datetime.fromisoformat(expires_at) except Exception: expired = False deactivated = False reason = None if one_time and claimed: deactivated = True reason = "claimed" elif remaining is not None and remaining <= 0: deactivated = True reason = "exhausted" if expired: deactivated = True reason = reason or "expired" # Admin disabled flag try: if int(disabled) == 1: deactivated = True reason = "disabled" except Exception: pass active = not deactivated # Password requirement + authorization state password_required = bool(password_hash) authorized = False try: ia = request.session.get("inviteAuth") or {} authorized = bool(ia.get(token)) except Exception: authorized = False return JSONResponse({ "token": token, "albumId": album_id, "albumName": album_name, "name": link_name, "maxUses": max_uses, "used": used_count or 0, "remaining": remaining, "expiresAt": expires_at, "oneTime": one_time, "claimed": bool(claimed), "claimedAt": claimed_at, "expired": expired, "active": active, "inactiveReason": (None if active else (reason or "inactive")), "passwordRequired": password_required, "authorized": authorized, }) @app.post("/api/invite/{token}/auth") async def api_invite_auth(token: str, request: Request) -> JSONResponse: """Validate a password for an invite token, and mark this session authorized if valid.""" try: body = await request.json() except Exception: body = None provided = (body or {}).get("password") if isinstance(body, dict) else None try: conn = sqlite3.connect(SETTINGS.state_db) cur = conn.cursor() cur.execute("SELECT password_hash FROM invites WHERE token = ?", (token,)) row = cur.fetchone() conn.close() except Exception as e: logger.exception("Invite auth lookup error: %s", e) return JSONResponse({"error": "db_error"}, status_code=500) if not row: return JSONResponse({"error": "not_found"}, status_code=404) password_hash = row[0] if not password_hash: # No password required; mark as authorized to simplify client flow ia = request.session.get("inviteAuth") or {} ia[token] = True request.session["inviteAuth"] = ia return JSONResponse({"ok": True, "authorized": True}) # verify if not _verify_password(password_hash, provided): return JSONResponse({"error": "invalid_password"}, status_code=403) ia = request.session.get("inviteAuth") or {} ia[token] = True request.session["inviteAuth"] = ia return JSONResponse({"ok": True, "authorized": True}) @app.get("/api/qr", response_model=None) async def api_qr(request: Request): """Generate a QR code PNG for a given text (query param 'text').""" text = request.query_params.get("text") if not text: return JSONResponse({"error": "missing_text"}, status_code=400) if qrcode is None: logger.warning("qrcode library not installed; cannot generate QR") return JSONResponse({"error": "qr_not_available"}, status_code=501) import io as _io img = qrcode.make(text) buf = _io.BytesIO() img.save(buf, format="PNG") buf.seek(0) return Response(content=buf.read(), media_type="image/png") """ Note: Do not run this module directly. Use `python main.py` from project root, which starts `uvicorn app.app:app` with reload. """