#!/usr/bin/env python3 # /// script # requires-python = ">=3.10" # dependencies = [ # "requests", # "beautifulsoup4", # "cryptography", # ] # /// """ PKI Health Checker ================== Discovers CA certificates and CRLs from IIS directory-browsing sites, builds the CA hierarchy, and validates CDP/AIA for each CA certificate. Usage: uv run pki_health.py [ ...] Examples: uv run pki_health.py http://pki.matas.dk/aia http://pki.matas.dk/cdp uv run pki_health.py http://pki.imy.se/ """ import sys from datetime import datetime, timezone from urllib.parse import urljoin, unquote, urlparse import requests from bs4 import BeautifulSoup from cryptography import x509 from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec from cryptography.exceptions import InvalidSignature CERT_EXTENSIONS = ('.crt', '.cer', '.pem') CRL_EXTENSIONS = ('.crl',) TIMEOUT = 30 MAX_DEPTH = 5 # ─── Terminal ──────────────────────────────────────────────────────────────── class C: H = '\033[95m'; B = '\033[94m'; CN = '\033[96m' G = '\033[92m'; Y = '\033[93m'; R = '\033[91m' BD = '\033[1m'; DM = '\033[2m'; RS = '\033[0m' def header(t): print(f"\n{C.BD}{C.H}{'='*78}{C.RS}") print(f"{C.BD}{C.H} {t}{C.RS}") print(f"{C.BD}{C.H}{'='*78}{C.RS}") # ─── Helpers ───────────────────────────────────────────────────────────────── def sha1(cert): return cert.fingerprint(hashes.SHA1()).hex(':') def get_cn(name): for attr in reversed(list(name)): if attr.oid == x509.oid.NameOID.COMMON_NAME: return attr.value for attr in name: return f"{attr.oid._name}={attr.value}" return "(unknown)" def ski_hex(obj): try: return obj.extensions.get_extension_for_oid( ExtensionOID.SUBJECT_KEY_IDENTIFIER).value.digest.hex() except x509.ExtensionNotFound: return None def aki_hex(obj): try: ext = obj.extensions.get_extension_for_oid( ExtensionOID.AUTHORITY_KEY_IDENTIFIER) return ext.value.key_identifier.hex() if ext.value.key_identifier else None except x509.ExtensionNotFound: return None def cdp_urls(cert): try: ext = cert.extensions.get_extension_for_oid( ExtensionOID.CRL_DISTRIBUTION_POINTS) urls = [] for dp in ext.value: if dp.full_name: for n in dp.full_name: if isinstance(n, x509.UniformResourceIdentifier): urls.append(n.value) return urls except x509.ExtensionNotFound: return [] def aia_urls(cert): try: ext = cert.extensions.get_extension_for_oid( ExtensionOID.AUTHORITY_INFORMATION_ACCESS) ca, ocsp = [], [] for d in ext.value: if isinstance(d.access_location, x509.UniformResourceIdentifier): if d.access_method == AuthorityInformationAccessOID.CA_ISSUERS: ca.append(d.access_location.value) elif d.access_method == AuthorityInformationAccessOID.OCSP: ocsp.append(d.access_location.value) return ca, ocsp except x509.ExtensionNotFound: return [], [] def is_ca(cert): try: return cert.extensions.get_extension_for_oid( ExtensionOID.BASIC_CONSTRAINTS).value.ca except x509.ExtensionNotFound: return False def is_self_signed(cert): return cert.issuer == cert.subject def short_id(h): return h[:16] + '...' if h and len(h) > 16 else (h or '(none)') # ─── Network ──────────────────────────────────────────────────────────────── def make_session(): s = requests.Session() s.verify = False s.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36', 'Accept': '*/*', 'Accept-Encoding': 'identity', }) requests.packages.urllib3.disable_warnings() return s def dl(url, session): resp = session.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.content def scrape(base_url, session, origin=None, depth=0, visited=None): if visited is None: visited = set() if not base_url.endswith('/'): base_url += '/' if base_url in visited or depth > MAX_DEPTH: return [], [] visited.add(base_url) if origin is None: origin = urlparse(base_url).netloc try: resp = session.get(base_url, timeout=TIMEOUT) resp.raise_for_status() except Exception: return [], [] soup = BeautifulSoup(resp.text, 'html.parser') certs, crls, dirs = [], [], [] for a in soup.find_all('a', href=True): href = a['href'] full = urljoin(base_url, href) low = unquote(full).lower() if urlparse(full).netloc != origin: continue if href in ('../', '..'): continue if full.rstrip('/') == base_url.rstrip('/'): continue if any(low.endswith(e) for e in CERT_EXTENSIONS): certs.append(full) elif any(low.endswith(e) for e in CRL_EXTENSIONS): crls.append(full) elif full.endswith('/') and full not in visited: dirs.append(full) for d in dirs: sc, sl = scrape(d, session, origin, depth+1, visited) certs.extend(sc); crls.extend(sl) return certs, crls def load_cert(data): for fn in [x509.load_der_x509_certificate, x509.load_pem_x509_certificate]: try: return fn(data) except Exception: pass return None def load_crl(data): for fn in [x509.load_der_x509_crl, x509.load_pem_x509_crl]: try: return fn(data) except Exception: pass return None # ─── CRL Signature ────────────────────────────────────────────────────────── def verify_crl_sig(crl, issuer_cert): try: pub = issuer_cert.public_key() if isinstance(pub, rsa.RSAPublicKey): pub.verify(crl.signature, crl.tbs_certlist_bytes, padding.PKCS1v15(), crl.signature_hash_algorithm) elif isinstance(pub, ec.EllipticCurvePublicKey): from cryptography.hazmat.primitives.asymmetric import ec as ecm pub.verify(crl.signature, crl.tbs_certlist_bytes, ecm.ECDSA(crl.signature_hash_algorithm)) else: return None, "Unsupported key type" return True, "Signature OK" except InvalidSignature: return False, "Signature INVALID" except Exception as e: return None, f"Verify error: {e}" # ─── Tree Builder ──────────────────────────────────────────────────────────── class CANode: def __init__(self, cert, url): self.cert = cert self.url = url self.tp = sha1(cert) self.name = get_cn(cert.subject) self.ski = ski_hex(cert) self.aki = aki_hex(cert) self.root = is_self_signed(cert) self.children = [] self.alternates = [] # (cert, url, thumbprint) — renewed same-key certs def build_tree(certs_dict): by_ski = {} for tp, (cert, url) in certs_dict.items(): ski = ski_hex(cert) if ski: by_ski.setdefault(ski, []).append((tp, cert, url)) nodes = {} ski_primary = {} for ski, group in by_ski.items(): group.sort(key=lambda x: x[1].not_valid_after_utc, reverse=True) tp, cert, url = group[0] node = CANode(cert, url) for atp, acert, aurl in group[1:]: node.alternates.append((acert, aurl, sha1(acert))) nodes[tp] = node ski_primary[ski] = tp roots = [] for tp, node in nodes.items(): if node.root: roots.append(node) elif node.aki and node.aki in ski_primary: ptk = ski_primary[node.aki] if ptk in nodes and ptk != tp: nodes[ptk].children.append(node) else: roots.append(node) else: roots.append(node) return roots # ─── CDP Check ─────────────────────────────────────────────────────────────── def check_cdp(url, issuer_ski, issuer_cert, session, p): if not url.lower().startswith('http'): print(f"{p} {C.DM}LDAP — not checked{C.RS}") return False try: data = dl(url, session) except Exception as e: print(f"{p} {C.R}✘ Unreachable{C.RS}") return True crl = load_crl(data) if crl is None: print(f"{p} {C.R}✘ Failed to parse{C.RS}") return True parts = [] has_issue = False now = datetime.now(timezone.utc) # Expiry if crl.next_update_utc: if crl.next_update_utc < now: days = (now - crl.next_update_utc).days parts.append(f"{C.R}EXPIRED {days}d ago{C.RS}") has_issue = True else: days = (crl.next_update_utc - now).days color = C.G if days > 7 else C.Y parts.append(f"{color}Valid ({days}d){C.RS}") # AKI match crl_aki = aki_hex(crl) if crl_aki and issuer_ski: if crl_aki == issuer_ski: parts.append(f"{C.G}AKI match{C.RS}") else: parts.append(f"{C.R}AKI MISMATCH{C.RS}") has_issue = True # Signature if issuer_cert is not None: valid, msg = verify_crl_sig(crl, issuer_cert) if valid is True: parts.append(f"{C.G}Sig OK{C.RS}") elif valid is False: parts.append(f"{C.R}Sig INVALID{C.RS}") has_issue = True else: parts.append(f"{C.Y}{msg}{C.RS}") # Delta try: crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR) parts.append("Delta") except x509.ExtensionNotFound: pass # Revoked count parts.append(f"{sum(1 for _ in crl)} revoked") print(f"{p} {' | '.join(parts)}") # Show details on mismatch if crl_aki and issuer_ski and crl_aki != issuer_ski: print(f"{p} {C.R}CRL AKI: {crl_aki}{C.RS}") print(f"{p} {C.R}Issuer SKI: {issuer_ski}{C.RS}") return has_issue # ─── AIA Check ─────────────────────────────────────────────────────────────── def check_aia(url, expected_tp, session, p): if not url.lower().startswith('http'): print(f"{p} {C.DM}LDAP — not checked{C.RS}") return False try: data = dl(url, session) except Exception: print(f"{p} {C.R}✘ Unreachable{C.RS}") return True cert = load_cert(data) if cert is None: print(f"{p} {C.R}✘ Failed to parse{C.RS}") return True got_tp = sha1(cert) if expected_tp: if got_tp == expected_tp: print(f"{p} {C.G}✔ Matches issuer{C.RS} ({got_tp[:23]}...)") return False else: print(f"{p} {C.R}✘ MISMATCH{C.RS}") print(f"{p} Got: {get_cn(cert.subject)} ({got_tp[:23]}...)") print(f"{p} Expected: {expected_tp[:23]}...") return True else: print(f"{p} Downloaded: {get_cn(cert.subject)} ({got_tp[:23]}...)") return False # ─── Display a CA Node ─────────────────────────────────────────────────────── def show_cert_block(node, parent, session, p, issues): """Show one CA certificate block with CDP/AIA checks.""" cert = node.cert now = datetime.now(timezone.utc) nb = cert.not_valid_before_utc na = cert.not_valid_after_utc days = (na - now).days expired = na < now vc = C.R if expired else (C.Y if days < 90 else C.G) vl = "EXPIRED" if expired else f"{days} days" print(f"{p} Thumbprint: {node.tp}") print(f"{p} SKI: {short_id(node.ski)}") if node.aki: match = "" if parent and parent.ski: if node.aki == parent.ski: match = f" {C.G}✔ matches parent{C.RS}" else: match = f" {C.R}✘ MISMATCH with parent SKI!{C.RS}" issues.append(f"AKI mismatch: {node.name}") print(f"{p} AKI: {short_id(node.aki)}{match}") print(f"{p} Valid: {nb.strftime('%Y-%m-%d')} → {na.strftime('%Y-%m-%d')} {vc}[{vl}]{C.RS}") if expired: issues.append(f"EXPIRED: {node.name}") # Alternates if node.alternates: print(f"{p} {C.Y}Also (renewed, same key):{C.RS}") for ac, au, atp in node.alternates: ana = ac.not_valid_after_utc ad = (ana - now).days ae = ana < now ac2 = C.R if ae else (C.Y if ad < 90 else C.G) al = "EXPIRED" if ae else f"{ad} days" print(f"{p} {atp} {ac2}[{al}]{C.RS}") # Issuer info for checks issuer_cert = parent.cert if parent else None issuer_ski = parent.ski if parent else None issuer_tp = parent.tp if parent else None # CDP cdps = cdp_urls(cert) if cdps: for url in cdps: print(f"{p} CDP: {url}") has_issue = check_cdp(url, issuer_ski, issuer_cert, session, p) if has_issue: issues.append(f"CDP issue: {node.name} → {url}") elif node.root: print(f"{p} CDP: (none — root)") else: print(f"{p} {C.Y}CDP: (none — missing!){C.RS}") issues.append(f"No CDP: {node.name}") # AIA ca_issuers, ocsp = aia_urls(cert) if ca_issuers: for url in ca_issuers: print(f"{p} AIA: {url}") has_issue = check_aia(url, issuer_tp, session, p) if has_issue: issues.append(f"AIA issue: {node.name} → {url}") elif node.root: print(f"{p} AIA: (none — root)") else: print(f"{p} {C.Y}AIA: (none — missing!){C.RS}") issues.append(f"No AIA: {node.name}") if ocsp: for url in ocsp: print(f"{p} OCSP: {url}") # ─── Display Chain ─────────────────────────────────────────────────────────── def display_chain(root, chain_num, session, issues): """Display a full chain starting from root.""" print(f"\n{C.BD}{C.CN}Chain {chain_num}{C.RS}") print(f"{C.BD}{C.CN}{'─'*78}{C.RS}") # Root print(f"\n{C.BD}{C.B}{root.name}{C.RS}") show_cert_block(root, None, session, "", issues) # Children for child in root.children: print(f"\n {C.BD}{C.B}→ {child.name}{C.RS}") show_cert_block(child, root, session, " ", issues) # Grandchildren (3-tier) for gc in child.children: print(f"\n {C.BD}{C.B}→ → {gc.name}{C.RS}") show_cert_block(gc, child, session, " ", issues) # ─── Orphan Detection ─────────────────────────────────────────────────────── def collect_referenced(roots): urls = set() def walk(node): for u in cdp_urls(node.cert): urls.add(unquote(u).lower()) ca, _ = aia_urls(node.cert) for u in ca: urls.add(unquote(u).lower()) for ch in node.children: walk(ch) for r in roots: walk(r) return urls def collect_tree_sources(roots): srcs = set() def walk(node): srcs.add(node.url) for _, u, _ in node.alternates: srcs.add(u) for ch in node.children: walk(ch) for r in roots: walk(r) return srcs # ─── Main ──────────────────────────────────────────────────────────────────── def main(): if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} [ ...]") print(f"Example: {sys.argv[0]} http://pki.imy.se/") sys.exit(1) session = make_session() all_cert_urls, all_crl_urls = [], [] # ── Discover ───────────────────────────────────────────────────────── for base_url in sys.argv[1:]: header(f"Scanning: {base_url}") cu, cl = scrape(base_url, session) for u in cu: print(f" 📜 {unquote(u.split('/')[-1])}") for u in cl: print(f" 📋 {unquote(u.split('/')[-1])}") all_cert_urls.extend(cu) all_crl_urls.extend(cl) all_cert_urls = list(dict.fromkeys(all_cert_urls)) all_crl_urls = list(dict.fromkeys(all_crl_urls)) print(f"\n Found {len(all_cert_urls)} cert(s), {len(all_crl_urls)} CRL(s) on site") # ── Load CA certs ──────────────────────────────────────────────────── header("Loading CA Certificates") certs = {} for url in all_cert_urls: try: data = dl(url, session) cert = load_cert(data) if cert is not None and is_ca(cert): tp = sha1(cert) if tp not in certs: certs[tp] = (cert, url) print(f" {C.G}✔{C.RS} {get_cn(cert.subject)}") except Exception as e: print(f" {C.R}✘{C.RS} {url} — {e}") # Follow AIA to get parent certs we might not have for tp, (cert, url) in list(certs.items()): ca_issuers, _ = aia_urls(cert) for aia_url in ca_issuers: if not aia_url.lower().startswith('http'): continue try: data = dl(aia_url, session) c = load_cert(data) if c is not None and is_ca(c): t = sha1(c) if t not in certs: certs[t] = (c, aia_url) print(f" {C.G}✔{C.RS} {get_cn(c.subject)} (via AIA)") except Exception: pass print(f"\n {len(certs)} CA certificate(s) loaded") # ── Build and display ──────────────────────────────────────────────── roots = build_tree(certs) if not roots: print(f"\n {C.R}✘ Could not build CA hierarchy{C.RS}") sys.exit(1) header("PKI Health Check") all_issues = [] for i, root in enumerate(roots, 1): display_chain(root, i, session, all_issues) # ── Orphans ────────────────────────────────────────────────────────── header("Orphaned Files") referenced = collect_referenced(roots) tree_srcs = collect_tree_sources(roots) orphan_crls = [u for u in all_crl_urls if unquote(u).lower() not in referenced] orphan_certs = [u for u in all_cert_urls if u not in tree_srcs] if orphan_crls: print(f"\n {C.Y}⚠{C.RS} {len(orphan_crls)} CRL(s) on site not referenced by any cert CDP:") for url in orphan_crls: name = unquote(url.split('/')[-1]) try: data = dl(url, session) crl = load_crl(data) if crl is not None: now = datetime.now(timezone.utc) nu = crl.next_update_utc if nu and nu < now: exp = f"{C.R}EXPIRED{C.RS}" elif nu: exp = f"{C.G}{(nu-now).days}d{C.RS}" else: exp = "?" print(f" 📋 {name} (issuer: {get_cn(crl.issuer)}, {exp})") else: print(f" 📋 {name} (could not parse)") except Exception: print(f" 📋 {name} (could not fetch)") else: print(f"\n {C.G}✔{C.RS} No orphaned CRLs") if orphan_certs: print(f"\n {C.Y}⚠{C.RS} {len(orphan_certs)} cert(s) on site not in hierarchy:") for url in orphan_certs: print(f" 📜 {unquote(url.split('/')[-1])}") else: print(f" {C.G}✔{C.RS} No orphaned certificates") # ── Summary ────────────────────────────────────────────────────────── header("Summary") print(f" Chains: {len(roots)} | CA certs: {len(certs)} | " f"Site CRLs: {len(all_crl_urls)}") if not all_issues: print(f"\n {C.G}{C.BD}✔ All checks passed{C.RS}") else: print(f"\n {C.R}{C.BD}✘ {len(all_issues)} issue(s):{C.RS}") for issue in all_issues: print(f" {C.R}• {issue}{C.RS}") if __name__ == '__main__': main()