Files
immich-library-watcher/main.py
Tanner Collin d811483e94 fix: Only trigger Immich scan on file events
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2025-12-17 11:15:31 -07:00

98 lines
3.8 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import secrets
import threading
import requests
from inotify_simple import INotify, flags
SCAN_DELAY_SECONDS = 10
timer = None
def trigger_scan():
"""Trigger a library scan in Immich."""
logging.info("Triggering Immich library scan.")
url = f"{secrets.IMMICH_URL}/api/libraries/{secrets.LIBRARY_UUID}/scan"
headers = {
'Accept': 'application/json',
'x-api-key': secrets.IMMICH_API_KEY,
}
try:
response = requests.post(url, headers=headers)
response.raise_for_status()
logging.info(f"Scan triggered successfully: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to trigger scan: {e}")
def add_watch_recursive(inotify, path, watch_flags, wd_to_path):
"""Walk a directory path and add watches recursively."""
try:
for root, _, _ in os.walk(path):
try:
logging.info(f"Watching {root}")
wd = inotify.add_watch(root, watch_flags)
wd_to_path[wd] = root
except OSError as e:
logging.error(f"Error adding watch for {root}: {e}")
except FileNotFoundError:
logging.warning(f"Could not scan {path} as it was not found.")
def main():
global timer
watch_flags = flags.MODIFY | flags.CREATE | flags.DELETE | flags.MOVED_FROM | flags.MOVED_TO
wd_to_path = {}
with INotify() as inotify:
for path in secrets.WATCH_PATHS:
logging.info(f"Watching {path} and its subdirectories recursively.")
add_watch_recursive(inotify, path, watch_flags, wd_to_path)
try:
while True:
events = inotify.read(timeout=1000)
if events:
scan_needed = False
for event in events:
logging.debug(f"Event: {event!r}")
if event.mask & flags.IGNORED:
logging.info(f"Watch for wd {event.wd} was removed (directory deleted?).")
if event.wd in wd_to_path:
logging.info(f"Removing path '{wd_to_path[event.wd]}' from watch mapping.")
del wd_to_path[event.wd]
continue
if event.mask & flags.ISDIR and (event.mask & flags.CREATE or event.mask & flags.MOVED_TO):
parent_dir_path = wd_to_path.get(event.wd)
if not parent_dir_path:
logging.warning(f"Could not find path for watch descriptor {event.wd}")
continue
new_dir_path = os.path.join(parent_dir_path, event.name)
logging.info(f"New directory {new_dir_path} detected, adding watches.")
add_watch_recursive(inotify, new_dir_path, watch_flags, wd_to_path)
continue
scan_needed = True
if scan_needed:
if timer:
timer.cancel()
logging.info('Debounce cancelled timer.')
timer = threading.Timer(SCAN_DELAY_SECONDS, trigger_scan)
timer.start()
logging.info(f"Modification detected. Triggering scan in {SCAN_DELAY_SECONDS} seconds.")
except KeyboardInterrupt:
logging.info("Shutting down...")
finally:
if timer:
timer.cancel()
if __name__ == '__main__':
main()