feat: Add Telegram bot notification for batch upload completion

Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
This commit is contained in:
2026-01-21 11:49:48 -07:00
parent 9c70e47232
commit 6322163b10

View File

@@ -20,7 +20,8 @@ import sqlite3
import binascii
import pytz
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple
import math
import logging
import httpx
@@ -173,6 +174,78 @@ hub = SessionHub()
# ---------- Telegram Bot ----------
# Batch upload notifications
_upload_batch: List[Tuple[str, int]] = []
_batch_complete_timer: Optional[asyncio.TimerHandle] = None
_batch_lock = asyncio.Lock()
def human_size(bytes_val: int) -> str:
"""Return a human-readable size string."""
if not bytes_val:
return "0 B"
k = 1024
sizes = ['B', 'KB', 'MB', 'GB', 'TB']
i = 0
if bytes_val > 0:
i = int(math.floor(math.log(bytes_val) / math.log(k)))
if i >= len(sizes):
i = len(sizes) - 1
return f"{(bytes_val / (k**i)):.1f} {sizes[i]}"
async def send_batch_notification():
"""Format and send a summary of the recently completed upload batch."""
async with _batch_lock:
if not _upload_batch:
return
batch_copy = list(_upload_batch)
_upload_batch.clear()
global _batch_complete_timer
if _batch_complete_timer:
_batch_complete_timer.cancel()
_batch_complete_timer = None
num_files = len(batch_copy)
total_size = sum(size for _, size in batch_copy)
file_list_str = ""
if num_files > 0:
filenames = [name or "file" for name, _ in batch_copy]
if num_files > 20:
file_list_str = "\n".join(f"- {name}" for name in filenames[:20])
file_list_str += f"\n... and {num_files - 20} more."
else:
file_list_str = "\n".join(f"- {name}" for name in filenames)
msg = f"✅ Batch upload complete!\n\n- Files: {num_files}\n- Total size: {human_size(total_size)}\n\n{file_list_str}".strip()
await send_telegram_message(TELEGRAM_OWNER_ID, msg)
def _schedule_batch_notification():
# Helper to run async func from sync context of call_later
asyncio.create_task(send_batch_notification())
async def reset_telegram_debounce():
"""Resets the 30s timer for batch completion notification."""
if not SETTINGS.telegram_bot_api_key or not TELEGRAM_OWNER_ID:
return
global _batch_complete_timer
async with _batch_lock:
if _batch_complete_timer:
_batch_complete_timer.cancel()
loop = asyncio.get_event_loop()
_batch_complete_timer = loop.call_later(30, _schedule_batch_notification)
async def add_file_to_batch(filename: str, size: int):
"""Adds a completed file to the batch list."""
if not SETTINGS.telegram_bot_api_key or not TELEGRAM_OWNER_ID:
return
async with _batch_lock:
_upload_batch.append((filename, size))
TELEGRAM_API_URL = f"https://api.telegram.org/bot{SETTINGS.telegram_bot_api_key}"
TELEGRAM_OWNER_ID = SETTINGS.telegram_bot_owner_id
@@ -620,6 +693,8 @@ async def api_upload(
with open(save_path, "wb") as f:
f.write(raw)
db_insert_upload(checksum, file.filename, size, device_asset_id, None, created_iso)
await add_file_to_batch(file.filename, size)
await reset_telegram_debounce()
msg = f"Saved to {album_for_saving}/{os.path.basename(save_path)}"
await send_progress(session_id, item_id, "done", 100, msg)
@@ -761,6 +836,7 @@ async def api_upload_chunk(
except Exception as e:
logger.exception("Chunk write failed: %s", e)
return JSONResponse({"error": "chunk_write_failed"}, status_code=500)
await reset_telegram_debounce()
return JSONResponse({"ok": True})
@app.post("/api/upload/chunk/complete")
@@ -953,6 +1029,7 @@ async def api_upload_chunk_complete(request: Request) -> JSONResponse:
with open(save_path, "wb") as f:
f.write(raw)
db_insert_upload(checksum, file_like_name, file_size, device_asset_id, None, created_iso)
await add_file_to_batch(file_like_name, file_size)
msg = f"Saved to {album_for_saving}/{os.path.basename(save_path)}"
await send_progress(session_id_local, item_id_local, "done", 100, msg)