From 6c6ad49c5fd5e066399782bafaaef15276d369c0 Mon Sep 17 00:00:00 2001 From: Tanner Collin Date: Wed, 14 Jan 2026 11:36:04 -0700 Subject: [PATCH] feat: Include protocol in host certificate logs and alerts Co-authored-by: aider (gemini/gemini-2.5-pro) --- main.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/main.py b/main.py index 610f0d0..399a46b 100644 --- a/main.py +++ b/main.py @@ -28,7 +28,7 @@ async def alert_tanner(message): except BaseException as e: logging.error('Problem alerting Tanner: ' + str(e)) -async def check_host_cert(host, port, seen_serials): +async def check_host_cert(proto, host, port, seen_serials): "check a single host's cert" try: # default context does hostname checking and certificate validation @@ -43,7 +43,7 @@ async def check_host_cert(host, port, seen_serials): if not cert: # this case should be rare if handshake succeeded - msg = f"Could not get certificate for {host}:{port}" + msg = f"Could not get certificate for {proto.upper()} {host}:{port}" logging.warning(msg) await alert_tanner(msg) return @@ -51,7 +51,7 @@ async def check_host_cert(host, port, seen_serials): serial_number = cert.get('serialNumber') if serial_number: if serial_number in seen_serials: - logging.warning(f"Duplicate certificate with serial number {serial_number} found for host {host}:{port}") + logging.warning(f"Duplicate certificate with serial number {serial_number} found for host {proto.upper()} {host}:{port}") else: seen_serials.add(serial_number) @@ -61,26 +61,26 @@ async def check_host_cert(host, port, seen_serials): time_left = expiry_date - datetime.utcnow() if time_left < timedelta(days=7): - msg = f"Certificate for {host}:{port} expires in less than a week: {expiry_date}" + msg = f"Certificate for {proto.upper()} {host}:{port} expires in less than a week: {expiry_date}" logging.warning(msg) await alert_tanner(msg) else: - logging.info(f"Certificate for {host}:{port} is valid until {expiry_date} ({time_left.days} days left)") + logging.info(f"Certificate for {proto.upper()} {host}:{port} is valid until {expiry_date} ({time_left.days} days left)") except ssl.SSLCertVerificationError as e: - msg = f"Certificate verification error for {host}:{port}: {e.reason}" + msg = f"Certificate verification error for {proto.upper()} {host}:{port}: {e.reason}" logging.error(msg) await alert_tanner(msg) except ssl.SSLError as e: - msg = f"SSL error for {host}:{port}: {e}" + msg = f"SSL error for {proto.upper()} {host}:{port}: {e}" logging.error(msg) await alert_tanner(msg) except (asyncio.TimeoutError, OSError) as e: # Per instructions: log and move on for connection errors - logging.error(f"Connection error for {host}:{port}: {e}") + logging.error(f"Connection error for {proto.upper()} {host}:{port}: {e}") except Exception as e: # Catchall for other things - msg = f"An unexpected error occurred for {host}:{port}: {e}" + msg = f"An unexpected error occurred for {proto.upper()} {host}:{port}: {e}" logging.error(msg) await alert_tanner(msg) @@ -89,11 +89,11 @@ async def main(): seen_serials = {proto: set() for proto in HOSTS} tasks = [] for host in HOSTS['http']: - tasks.append(check_host_cert(host, 443, seen_serials['http'])) + tasks.append(check_host_cert('http', host, 443, seen_serials['http'])) for host in HOSTS['mqtt']: # standard port for MQTTS is 8883 - tasks.append(check_host_cert(host, 8883, seen_serials['mqtt'])) + tasks.append(check_host_cert('mqtt', host, 8883, seen_serials['mqtt'])) await asyncio.gather(*tasks)