import os, logging DEBUG = os.environ.get('DEBUG') logging.basicConfig( format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s', level=logging.DEBUG if DEBUG else logging.INFO) import asyncio import aiohttp import aiomqtt import ssl from datetime import datetime, timedelta HOSTS = { 'http': [ 'tanner.vc', 'forum.protospace.ca', 'protospace.ca', 'my.protospace.ca', ], 'mqtt': [ 'webhost.protospace.ca', ], } def alert_tanner(msg): pass async def check_host_cert(host, port): "check a single host's cert" try: # default context does hostname checking and certificate validation ssl_context = ssl.create_default_context() # open_connection does the TCP connection and TLS handshake _reader, writer = await asyncio.wait_for(asyncio.open_connection( host, port, ssl=ssl_context), timeout=10) cert = writer.get_extra_info('peercert') writer.close() await writer.wait_closed() if not cert: # this case should be rare if handshake succeeded msg = f"Could not get certificate for {host}:{port}" logging.warning(msg) alert_tanner(msg) return expiry_date_str = cert['notAfter'] expiry_date = datetime.strptime(expiry_date_str, '%b %d %H:%M:%S %Y %Z') time_left = expiry_date - datetime.utcnow() if time_left < timedelta(days=7): msg = f"Certificate for {host}:{port} expires in less than a week: {expiry_date}" logging.warning(msg) alert_tanner(msg) else: logging.info(f"Certificate for {host}:{port} is valid until {expiry_date} ({time_left.days} days left)") except ssl.SSLCertVerificationError as e: msg = f"Certificate verification error for {host}:{port}: {e.reason}" logging.error(msg) alert_tanner(msg) except ssl.SSLError as e: msg = f"SSL error for {host}:{port}: {e}" logging.error(msg) alert_tanner(msg) except (asyncio.TimeoutError, OSError) as e: # Per instructions: log and move on for connection errors logging.error(f"Connection error for {host}:{port}: {e}") except Exception as e: # Catchall for other things msg = f"An unexpected error occurred for {host}:{port}: {e}" logging.error(msg) alert_tanner(msg) async def main(): tasks = [] for host in HOSTS['http']: tasks.append(check_host_cert(host, 443)) for host in HOSTS['mqtt']: # standard port for MQTTS is 8883 tasks.append(check_host_cert(host, 8883)) await asyncio.gather(*tasks) if __name__ == '__main__': asyncio.run(main())