Files
navidrome-heart-monitor/main.py
2026-02-07 14:14:59 -07:00

215 lines
6.9 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import asyncio
import re
from datetime import datetime, timedelta, timezone
import aiodocker
from aiodocker.connector import DockerConnector
import aiohttp
import random
import string
import hashlib
import settings
STAR_UNSTAR_WINDOW = timedelta(seconds=4)
# A dictionary to store the timestamp of when a song was starred.
# {song_id: timestamp}
starred_songs = {}
LOG_PATTERN = re.compile(
r'time="(?P<ts>[^"]+)".*msg="Changing starred" ids="\[`(?P<id>[^`]+)`\]".*starred=(?P<starred>true|false)'
)
async def alert_tanner(message):
try:
params = {'navidrome': message}
async with aiohttp.ClientSession() as session:
async with session.get('https://tbot.tannercollin.com/message', params=params, timeout=4) as response:
response.raise_for_status()
except BaseException as e:
logging.error('Problem alerting Tanner: ' + str(e))
def parse_log_line(line):
"""
Parses a log line to find song star/unstar events.
Returns a tuple of (timestamp, song_id, is_starred) or None if not a match.
"""
match = LOG_PATTERN.search(line)
if not match:
return None
data = match.groupdict()
ts_str = data['ts']
song_id = data['id']
is_starred = data['starred'] == 'true'
# fromisoformat doesn't like Z, so we replace it with +00:00 timezone info.
timestamp = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
return timestamp, song_id, is_starred
async def _call_subsonic_api(endpoint, **kwargs):
"""
A generic helper to call the Navidrome Subsonic API.
"""
navidrome_url = settings.NAVIDROME_URL
username = settings.NAVIDROME_USER
if not all([navidrome_url, username]):
logging.error("NAVIDROME_URL and NAVIDROME_USER must be set in settings.py.")
return None
salt = settings.SUBSONIC_SALT
token = settings.SUBSONIC_TOKEN
if not all([salt, token]):
password = settings.NAVIDROME_PASSWORD
if not password:
logging.error("Either (SUBSONIC_SALT and SUBSONIC_TOKEN) or NAVIDROME_PASSWORD must be set in settings.py.")
return None
# Subsonic API requires a salt and a token (md5(password + salt))
salt = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(10))
token = hashlib.md5((password + salt).encode('utf-8')).hexdigest()
params = {
'u': username,
't': token,
's': salt,
'v': '1.16.1',
'c': 'heart-monitor',
'f': 'json',
}
params.update(kwargs)
api_url = f"{navidrome_url.rstrip('/')}/rest/{endpoint}"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url, params=params) as response:
response.raise_for_status()
data = await response.json()
except aiohttp.ClientError as e:
logging.error(f"Error calling Navidrome API endpoint {endpoint} for song {kwargs.get('id')}: {e}")
return None
if data.get('subsonic-response', {}).get('status') != 'ok':
logging.error(f"Failed to call Navidrome API endpoint {endpoint} for song {kwargs.get('id')}. Response: {data}")
return None
return data.get('subsonic-response', {})
async def navidrome_get_song_details(song_id):
"""
Gets song details from Navidrome's Subsonic API.
"""
response = await _call_subsonic_api('getSong', id=song_id)
if response:
return response.get('song')
return None
async def navidrome_set_rating(song_id):
"""
Sets a 1-star rating for a song using Navidrome's Subsonic API.
"""
response = await _call_subsonic_api('setRating', id=song_id, rating=1)
return response is not None
async def handle_star_unstar_event(song_id):
"""
Sets song rating to 1 and logs the song details.
"""
success = await navidrome_set_rating(song_id)
if not success:
# _call_subsonic_api already logs errors
return
details = await navidrome_get_song_details(song_id)
if not details:
logging.info(f"Successfully set rating for song {song_id} to 1, but couldn't get song details.")
return
title = details.get('title', 'Unknown Title')
artist = details.get('artist', 'Unknown Artist')
album = details.get('album', 'Unknown Album')
msg = f'Set song "{title}" - {artist} ({album}) rating to 1.'
logging.info(msg)
await alert_tanner(msg)
async def main():
"""
Monitors Navidrome container logs for rapid star/unstar events.
"""
docker = None
session = None
try:
# We need a custom session with no timeout for the long-polling log stream.
# We also need to use aiodocker's connector to handle Unix sockets correctly.
connector = DockerConnector()
timeout = aiohttp.ClientTimeout(total=None, sock_read=None)
session = connector.aiohttp_session(timeout=timeout)
docker = aiodocker.Docker(session=session)
container = await docker.containers.get(settings.NAVIDROME_CONTAINER)
logging.info(f"Monitoring logs for container '{settings.NAVIDROME_CONTAINER}'...")
logs = container.log(
stdout=True,
stderr=True,
follow=True,
since=datetime.now(timezone.utc).timestamp(),
)
async for line in logs:
parsed = parse_log_line(line)
if not parsed:
continue
timestamp, song_id, is_starred = parsed
if is_starred:
starred_songs[song_id] = timestamp
elif song_id in starred_songs: # is_starred is False (unstarred)
time_diff = timestamp - starred_songs[song_id]
if time_diff <= STAR_UNSTAR_WINDOW:
logging.info(
f"Song {song_id} was starred and then unstarred within {STAR_UNSTAR_WINDOW.seconds} seconds."
)
await handle_star_unstar_event(song_id)
# Remove song from tracking after it has been unstarred
del starred_songs[song_id]
except aiodocker.exceptions.DockerError as e:
if e.status == 404:
logging.error(f"Container '{settings.NAVIDROME_CONTAINER}' not found.")
else:
logging.error(f"Error connecting to Docker or getting container: {e}")
except Exception:
logging.exception("An unexpected error occurred")
finally:
if docker:
# aiodocker doesn't close sessions it doesn't own.
await docker.close()
if session:
await session.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Exiting.")