90 lines
2.8 KiB
Python
90 lines
2.8 KiB
Python
import os, logging
|
|
DEBUG = os.environ.get('DEBUG')
|
|
logging.basicConfig(
|
|
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s - %(message)s',
|
|
level=logging.DEBUG if DEBUG else logging.INFO)
|
|
|
|
import asyncio
|
|
import aiohttp
|
|
import aiomqtt
|
|
import ssl
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
|
|
|
|
try:
|
|
with open('hosts.json', 'r') as f:
|
|
HOSTS = json.load(f)
|
|
except FileNotFoundError:
|
|
logging.error("hosts.json not found. Please copy hosts.json.example to hosts.json and configure it.")
|
|
exit(1)
|
|
|
|
|
|
def alert_tanner(msg):
|
|
pass
|
|
|
|
async def check_host_cert(host, port):
|
|
"check a single host's cert"
|
|
try:
|
|
# default context does hostname checking and certificate validation
|
|
ssl_context = ssl.create_default_context()
|
|
# open_connection does the TCP connection and TLS handshake
|
|
_reader, writer = await asyncio.wait_for(asyncio.open_connection(
|
|
host, port, ssl=ssl_context), timeout=10)
|
|
|
|
cert = writer.get_extra_info('peercert')
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
if not cert:
|
|
# this case should be rare if handshake succeeded
|
|
msg = f"Could not get certificate for {host}:{port}"
|
|
logging.warning(msg)
|
|
alert_tanner(msg)
|
|
return
|
|
|
|
expiry_date_str = cert['notAfter']
|
|
expiry_date = datetime.strptime(expiry_date_str, '%b %d %H:%M:%S %Y %Z')
|
|
|
|
time_left = expiry_date - datetime.utcnow()
|
|
|
|
if time_left < timedelta(days=7):
|
|
msg = f"Certificate for {host}:{port} expires in less than a week: {expiry_date}"
|
|
logging.warning(msg)
|
|
alert_tanner(msg)
|
|
else:
|
|
logging.info(f"Certificate for {host}:{port} is valid until {expiry_date} ({time_left.days} days left)")
|
|
|
|
except ssl.SSLCertVerificationError as e:
|
|
msg = f"Certificate verification error for {host}:{port}: {e.reason}"
|
|
logging.error(msg)
|
|
alert_tanner(msg)
|
|
except ssl.SSLError as e:
|
|
msg = f"SSL error for {host}:{port}: {e}"
|
|
logging.error(msg)
|
|
alert_tanner(msg)
|
|
except (asyncio.TimeoutError, OSError) as e:
|
|
# Per instructions: log and move on for connection errors
|
|
logging.error(f"Connection error for {host}:{port}: {e}")
|
|
except Exception as e:
|
|
# Catchall for other things
|
|
msg = f"An unexpected error occurred for {host}:{port}: {e}"
|
|
logging.error(msg)
|
|
alert_tanner(msg)
|
|
|
|
|
|
async def main():
|
|
tasks = []
|
|
for host in HOSTS['http']:
|
|
tasks.append(check_host_cert(host, 443))
|
|
|
|
for host in HOSTS['mqtt']:
|
|
# standard port for MQTTS is 8883
|
|
tasks.append(check_host_cert(host, 8883))
|
|
|
|
await asyncio.gather(*tasks)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|