#!/usr/bin/env python3 # /// script # requires-python = ">=3.10" # dependencies = [ # "requests", # "beautifulsoup4", # "cryptography", # "pyopenssl", # ] # /// """ PKI Health Checker ================== Discovers CA certificates from IIS directory-browsing sites, direct cert URLs, or live TLS servers. Builds the CA hierarchy by following AIA chains upward, validates CDP/AIA for each CA, and checks issuing CA CRLs. Usage: uv run pki_health.py [ ...] Input types: Directory: http://pki.kinda.se/ Cert URL: https://r10.i.lencr.org/ TLS server: https://letsencrypt.org Examples: uv run pki_health.py http://pki.imy.se/ uv run pki_health.py https://r10.i.lencr.org/ uv run pki_health.py https://www.google.com """ import ssl import socket import sys from datetime import datetime, timezone from urllib.parse import urljoin, unquote, urlparse import requests from bs4 import BeautifulSoup from cryptography import x509 from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec from cryptography.hazmat.primitives.serialization import Encoding from cryptography.exceptions import InvalidSignature from OpenSSL import SSL, crypto CERT_EXTENSIONS = ('.crt', '.cer', '.pem') CRL_EXTENSIONS = ('.crl',) TIMEOUT = 30 MAX_DEPTH = 5 # ─── Terminal ──────────────────────────────────────────────────────────────── class C: H = '\033[95m'; B = '\033[94m'; CN = '\033[96m' G = '\033[92m'; Y = '\033[93m'; R = '\033[91m' BD = '\033[1m'; DM = '\033[2m'; RS = '\033[0m' def header(t): print(f"\n{C.BD}{C.H}{'='*78}{C.RS}") print(f"{C.BD}{C.H} {t}{C.RS}") print(f"{C.BD}{C.H}{'='*78}{C.RS}") def section(t): print(f"\n{C.BD}{C.CN}── {t} {'─'*(72-len(t))}{C.RS}") # ─── Helpers ───────────────────────────────────────────────────────────────── def sha1(cert): return cert.fingerprint(hashes.SHA1()).hex(':') def get_cn(name): for attr in reversed(list(name)): if attr.oid == x509.oid.NameOID.COMMON_NAME: return attr.value for attr in name: return f"{attr.oid._name}={attr.value}" return "(unknown)" def ski_hex(obj): try: return obj.extensions.get_extension_for_oid( ExtensionOID.SUBJECT_KEY_IDENTIFIER).value.digest.hex() except x509.ExtensionNotFound: return None def aki_hex(obj): try: ext = obj.extensions.get_extension_for_oid( ExtensionOID.AUTHORITY_KEY_IDENTIFIER) return ext.value.key_identifier.hex() if ext.value.key_identifier else None except x509.ExtensionNotFound: return None def cdp_urls(cert): try: ext = cert.extensions.get_extension_for_oid( ExtensionOID.CRL_DISTRIBUTION_POINTS) urls = [] for dp in ext.value: if dp.full_name: for n in dp.full_name: if isinstance(n, x509.UniformResourceIdentifier): urls.append(n.value) return urls except x509.ExtensionNotFound: return [] def aia_ca_issuer_urls(cert): try: ext = cert.extensions.get_extension_for_oid( ExtensionOID.AUTHORITY_INFORMATION_ACCESS) urls = [] for d in ext.value: if isinstance(d.access_location, x509.UniformResourceIdentifier): if d.access_method == AuthorityInformationAccessOID.CA_ISSUERS: urls.append(d.access_location.value) return urls except x509.ExtensionNotFound: return [] def aia_ocsp_urls(cert): try: ext = cert.extensions.get_extension_for_oid( ExtensionOID.AUTHORITY_INFORMATION_ACCESS) urls = [] for d in ext.value: if isinstance(d.access_location, x509.UniformResourceIdentifier): if d.access_method == AuthorityInformationAccessOID.OCSP: urls.append(d.access_location.value) return urls except x509.ExtensionNotFound: return [] def is_ca(cert): try: return cert.extensions.get_extension_for_oid( ExtensionOID.BASIC_CONSTRAINTS).value.ca except x509.ExtensionNotFound: return False def is_self_signed(cert): return cert.issuer == cert.subject def short_ski(h): return h[:16] + '...' if h and len(h) > 16 else (h or '(none)') # ─── Network ──────────────────────────────────────────────────────────────── def make_session(): s = requests.Session() s.verify = False s.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36', 'Accept': '*/*', 'Accept-Encoding': 'identity', }) requests.packages.urllib3.disable_warnings() return s def fetch(url, session): resp = session.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.content def load_cert(data): for fn in [x509.load_der_x509_certificate, x509.load_pem_x509_certificate]: try: return fn(data) except Exception: pass return None def load_crl(data): for fn in [x509.load_der_x509_crl, x509.load_pem_x509_crl]: try: return fn(data) except Exception: pass return None # ─── Input Detection & Loading ─────────────────────────────────────────────── def scrape_directory(base_url, session, origin=None, depth=0, visited=None): """Recursively scrape IIS directory listing.""" if visited is None: visited = set() if not base_url.endswith('/'): base_url += '/' if base_url in visited or depth > MAX_DEPTH: return [], [] visited.add(base_url) if origin is None: origin = urlparse(base_url).netloc try: resp = session.get(base_url, timeout=TIMEOUT) resp.raise_for_status() except Exception: return [], [] soup = BeautifulSoup(resp.text, 'html.parser') certs, crls, dirs = [], [], [] for a in soup.find_all('a', href=True): href = a['href'] full = urljoin(base_url, href) low = unquote(full).lower() if urlparse(full).netloc != origin: continue if href in ('../', '..'): continue if full.rstrip('/') == base_url.rstrip('/'): continue if any(low.endswith(e) for e in CERT_EXTENSIONS): certs.append(full) elif any(low.endswith(e) for e in CRL_EXTENSIONS): crls.append(full) elif full.endswith('/') and full not in visited: dirs.append(full) for d in dirs: sc, sl = scrape_directory(d, session, origin, depth+1, visited) certs.extend(sc); crls.extend(sl) return certs, crls def get_tls_chain(hostname, port=443): """Connect to a TLS server and return the certificate chain as cryptography objects.""" import select ctx = SSL.Context(SSL.TLS_CLIENT_METHOD) ctx.set_verify(SSL.VERIFY_NONE, lambda *a: True) sock = socket.create_connection((hostname, port), timeout=TIMEOUT) conn = SSL.Connection(ctx, sock) conn.set_tlsext_host_name(hostname.encode()) conn.set_connect_state() # Retry handshake — needed when socket has timeout set while True: try: conn.do_handshake() break except SSL.WantReadError: select.select([sock], [], [], 5) except SSL.WantWriteError: select.select([], [sock], [], 5) certs = [] chain = conn.get_peer_cert_chain() if chain: for pyopenssl_cert in chain: der = crypto.dump_certificate(crypto.FILETYPE_ASN1, pyopenssl_cert) cert = x509.load_der_x509_certificate(der) certs.append(cert) else: peer = conn.get_peer_certificate() if peer: der = crypto.dump_certificate(crypto.FILETYPE_ASN1, peer) certs.append(x509.load_der_x509_certificate(der)) conn.shutdown() conn.close() sock.close() if not certs: raise Exception("No certificates received from server") return certs def walk_aia_chain(start_cert, session): """Follow AIA CA Issuer URLs upward to collect the full chain.""" collected = {} # thumbprint -> cert tp = sha1(start_cert) collected[tp] = start_cert current = start_cert seen = {tp} while True: if is_self_signed(current): break issuer_urls = aia_ca_issuer_urls(current) http_urls = [u for u in issuer_urls if u.lower().startswith('http')] if not http_urls: break found_parent = False for url in http_urls: try: data = fetch(url, session) parent = load_cert(data) if parent is not None: ptp = sha1(parent) if ptp not in seen: seen.add(ptp) collected[ptp] = parent current = parent found_parent = True break except Exception: continue if not found_parent: break return collected def detect_and_load(url, session): """ Detect input type and return (certs_dict, crl_urls, input_type). certs_dict: thumbprint -> (cert, source_url_or_description) """ parsed = urlparse(url) certs_dict = {} crl_urls = [] # ── Try 1: TLS server (https:// without obvious file extension) ────── if parsed.scheme == 'https': low_path = parsed.path.lower().rstrip('/') is_file = any(low_path.endswith(e) for e in CERT_EXTENSIONS + CRL_EXTENSIONS) if not is_file and (not low_path or low_path == '/'): # Looks like a server, try TLS handshake hostname = parsed.hostname port = parsed.port or 443 try: print(f" 🔌 TLS connecting to {hostname}:{port}...") chain = get_tls_chain(hostname, port) if chain: print(f" ✔ Got {len(chain)} cert(s) from TLS handshake") for cert in chain: tp = sha1(cert) source = f"TLS:{hostname}:{port}" certs_dict[tp] = (cert, source) # Walk AIA from the deepest cert we got for cert in chain: aia_certs = walk_aia_chain(cert, session) for tp2, cert2 in aia_certs.items(): if tp2 not in certs_dict: aia_url = aia_ca_issuer_urls(cert2) src = aia_url[0] if aia_url else "AIA chain" certs_dict[tp2] = (cert2, src) return certs_dict, crl_urls, "tls" except Exception as e: print(f" {C.Y}⚠ TLS failed ({e}), trying as URL...{C.RS}") # ── Try 2: Direct certificate URL ──────────────────────────────────── try: data = fetch(url, session) except Exception as e: print(f" {C.R}✘ Failed to fetch {url}: {e}{C.RS}") return certs_dict, crl_urls, "error" # Check if it's a certificate cert = load_cert(data) if cert is not None: print(f" 📜 Direct certificate: {get_cn(cert.subject)}") tp = sha1(cert) certs_dict[tp] = (cert, url) # Walk AIA upward print(f" 🔗 Walking AIA chain...") aia_certs = walk_aia_chain(cert, session) for tp2, cert2 in aia_certs.items(): if tp2 not in certs_dict: urls = aia_ca_issuer_urls(cert2) src = urls[0] if urls else "AIA" certs_dict[tp2] = (cert2, src) print(f" ↑ {get_cn(cert2.subject)}") return certs_dict, crl_urls, "cert" # ── Try 3: Directory listing ───────────────────────────────────────── # If we got HTML, assume it's a directory if data[:50].lstrip().lower().startswith((b'<', b' 7 else C.Y parts.append(f"{color}Valid ({days}d){C.RS}") crl_aki = aki_hex(crl) if crl_aki and issuer_ski: if crl_aki == issuer_ski: parts.append(f"{C.G}AKI match{C.RS}") else: parts.append(f"{C.R}AKI MISMATCH{C.RS}") has_issue = True if issuer_cert is not None: valid, msg = verify_crl_sig(crl, issuer_cert) if valid is True: parts.append(f"{C.G}{msg}{C.RS}") elif valid is False: parts.append(f"{C.R}{msg}{C.RS}") has_issue = True else: parts.append(f"{C.Y}{msg}{C.RS}") try: crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR) parts.append("Delta") except x509.ExtensionNotFound: pass parts.append(f"{sum(1 for _ in crl)} revoked") print(f"{p} {' | '.join(parts)}") if crl_aki and issuer_ski and crl_aki != issuer_ski: print(f"{p} {C.R}CRL AKI: {crl_aki}{C.RS}") print(f"{p} {C.R}Issuer SKI: {issuer_ski}{C.RS}") return has_issue # ─── AIA Check ─────────────────────────────────────────────────────────────── def check_aia(url, expected_tp, session, p): if not url.lower().startswith('http'): print(f"{p} {C.DM}LDAP — not checked{C.RS}") return False try: data = fetch(url, session) except Exception: print(f"{p} {C.R}✘ Unreachable{C.RS}") return True cert = load_cert(data) if cert is None: print(f"{p} {C.R}✘ Failed to parse{C.RS}") return True got_tp = sha1(cert) if expected_tp: if got_tp == expected_tp: print(f"{p} {C.G}✔ Matches issuer{C.RS} ({got_tp[:23]}...)") return False else: # Check if same key (renewed cert) got_ski = ski_hex(cert) # find expected cert's SKI from our data print(f"{p} {C.Y}⚠ Different thumbprint{C.RS}") print(f"{p} Got: {get_cn(cert.subject)} ({got_tp[:23]}...)") print(f"{p} Expected: {expected_tp[:23]}...") return True else: print(f"{p} Downloaded: {get_cn(cert.subject)} ({got_tp[:23]}...)") return False # ─── Display ───────────────────────────────────────────────────────────────── def show_cert(node, parent, session, p, issues): cert = node.cert now = datetime.now(timezone.utc) nb = cert.not_valid_before_utc na = cert.not_valid_after_utc days = (na - now).days expired = na < now vc = C.R if expired else (C.Y if days < 90 else C.G) vl = "EXPIRED" if expired else f"{days} days" print(f"{p}Thumbprint: {node.tp}") print(f"{p}SKI: {short_ski(node.ski)}") if node.aki and parent and parent.ski and node.aki != parent.ski: print(f"{p}AKI: {short_ski(node.aki)} " f"{C.R}✘ MISMATCH with parent SKI ({short_ski(parent.ski)}){C.RS}") issues.append(f"AKI mismatch: {node.name}") print(f"{p}Valid: {nb.strftime('%Y-%m-%d')} → {na.strftime('%Y-%m-%d')} " f"{vc}[{vl}]{C.RS}") if expired: issues.append(f"EXPIRED: {node.name}") if node.alternates: print(f"{p}{C.Y}Also (renewed, same key):{C.RS}") for ac, au, atp in node.alternates: ana = ac.not_valid_after_utc ad = (ana - now).days ae = ana < now ac2 = C.R if ae else (C.Y if ad < 90 else C.G) al = "EXPIRED" if ae else f"{ad} days" print(f"{p} {atp} {ac2}[{al}]{C.RS}") issuer_cert = parent.cert if parent else None issuer_ski = parent.ski if parent else None issuer_tp = parent.tp if parent else None # CDP cdps = cdp_urls(cert) if cdps: for url in cdps: print(f"{p}CDP:") print(f"{p} {url}") has = check_cdp(url, issuer_ski, issuer_cert, session, p) if has: issues.append(f"CDP issue: {node.name}") elif node.root: print(f"{p}CDP: (none — root)") else: print(f"{p}{C.Y}CDP: (none — missing!){C.RS}") issues.append(f"No CDP: {node.name}") # AIA ca_urls = aia_ca_issuer_urls(cert) if ca_urls: for url in ca_urls: print(f"{p}AIA:") print(f"{p} {url}") has = check_aia(url, issuer_tp, session, p) if has: issues.append(f"AIA issue: {node.name}") elif node.root: print(f"{p}AIA: (none — root)") else: print(f"{p}{C.Y}AIA: (none — missing!){C.RS}") issues.append(f"No AIA: {node.name}") # OCSP ocsp = aia_ocsp_urls(cert) for url in ocsp: print(f"{p}OCSP: {url}") def display_chain(root, num, session, issues): print(f"\n{C.BD}{C.CN}Chain {num}{C.RS}") print(f"{C.BD}{C.CN}{'─'*78}{C.RS}") print(f"\n{C.BD}{C.B}{root.name}{C.RS}") show_cert(root, None, session, " ", issues) for child in root.children: print(f"\n {C.BD}{C.B}→ {child.name}{C.RS}") show_cert(child, root, session, " ", issues) for gc in child.children: print(f"\n {C.BD}{C.B}→ → {gc.name}{C.RS}") show_cert(gc, child, session, " ", issues) # ─── Issuing CA CRLs ──────────────────────────────────────────────────────── def check_issuing_ca_crls(roots, all_crl_urls, session, issues): """Check CRLs on the site that are published by CAs in our tree (for leaf validation).""" # Collect all CA nodes ca_nodes = {} # ski -> node def collect(node): if node.ski: ca_nodes[node.ski] = node for ch in node.children: collect(ch) for r in roots: collect(r) # Collect all CDP URLs referenced by certs in the tree referenced_cdps = set() def collect_cdps(node): for u in cdp_urls(node.cert): referenced_cdps.add(unquote(u).lower()) for ch in node.children: collect_cdps(ch) for r in roots: collect_cdps(r) # Find CRLs on site that are issued by a known CA but NOT referenced as CDP issuing_crls = [] orphan_crls = [] for url in all_crl_urls: normalized = unquote(url).lower() if normalized in referenced_cdps: continue # Already checked inline with the cert try: data = fetch(url, session) crl = load_crl(data) if crl is None: orphan_crls.append((url, None, "Could not parse")) continue crl_aki = aki_hex(crl) if crl_aki and crl_aki in ca_nodes: issuing_crls.append((url, crl, ca_nodes[crl_aki])) else: # Try match by issuer name matched = False for ski, node in ca_nodes.items(): if crl.issuer == node.cert.subject: issuing_crls.append((url, crl, node)) matched = True break if not matched: orphan_crls.append((url, crl, None)) except Exception: orphan_crls.append((url, None, "Could not fetch")) # Display issuing CA CRLs if issuing_crls: header("Issuing CA CRLs (for leaf certificate validation)") for url, crl, node in issuing_crls: print(f"\n {C.BD}{node.name}{C.RS}") print(f" {url}") parts = [] has_issue = False now = datetime.now(timezone.utc) if crl.next_update_utc: if crl.next_update_utc < now: days = (now - crl.next_update_utc).days parts.append(f"{C.R}EXPIRED {days}d ago{C.RS}") has_issue = True else: days = (crl.next_update_utc - now).days color = C.G if days > 7 else C.Y parts.append(f"{color}Valid ({days}d){C.RS}") crl_aki = aki_hex(crl) if crl_aki and node.ski: if crl_aki == node.ski: parts.append(f"{C.G}AKI match{C.RS}") else: parts.append(f"{C.R}AKI MISMATCH{C.RS}") has_issue = True valid, msg = verify_crl_sig(crl, node.cert) if valid is True: parts.append(f"{C.G}{msg}{C.RS}") elif valid is False: parts.append(f"{C.R}{msg}{C.RS}") has_issue = True else: parts.append(f"{C.Y}{msg}{C.RS}") try: crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR) parts.append("Delta") except x509.ExtensionNotFound: pass parts.append(f"{sum(1 for _ in crl)} revoked") print(f" {' | '.join(parts)}") if has_issue: issues.append(f"Issuing CA CRL issue: {node.name}") # Display orphans if orphan_crls: header("Orphaned Files") print(f"\n {C.Y}⚠ {len(orphan_crls)} CRL(s) not matched to any CA:{C.RS}") for url, crl, reason in orphan_crls: name = unquote(url.split('/')[-1]) if crl is not None: now = datetime.now(timezone.utc) nu = crl.next_update_utc if nu and nu < now: exp = f"{C.R}EXPIRED{C.RS}" elif nu: exp = f"{C.G}{(nu-now).days}d{C.RS}" else: exp = "?" print(f" 📋 {name} ({get_cn(crl.issuer)}, {exp})") else: print(f" 📋 {name} ({reason})") elif all_crl_urls: if not issuing_crls: header("Orphaned Files") print(f"\n {C.G}✔{C.RS} No orphaned files") return issuing_crls, orphan_crls # ─── Main ──────────────────────────────────────────────────────────────────── def main(): if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} [ ...]") print(f"\nInput types:") print(f" Directory: http://pki.kinda.se/") print(f" Cert URL: https://r10.i.lencr.org/") print(f" TLS server: https://www.google.com") sys.exit(1) session = make_session() all_certs = {} # thumbprint -> (cert, source) all_crl_urls = [] # ── Load from all inputs ───────────────────────────────────────────── for url in sys.argv[1:]: header(f"Input: {url}") certs, crls, input_type = detect_and_load(url, session) for tp, (cert, src) in certs.items(): if tp not in all_certs: all_certs[tp] = (cert, src) all_crl_urls.extend(crls) all_crl_urls = list(dict.fromkeys(all_crl_urls)) # Count CA certs ca_count = sum(1 for tp, (c, u) in all_certs.items() if is_ca(c)) total = len(all_certs) non_ca = total - ca_count print(f"\n Loaded: {ca_count} CA cert(s)", end="") if non_ca: print(f", {non_ca} non-CA cert(s) (skipped)") else: print() # ── Build tree ─────────────────────────────────────────────────────── roots, nodes, ski_primary = build_tree(all_certs) if not roots: print(f"\n {C.R}✘ Could not build CA hierarchy{C.RS}") sys.exit(1) # ── Display chains ─────────────────────────────────────────────────── header("PKI Health Check") all_issues = [] for i, root in enumerate(roots, 1): display_chain(root, i, session, all_issues) # ── Issuing CA CRLs & Orphans ──────────────────────────────────────── if all_crl_urls: check_issuing_ca_crls(roots, all_crl_urls, session, all_issues) # ── Summary ────────────────────────────────────────────────────────── header("Summary") print(f" Chains: {len(roots)} | CA certs: {ca_count} | " f"Site CRLs: {len(all_crl_urls)}") if not all_issues: print(f"\n {C.G}{C.BD}✔ All checks passed{C.RS}") else: print(f"\n {C.R}{C.BD}✘ {len(all_issues)} issue(s):{C.RS}") seen = set() for issue in all_issues: if issue not in seen: seen.add(issue) print(f" {C.R}• {issue}{C.RS}") if __name__ == '__main__': main()