From ebd120b2cdb7eaa88c9fee3a258b778c635c6282 Mon Sep 17 00:00:00 2001 From: Tanner Collin Date: Sat, 22 Nov 2025 20:50:19 -0700 Subject: [PATCH] feat: Implement hashed admin password support and centralize password logic Co-authored-by: aider (gemini/gemini-2.5-pro) --- app/app.py | 78 ++++++++++++++++++++++++++------------------------- app/config.py | 22 +++++++++++++++ 2 files changed, 62 insertions(+), 38 deletions(-) diff --git a/app/app.py b/app/app.py index 502eddf..43ed803 100644 --- a/app/app.py +++ b/app/app.py @@ -16,6 +16,7 @@ import json import hashlib import os import sqlite3 +import binascii from datetime import datetime from typing import Dict, List, Optional @@ -223,6 +224,34 @@ def read_exif_datetimes(file_bytes: bytes): pass return created, modified + +def _hash_password(pw: str) -> str: + """Return PBKDF2-SHA256 hash of a password.""" + try: + if not pw: + return "" + salt = os.urandom(16) + iterations = 200_000 + dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations) + return f"pbkdf2_sha256${iterations}${binascii.hexlify(salt).decode()}${binascii.hexlify(dk).decode()}" + except Exception: + return "" + +def _verify_password(stored: str, pw: Optional[str]) -> bool: + """Verify a password against a PBKDF2-SHA256 hash.""" + if not pw or not stored: + return False + try: + algo, iter_s, salt_hex, hash_hex = stored.split("$") + if algo != 'pbkdf2_sha256': + return False + iterations = int(iter_s) + salt = binascii.unhexlify(salt_hex) + dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations) + return binascii.hexlify(dk).decode() == hash_hex + except Exception: + return False + def get_or_create_album_dir(album_name: str) -> str: """Get or create a directory for an album. Returns the path.""" if not album_name or not isinstance(album_name, str): @@ -858,7 +887,14 @@ async def api_login(request: Request) -> JSONResponse: if not email or not password: return JSONResponse({"error": "missing_credentials"}, status_code=400) - if email == "admin" and password == SETTINGS.admin_password: + stored_password = SETTINGS.admin_password + password_ok = False + if stored_password.startswith("pbkdf2_sha256$"): + password_ok = _verify_password(stored_password, password) + else: + password_ok = (password == stored_password) + + if email == "admin" and password_ok: user_info = { "accessToken": "local_admin_session", # dummy value "userEmail": "admin", @@ -1028,19 +1064,7 @@ async def api_invites_create(request: Request) -> JSONResponse: import uuid token = uuid.uuid4().hex # Prepare password hash, if provided - def hash_password(pw: str) -> str: - try: - if not pw: - return "" - import os as _os - import binascii as _binascii - salt = _os.urandom(16) - iterations = 200_000 - dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations) - return f"pbkdf2_sha256${iterations}${_binascii.hexlify(salt).decode()}${_binascii.hexlify(dk).decode()}" - except Exception: - return "" - pw_hash = hash_password(invite_password or "") if (invite_password and str(invite_password).strip()) else None + pw_hash = _hash_password(invite_password or "") if (invite_password and str(invite_password).strip()) else None # Owner info from session owner_user_id = str(request.session.get("userId") or "") owner_email = str(request.session.get("userEmail") or "") @@ -1235,16 +1259,8 @@ async def api_invite_update(token: str, request: Request) -> JSONResponse: if "password" in (body or {}): pw = str((body or {}).get("password") or "").strip() if pw: - # Reuse hasher from above - def _hash_pw(pw: str) -> str: - import os as _os - import binascii as _binascii - salt = _os.urandom(16) - iterations = 200_000 - dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations) - return f"pbkdf2_sha256${iterations}${_binascii.hexlify(salt).decode()}${_binascii.hexlify(dk).decode()}" fields.append("password_hash = ?") - params.append(_hash_pw(pw)) + params.append(_hash_password(pw)) else: fields.append("password_hash = NULL") # Reset usage @@ -1488,21 +1504,7 @@ async def api_invite_auth(token: str, request: Request) -> JSONResponse: request.session["inviteAuth"] = ia return JSONResponse({"ok": True, "authorized": True}) # verify - def verify_password(stored: str, pw: Optional[str]) -> bool: - if not pw: - return False - try: - algo, iter_s, salt_hex, hash_hex = stored.split("$") - if algo != 'pbkdf2_sha256': - return False - iterations = int(iter_s) - import binascii as _binascii - salt = _binascii.unhexlify(salt_hex) - dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations) - return _binascii.hexlify(dk).decode() == hash_hex - except Exception: - return False - if not verify_password(password_hash, provided): + if not _verify_password(password_hash, provided): return JSONResponse({"error": "invalid_password"}, status_code=403) ia = request.session.get("inviteAuth") or {} ia[token] = True diff --git a/app/config.py b/app/config.py index e99d473..a5ea9b4 100644 --- a/app/config.py +++ b/app/config.py @@ -8,6 +8,8 @@ import os from dataclasses import dataclass import secrets from dotenv import load_dotenv +import hashlib +import binascii @dataclass @@ -23,6 +25,18 @@ class Settings: chunked_uploads_enabled: bool = False chunk_size_mb: int = 95 +def _hash_password(pw: str) -> str: + """Return PBKDF2-SHA256 hash of a password.""" + try: + if not pw: + return "" + salt = os.urandom(16) + iterations = 200_000 + dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations) + return f"pbkdf2_sha256${iterations}${binascii.hexlify(salt).decode()}${binascii.hexlify(dk).decode()}" + except Exception: + return "" + def load_settings() -> Settings: """Load settings from .env, applying defaults when absent.""" # Load environment variables from .env once here so importers don’t have to @@ -31,6 +45,14 @@ def load_settings() -> Settings: except Exception: pass admin_password = os.getenv("ADMIN_PASSWORD", "admin") # Default for convenience, should be changed + if not admin_password.startswith("pbkdf2_sha256$"): + print("="*60) + print("WARNING: ADMIN_PASSWORD is in plaintext.") + print("For better security, use the hashed password below in your .env file:") + hashed_pw = _hash_password(admin_password) + if hashed_pw: + print(f"ADMIN_PASSWORD={hashed_pw}") + print("="*60) # Safe defaults: disable public uploader and invites unless explicitly enabled def as_bool(v: str, default: bool = False) -> bool: if v is None: