import os, logging import hashlib import random import string import subprocess import time import requests DEBUG = os.environ.get('DEBUG') logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) def run_pls_command(playlist_id, playlist_name): """Runs the docker exec command for a given playlist.""" cmd = ['docker', 'exec', 'navidrome-navidrome-1', '/app/navidrome', 'pls', '-l', 'error', '-np', playlist_id] try: result = subprocess.run(cmd, capture_output=True, text=True, check=False) print(f"\n--- Output for playlist '{playlist_name}' ---") if result.stdout.strip(): print(result.stdout.strip()) if result.stderr.strip(): print("STDERR:") print(result.stderr.strip()) print("--- End output ---\n") except FileNotFoundError: logging.error("Command 'docker' not found. Please ensure it is installed and in your PATH.") def main(): """Get playlists from a Navidrome server using the Subsonic API.""" navidrome_url = os.environ.get('NAVIDROME_URL') username = os.environ.get('NAVIDROME_USER') if not all([navidrome_url, username]): logging.error("NAVIDROME_URL and NAVIDROME_USER environment variables must be set.") return salt = os.environ.get('SUBSONIC_SALT') token = os.environ.get('SUBSONIC_TOKEN') if not all([salt, token]): password = os.environ.get('NAVIDROME_PASSWORD') if not password: logging.error("Either (SUBSONIC_SALT and SUBSONIC_TOKEN) or NAVIDROME_PASSWORD must be set.") return # Subsonic API requires a salt and a token (md5(password + salt)) salt = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)) token = hashlib.md5((password + salt).encode('utf-8')).hexdigest() params = { 'u': username, 't': token, 's': salt, 'v': '1.16.1', 'c': 'playlist-script', 'f': 'json' } known_playlists = {} while True: api_url = f"{navidrome_url.rstrip('/')}/rest/getPlaylists.view" try: response = requests.get(api_url, params=params) response.raise_for_status() # Raise an exception for bad status codes except requests.exceptions.RequestException as e: logging.error(f"Error connecting to Navidrome: {e}") time.sleep(5) continue data = response.json() subsonic_response = data.get('subsonic-response') if not subsonic_response: logging.error("Invalid response format from server.") time.sleep(5) continue if subsonic_response.get('status') == 'failed': error = subsonic_response.get('error', {}) logging.error(f"API Error: {error.get('message')} (code: {error.get('code')})") time.sleep(5) continue playlists = subsonic_response.get('playlists', {}).get('playlist', []) current_playlists = {p.get('id'): p for p in playlists} # On first successful poll, just populate state and print list if not known_playlists and playlists: logging.info("Initial scan. Monitoring playlists for changes.") print(f"Playlists for {username}:") for playlist_id, playlist_data in current_playlists.items(): print(f" - [{playlist_id}] {playlist_data.get('name')} ({playlist_data.get('songCount')} songs)") known_playlists = current_playlists time.sleep(5) continue # --- Detect changes after initial scan --- # Newly created playlists newly_added_ids = set(current_playlists.keys()) - set(known_playlists.keys()) for playlist_id in newly_added_ids: playlist_data = current_playlists[playlist_id] playlist_name = playlist_data.get('name') logging.info(f"New playlist detected: '{playlist_name}' ({playlist_id}). Running command.") run_pls_command(playlist_id, playlist_name) # Deleted playlists deleted_ids = set(known_playlists.keys()) - set(current_playlists.keys()) for playlist_id in deleted_ids: playlist_data = known_playlists[playlist_id] logging.info(f"Playlist deleted: '{playlist_data.get('name')}' ({playlist_id}).") # Check for song count changes in existing playlists for playlist_id, playlist_data in current_playlists.items(): if playlist_id in known_playlists: previous_data = known_playlists[playlist_id] current_song_count = playlist_data.get('songCount') previous_song_count = previous_data.get('songCount') if previous_song_count != current_song_count: playlist_name = playlist_data.get('name') logging.info(f"Song count changed for '{playlist_name}' ({playlist_id}): {previous_song_count} -> {current_song_count}. Running command.") run_pls_command(playlist_id, playlist_name) # Update the state for the next iteration. known_playlists = current_playlists time.sleep(5) if __name__ == "__main__": main()