Files
ssl-monitor/main.py
2026-01-14 10:17:53 -07:00

94 lines
2.7 KiB
Python

import os, logging
DEBUG = os.environ.get('DEBUG')
logging.basicConfig(
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
level=logging.DEBUG if DEBUG else logging.INFO)
import asyncio
import aiohttp
import aiomqtt
import ssl
from datetime import datetime, timedelta
HOSTS = {
'http': [
'tanner.vc',
'forum.protospace.ca',
'protospace.ca',
'my.protospace.ca',
],
'mqtt': [
'webhost.protospace.ca',
],
}
def alert_tanner(msg):
pass
async def check_host_cert(host, port):
"check a single host's cert"
try:
# default context does hostname checking and certificate validation
ssl_context = ssl.create_default_context()
# open_connection does the TCP connection and TLS handshake
_reader, writer = await asyncio.wait_for(asyncio.open_connection(
host, port, ssl=ssl_context), timeout=10)
cert = writer.get_extra_info('peercert')
writer.close()
await writer.wait_closed()
if not cert:
# this case should be rare if handshake succeeded
msg = f"Could not get certificate for {host}:{port}"
logging.warning(msg)
alert_tanner(msg)
return
expiry_date_str = cert['notAfter']
expiry_date = datetime.strptime(expiry_date_str, '%b %d %H:%M:%S %Y %Z')
time_left = expiry_date - datetime.utcnow()
if time_left < timedelta(days=7):
msg = f"Certificate for {host}:{port} expires in less than a week: {expiry_date}"
logging.warning(msg)
alert_tanner(msg)
else:
logging.info(f"Certificate for {host}:{port} is valid until {expiry_date} ({time_left.days} days left)")
except ssl.SSLCertVerificationError as e:
msg = f"Certificate verification error for {host}:{port}: {e.reason}"
logging.error(msg)
alert_tanner(msg)
except ssl.SSLError as e:
msg = f"SSL error for {host}:{port}: {e}"
logging.error(msg)
alert_tanner(msg)
except (asyncio.TimeoutError, OSError) as e:
# Per instructions: log and move on for connection errors
logging.error(f"Connection error for {host}:{port}: {e}")
except Exception as e:
# Catchall for other things
msg = f"An unexpected error occurred for {host}:{port}: {e}"
logging.error(msg)
alert_tanner(msg)
async def main():
tasks = []
for host in HOSTS['http']:
tasks.append(check_host_cert(host, 443))
for host in HOSTS['mqtt']:
# standard port for MQTTS is 8883
tasks.append(check_host_cert(host, 8883))
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())