feat: Include protocol in host certificate logs and alerts

Co-authored-by: aider (gemini/gemini-2.5-pro) <aider@aider.chat>
This commit is contained in:
2026-01-14 11:36:04 -07:00
parent 25fddcc413
commit 6c6ad49c5f

22
main.py
View File

@@ -28,7 +28,7 @@ async def alert_tanner(message):
except BaseException as e:
logging.error('Problem alerting Tanner: ' + str(e))
async def check_host_cert(host, port, seen_serials):
async def check_host_cert(proto, host, port, seen_serials):
"check a single host's cert"
try:
# default context does hostname checking and certificate validation
@@ -43,7 +43,7 @@ async def check_host_cert(host, port, seen_serials):
if not cert:
# this case should be rare if handshake succeeded
msg = f"Could not get certificate for {host}:{port}"
msg = f"Could not get certificate for {proto.upper()} {host}:{port}"
logging.warning(msg)
await alert_tanner(msg)
return
@@ -51,7 +51,7 @@ async def check_host_cert(host, port, seen_serials):
serial_number = cert.get('serialNumber')
if serial_number:
if serial_number in seen_serials:
logging.warning(f"Duplicate certificate with serial number {serial_number} found for host {host}:{port}")
logging.warning(f"Duplicate certificate with serial number {serial_number} found for host {proto.upper()} {host}:{port}")
else:
seen_serials.add(serial_number)
@@ -61,26 +61,26 @@ async def check_host_cert(host, port, seen_serials):
time_left = expiry_date - datetime.utcnow()
if time_left < timedelta(days=7):
msg = f"Certificate for {host}:{port} expires in less than a week: {expiry_date}"
msg = f"Certificate for {proto.upper()} {host}:{port} expires in less than a week: {expiry_date}"
logging.warning(msg)
await alert_tanner(msg)
else:
logging.info(f"Certificate for {host}:{port} is valid until {expiry_date} ({time_left.days} days left)")
logging.info(f"Certificate for {proto.upper()} {host}:{port} is valid until {expiry_date} ({time_left.days} days left)")
except ssl.SSLCertVerificationError as e:
msg = f"Certificate verification error for {host}:{port}: {e.reason}"
msg = f"Certificate verification error for {proto.upper()} {host}:{port}: {e.reason}"
logging.error(msg)
await alert_tanner(msg)
except ssl.SSLError as e:
msg = f"SSL error for {host}:{port}: {e}"
msg = f"SSL error for {proto.upper()} {host}:{port}: {e}"
logging.error(msg)
await alert_tanner(msg)
except (asyncio.TimeoutError, OSError) as e:
# Per instructions: log and move on for connection errors
logging.error(f"Connection error for {host}:{port}: {e}")
logging.error(f"Connection error for {proto.upper()} {host}:{port}: {e}")
except Exception as e:
# Catchall for other things
msg = f"An unexpected error occurred for {host}:{port}: {e}"
msg = f"An unexpected error occurred for {proto.upper()} {host}:{port}: {e}"
logging.error(msg)
await alert_tanner(msg)
@@ -89,11 +89,11 @@ async def main():
seen_serials = {proto: set() for proto in HOSTS}
tasks = []
for host in HOSTS['http']:
tasks.append(check_host_cert(host, 443, seen_serials['http']))
tasks.append(check_host_cert('http', host, 443, seen_serials['http']))
for host in HOSTS['mqtt']:
# standard port for MQTTS is 8883
tasks.append(check_host_cert(host, 8883, seen_serials['mqtt']))
tasks.append(check_host_cert('mqtt', host, 8883, seen_serials['mqtt']))
await asyncio.gather(*tasks)