import os, logging import hashlib import random import string import subprocess import time import urllib.parse import requests DEBUG = os.environ.get('DEBUG') logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) def transform_m3u_to_m3u8(m3u_content): """Transforms M3U content to M3U8 format.""" transformed_lines = [] for line in m3u_content.strip().splitlines(): if line.strip() and not line.startswith('#'): if line.startswith('/music/'): path = line[len('/music/'):] encoded_path = urllib.parse.quote(path) transformed_lines.append(f"local:track:{encoded_path}") return '\n'.join(transformed_lines) def run_pls_command(playlist_id): """Runs the docker exec command and returns combined stdout/stderr.""" cmd = ['docker', 'exec', 'navidrome-navidrome-1', '/app/navidrome', 'pls', '-l', 'error', '-np', playlist_id] try: result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, check=False) return result.stdout except FileNotFoundError: logging.error("Command 'docker' not found. Please ensure it is installed and in your PATH.") return "" def save_playlist_file(playlist_dir, playlist_name, content): """Saves the transformed playlist content to a file.""" filename = f"{playlist_name}.m3u8" filepath = os.path.join(playlist_dir, filename) try: with open(filepath, 'w', encoding='utf-8') as f: f.write(content) logging.info(f"Saved playlist to {filepath}") except IOError as e: logging.error(f"Error writing to file {filepath}: {e}") def delete_playlist_file(playlist_dir, playlist_name): """Deletes a playlist file.""" filename = f"{playlist_name}.m3u8" filepath = os.path.join(playlist_dir, filename) if os.path.exists(filepath): try: os.remove(filepath) logging.info(f"Deleted playlist file {filepath}") except OSError as e: logging.error(f"Error deleting file {filepath}: {e}") def main(): """Get playlists from a Navidrome server using the Subsonic API.""" mopidy_playlist_dir = os.environ.get('MOPIDY_PLAYLIST_DIR') if not mopidy_playlist_dir: logging.error("MOPIDY_PLAYLIST_DIR environment variable must be set.") return if not os.path.isdir(mopidy_playlist_dir): logging.error(f"Mopidy playlist directory not found: {mopidy_playlist_dir}") return navidrome_url = os.environ.get('NAVIDROME_URL') username = os.environ.get('NAVIDROME_USER') if not all([navidrome_url, username]): logging.error("NAVIDROME_URL and NAVIDROME_USER environment variables must be set.") return salt = os.environ.get('SUBSONIC_SALT') token = os.environ.get('SUBSONIC_TOKEN') if not all([salt, token]): password = os.environ.get('NAVIDROME_PASSWORD') if not password: logging.error("Either (SUBSONIC_SALT and SUBSONIC_TOKEN) or NAVIDROME_PASSWORD must be set.") return # Subsonic API requires a salt and a token (md5(password + salt)) salt = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(10)) token = hashlib.md5((password + salt).encode('utf-8')).hexdigest() params = { 'u': username, 't': token, 's': salt, 'v': '1.16.1', 'c': 'playlist-script', 'f': 'json' } known_playlists = {} while True: api_url = f"{navidrome_url.rstrip('/')}/rest/getPlaylists.view" try: response = requests.get(api_url, params=params) response.raise_for_status() # Raise an exception for bad status codes except requests.exceptions.RequestException as e: logging.error(f"Error connecting to Navidrome: {e}") time.sleep(5) continue data = response.json() subsonic_response = data.get('subsonic-response') if not subsonic_response: logging.error("Invalid response format from server.") time.sleep(5) continue if subsonic_response.get('status') == 'failed': error = subsonic_response.get('error', {}) logging.error(f"API Error: {error.get('message')} (code: {error.get('code')})") time.sleep(5) continue playlists = subsonic_response.get('playlists', {}).get('playlist', []) current_playlists = {p.get('id'): p for p in playlists} # On first successful poll, just populate state and print list if not known_playlists and playlists: logging.info("Initial scan. Monitoring playlists for changes.") print(f"Playlists for {username}:") for playlist_id, playlist_data in current_playlists.items(): print(f" - [{playlist_id}] {playlist_data.get('name')} ({playlist_data.get('songCount')} songs)") known_playlists = current_playlists time.sleep(5) continue # --- Detect changes after initial scan --- # Newly created playlists newly_added_ids = set(current_playlists.keys()) - set(known_playlists.keys()) for playlist_id in newly_added_ids: playlist_data = current_playlists[playlist_id] playlist_name = playlist_data.get('name') logging.info(f"New playlist detected: '{playlist_name}' ({playlist_id}). Generating file.") raw_output = run_pls_command(playlist_id) if raw_output: transformed_output = transform_m3u_to_m3u8(raw_output) if transformed_output: save_playlist_file(mopidy_playlist_dir, playlist_name, transformed_output) # Deleted playlists deleted_ids = set(known_playlists.keys()) - set(current_playlists.keys()) for playlist_id in deleted_ids: playlist_data = known_playlists[playlist_id] playlist_name = playlist_data.get('name') logging.info(f"Playlist deleted: '{playlist_name}' ({playlist_id}). Removing file.") delete_playlist_file(mopidy_playlist_dir, playlist_name) # Check for song count or name changes in existing playlists for playlist_id, playlist_data in current_playlists.items(): if playlist_id in known_playlists: previous_data = known_playlists[playlist_id] current_song_count = playlist_data.get('songCount') previous_song_count = previous_data.get('songCount') current_name = playlist_data.get('name') previous_name = previous_data.get('name') song_count_changed = previous_song_count != current_song_count name_changed = previous_name != current_name if song_count_changed or name_changed: log_msg = f"Playlist '{previous_name}' ({playlist_id}) changed." if name_changed: log_msg += f" Name: '{previous_name}' -> '{current_name}'." delete_playlist_file(mopidy_playlist_dir, previous_name) if song_count_changed: log_msg += f" Song count: {previous_song_count} -> {current_song_count}." logging.info(log_msg + " Regenerating file.") raw_output = run_pls_command(playlist_id) if raw_output: transformed_output = transform_m3u_to_m3u8(raw_output) if transformed_output: save_playlist_file(mopidy_playlist_dir, current_name, transformed_output) # Update the state for the next iteration. known_playlists = current_playlists time.sleep(5) if __name__ == "__main__": main()