chunks enabled.

This commit is contained in:
MEGASOL\simon.adams
2025-09-16 09:34:43 +02:00
parent 17feda0d2f
commit 69aa1c031e
7 changed files with 600 additions and 34 deletions

View File

@@ -59,6 +59,13 @@ app.add_middleware(SessionMiddleware, secret_key=SETTINGS.session_secret, same_s
FRONTEND_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend")
app.mount("/static", StaticFiles(directory=FRONTEND_DIR), name="static")
# Chunk upload storage
CHUNK_ROOT = "/data/chunks"
try:
os.makedirs(CHUNK_ROOT, exist_ok=True)
except Exception:
pass
# Album cache
ALBUM_ID: Optional[str] = None
@@ -379,6 +386,8 @@ async def api_config() -> dict:
"""Expose minimal public configuration flags for the frontend."""
return {
"public_upload_page_enabled": SETTINGS.public_upload_page_enabled,
"chunked_uploads_enabled": SETTINGS.chunked_uploads_enabled,
"chunk_size_mb": SETTINGS.chunk_size_mb,
}
@app.websocket("/ws")
@@ -469,7 +478,7 @@ async def api_upload(
try:
conn = sqlite3.connect(SETTINGS.state_db)
cur = conn.cursor()
cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_by_session FROM invites WHERE token = ?", (invite_token,))
cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_by_session, password_hash FROM invites WHERE token = ?", (invite_token,))
row = cur.fetchone()
conn.close()
except Exception as e:
@@ -478,7 +487,17 @@ async def api_upload(
if not row:
await send_progress(session_id, item_id, "error", 100, "Invalid invite token")
return JSONResponse({"error": "invalid_invite"}, status_code=403)
_, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_by_session = row
_, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_by_session, password_hash = row
# If invite requires password, ensure this session is authorized
if password_hash:
try:
ia = request.session.get("inviteAuth") or {}
if not ia.get(invite_token):
await send_progress(session_id, item_id, "error", 100, "Password required")
return JSONResponse({"error": "invite_password_required"}, status_code=403)
except Exception:
await send_progress(session_id, item_id, "error", 100, "Password required")
return JSONResponse({"error": "invite_password_required"}, status_code=403)
# Expiry check
if expires_at:
try:
@@ -604,6 +623,314 @@ async def api_upload(
return await do_upload()
# --------- Chunked upload endpoints ---------
def _chunk_dir(session_id: str, item_id: str) -> str:
safe_session = session_id.replace('/', '_')
safe_item = item_id.replace('/', '_')
return os.path.join(CHUNK_ROOT, safe_session, safe_item)
@app.post("/api/upload/chunk/init")
async def api_upload_chunk_init(request: Request) -> JSONResponse:
"""Initialize a chunked upload; creates a temp directory for incoming parts."""
try:
data = await request.json()
except Exception:
return JSONResponse({"error": "invalid_json"}, status_code=400)
item_id = (data or {}).get("item_id")
session_id = (data or {}).get("session_id")
if not item_id or not session_id:
return JSONResponse({"error": "missing_ids"}, status_code=400)
d = _chunk_dir(session_id, item_id)
try:
os.makedirs(d, exist_ok=True)
# Write manifest for later use
meta = {
"name": (data or {}).get("name"),
"size": (data or {}).get("size"),
"last_modified": (data or {}).get("last_modified"),
"invite_token": (data or {}).get("invite_token"),
"content_type": (data or {}).get("content_type") or "application/octet-stream",
"created_at": datetime.utcnow().isoformat(),
}
with open(os.path.join(d, "meta.json"), "w", encoding="utf-8") as f:
json.dump(meta, f)
except Exception as e:
logger.exception("Chunk init failed: %s", e)
return JSONResponse({"error": "init_failed"}, status_code=500)
return JSONResponse({"ok": True})
@app.post("/api/upload/chunk")
async def api_upload_chunk(
request: Request,
item_id: str = Form(...),
session_id: str = Form(...),
chunk_index: int = Form(...),
total_chunks: int = Form(...),
invite_token: Optional[str] = Form(None),
chunk: UploadFile = Form(...),
) -> JSONResponse:
"""Receive a single chunk; write to disk under chunk directory."""
d = _chunk_dir(session_id, item_id)
try:
os.makedirs(d, exist_ok=True)
# Persist invite token in meta if provided (for consistency)
meta_path = os.path.join(d, "meta.json")
if os.path.exists(meta_path):
try:
with open(meta_path, "r", encoding="utf-8") as f:
meta = json.load(f)
except Exception:
meta = {}
else:
meta = {}
if invite_token:
meta["invite_token"] = invite_token
meta["total_chunks"] = int(total_chunks)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(meta, f)
# Save chunk
content = await chunk.read()
with open(os.path.join(d, f"part_{int(chunk_index):06d}"), "wb") as f:
f.write(content)
except Exception as e:
logger.exception("Chunk write failed: %s", e)
return JSONResponse({"error": "chunk_write_failed"}, status_code=500)
return JSONResponse({"ok": True})
@app.post("/api/upload/chunk/complete")
async def api_upload_chunk_complete(request: Request) -> JSONResponse:
"""Assemble all parts and run the regular upload flow to Immich."""
try:
data = await request.json()
except Exception:
return JSONResponse({"error": "invalid_json"}, status_code=400)
item_id = (data or {}).get("item_id")
session_id = (data or {}).get("session_id")
name = (data or {}).get("name") or "upload.bin"
last_modified = (data or {}).get("last_modified")
invite_token = (data or {}).get("invite_token")
content_type = (data or {}).get("content_type") or "application/octet-stream"
if not item_id or not session_id:
return JSONResponse({"error": "missing_ids"}, status_code=400)
d = _chunk_dir(session_id, item_id)
meta_path = os.path.join(d, "meta.json")
# Basic validation
try:
with open(meta_path, "r", encoding="utf-8") as f:
meta = json.load(f)
except Exception:
meta = {}
total_chunks = int(meta.get("total_chunks") or (data or {}).get("total_chunks") or 0)
if total_chunks <= 0:
return JSONResponse({"error": "missing_total"}, status_code=400)
# Assemble
parts = []
try:
for i in range(total_chunks):
p = os.path.join(d, f"part_{i:06d}")
if not os.path.exists(p):
return JSONResponse({"error": "missing_part", "index": i}, status_code=400)
with open(p, "rb") as f:
parts.append(f.read())
raw = b"".join(parts)
except Exception as e:
logger.exception("Assemble failed: %s", e)
return JSONResponse({"error": "assemble_failed"}, status_code=500)
# Cleanup parts promptly
try:
for i in range(total_chunks):
try:
os.remove(os.path.join(d, f"part_{i:06d}"))
except Exception:
pass
try:
os.remove(meta_path)
except Exception:
pass
try:
os.rmdir(d)
except Exception:
pass
except Exception:
pass
# Now reuse the core logic from api_upload but with assembled bytes
item_id_local = item_id
session_id_local = session_id
file_like_name = name
file_size = len(raw)
checksum = sha1_hex(raw)
exif_created, exif_modified = read_exif_datetimes(raw)
created_at = exif_created or (datetime.fromtimestamp(last_modified / 1000) if last_modified else datetime.utcnow())
modified_at = exif_modified or created_at
created_iso = created_at.isoformat()
modified_iso = modified_at.isoformat()
device_asset_id = f"{file_like_name}-{last_modified or 0}-{file_size}"
# Local duplicate checks
if db_lookup_checksum(checksum):
await send_progress(session_id_local, item_id_local, "duplicate", 100, "Duplicate (by checksum - local cache)")
return JSONResponse({"status": "duplicate", "id": None}, status_code=200)
if db_lookup_device_asset(device_asset_id):
await send_progress(session_id_local, item_id_local, "duplicate", 100, "Already uploaded from this device (local cache)")
return JSONResponse({"status": "duplicate", "id": None}, status_code=200)
await send_progress(session_id_local, item_id_local, "checking", 2, "Checking duplicates…")
bulk = immich_bulk_check([{ "id": item_id_local, "checksum": checksum }])
if bulk.get(item_id_local, {}).get("action") == "reject" and bulk[item_id_local].get("reason") == "duplicate":
asset_id = bulk[item_id_local].get("assetId")
db_insert_upload(checksum, file_like_name, file_size, device_asset_id, asset_id, created_iso)
await send_progress(session_id_local, item_id_local, "duplicate", 100, "Duplicate (server)", asset_id)
return JSONResponse({"status": "duplicate", "id": asset_id}, status_code=200)
def gen_encoder2() -> MultipartEncoder:
return MultipartEncoder(fields={
"assetData": (file_like_name, io.BytesIO(raw), content_type or "application/octet-stream"),
"deviceAssetId": device_asset_id,
"deviceId": f"python-{session_id_local}",
"fileCreatedAt": created_iso,
"fileModifiedAt": modified_iso,
"isFavorite": "false",
"filename": file_like_name,
})
# Invite validation/gating mirrors api_upload
target_album_id: Optional[str] = None
target_album_name: Optional[str] = None
if invite_token:
try:
conn = sqlite3.connect(SETTINGS.state_db)
cur = conn.cursor()
cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_by_session, password_hash FROM invites WHERE token = ?", (invite_token,))
row = cur.fetchone()
conn.close()
except Exception as e:
logger.exception("Invite lookup error: %s", e)
row = None
if not row:
await send_progress(session_id_local, item_id_local, "error", 100, "Invalid invite token")
return JSONResponse({"error": "invalid_invite"}, status_code=403)
_, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_by_session, password_hash = row
if password_hash:
try:
ia = request.session.get("inviteAuth") or {}
if not ia.get(invite_token):
await send_progress(session_id_local, item_id_local, "error", 100, "Password required")
return JSONResponse({"error": "invite_password_required"}, status_code=403)
except Exception:
await send_progress(session_id_local, item_id_local, "error", 100, "Password required")
return JSONResponse({"error": "invite_password_required"}, status_code=403)
# expiry
if expires_at:
try:
if datetime.utcnow() > datetime.fromisoformat(expires_at):
await send_progress(session_id_local, item_id_local, "error", 100, "Invite expired")
return JSONResponse({"error": "invite_expired"}, status_code=403)
except Exception:
pass
try:
max_uses_int = int(max_uses) if max_uses is not None else -1
except Exception:
max_uses_int = -1
if max_uses_int == 1:
if claimed:
if claimed_by_session and claimed_by_session != session_id_local:
await send_progress(session_id_local, item_id_local, "error", 100, "Invite already used")
return JSONResponse({"error": "invite_claimed"}, status_code=403)
else:
try:
connc = sqlite3.connect(SETTINGS.state_db)
curc = connc.cursor()
curc.execute(
"UPDATE invites SET claimed = 1, claimed_at = CURRENT_TIMESTAMP, claimed_by_session = ? WHERE token = ? AND (claimed IS NULL OR claimed = 0)",
(session_id_local, invite_token)
)
connc.commit()
changed = connc.total_changes
connc.close()
except Exception as e:
logger.exception("Invite claim failed: %s", e)
return JSONResponse({"error": "invite_claim_failed"}, status_code=500)
if changed == 0:
try:
conn2 = sqlite3.connect(SETTINGS.state_db)
cur2 = conn2.cursor()
cur2.execute("SELECT claimed_by_session FROM invites WHERE token = ?", (invite_token,))
owner_row = cur2.fetchone()
conn2.close()
owner = owner_row[0] if owner_row else None
except Exception:
owner = None
if not owner or owner != session_id_local:
await send_progress(session_id_local, item_id_local, "error", 100, "Invite already used")
return JSONResponse({"error": "invite_claimed"}, status_code=403)
else:
if (used_count or 0) >= (max_uses_int if max_uses_int >= 0 else 10**9):
await send_progress(session_id_local, item_id_local, "error", 100, "Invite already used up")
return JSONResponse({"error": "invite_exhausted"}, status_code=403)
target_album_id = album_id
target_album_name = album_name
await send_progress(session_id_local, item_id_local, "uploading", 0, "Uploading…")
sent = {"pct": 0}
def cb2(monitor: MultipartEncoderMonitor) -> None:
if monitor.len:
pct = int(monitor.bytes_read * 100 / monitor.len)
if pct != sent["pct"]:
sent["pct"] = pct
asyncio.create_task(send_progress(session_id_local, item_id_local, "uploading", pct))
encoder2 = gen_encoder2()
monitor2 = MultipartEncoderMonitor(encoder2, cb2)
headers = {"Accept": "application/json", "Content-Type": monitor2.content_type, "x-immich-checksum": checksum, **immich_headers(request)}
try:
r = requests.post(f"{SETTINGS.normalized_base_url}/assets", headers=headers, data=monitor2, timeout=120)
if r.status_code in (200, 201):
data_r = r.json()
asset_id = data_r.get("id")
db_insert_upload(checksum, file_like_name, file_size, device_asset_id, asset_id, created_iso)
status = data_r.get("status", "created")
if asset_id:
added = False
if invite_token:
added = add_asset_to_album(asset_id, request=request, album_id_override=target_album_id, album_name_override=target_album_name)
if added:
status += f" (added to album '{target_album_name or target_album_id}')"
elif SETTINGS.album_name:
if add_asset_to_album(asset_id, request=request):
status += f" (added to album '{SETTINGS.album_name}')"
await send_progress(session_id_local, item_id_local, "duplicate" if status == "duplicate" else "done", 100, status, asset_id)
if invite_token:
try:
conn2 = sqlite3.connect(SETTINGS.state_db)
cur2 = conn2.cursor()
cur2.execute("SELECT max_uses FROM invites WHERE token = ?", (invite_token,))
row_mu = cur2.fetchone()
mx = None
try:
mx = int(row_mu[0]) if row_mu and row_mu[0] is not None else None
except Exception:
mx = None
if mx == 1:
cur2.execute("UPDATE invites SET used_count = 1 WHERE token = ?", (invite_token,))
else:
cur2.execute("UPDATE invites SET used_count = used_count + 1 WHERE token = ?", (invite_token,))
conn2.commit()
conn2.close()
except Exception as e:
logger.exception("Failed to increment invite usage: %s", e)
return JSONResponse({"id": asset_id, "status": status}, status_code=200)
else:
try:
msg = r.json().get("message", r.text)
except Exception:
msg = r.text
await send_progress(session_id_local, item_id_local, "error", 100, msg)
return JSONResponse({"error": msg}, status_code=400)
except Exception as e:
await send_progress(session_id_local, item_id_local, "error", 100, str(e))
return JSONResponse({"error": str(e)}, status_code=500)
@app.post("/api/album/reset")
async def api_album_reset() -> dict:
"""Explicit trigger from the UI to clear cached album id."""
@@ -725,6 +1052,11 @@ def ensure_invites_table() -> None:
cur.execute("ALTER TABLE invites ADD COLUMN claimed_by_session TEXT")
except Exception:
pass
# Optional password protection for invites
try:
cur.execute("ALTER TABLE invites ADD COLUMN password_hash TEXT")
except Exception:
pass
conn.commit()
conn.close()
except Exception as e:
@@ -745,6 +1077,7 @@ async def api_invites_create(request: Request) -> JSONResponse:
album_id = (body or {}).get("albumId")
album_name = (body or {}).get("albumName")
max_uses = (body or {}).get("maxUses", 1)
invite_password = (body or {}).get("password")
expires_days = (body or {}).get("expiresDays")
# Normalize max_uses
try:
@@ -775,13 +1108,33 @@ async def api_invites_create(request: Request) -> JSONResponse:
# Generate token
import uuid
token = uuid.uuid4().hex
# Prepare password hash, if provided
def hash_password(pw: str) -> str:
try:
if not pw:
return ""
import os as _os
import binascii as _binascii
salt = _os.urandom(16)
iterations = 200_000
dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations)
return f"pbkdf2_sha256${iterations}${_binascii.hexlify(salt).decode()}${_binascii.hexlify(dk).decode()}"
except Exception:
return ""
pw_hash = hash_password(invite_password or "") if (invite_password and str(invite_password).strip()) else None
try:
conn = sqlite3.connect(SETTINGS.state_db)
cur = conn.cursor()
cur.execute(
"INSERT INTO invites (token, album_id, album_name, max_uses, expires_at) VALUES (?,?,?,?,?)",
(token, resolved_album_id, album_name, max_uses, expires_at)
)
if pw_hash:
cur.execute(
"INSERT INTO invites (token, album_id, album_name, max_uses, expires_at, password_hash) VALUES (?,?,?,?,?,?)",
(token, resolved_album_id, album_name, max_uses, expires_at, pw_hash)
)
else:
cur.execute(
"INSERT INTO invites (token, album_id, album_name, max_uses, expires_at) VALUES (?,?,?,?,?)",
(token, resolved_album_id, album_name, max_uses, expires_at)
)
conn.commit()
conn.close()
except Exception as e:
@@ -812,11 +1165,11 @@ async def invite_page(token: str, request: Request) -> HTMLResponse:
return FileResponse(os.path.join(FRONTEND_DIR, "invite.html"))
@app.get("/api/invite/{token}")
async def api_invite_info(token: str) -> JSONResponse:
async def api_invite_info(token: str, request: Request) -> JSONResponse:
try:
conn = sqlite3.connect(SETTINGS.state_db)
cur = conn.cursor()
cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_at FROM invites WHERE token = ?", (token,))
cur.execute("SELECT token, album_id, album_name, max_uses, used_count, expires_at, COALESCE(claimed,0), claimed_at, password_hash FROM invites WHERE token = ?", (token,))
row = cur.fetchone()
conn.close()
except Exception as e:
@@ -824,7 +1177,7 @@ async def api_invite_info(token: str) -> JSONResponse:
return JSONResponse({"error": "db_error"}, status_code=500)
if not row:
return JSONResponse({"error": "not_found"}, status_code=404)
_, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_at = row
_, album_id, album_name, max_uses, used_count, expires_at, claimed, claimed_at, password_hash = row
# compute remaining
remaining = None
try:
@@ -851,6 +1204,14 @@ async def api_invite_info(token: str) -> JSONResponse:
if expired:
deactivated = True
active = not deactivated
# Password requirement + authorization state
password_required = bool(password_hash)
authorized = False
try:
ia = request.session.get("inviteAuth") or {}
authorized = bool(ia.get(token))
except Exception:
authorized = False
return JSONResponse({
"token": token,
"albumId": album_id,
@@ -864,8 +1225,58 @@ async def api_invite_info(token: str) -> JSONResponse:
"claimedAt": claimed_at,
"expired": expired,
"active": active,
"passwordRequired": password_required,
"authorized": authorized,
})
@app.post("/api/invite/{token}/auth")
async def api_invite_auth(token: str, request: Request) -> JSONResponse:
"""Validate a password for an invite token, and mark this session authorized if valid."""
try:
body = await request.json()
except Exception:
body = None
provided = (body or {}).get("password") if isinstance(body, dict) else None
try:
conn = sqlite3.connect(SETTINGS.state_db)
cur = conn.cursor()
cur.execute("SELECT password_hash FROM invites WHERE token = ?", (token,))
row = cur.fetchone()
conn.close()
except Exception as e:
logger.exception("Invite auth lookup error: %s", e)
return JSONResponse({"error": "db_error"}, status_code=500)
if not row:
return JSONResponse({"error": "not_found"}, status_code=404)
password_hash = row[0]
if not password_hash:
# No password required; mark as authorized to simplify client flow
ia = request.session.get("inviteAuth") or {}
ia[token] = True
request.session["inviteAuth"] = ia
return JSONResponse({"ok": True, "authorized": True})
# verify
def verify_password(stored: str, pw: Optional[str]) -> bool:
if not pw:
return False
try:
algo, iter_s, salt_hex, hash_hex = stored.split("$")
if algo != 'pbkdf2_sha256':
return False
iterations = int(iter_s)
import binascii as _binascii
salt = _binascii.unhexlify(salt_hex)
dk = hashlib.pbkdf2_hmac('sha256', pw.encode('utf-8'), salt, iterations)
return _binascii.hexlify(dk).decode() == hash_hex
except Exception:
return False
if not verify_password(password_hash, provided):
return JSONResponse({"error": "invalid_password"}, status_code=403)
ia = request.session.get("inviteAuth") or {}
ia[token] = True
request.session["inviteAuth"] = ia
return JSONResponse({"ok": True, "authorized": True})
@app.get("/api/qr", response_model=None)
async def api_qr(request: Request):
"""Generate a QR code PNG for a given text (query param 'text')."""

View File

@@ -22,6 +22,8 @@ class Settings:
state_db: str = ""
session_secret: str = ""
log_level: str = "INFO"
chunked_uploads_enabled: bool = False
chunk_size_mb: int = 95
@property
def normalized_base_url(self) -> str:
@@ -51,6 +53,11 @@ def load_settings() -> Settings:
state_db = os.getenv("STATE_DB", "/data/state.db")
session_secret = os.getenv("SESSION_SECRET") or secrets.token_hex(32)
log_level = os.getenv("LOG_LEVEL", "INFO").upper()
chunked_uploads_enabled = as_bool(os.getenv("CHUNKED_UPLOADS_ENABLED", "false"), False)
try:
chunk_size_mb = int(os.getenv("CHUNK_SIZE_MB", "95"))
except ValueError:
chunk_size_mb = 95
return Settings(
immich_base_url=base,
immich_api_key=api_key,
@@ -61,4 +68,6 @@ def load_settings() -> Settings:
state_db=state_db,
session_secret=session_secret,
log_level=log_level,
chunked_uploads_enabled=chunked_uploads_enabled,
chunk_size_mb=chunk_size_mb,
)