feat: Add admin file viewer with thumbnail generation and gallery
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
This commit is contained in:
157
app/app.py
157
app/app.py
@@ -18,6 +18,8 @@ import hashlib
|
||||
import os
|
||||
import sqlite3
|
||||
import binascii
|
||||
import base64
|
||||
import mimetypes
|
||||
import pytz
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
@@ -1228,6 +1230,161 @@ async def api_albums_create(request: Request) -> JSONResponse:
|
||||
logger.exception("Create album directory failed: %s", e)
|
||||
return JSONResponse({"error": "create_album_failed"}, status_code=500)
|
||||
|
||||
|
||||
# ---------- File Viewer APIs ----------
|
||||
UPLOAD_ROOT = "./data/uploads"
|
||||
|
||||
def _is_safe_path(base, path):
|
||||
"""Check that resolved path is under base path."""
|
||||
try:
|
||||
# After resolving symlinks, the path must be inside the base.
|
||||
return os.path.realpath(path).startswith(os.path.realpath(base))
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@app.get("/api/files/dirs")
|
||||
async def api_files_dirs(request: Request):
|
||||
if not request.session.get("accessToken"):
|
||||
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
||||
|
||||
q = (request.query_params.get("q") or "").strip().lower()
|
||||
sort = (request.query_params.get("sort") or "-modified").strip()
|
||||
|
||||
dirs = []
|
||||
try:
|
||||
os.makedirs(UPLOAD_ROOT, exist_ok=True)
|
||||
for root, _, filenames in os.walk(UPLOAD_ROOT):
|
||||
if not filenames:
|
||||
continue
|
||||
|
||||
dir_path = root
|
||||
rel_path = os.path.relpath(dir_path, UPLOAD_ROOT)
|
||||
|
||||
if rel_path == '.':
|
||||
continue
|
||||
|
||||
if q and q not in rel_path.lower():
|
||||
continue
|
||||
|
||||
try:
|
||||
total_size = sum(os.path.getsize(os.path.join(dir_path, f)) for f in filenames)
|
||||
file_mtimes = [os.path.getmtime(os.path.join(dir_path, f)) for f in filenames]
|
||||
last_modified = max(file_mtimes) if file_mtimes else os.path.getmtime(dir_path)
|
||||
|
||||
dirs.append({
|
||||
"path": rel_path,
|
||||
"path_b64": base64.urlsafe_b64encode(rel_path.encode()).decode(),
|
||||
"name": os.path.basename(dir_path),
|
||||
"file_count": len(filenames),
|
||||
"total_size": total_size,
|
||||
"modified": last_modified
|
||||
})
|
||||
except Exception:
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.exception("Failed to list directories: %s", e)
|
||||
return JSONResponse({"error": "server_error"}, status_code=500)
|
||||
|
||||
# Sort
|
||||
reverse = sort.startswith('-')
|
||||
sort_field = sort.lstrip('-+')
|
||||
if sort_field not in ('name', 'file_count', 'total_size', 'modified'):
|
||||
sort_field = 'modified'
|
||||
|
||||
dirs.sort(key=lambda x: x.get(sort_field, 0), reverse=reverse)
|
||||
|
||||
for d in dirs:
|
||||
d['modified'] = datetime.fromtimestamp(d['modified']).isoformat()
|
||||
|
||||
return JSONResponse({"items": dirs})
|
||||
|
||||
|
||||
@app.get("/api/files/list/{path_b64}")
|
||||
async def api_files_list(path_b64: str, request: Request):
|
||||
if not request.session.get("accessToken"):
|
||||
return JSONResponse({"error": "unauthorized"}, status_code=401)
|
||||
|
||||
try:
|
||||
rel_path = base64.urlsafe_b64decode(path_b64).decode()
|
||||
dir_path = os.path.join(UPLOAD_ROOT, rel_path)
|
||||
except Exception:
|
||||
return JSONResponse({"error": "invalid_path"}, status_code=400)
|
||||
|
||||
if not _is_safe_path(UPLOAD_ROOT, dir_path) or not os.path.isdir(dir_path):
|
||||
return JSONResponse({"error": "not_found"}, status_code=404)
|
||||
|
||||
files = []
|
||||
try:
|
||||
for filename in sorted(os.listdir(dir_path)):
|
||||
file_path = os.path.join(dir_path, filename)
|
||||
if os.path.isfile(file_path):
|
||||
mime_type, _ = mimetypes.guess_type(filename)
|
||||
is_image = bool(mime_type and mime_type.startswith('image/'))
|
||||
rel_file_path = os.path.relpath(file_path, UPLOAD_ROOT)
|
||||
files.append({
|
||||
"name": filename,
|
||||
"path_b64": base64.urlsafe_b64encode(rel_file_path.encode()).decode(),
|
||||
"size": os.path.getsize(file_path),
|
||||
"modified": datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat(),
|
||||
"is_image": is_image,
|
||||
})
|
||||
except Exception as e:
|
||||
logger.exception("Failed to list files: %s", e)
|
||||
return JSONResponse({"error": "server_error"}, status_code=500)
|
||||
|
||||
return JSONResponse({"items": files})
|
||||
|
||||
|
||||
@app.get("/api/files/thumb/{path_b64}")
|
||||
async def api_files_thumb(path_b64: str, request: Request):
|
||||
if not request.session.get("accessToken"):
|
||||
return Response(status_code=401)
|
||||
|
||||
try:
|
||||
rel_path = base64.urlsafe_b64decode(path_b64).decode()
|
||||
file_path = os.path.join(UPLOAD_ROOT, rel_path)
|
||||
except Exception:
|
||||
return Response(status_code=400)
|
||||
|
||||
if not _is_safe_path(UPLOAD_ROOT, file_path) or not os.path.isfile(file_path):
|
||||
return Response(status_code=404)
|
||||
|
||||
mime_type, _ = mimetypes.guess_type(os.path.basename(file_path))
|
||||
if not mime_type or not mime_type.startswith('image/'):
|
||||
return Response(status_code=404)
|
||||
|
||||
try:
|
||||
with Image.open(file_path) as img:
|
||||
img.thumbnail((256, 256))
|
||||
buf = io.BytesIO()
|
||||
# Convert to RGB to avoid issues with saving palette-based images (e.g. some GIFs) as JPEG
|
||||
if img.mode not in ("RGB", "L"):
|
||||
img = img.convert("RGB")
|
||||
img.save(buf, format="JPEG", quality=85)
|
||||
buf.seek(0)
|
||||
return Response(content=buf.read(), media_type="image/jpeg")
|
||||
except Exception as e:
|
||||
logger.warning("Failed to generate thumbnail for %s: %s", file_path, e)
|
||||
return Response(status_code=500)
|
||||
|
||||
|
||||
@app.get("/api/files/full/{path_b64}")
|
||||
async def api_files_full(path_b64: str, request: Request):
|
||||
if not request.session.get("accessToken"):
|
||||
return Response(status_code=401)
|
||||
|
||||
try:
|
||||
rel_path = base64.urlsafe_b64decode(path_b64).decode()
|
||||
file_path = os.path.join(UPLOAD_ROOT, rel_path)
|
||||
except Exception:
|
||||
return Response(status_code=400)
|
||||
|
||||
if not _is_safe_path(UPLOAD_ROOT, file_path) or not os.path.isfile(file_path):
|
||||
return Response(status_code=404)
|
||||
|
||||
return FileResponse(file_path)
|
||||
|
||||
|
||||
# ---------- Invites (one-time/expiring links) ----------
|
||||
|
||||
def ensure_invites_table() -> None:
|
||||
|
||||
Reference in New Issue
Block a user