Files
navidrome-heart-monitor/main.py
2026-02-07 14:18:54 -07:00

228 lines
7.7 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import asyncio
import re
from datetime import datetime, timedelta, timezone
import aiohttp
import random
import string
import hashlib
import settings
STAR_UNSTAR_WINDOW = timedelta(seconds=4)
# A dictionary to store the timestamp of when a song was starred.
# {song_id: timestamp}
starred_songs = {}
LOG_PATTERN = re.compile(
r'time="(?P<ts>[^"]+)".*msg="Changing starred" ids="\[`(?P<id>[^`]+)`\]".*starred=(?P<starred>true|false)'
)
async def alert_tanner(message):
try:
params = {'navidrome': message}
async with aiohttp.ClientSession() as session:
async with session.get('https://tbot.tannercollin.com/message', params=params, timeout=4) as response:
response.raise_for_status()
except BaseException as e:
logging.error('Problem alerting Tanner: ' + str(e))
def parse_log_line(line):
"""
Parses a log line to find song star/unstar events.
Returns a tuple of (timestamp, song_id, is_starred) or None if not a match.
"""
match = LOG_PATTERN.search(line)
if not match:
return None
data = match.groupdict()
ts_str = data['ts']
song_id = data['id']
is_starred = data['starred'] == 'true'
# fromisoformat doesn't like Z, so we replace it with +00:00 timezone info.
timestamp = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
return timestamp, song_id, is_starred
async def _call_subsonic_api(endpoint, **kwargs):
"""
A generic helper to call the Navidrome Subsonic API.
"""
navidrome_url = settings.NAVIDROME_URL
username = settings.NAVIDROME_USER
if not all([navidrome_url, username]):
logging.error("NAVIDROME_URL and NAVIDROME_USER must be set in settings.py.")
return None
salt = settings.SUBSONIC_SALT
token = settings.SUBSONIC_TOKEN
if not all([salt, token]):
password = settings.NAVIDROME_PASSWORD
if not password:
logging.error("Either (SUBSONIC_SALT and SUBSONIC_TOKEN) or NAVIDROME_PASSWORD must be set in settings.py.")
return None
# Subsonic API requires a salt and a token (md5(password + salt))
salt = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(10))
token = hashlib.md5((password + salt).encode('utf-8')).hexdigest()
params = {
'u': username,
't': token,
's': salt,
'v': '1.16.1',
'c': 'heart-monitor',
'f': 'json',
}
params.update(kwargs)
api_url = f"{navidrome_url.rstrip('/')}/rest/{endpoint}"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, params=params) as response:
response.raise_for_status()
data = await response.json()
except aiohttp.ClientError as e:
logging.error(f"Error calling Navidrome API endpoint {endpoint} for song {kwargs.get('id')}: {e}")
return None
if data.get('subsonic-response', {}).get('status') != 'ok':
logging.error(f"Failed to call Navidrome API endpoint {endpoint} for song {kwargs.get('id')}. Response: {data}")
return None
return data.get('subsonic-response', {})
async def navidrome_get_song_details(song_id):
"""
Gets song details from Navidrome's Subsonic API.
"""
response = await _call_subsonic_api('getSong', id=song_id)
if response:
return response.get('song')
return None
async def navidrome_set_rating(song_id):
"""
Sets a 1-star rating for a song using Navidrome's Subsonic API.
"""
response = await _call_subsonic_api('setRating', id=song_id, rating=1)
return response is not None
async def handle_star_unstar_event(song_id):
"""
Sets song rating to 1 and logs the song details.
"""
success = await navidrome_set_rating(song_id)
if not success:
# _call_subsonic_api already logs errors
return
details = await navidrome_get_song_details(song_id)
if not details:
logging.info(f"Successfully set rating for song {song_id} to 1, but couldn't get song details.")
return
title = details.get('title', 'Unknown Title')
artist = details.get('artist', 'Unknown Artist')
album = details.get('album', 'Unknown Album')
msg = f'Set song "{title}" - {artist} ({album}) rating to 1.'
logging.info(msg)
await alert_tanner(msg)
async def main():
"""
Monitors Navidrome container logs for rapid star/unstar events.
"""
session = None
try:
connector = aiohttp.UnixConnector(path="/var/run/docker.sock")
# Disable all timeouts for the long-polling log stream
timeout = aiohttp.ClientTimeout(total=None, sock_read=None)
session = aiohttp.ClientSession(connector=connector, timeout=timeout)
container_name = settings.NAVIDROME_CONTAINER
# First, check if the container exists.
inspect_url = f"http://localhost/containers/{container_name}/json"
async with session.get(inspect_url) as response:
if response.status == 404:
logging.error(f"Container '{container_name}' not found.")
return
response.raise_for_status()
logging.info(f"Monitoring logs for container '{container_name}'...")
since = int(datetime.now(timezone.utc).timestamp())
params = {
'stdout': 'true',
'stderr': 'true',
'follow': 'true',
'since': str(since),
}
logs_url = f"http://localhost/containers/{container_name}/logs"
async with session.get(logs_url, params=params) as response:
response.raise_for_status()
while True:
line_bytes = await response.content.readline()
if not line_bytes:
break
# Docker's log stream is multiplexed. The first 8 bytes are a header.
# We strip it to get the raw log line.
if len(line_bytes) > 8:
line = line_bytes[8:].decode('utf-8', errors='replace').strip()
parsed = parse_log_line(line)
if not parsed:
continue
timestamp, song_id, is_starred = parsed
if is_starred:
starred_songs[song_id] = timestamp
elif song_id in starred_songs: # is_starred is False (unstarred)
time_diff = timestamp - starred_songs[song_id]
if time_diff <= STAR_UNSTAR_WINDOW:
logging.info(
f"Song {song_id} was starred and then unstarred within {STAR_UNSTAR_WINDOW.seconds} seconds."
)
await handle_star_unstar_event(song_id)
# Remove song from tracking after it has been unstarred
del starred_songs[song_id]
except aiohttp.ClientResponseError as e:
if e.status == 404:
logging.error(f"Container '{settings.NAVIDROME_CONTAINER}' not found.")
else:
logging.error(f"Error with Docker API: {e}")
except aiohttp.ClientError as e:
# For other client errors like connection issues
logging.error(f"Error connecting to Docker: {e}")
except Exception:
logging.exception("An unexpected error occurred")
finally:
if session:
await session.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Exiting.")