Files
ssl-monitor/main.py
Tanner Collin 25fddcc413 refactor: Convert alert_tanner to use aiohttp
Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
2026-01-14 11:32:20 -07:00

103 lines
3.6 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import asyncio
import aiohttp
import aiomqtt
import ssl
import json
from datetime import datetime, timedelta
try:
with open('hosts.json', 'r') as f:
HOSTS = json.load(f)
except FileNotFoundError:
logging.error("hosts.json not found. Please copy hosts.json.example to hosts.json and configure it.")
exit(1)
async def alert_tanner(message):
try:
params = {'ssl-monitor': message}
async with aiohttp.ClientSession() as session:
async with session.get('https://tbot.tannercollin.com/message', params=params, timeout=4) as response:
response.raise_for_status()
except BaseException as e:
logging.error('Problem alerting Tanner: ' + str(e))
async def check_host_cert(host, port, seen_serials):
"check a single host's cert"
try:
# default context does hostname checking and certificate validation
ssl_context = ssl.create_default_context()
# open_connection does the TCP connection and TLS handshake
_reader, writer = await asyncio.wait_for(asyncio.open_connection(
host, port, ssl=ssl_context), timeout=10)
cert = writer.get_extra_info('peercert')
writer.close()
await writer.wait_closed()
if not cert:
# this case should be rare if handshake succeeded
msg = f"Could not get certificate for {host}:{port}"
logging.warning(msg)
await alert_tanner(msg)
return
serial_number = cert.get('serialNumber')
if serial_number:
if serial_number in seen_serials:
logging.warning(f"Duplicate certificate with serial number {serial_number} found for host {host}:{port}")
else:
seen_serials.add(serial_number)
expiry_date_str = cert['notAfter']
expiry_date = datetime.strptime(expiry_date_str, '%b %d %H:%M:%S %Y %Z')
time_left = expiry_date - datetime.utcnow()
if time_left < timedelta(days=7):
msg = f"Certificate for {host}:{port} expires in less than a week: {expiry_date}"
logging.warning(msg)
await alert_tanner(msg)
else:
logging.info(f"Certificate for {host}:{port} is valid until {expiry_date} ({time_left.days} days left)")
except ssl.SSLCertVerificationError as e:
msg = f"Certificate verification error for {host}:{port}: {e.reason}"
logging.error(msg)
await alert_tanner(msg)
except ssl.SSLError as e:
msg = f"SSL error for {host}:{port}: {e}"
logging.error(msg)
await alert_tanner(msg)
except (asyncio.TimeoutError, OSError) as e:
# Per instructions: log and move on for connection errors
logging.error(f"Connection error for {host}:{port}: {e}")
except Exception as e:
# Catchall for other things
msg = f"An unexpected error occurred for {host}:{port}: {e}"
logging.error(msg)
await alert_tanner(msg)
async def main():
seen_serials = {proto: set() for proto in HOSTS}
tasks = []
for host in HOSTS['http']:
tasks.append(check_host_cert(host, 443, seen_serials['http']))
for host in HOSTS['mqtt']:
# standard port for MQTTS is 8883
tasks.append(check_host_cert(host, 8883, seen_serials['mqtt']))
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())