upload unique links + default upload page + login
This commit is contained in:
511
app/app.py
511
app/app.py
@@ -21,22 +21,21 @@ from typing import Dict, List, Optional
|
||||
|
||||
import requests
|
||||
from requests_toolbelt.multipart.encoder import MultipartEncoder, MultipartEncoderMonitor
|
||||
import logging
|
||||
from fastapi import FastAPI, UploadFile, WebSocket, WebSocketDisconnect, Request, Form
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, RedirectResponse, Response
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from starlette.websockets import WebSocketState
|
||||
from starlette.middleware.sessions import SessionMiddleware
|
||||
from PIL import Image, ExifTags
|
||||
from dotenv import load_dotenv
|
||||
try:
|
||||
import qrcode
|
||||
except Exception:
|
||||
qrcode = None
|
||||
|
||||
from app.config import Settings, load_settings
|
||||
|
||||
# ---- Load environment / defaults ----
|
||||
load_dotenv()
|
||||
HOST = os.getenv("HOST", "127.0.0.1")
|
||||
PORT = int(os.getenv("PORT", "8080"))
|
||||
STATE_DB = os.getenv("STATE_DB", "./state.db")
|
||||
|
||||
# ---- App & static ----
|
||||
app = FastAPI(title="Immich Drop Uploader (Python)")
|
||||
app.add_middleware(
|
||||
@@ -47,12 +46,19 @@ app.add_middleware(
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
FRONTEND_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend")
|
||||
app.mount("/static", StaticFiles(directory=FRONTEND_DIR), name="static")
|
||||
|
||||
# Global settings (read-only at runtime)
|
||||
SETTINGS: Settings = load_settings()
|
||||
|
||||
# Basic logging setup using settings
|
||||
logging.basicConfig(level=SETTINGS.log_level, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
|
||||
logger = logging.getLogger("immich_drop")
|
||||
|
||||
# Cookie-based session for short-lived auth token storage (no persistence)
|
||||
app.add_middleware(SessionMiddleware, secret_key=SETTINGS.session_secret, same_site="lax")
|
||||
|
||||
FRONTEND_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend")
|
||||
app.mount("/static", StaticFiles(directory=FRONTEND_DIR), name="static")
|
||||
|
||||
# Album cache
|
||||
ALBUM_ID: Optional[str] = None
|
||||
|
||||
@@ -65,7 +71,7 @@ def reset_album_cache() -> None:
|
||||
|
||||
def db_init() -> None:
|
||||
"""Create the local SQLite table used for duplicate checks (idempotent)."""
|
||||
conn = sqlite3.connect(STATE_DB)
|
||||
conn = sqlite3.connect(SETTINGS.state_db)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
@@ -86,7 +92,7 @@ def db_init() -> None:
|
||||
|
||||
def db_lookup_checksum(checksum: str) -> Optional[dict]:
|
||||
"""Return a record for the given checksum if seen before (None if not)."""
|
||||
conn = sqlite3.connect(STATE_DB)
|
||||
conn = sqlite3.connect(SETTINGS.state_db)
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT checksum, immich_asset_id FROM uploads WHERE checksum = ?", (checksum,))
|
||||
row = cur.fetchone()
|
||||
@@ -97,7 +103,7 @@ def db_lookup_checksum(checksum: str) -> Optional[dict]:
|
||||
|
||||
def db_lookup_device_asset(device_asset_id: str) -> bool:
|
||||
"""True if a deviceAssetId has been uploaded by this service previously."""
|
||||
conn = sqlite3.connect(STATE_DB)
|
||||
conn = sqlite3.connect(SETTINGS.state_db)
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT 1 FROM uploads WHERE device_asset_id = ?", (device_asset_id,))
|
||||
row = cur.fetchone()
|
||||
@@ -106,7 +112,7 @@ def db_lookup_device_asset(device_asset_id: str) -> bool:
|
||||
|
||||
def db_insert_upload(checksum: str, filename: str, size: int, device_asset_id: str, immich_asset_id: Optional[str], created_at: str) -> None:
|
||||
"""Insert a newly-uploaded asset into the local cache (ignore on duplicates)."""
|
||||
conn = sqlite3.connect(STATE_DB)
|
||||
conn = sqlite3.connect(SETTINGS.state_db)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"INSERT OR IGNORE INTO uploads (checksum, filename, size, device_asset_id, immich_asset_id, created_at) VALUES (?,?,?,?,?,?)",
|
||||
@@ -197,67 +203,87 @@ def read_exif_datetimes(file_bytes: bytes):
|
||||
pass
|
||||
return created, modified
|
||||
|
||||
def immich_headers() -> dict:
|
||||
"""Headers for Immich API calls (keeps key server-side)."""
|
||||
return {"Accept": "application/json", "x-api-key": SETTINGS.immich_api_key}
|
||||
def immich_headers(request: Optional[Request] = None) -> dict:
|
||||
"""Headers for Immich API calls using either session access token or API key."""
|
||||
headers = {"Accept": "application/json"}
|
||||
token = None
|
||||
try:
|
||||
if request is not None:
|
||||
token = request.session.get("accessToken")
|
||||
except Exception:
|
||||
token = None
|
||||
if token:
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
elif SETTINGS.immich_api_key:
|
||||
headers["x-api-key"] = SETTINGS.immich_api_key
|
||||
return headers
|
||||
|
||||
def get_or_create_album() -> Optional[str]:
|
||||
def get_or_create_album(request: Optional[Request] = None, album_name_override: Optional[str] = None) -> Optional[str]:
|
||||
"""Get existing album by name or create a new one. Returns album ID or None."""
|
||||
global ALBUM_ID
|
||||
|
||||
album_name = album_name_override if album_name_override is not None else SETTINGS.album_name
|
||||
# Skip if no album name configured
|
||||
if not SETTINGS.album_name:
|
||||
if not album_name:
|
||||
return None
|
||||
|
||||
# Return cached album ID if already fetched
|
||||
if ALBUM_ID:
|
||||
# Return cached album ID if already fetched and using default settings name
|
||||
if album_name_override is None and ALBUM_ID:
|
||||
return ALBUM_ID
|
||||
|
||||
try:
|
||||
# First, try to find existing album
|
||||
url = f"{SETTINGS.normalized_base_url}/albums"
|
||||
r = requests.get(url, headers=immich_headers(), timeout=10)
|
||||
r = requests.get(url, headers=immich_headers(request), timeout=10)
|
||||
|
||||
if r.status_code == 200:
|
||||
albums = r.json()
|
||||
for album in albums:
|
||||
if album.get("albumName") == SETTINGS.album_name:
|
||||
ALBUM_ID = album.get("id")
|
||||
print(f"Found existing album '{SETTINGS.album_name}' with ID: {ALBUM_ID}")
|
||||
return ALBUM_ID
|
||||
if album.get("albumName") == album_name:
|
||||
found_id = album.get("id")
|
||||
if album_name_override is None:
|
||||
ALBUM_ID = found_id
|
||||
logger.info(f"Found existing album '%s' with ID: %s", album_name, ALBUM_ID)
|
||||
return ALBUM_ID
|
||||
else:
|
||||
return found_id
|
||||
|
||||
# Album doesn't exist, create it
|
||||
create_url = f"{SETTINGS.normalized_base_url}/albums"
|
||||
payload = {
|
||||
"albumName": SETTINGS.album_name,
|
||||
"albumName": album_name,
|
||||
"description": "Auto-created album for Immich Drop uploads"
|
||||
}
|
||||
r = requests.post(create_url, headers={**immich_headers(), "Content-Type": "application/json"},
|
||||
json=payload, timeout=10)
|
||||
r = requests.post(create_url, headers={**immich_headers(request), "Content-Type": "application/json"},
|
||||
json=payload, timeout=10)
|
||||
|
||||
if r.status_code in (200, 201):
|
||||
data = r.json()
|
||||
ALBUM_ID = data.get("id")
|
||||
print(f"Created new album '{SETTINGS.album_name}' with ID: {ALBUM_ID}")
|
||||
return ALBUM_ID
|
||||
new_id = data.get("id")
|
||||
if album_name_override is None:
|
||||
ALBUM_ID = new_id
|
||||
logger.info("Created new album '%s' with ID: %s", album_name, ALBUM_ID)
|
||||
return ALBUM_ID
|
||||
else:
|
||||
logger.info("Created new album '%s' with ID: %s", album_name, new_id)
|
||||
return new_id
|
||||
else:
|
||||
print(f"Failed to create album: {r.status_code} - {r.text}")
|
||||
logger.warning("Failed to create album: %s - %s", r.status_code, r.text)
|
||||
except Exception as e:
|
||||
print(f"Error managing album: {e}")
|
||||
logger.exception("Error managing album: %s", e)
|
||||
|
||||
return None
|
||||
|
||||
def add_asset_to_album(asset_id: str) -> bool:
|
||||
def add_asset_to_album(asset_id: str, request: Optional[Request] = None, album_id_override: Optional[str] = None, album_name_override: Optional[str] = None) -> bool:
|
||||
"""Add an asset to the configured album. Returns True on success."""
|
||||
album_id = get_or_create_album()
|
||||
album_id = album_id_override
|
||||
if not album_id:
|
||||
album_id = get_or_create_album(request=request, album_name_override=album_name_override)
|
||||
if not album_id or not asset_id:
|
||||
return False
|
||||
|
||||
try:
|
||||
url = f"{SETTINGS.normalized_base_url}/albums/{album_id}/assets"
|
||||
payload = {"ids": [asset_id]}
|
||||
r = requests.put(url, headers={**immich_headers(), "Content-Type": "application/json"},
|
||||
json=payload, timeout=10)
|
||||
r = requests.put(url, headers={**immich_headers(request), "Content-Type": "application/json"},
|
||||
json=payload, timeout=10)
|
||||
|
||||
if r.status_code == 200:
|
||||
results = r.json()
|
||||
@@ -270,7 +296,7 @@ def add_asset_to_album(asset_id: str) -> bool:
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Error adding asset to album: {e}")
|
||||
logger.exception("Error adding asset to album: %s", e)
|
||||
return False
|
||||
|
||||
def immich_ping() -> bool:
|
||||
@@ -312,10 +338,24 @@ async def send_progress(session_id: str, item_id: str, status: str, progress: in
|
||||
# ---------- Routes ----------
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def index(_: Request) -> HTMLResponse:
|
||||
"""Serve the SPA (frontend/index.html)."""
|
||||
async def index(request: Request) -> HTMLResponse:
|
||||
"""Serve the SPA (frontend/index.html) or redirect to login if disabled."""
|
||||
if not SETTINGS.public_upload_page_enabled:
|
||||
return RedirectResponse(url="/login")
|
||||
return FileResponse(os.path.join(FRONTEND_DIR, "index.html"))
|
||||
|
||||
@app.get("/login", response_class=HTMLResponse)
|
||||
async def login_page(_: Request) -> HTMLResponse:
|
||||
"""Serve the login page."""
|
||||
return FileResponse(os.path.join(FRONTEND_DIR, "login.html"))
|
||||
|
||||
@app.get("/menu", response_class=HTMLResponse)
|
||||
async def menu_page(request: Request) -> HTMLResponse:
|
||||
"""Serve the menu page for creating invite links. Requires login."""
|
||||
if not request.session.get("accessToken"):
|
||||
return RedirectResponse(url="/login")
|
||||
return FileResponse(os.path.join(FRONTEND_DIR, "menu.html"))
|
||||
|
||||
@app.post("/api/ping")
|
||||
async def api_ping() -> dict:
|
||||
"""Connectivity test endpoint used by the UI to display a temporary banner."""
|
||||
@@ -325,6 +365,13 @@ async def api_ping() -> dict:
|
||||
"album_name": SETTINGS.album_name if SETTINGS.album_name else None
|
||||
}
|
||||
|
||||
@app.get("/api/config")
|
||||
async def api_config() -> dict:
|
||||
"""Expose minimal public configuration flags for the frontend."""
|
||||
return {
|
||||
"public_upload_page_enabled": SETTINGS.public_upload_page_enabled,
|
||||
}
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def ws_endpoint(ws: WebSocket) -> None:
|
||||
"""WebSocket endpoint for pushing per-item upload progress."""
|
||||
@@ -358,11 +405,12 @@ async def ws_endpoint(ws: WebSocket) -> None:
|
||||
|
||||
@app.post("/api/upload")
|
||||
async def api_upload(
|
||||
_: Request,
|
||||
request: Request,
|
||||
file: UploadFile,
|
||||
item_id: str = Form(...),
|
||||
session_id: str = Form(...),
|
||||
last_modified: Optional[int] = Form(None),
|
||||
invite_token: Optional[str] = Form(None),
|
||||
):
|
||||
"""Receive a file, check duplicates, forward to Immich; stream progress via WS."""
|
||||
raw = await file.read()
|
||||
@@ -405,6 +453,65 @@ async def api_upload(
|
||||
|
||||
encoder = gen_encoder()
|
||||
|
||||
# Invite token validation (if provided)
|
||||
target_album_id: Optional[str] = None
|
||||
target_album_name: Optional[str] = None
|
||||
if invite_token:
|
||||
try:
|
||||
conn = sqlite3.connect(SETTINGS.state_db)
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_by_session FROM invites WHERE token = ?", (invite_token,))
|
||||
row = cur.fetchone()
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
logger.exception("Invite lookup error: %s", e)
|
||||
row = None
|
||||
if not row:
|
||||
await send_progress(session_id, item_id, "error", 100, "Invalid invite token")
|
||||
return JSONResponse({"error": "invalid_invite"}, status_code=403)
|
||||
_, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_by_session = row
|
||||
# Expiry check
|
||||
if expires_at:
|
||||
try:
|
||||
if datetime.utcnow() > datetime.fromisoformat(expires_at):
|
||||
await send_progress(session_id, item_id, "error", 100, "Invite expired")
|
||||
return JSONResponse({"error": "invite_expired"}, status_code=403)
|
||||
except Exception:
|
||||
pass
|
||||
# One-time claim or multi-use enforcement
|
||||
try:
|
||||
max_uses_int = int(max_uses) if max_uses is not None else -1
|
||||
except Exception:
|
||||
max_uses_int = -1
|
||||
if max_uses_int == 1:
|
||||
if claimed and claimed_by_session and claimed_by_session != session_id:
|
||||
await send_progress(session_id, item_id, "error", 100, "Invite already used")
|
||||
return JSONResponse({"error": "invite_claimed"}, status_code=403)
|
||||
# Atomically claim the one-time invite to prevent concurrent use
|
||||
try:
|
||||
connc = sqlite3.connect(SETTINGS.state_db)
|
||||
curc = connc.cursor()
|
||||
curc.execute(
|
||||
"UPDATE invites SET claimed = 1, claimed_at = CURRENT_TIMESTAMP, claimed_by_session = ? WHERE token = ? AND (claimed IS NULL OR claimed = 0)",
|
||||
(session_id, invite_token)
|
||||
)
|
||||
connc.commit()
|
||||
changed = connc.total_changes
|
||||
connc.close()
|
||||
if changed == 0 and (claimed_by_session or claimed):
|
||||
await send_progress(session_id, item_id, "error", 100, "Invite already used")
|
||||
return JSONResponse({"error": "invite_claimed"}, status_code=403)
|
||||
except Exception as e:
|
||||
logger.exception("Invite claim failed: %s", e)
|
||||
return JSONResponse({"error": "invite_claim_failed"}, status_code=500)
|
||||
else:
|
||||
# Usage check for multi-use (max_uses < 0 => indefinite)
|
||||
if (used_count or 0) >= (max_uses_int if max_uses_int >= 0 else 10**9):
|
||||
await send_progress(session_id, item_id, "error", 100, "Invite already used up")
|
||||
return JSONResponse({"error": "invite_exhausted"}, status_code=403)
|
||||
target_album_id = album_id
|
||||
target_album_name = album_name
|
||||
|
||||
async def do_upload():
|
||||
await send_progress(session_id, item_id, "uploading", 0, "Uploading…")
|
||||
sent = {"pct": 0}
|
||||
@@ -415,7 +522,7 @@ async def api_upload(
|
||||
sent["pct"] = pct
|
||||
asyncio.create_task(send_progress(session_id, item_id, "uploading", pct))
|
||||
monitor = MultipartEncoderMonitor(encoder, cb)
|
||||
headers = {"Accept": "application/json", "Content-Type": monitor.content_type, "x-immich-checksum": checksum, **immich_headers()}
|
||||
headers = {"Accept": "application/json", "Content-Type": monitor.content_type, "x-immich-checksum": checksum, **immich_headers(request)}
|
||||
try:
|
||||
r = requests.post(f"{SETTINGS.normalized_base_url}/assets", headers=headers, data=monitor, timeout=120)
|
||||
if r.status_code in (200, 201):
|
||||
@@ -424,12 +531,40 @@ async def api_upload(
|
||||
db_insert_upload(checksum, file.filename, size, device_asset_id, asset_id, created_iso)
|
||||
status = data.get("status", "created")
|
||||
|
||||
# Add to album if configured
|
||||
if SETTINGS.album_name and asset_id:
|
||||
if add_asset_to_album(asset_id):
|
||||
status += f" (added to album '{SETTINGS.album_name}')"
|
||||
|
||||
# Add to album if configured (invite overrides .env)
|
||||
if asset_id:
|
||||
added = False
|
||||
if invite_token:
|
||||
added = add_asset_to_album(asset_id, request=request, album_id_override=target_album_id, album_name_override=target_album_name)
|
||||
if added:
|
||||
status += f" (added to album '{target_album_name or target_album_id}')"
|
||||
elif SETTINGS.album_name:
|
||||
if add_asset_to_album(asset_id, request=request):
|
||||
status += f" (added to album '{SETTINGS.album_name}')"
|
||||
|
||||
await send_progress(session_id, item_id, "duplicate" if status == "duplicate" else "done", 100, status, asset_id)
|
||||
|
||||
# Increment invite usage on success
|
||||
if invite_token:
|
||||
try:
|
||||
conn2 = sqlite3.connect(SETTINGS.state_db)
|
||||
cur2 = conn2.cursor()
|
||||
# Keep one-time used_count at 1; multi-use increments per asset
|
||||
cur2.execute("SELECT max_uses FROM invites WHERE token = ?", (invite_token,))
|
||||
row_mu = cur2.fetchone()
|
||||
mx = None
|
||||
try:
|
||||
mx = int(row_mu[0]) if row_mu and row_mu[0] is not None else None
|
||||
except Exception:
|
||||
mx = None
|
||||
if mx == 1:
|
||||
cur2.execute("UPDATE invites SET used_count = 1 WHERE token = ?", (invite_token,))
|
||||
else:
|
||||
cur2.execute("UPDATE invites SET used_count = used_count + 1 WHERE token = ?", (invite_token,))
|
||||
conn2.commit()
|
||||
conn2.close()
|
||||
except Exception as e:
|
||||
logger.exception("Failed to increment invite usage: %s", e)
|
||||
return JSONResponse({"id": asset_id, "status": status}, status_code=200)
|
||||
else:
|
||||
try:
|
||||
@@ -450,6 +585,278 @@ async def api_album_reset() -> dict:
|
||||
reset_album_cache()
|
||||
return {"ok": True}
|
||||
|
||||
# ---------- Auth & Albums & Invites APIs ----------
|
||||
|
||||
@app.post("/api/login")
|
||||
async def api_login(request: Request) -> JSONResponse:
|
||||
"""Authenticate against Immich using email/password; store token in session."""
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return JSONResponse({"error": "invalid_json"}, status_code=400)
|
||||
email = (body or {}).get("email")
|
||||
password = (body or {}).get("password")
|
||||
if not email or not password:
|
||||
return JSONResponse({"error": "missing_credentials"}, status_code=400)
|
||||
try:
|
||||
r = requests.post(f"{SETTINGS.normalized_base_url}/auth/login", headers={"Content-Type": "application/json", "Accept": "application/json"}, json={"email": email, "password": password}, timeout=15)
|
||||
except Exception as e:
|
||||
logger.exception("Login request failed: %s", e)
|
||||
return JSONResponse({"error": "login_failed"}, status_code=502)
|
||||
if r.status_code not in (200, 201):
|
||||
logger.warning("Auth rejected: %s - %s", r.status_code, r.text)
|
||||
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
||||
data = r.json() if r.content else {}
|
||||
token = data.get("accessToken")
|
||||
if not token:
|
||||
logger.warning("Auth response missing accessToken")
|
||||
return JSONResponse({"error": "invalid_response"}, status_code=502)
|
||||
# Store only token and basic info in cookie session
|
||||
request.session.update({
|
||||
"accessToken": token,
|
||||
"userEmail": data.get("userEmail"),
|
||||
"userId": data.get("userId"),
|
||||
"name": data.get("name"),
|
||||
"isAdmin": data.get("isAdmin", False),
|
||||
})
|
||||
logger.info("User %s logged in", data.get("userEmail"))
|
||||
return JSONResponse({"ok": True, **{k: data.get(k) for k in ("userEmail","userId","name","isAdmin")}})
|
||||
|
||||
@app.post("/api/logout")
|
||||
async def api_logout(request: Request) -> dict:
|
||||
request.session.clear()
|
||||
return {"ok": True}
|
||||
|
||||
@app.get("/logout")
|
||||
async def logout_get(request: Request) -> RedirectResponse:
|
||||
request.session.clear()
|
||||
return RedirectResponse(url="/login")
|
||||
|
||||
@app.get("/api/albums")
|
||||
async def api_albums(request: Request) -> JSONResponse:
|
||||
"""Return list of albums if authorized; logs on 401/403."""
|
||||
try:
|
||||
r = requests.get(f"{SETTINGS.normalized_base_url}/albums", headers=immich_headers(request), timeout=10)
|
||||
except Exception as e:
|
||||
logger.exception("Albums request failed: %s", e)
|
||||
return JSONResponse({"error": "request_failed"}, status_code=502)
|
||||
if r.status_code == 200:
|
||||
return JSONResponse(r.json())
|
||||
if r.status_code in (401, 403):
|
||||
logger.warning("Album list not allowed: %s - %s", r.status_code, r.text)
|
||||
return JSONResponse({"error": "forbidden"}, status_code=403)
|
||||
return JSONResponse({"error": "unexpected_status", "status": r.status_code}, status_code=502)
|
||||
|
||||
@app.post("/api/albums")
|
||||
async def api_albums_create(request: Request) -> JSONResponse:
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return JSONResponse({"error": "invalid_json"}, status_code=400)
|
||||
name = (body or {}).get("name")
|
||||
if not name:
|
||||
return JSONResponse({"error": "missing_name"}, status_code=400)
|
||||
try:
|
||||
r = requests.post(f"{SETTINGS.normalized_base_url}/albums", headers={**immich_headers(request), "Content-Type": "application/json"}, json={"albumName": name}, timeout=10)
|
||||
except Exception as e:
|
||||
logger.exception("Create album failed: %s", e)
|
||||
return JSONResponse({"error": "request_failed"}, status_code=502)
|
||||
if r.status_code in (200, 201):
|
||||
return JSONResponse(r.json(), status_code=201)
|
||||
if r.status_code in (401, 403):
|
||||
logger.warning("Create album forbidden: %s - %s", r.status_code, r.text)
|
||||
return JSONResponse({"error": "forbidden"}, status_code=403)
|
||||
return JSONResponse({"error": "unexpected_status", "status": r.status_code, "body": r.text}, status_code=502)
|
||||
|
||||
# ---------- Invites (one-time/expiring links) ----------
|
||||
|
||||
def ensure_invites_table() -> None:
|
||||
try:
|
||||
conn = sqlite3.connect(SETTINGS.state_db)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS invites (
|
||||
token TEXT PRIMARY KEY,
|
||||
album_id TEXT,
|
||||
album_name TEXT,
|
||||
max_uses INTEGER DEFAULT 1,
|
||||
used_count INTEGER DEFAULT 0,
|
||||
expires_at TEXT,
|
||||
created_at TEXT DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
"""
|
||||
)
|
||||
# Attempt to add new columns for claiming semantics
|
||||
try:
|
||||
cur.execute("ALTER TABLE invites ADD COLUMN claimed INTEGER DEFAULT 0")
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
cur.execute("ALTER TABLE invites ADD COLUMN claimed_at TEXT")
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
cur.execute("ALTER TABLE invites ADD COLUMN claimed_by_session TEXT")
|
||||
except Exception:
|
||||
pass
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
logger.exception("Failed to ensure invites table: %s", e)
|
||||
|
||||
ensure_invites_table()
|
||||
|
||||
@app.post("/api/invites")
|
||||
async def api_invites_create(request: Request) -> JSONResponse:
|
||||
"""Create an invite link for uploads with optional expiry and max uses."""
|
||||
# Require a logged-in session to create invites
|
||||
if not request.session.get("accessToken"):
|
||||
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return JSONResponse({"error": "invalid_json"}, status_code=400)
|
||||
album_id = (body or {}).get("albumId")
|
||||
album_name = (body or {}).get("albumName")
|
||||
max_uses = (body or {}).get("maxUses", 1)
|
||||
expires_days = (body or {}).get("expiresDays")
|
||||
# Normalize max_uses
|
||||
try:
|
||||
max_uses = int(max_uses)
|
||||
except Exception:
|
||||
max_uses = 1
|
||||
if not album_id and not album_name and not SETTINGS.album_name:
|
||||
return JSONResponse({"error": "missing_album"}, status_code=400)
|
||||
if not album_name and SETTINGS.album_name:
|
||||
album_name = SETTINGS.album_name
|
||||
# If only album_name provided, resolve or create now to fix to an ID
|
||||
resolved_album_id = None
|
||||
if not album_id and album_name:
|
||||
resolved_album_id = get_or_create_album(request=request, album_name_override=album_name)
|
||||
else:
|
||||
resolved_album_id = album_id
|
||||
# Compute expiry
|
||||
expires_at = None
|
||||
if expires_days is not None:
|
||||
try:
|
||||
days = int(expires_days)
|
||||
expires_at = (datetime.utcnow()).replace(microsecond=0).isoformat()
|
||||
# Use timedelta
|
||||
from datetime import timedelta
|
||||
expires_at = (datetime.utcnow() + timedelta(days=days)).replace(microsecond=0).isoformat()
|
||||
except Exception:
|
||||
expires_at = None
|
||||
# Generate token
|
||||
import uuid
|
||||
token = uuid.uuid4().hex
|
||||
try:
|
||||
conn = sqlite3.connect(SETTINGS.state_db)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"INSERT INTO invites (token, album_id, album_name, max_uses, expires_at) VALUES (?,?,?,?,?)",
|
||||
(token, resolved_album_id, album_name, max_uses, expires_at)
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
logger.exception("Failed to create invite: %s", e)
|
||||
return JSONResponse({"error": "db_error"}, status_code=500)
|
||||
# Build absolute URL using PUBLIC_BASE_URL if set, else request base
|
||||
try:
|
||||
base_url = SETTINGS.public_base_url.strip().rstrip('/') if SETTINGS.public_base_url else str(request.base_url).rstrip('/')
|
||||
except Exception:
|
||||
base_url = str(request.base_url).rstrip('/')
|
||||
absolute = f"{base_url}/invite/{token}"
|
||||
return JSONResponse({
|
||||
"ok": True,
|
||||
"token": token,
|
||||
"url": f"/invite/{token}",
|
||||
"absoluteUrl": absolute,
|
||||
"albumId": resolved_album_id,
|
||||
"albumName": album_name,
|
||||
"maxUses": max_uses,
|
||||
"expiresAt": expires_at
|
||||
})
|
||||
|
||||
@app.get("/invite/{token}", response_class=HTMLResponse)
|
||||
async def invite_page(token: str, request: Request) -> HTMLResponse:
|
||||
# If public invites disabled and no user session, require login
|
||||
#if not request.session.get("accessToken"):
|
||||
# return RedirectResponse(url="/login")
|
||||
return FileResponse(os.path.join(FRONTEND_DIR, "invite.html"))
|
||||
|
||||
@app.get("/api/invite/{token}")
|
||||
async def api_invite_info(token: str) -> JSONResponse:
|
||||
try:
|
||||
conn = sqlite3.connect(SETTINGS.state_db)
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_at FROM invites WHERE token = ?", (token,))
|
||||
row = cur.fetchone()
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
logger.exception("Invite info error: %s", e)
|
||||
return JSONResponse({"error": "db_error"}, status_code=500)
|
||||
if not row:
|
||||
return JSONResponse({"error": "not_found"}, status_code=404)
|
||||
_, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_at = row
|
||||
# compute remaining
|
||||
remaining = None
|
||||
try:
|
||||
if max_uses is not None and int(max_uses) >= 0:
|
||||
remaining = int(max_uses) - int(used_count or 0)
|
||||
except Exception:
|
||||
remaining = None
|
||||
# compute state flags
|
||||
try:
|
||||
one_time = (int(max_uses) == 1)
|
||||
except Exception:
|
||||
one_time = False
|
||||
expired = False
|
||||
if expires_at:
|
||||
try:
|
||||
expired = datetime.utcnow() > datetime.fromisoformat(expires_at)
|
||||
except Exception:
|
||||
expired = False
|
||||
deactivated = False
|
||||
if one_time and claimed:
|
||||
deactivated = True
|
||||
elif remaining is not None and remaining <= 0:
|
||||
deactivated = True
|
||||
if expired:
|
||||
deactivated = True
|
||||
active = not deactivated
|
||||
return JSONResponse({
|
||||
"token": token,
|
||||
"albumId": album_id,
|
||||
"albumName": album_name,
|
||||
"maxUses": max_uses,
|
||||
"used": used_count or 0,
|
||||
"remaining": remaining,
|
||||
"expiresAt": expires_at,
|
||||
"oneTime": one_time,
|
||||
"claimed": bool(claimed),
|
||||
"claimedAt": claimed_at,
|
||||
"expired": expired,
|
||||
"active": active,
|
||||
})
|
||||
|
||||
@app.get("/api/qr", response_model=None)
|
||||
async def api_qr(request: Request):
|
||||
"""Generate a QR code PNG for a given text (query param 'text')."""
|
||||
text = request.query_params.get("text")
|
||||
if not text:
|
||||
return JSONResponse({"error": "missing_text"}, status_code=400)
|
||||
if qrcode is None:
|
||||
logger.warning("qrcode library not installed; cannot generate QR")
|
||||
return JSONResponse({"error": "qr_not_available"}, status_code=501)
|
||||
import io as _io
|
||||
img = qrcode.make(text)
|
||||
buf = _io.BytesIO()
|
||||
img.save(buf, format="PNG")
|
||||
buf.seek(0)
|
||||
return Response(content=buf.read(), media_type="image/png")
|
||||
|
||||
"""
|
||||
Note: Do not run this module directly. Use `python main.py` from
|
||||
project root, which starts `uvicorn app.app:app` with reload.
|
||||
|
||||
@@ -6,6 +6,8 @@ Reads ONLY from .env; there is NO runtime mutation from the UI.
|
||||
from __future__ import annotations
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
import secrets
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -15,6 +17,11 @@ class Settings:
|
||||
immich_api_key: str
|
||||
max_concurrent: int = 3
|
||||
album_name: str = ""
|
||||
public_upload_page_enabled: bool = False
|
||||
public_base_url: str = ""
|
||||
state_db: str = "./state.db"
|
||||
session_secret: str = ""
|
||||
log_level: str = "INFO"
|
||||
|
||||
@property
|
||||
def normalized_base_url(self) -> str:
|
||||
@@ -23,11 +30,35 @@ class Settings:
|
||||
|
||||
def load_settings() -> Settings:
|
||||
"""Load settings from .env, applying defaults when absent."""
|
||||
# Load environment variables from .env once here so importers don’t have to
|
||||
try:
|
||||
load_dotenv()
|
||||
except Exception:
|
||||
pass
|
||||
base = os.getenv("IMMICH_BASE_URL", "http://127.0.0.1:2283/api")
|
||||
api_key = os.getenv("IMMICH_API_KEY", "")
|
||||
album_name = os.getenv("IMMICH_ALBUM_NAME", "")
|
||||
# Safe defaults: disable public uploader and invites unless explicitly enabled
|
||||
def as_bool(v: str, default: bool = False) -> bool:
|
||||
if v is None:
|
||||
return default
|
||||
return str(v).strip().lower() in {"1","true","yes","on"}
|
||||
public_upload = as_bool(os.getenv("PUBLIC_UPLOAD_PAGE_ENABLED", "false"), False)
|
||||
try:
|
||||
maxc = int(os.getenv("MAX_CONCURRENT", "3"))
|
||||
except ValueError:
|
||||
maxc = 3
|
||||
return Settings(immich_base_url=base, immich_api_key=api_key, max_concurrent=maxc, album_name=album_name)
|
||||
state_db = os.getenv("STATE_DB", "./state.db")
|
||||
session_secret = os.getenv("SESSION_SECRET") or secrets.token_hex(32)
|
||||
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
return Settings(
|
||||
immich_base_url=base,
|
||||
immich_api_key=api_key,
|
||||
max_concurrent=maxc,
|
||||
album_name=album_name,
|
||||
public_upload_page_enabled=public_upload,
|
||||
public_base_url=os.getenv("PUBLIC_BASE_URL", ""),
|
||||
state_db=state_db,
|
||||
session_secret=session_secret,
|
||||
log_level=log_level,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user