feat: Implement hashed admin password support and centralize password logic

Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
This commit is contained in:
2025-11-22 20:50:19 -07:00
parent 7b1e3da8b0
commit ebd120b2cd
2 changed files with 62 additions and 38 deletions

View File

@@ -16,6 +16,7 @@ import json
import hashlib import hashlib
import os import os
import sqlite3 import sqlite3
import binascii
from datetime import datetime from datetime import datetime
from typing import Dict, List, Optional from typing import Dict, List, Optional
@@ -223,6 +224,34 @@ def read_exif_datetimes(file_bytes: bytes):
pass pass
return created, modified return created, modified
def _hash_password(pw: str) -> str:
"""Return PBKDF2-SHA256 hash of a password."""
try:
if not pw:
return ""
salt = os.urandom(16)
iterations = 200_000
dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations)
return f"pbkdf2_sha256${iterations}${binascii.hexlify(salt).decode()}${binascii.hexlify(dk).decode()}"
except Exception:
return ""
def _verify_password(stored: str, pw: Optional[str]) -> bool:
"""Verify a password against a PBKDF2-SHA256 hash."""
if not pw or not stored:
return False
try:
algo, iter_s, salt_hex, hash_hex = stored.split("$")
if algo != 'pbkdf2_sha256':
return False
iterations = int(iter_s)
salt = binascii.unhexlify(salt_hex)
dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations)
return binascii.hexlify(dk).decode() == hash_hex
except Exception:
return False
def get_or_create_album_dir(album_name: str) -> str: def get_or_create_album_dir(album_name: str) -> str:
"""Get or create a directory for an album. Returns the path.""" """Get or create a directory for an album. Returns the path."""
if not album_name or not isinstance(album_name, str): if not album_name or not isinstance(album_name, str):
@@ -858,7 +887,14 @@ async def api_login(request: Request) -> JSONResponse:
if not email or not password: if not email or not password:
return JSONResponse({"error": "missing_credentials"}, status_code=400) return JSONResponse({"error": "missing_credentials"}, status_code=400)
if email == "admin" and password == SETTINGS.admin_password: stored_password = SETTINGS.admin_password
password_ok = False
if stored_password.startswith("pbkdf2_sha256$"):
password_ok = _verify_password(stored_password, password)
else:
password_ok = (password == stored_password)
if email == "admin" and password_ok:
user_info = { user_info = {
"accessToken": "local_admin_session", # dummy value "accessToken": "local_admin_session", # dummy value
"userEmail": "admin", "userEmail": "admin",
@@ -1028,19 +1064,7 @@ async def api_invites_create(request: Request) -> JSONResponse:
import uuid import uuid
token = uuid.uuid4().hex token = uuid.uuid4().hex
# Prepare password hash, if provided # Prepare password hash, if provided
def hash_password(pw: str) -> str: pw_hash = _hash_password(invite_password or "") if (invite_password and str(invite_password).strip()) else None
try:
if not pw:
return ""
import os as _os
import binascii as _binascii
salt = _os.urandom(16)
iterations = 200_000
dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations)
return f"pbkdf2_sha256${iterations}${_binascii.hexlify(salt).decode()}${_binascii.hexlify(dk).decode()}"
except Exception:
return ""
pw_hash = hash_password(invite_password or "") if (invite_password and str(invite_password).strip()) else None
# Owner info from session # Owner info from session
owner_user_id = str(request.session.get("userId") or "") owner_user_id = str(request.session.get("userId") or "")
owner_email = str(request.session.get("userEmail") or "") owner_email = str(request.session.get("userEmail") or "")
@@ -1235,16 +1259,8 @@ async def api_invite_update(token: str, request: Request) -> JSONResponse:
if "password" in (body or {}): if "password" in (body or {}):
pw = str((body or {}).get("password") or "").strip() pw = str((body or {}).get("password") or "").strip()
if pw: if pw:
# Reuse hasher from above
def _hash_pw(pw: str) -> str:
import os as _os
import binascii as _binascii
salt = _os.urandom(16)
iterations = 200_000
dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations)
return f"pbkdf2_sha256${iterations}${_binascii.hexlify(salt).decode()}${_binascii.hexlify(dk).decode()}"
fields.append("password_hash = ?") fields.append("password_hash = ?")
params.append(_hash_pw(pw)) params.append(_hash_password(pw))
else: else:
fields.append("password_hash = NULL") fields.append("password_hash = NULL")
# Reset usage # Reset usage
@@ -1488,21 +1504,7 @@ async def api_invite_auth(token: str, request: Request) -> JSONResponse:
request.session["inviteAuth"] = ia request.session["inviteAuth"] = ia
return JSONResponse({"ok": True, "authorized": True}) return JSONResponse({"ok": True, "authorized": True})
# verify # verify
def verify_password(stored: str, pw: Optional[str]) -> bool: if not _verify_password(password_hash, provided):
if not pw:
return False
try:
algo, iter_s, salt_hex, hash_hex = stored.split("$")
if algo != 'pbkdf2_sha256':
return False
iterations = int(iter_s)
import binascii as _binascii
salt = _binascii.unhexlify(salt_hex)
dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations)
return _binascii.hexlify(dk).decode() == hash_hex
except Exception:
return False
if not verify_password(password_hash, provided):
return JSONResponse({"error": "invalid_password"}, status_code=403) return JSONResponse({"error": "invalid_password"}, status_code=403)
ia = request.session.get("inviteAuth") or {} ia = request.session.get("inviteAuth") or {}
ia[token] = True ia[token] = True

View File

@@ -8,6 +8,8 @@ import os
from dataclasses import dataclass from dataclasses import dataclass
import secrets import secrets
from dotenv import load_dotenv from dotenv import load_dotenv
import hashlib
import binascii
@dataclass @dataclass
@@ -23,6 +25,18 @@ class Settings:
chunked_uploads_enabled: bool = False chunked_uploads_enabled: bool = False
chunk_size_mb: int = 95 chunk_size_mb: int = 95
def _hash_password(pw: str) -> str:
"""Return PBKDF2-SHA256 hash of a password."""
try:
if not pw:
return ""
salt = os.urandom(16)
iterations = 200_000
dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations)
return f"pbkdf2_sha256${iterations}${binascii.hexlify(salt).decode()}${binascii.hexlify(dk).decode()}"
except Exception:
return ""
def load_settings() -> Settings: def load_settings() -> Settings:
"""Load settings from .env, applying defaults when absent.""" """Load settings from .env, applying defaults when absent."""
# Load environment variables from .env once here so importers dont have to # Load environment variables from .env once here so importers dont have to
@@ -31,6 +45,14 @@ def load_settings() -> Settings:
except Exception: except Exception:
pass pass
admin_password = os.getenv("ADMIN_PASSWORD", "admin") # Default for convenience, should be changed admin_password = os.getenv("ADMIN_PASSWORD", "admin") # Default for convenience, should be changed
if not admin_password.startswith("pbkdf2_sha256$"):
print("="*60)
print("WARNING: ADMIN_PASSWORD is in plaintext.")
print("For better security, use the hashed password below in your .env file:")
hashed_pw = _hash_password(admin_password)
if hashed_pw:
print(f"ADMIN_PASSWORD={hashed_pw}")
print("="*60)
# Safe defaults: disable public uploader and invites unless explicitly enabled # Safe defaults: disable public uploader and invites unless explicitly enabled
def as_bool(v: str, default: bool = False) -> bool: def as_bool(v: str, default: bool = False) -> bool:
if v is None: if v is None: