#!/usr/bin/env python3 # /// script # requires-python = ">=3.10" # dependencies = [ # "requests", # "beautifulsoup4", # "cryptography", # ] # /// """ PKI Site Analyzer v8 ==================== Scrapes IIS PKI sites recursively, builds certificate chains, fetches and validates CRLs referenced from each certificate's CDP, and flags orphaned CRLs that may need cleanup. Usage: uv run pki_analyzer.py [ ...] Examples: uv run pki_analyzer.py http://pki.matas.dk/aia http://pki.matas.dk/cdp uv run pki_analyzer.py http://pki.imy.se/ """ import sys from datetime import datetime, timezone from urllib.parse import urljoin, unquote, urlparse from collections import defaultdict import requests from bs4 import BeautifulSoup from cryptography import x509 from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec from cryptography.exceptions import InvalidSignature CERT_EXTENSIONS = ('.crt', '.cer', '.pem') CRL_EXTENSIONS = ('.crl',) TIMEOUT = 30 MAX_DEPTH = 5 DEBUG_SCRAPE = False class C: HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' DIM = '\033[2m' RESET = '\033[0m' def print_header(text): print(f"\n{C.BOLD}{C.HEADER}{'='*80}{C.RESET}") print(f"{C.BOLD}{C.HEADER} {text}{C.RESET}") print(f"{C.BOLD}{C.HEADER}{'='*80}{C.RESET}") def print_section(text): print(f"\n{C.BOLD}{C.CYAN}── {text} {'─'*(74-len(text))}{C.RESET}") def print_ok(text): print(f" {C.GREEN}✔{C.RESET} {text}") def print_warn(text): print(f" {C.YELLOW}⚠{C.RESET} {text}") def print_err(text): print(f" {C.RED}✘{C.RESET} {text}") def print_info(label, value): print(f" {C.BOLD}{label:.<40}{C.RESET} {value}") def print_debug(text): if DEBUG_SCRAPE: print(f" {C.DIM}DBG: {text}{C.RESET}") def sha1_fingerprint(cert): return cert.fingerprint(hashes.SHA1()).hex(':') def subject_str(name): parts = [] for attr in name: parts.append(f"{attr.oid._name}={attr.value}") return ', '.join(parts) if parts else '(empty)' # ─── Scraping ──────────────────────────────────────────────────────────────── def scrape_recursive(base_url, session, origin_host=None, depth=0, visited=None): if visited is None: visited = set() if not base_url.endswith('/'): base_url += '/' if base_url in visited or depth > MAX_DEPTH: return [], [] visited.add(base_url) if origin_host is None: origin_host = urlparse(base_url).netloc indent = " " * depth print(f"{indent} 🔍 Scanning: {base_url}") try: resp = session.get(base_url, timeout=TIMEOUT) resp.raise_for_status() except Exception as e: print_warn(f"{indent} Could not fetch {base_url}: {e}") return [], [] soup = BeautifulSoup(resp.text, 'html.parser') cert_urls, crl_urls, subdirs = [], [], [] for a_tag in soup.find_all('a', href=True): href = a_tag['href'] full_url = urljoin(base_url, href) decoded_url = unquote(full_url).lower() decoded_href = unquote(href) print_debug(f" href='{href}' → '{full_url}'") if urlparse(full_url).netloc != origin_host: continue if href in ('../', '..'): continue if full_url.rstrip('/') == base_url.rstrip('/'): continue if any(decoded_url.endswith(ext) for ext in CERT_EXTENSIONS): cert_urls.append(full_url) print(f"{indent} 📜 {decoded_href}") elif any(decoded_url.endswith(ext) for ext in CRL_EXTENSIONS): crl_urls.append(full_url) print(f"{indent} 📋 {decoded_href}") elif full_url.endswith('/') and full_url not in visited: subdirs.append(full_url) print(f"{indent} 📁 {decoded_href}") for subdir in subdirs: sc, sl = scrape_recursive(subdir, session, origin_host, depth + 1, visited) cert_urls.extend(sc) crl_urls.extend(sl) return cert_urls, crl_urls def download_file(url, session): resp = session.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.content # ─── Parsing ───────────────────────────────────────────────────────────────── def load_certificate(data, url=""): for loader in [x509.load_der_x509_certificate, x509.load_pem_x509_certificate]: try: return loader(data) except Exception: pass return None def load_crl(data, url=""): for loader in [x509.load_der_x509_crl, x509.load_pem_x509_crl]: try: return loader(data) except Exception: pass return None # ─── Extensions ────────────────────────────────────────────────────────────── def get_extension_value(obj, oid): try: return obj.extensions.get_extension_for_oid(oid).value except x509.ExtensionNotFound: return None def get_cdp_urls(cert): cdp = get_extension_value(cert, ExtensionOID.CRL_DISTRIBUTION_POINTS) urls = [] if cdp: for dp in cdp: if dp.full_name: for n in dp.full_name: if isinstance(n, x509.UniformResourceIdentifier): urls.append(n.value) return urls def get_aia_info(cert): aia = get_extension_value(cert, ExtensionOID.AUTHORITY_INFORMATION_ACCESS) ocsp, ca = [], [] if aia: for d in aia: if isinstance(d.access_location, x509.UniformResourceIdentifier): if d.access_method == AuthorityInformationAccessOID.OCSP: ocsp.append(d.access_location.value) elif d.access_method == AuthorityInformationAccessOID.CA_ISSUERS: ca.append(d.access_location.value) return ca, ocsp def get_subject_key_id(cert): ski = get_extension_value(cert, ExtensionOID.SUBJECT_KEY_IDENTIFIER) return ski.digest.hex() if ski else None def get_authority_key_id(obj): aki = get_extension_value(obj, ExtensionOID.AUTHORITY_KEY_IDENTIFIER) return aki.key_identifier.hex() if aki and aki.key_identifier else None def is_self_signed(cert): return cert.issuer == cert.subject def is_ca(cert): bc = get_extension_value(cert, ExtensionOID.BASIC_CONSTRAINTS) return bc.ca if bc else False def get_key_usage(cert): ku = get_extension_value(cert, ExtensionOID.KEY_USAGE) if not ku: return None usages = [] for attr in ['digital_signature', 'key_encipherment', 'key_cert_sign', 'crl_sign', 'content_commitment', 'data_encipherment', 'key_agreement']: try: if getattr(ku, attr): usages.append(attr) except Exception: pass return ', '.join(usages) def get_eku(cert): eku = get_extension_value(cert, ExtensionOID.EXTENDED_KEY_USAGE) return ', '.join(u.dotted_string for u in eku) if eku else None def get_san(cert): san = get_extension_value(cert, ExtensionOID.SUBJECT_ALTERNATIVE_NAME) return ', '.join(str(n.value) for n in san) if san else None # ─── Chain Building ────────────────────────────────────────────────────────── def build_chains(certs_dict): ski_index = {} for fp, (cert, url) in certs_dict.items(): ski = get_subject_key_id(cert) if ski: if ski in ski_index: existing, _ = certs_dict[ski_index[ski]] if cert.not_valid_after_utc > existing.not_valid_after_utc: ski_index[ski] = fp else: ski_index[ski] = fp parent_of = {} for fp, (cert, url) in certs_dict.items(): if is_self_signed(cert): continue aki = get_authority_key_id(cert) if aki and aki in ski_index and ski_index[aki] != fp: parent_of[fp] = ski_index[aki] roots = [fp for fp, (c, _) in certs_dict.items() if is_self_signed(c)] children_of = defaultdict(list) for child, par in parent_of.items(): children_of[par].append(child) chains = [] def walk(fp, chain): chain.append(fp) kids = children_of.get(fp, []) if not kids: chains.append(list(chain)) else: for k in kids: walk(k, chain) chain.pop() for r in roots: walk(r, []) seen = set() for c in chains: seen.update(c) for fp in certs_dict: if fp not in seen: chains.append([fp]) return chains # ─── CRL Operations ───────────────────────────────────────────────────────── def verify_crl_signature(crl, issuer_cert): try: pub = issuer_cert.public_key() if isinstance(pub, rsa.RSAPublicKey): pub.verify(crl.signature, crl.tbs_certlist_bytes, padding.PKCS1v15(), crl.signature_hash_algorithm) elif isinstance(pub, ec.EllipticCurvePublicKey): from cryptography.hazmat.primitives.asymmetric import ec as ec_mod pub.verify(crl.signature, crl.tbs_certlist_bytes, ec_mod.ECDSA(crl.signature_hash_algorithm)) else: return None, "Unsupported key type" return True, "Signature valid" except InvalidSignature: return False, "Signature INVALID" except Exception as e: return None, f"Verification error: {e}" def fetch_and_validate_crl(cdp_url, issuer_cert, session): """Fetch a CRL from a CDP URL, validate it, and return results dict.""" result = { 'url': cdp_url, 'reachable': False, 'parseable': False, 'issuer': None, 'last_update': None, 'next_update': None, 'expired': None, 'is_delta': False, 'sig_valid': None, 'sig_msg': None, 'revoked_count': 0, 'error': None, 'crl': None, } # Skip non-HTTP if not cdp_url.lower().startswith('http'): result['error'] = f"Non-HTTP CDP (skipped): {cdp_url}" return result # Download try: data = download_file(cdp_url, session) result['reachable'] = True except Exception as e: result['error'] = f"Download failed: {e}" return result # Parse crl = load_crl(data, cdp_url) if crl is None: result['error'] = f"Parse failed ({len(data)} bytes)" return result result['parseable'] = True result['crl'] = crl result['issuer'] = subject_str(crl.issuer) result['last_update'] = crl.last_update_utc result['next_update'] = crl.next_update_utc result['revoked_count'] = len(list(crl)) now = datetime.now(timezone.utc) if crl.next_update_utc: result['expired'] = crl.next_update_utc < now # Delta check try: crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR) result['is_delta'] = True except x509.ExtensionNotFound: pass # Signature verification if issuer_cert is not None: valid, msg = verify_crl_signature(crl, issuer_cert) result['sig_valid'] = valid result['sig_msg'] = msg else: result['sig_msg'] = "No issuer cert available" return result def is_cert_revoked(cert, crl): """Check if a certificate's serial is on the CRL.""" serial = cert.serial_number for revoked in crl: if revoked.serial_number == serial: return True return False # ─── Display ───────────────────────────────────────────────────────────────── def display_cert(cert, url, indent=0): p = " " * indent now = datetime.now(timezone.utc) nb = cert.not_valid_before_utc na = cert.not_valid_after_utc exp = na < now dl = (na - now).days print(f"{p}{C.BOLD}Subject:{C.RESET} {subject_str(cert.subject)}") print(f"{p}{C.BOLD}Issuer:{C.RESET} {subject_str(cert.issuer)}") print(f"{p}{C.BOLD}Serial:{C.RESET} {format(cert.serial_number, 'x')}") print(f"{p}{C.BOLD}Not Before:{C.RESET} {nb}") print(f"{p}{C.BOLD}Not After:{C.RESET} {na}", end="") if exp: print(f" {C.RED}[EXPIRED]{C.RESET}") elif dl < 90: print(f" {C.YELLOW}[{dl} days remaining]{C.RESET}") else: print(f" {C.GREEN}[{dl} days remaining]{C.RESET}") if nb > now: print(f"{p} {C.RED}[NOT YET VALID]{C.RESET}") print(f"{p}{C.BOLD}SHA1:{C.RESET} {sha1_fingerprint(cert)}") print(f"{p}{C.BOLD}Self-signed:{C.RESET} {'Yes' if is_self_signed(cert) else 'No'}") print(f"{p}{C.BOLD}CA:{C.RESET} {'Yes' if is_ca(cert) else 'No'}") ku = get_key_usage(cert) if ku: print(f"{p}{C.BOLD}Key Usage:{C.RESET} {ku}") eku = get_eku(cert) if eku: print(f"{p}{C.BOLD}EKU:{C.RESET} {eku}") san = get_san(cert) if san: print(f"{p}{C.BOLD}SAN:{C.RESET} {san}") ski = get_subject_key_id(cert) if ski: print(f"{p}{C.BOLD}SKI:{C.RESET} {ski}") aki = get_authority_key_id(cert) if aki: print(f"{p}{C.BOLD}AKI:{C.RESET} {aki}") # CDP cdps = get_cdp_urls(cert) if cdps: print(f"{p}{C.BOLD}CDP:{C.RESET}") for u in cdps: print(f"{p} → {u}") else: print(f"{p}{C.BOLD}CDP:{C.RESET} (none)") # AIA ca_issuers, ocsp_urls = get_aia_info(cert) if ca_issuers or ocsp_urls: print(f"{p}{C.BOLD}AIA:{C.RESET}") for u in ca_issuers: print(f"{p} → CA Issuer: {u}") for u in ocsp_urls: print(f"{p} → OCSP: {u}") else: print(f"{p}{C.BOLD}AIA:{C.RESET} (none)") # Policies policies = get_extension_value(cert, ExtensionOID.CERTIFICATE_POLICIES) if policies: print(f"{p}{C.BOLD}Policies:{C.RESET}") for pol in policies: print(f"{p} → {pol.policy_identifier.dotted_string}") if pol.policy_qualifiers: for q in pol.policy_qualifiers: if isinstance(q, str): print(f"{p} CPS: {q}") print(f"{p}{C.BOLD}Source:{C.RESET} {url}") def display_crl_inline(result, indent=0): """Display CRL validation results inline with its parent certificate.""" p = " " * indent r = result if r['error']: if 'Non-HTTP' in r['error']: print(f"{p}{C.DIM} ↳ CRL: {r['url']} (LDAP — not checked){C.RESET}") else: print(f"{p} {C.RED}↳ CRL: {r['url']}{C.RESET}") print(f"{p} {C.RED}✘ {r['error']}{C.RESET}") return crl_type = "Delta" if r['is_delta'] else "Base" print(f"{p} ↳ CRL: {r['url']}") print(f"{p} Type: {crl_type} | Revoked: {r['revoked_count']}", end="") if r['expired']: print(f" | {C.RED}EXPIRED (next update: {r['next_update']}){C.RESET}") elif r['next_update']: rem = (r['next_update'] - datetime.now(timezone.utc)).days print(f" | {C.GREEN}Valid ({rem} days){C.RESET}") else: print() print(f"{p} Published: {r['last_update']}") if r['sig_valid'] is True: print(f"{p} {C.GREEN}✔ {r['sig_msg']}{C.RESET}") elif r['sig_valid'] is False: print(f"{p} {C.RED}✘ {r['sig_msg']}{C.RESET}") else: print(f"{p} {C.YELLOW}⚠ {r['sig_msg']}{C.RESET}") # ─── Main ──────────────────────────────────────────────────────────────────── def main(): if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} [ ...]") print(f"Example: {sys.argv[0]} http://pki.imy.se/") sys.exit(1) base_urls = sys.argv[1:] session = requests.Session() session.verify = False session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/125.0.0.0 Safari/537.36', 'Accept': '*/*', 'Accept-Encoding': 'identity', }) requests.packages.urllib3.disable_warnings() all_cert_urls, all_crl_urls = [], [] # ── Step 1: Discover files ─────────────────────────────────────────── for base_url in base_urls: print_header(f"Scanning: {base_url}") cu, cl = scrape_recursive(base_url, session) all_cert_urls.extend(cu) all_crl_urls.extend(cl) all_cert_urls = list(dict.fromkeys(all_cert_urls)) all_crl_urls = list(dict.fromkeys(all_crl_urls)) print_section("Discovery Summary") print_ok(f"Certificate files: {len(all_cert_urls)}") print_ok(f"CRL files on site: {len(all_crl_urls)}") # ── Step 2: Download and parse certificates ────────────────────────── print_header("Loading Certificates") certs_dict = {} # fingerprint -> (cert, url) for url in all_cert_urls: try: data = download_file(url, session) cert = load_certificate(data, url) if cert is not None: fp = sha1_fingerprint(cert) certs_dict[fp] = (cert, url) print_ok(f"{subject_str(cert.subject)}") else: print_warn(f"Could not parse: {url}") except Exception as e: print_err(f"Download failed: {url} — {e}") # ── Step 3: Fetch additional certs from AIA ────────────────────────── print_header("Fetching CA Certs from AIA Extensions") aia_urls = set() for fp, (cert, url) in certs_dict.items(): ca_issuers, _ = get_aia_info(cert) for u in ca_issuers: if u.lower().startswith('http'): aia_urls.add(u) loaded_urls = set(u for _, (c, u) in certs_dict.items()) new_aia = aia_urls - loaded_urls if new_aia: for url in new_aia: try: data = download_file(url, session) cert = load_certificate(data, url) if cert is not None: fp = sha1_fingerprint(cert) if fp not in certs_dict: certs_dict[fp] = (cert, url) print_ok(f"Fetched: {subject_str(cert.subject)}") except Exception as e: print_err(f"Failed: {url} — {e}") else: print_info("No additional certs needed", "All issuers already loaded") # ── Step 4: Build chains ───────────────────────────────────────────── chains = build_chains(certs_dict) # Build issuer lookup: AKI -> issuer cert ski_to_cert = {} for fp, (cert, url) in certs_dict.items(): ski = get_subject_key_id(cert) if ski: if ski not in ski_to_cert: ski_to_cert[ski] = cert else: existing = ski_to_cert[ski] if cert.not_valid_after_utc > existing.not_valid_after_utc: ski_to_cert[ski] = cert # ── Step 5: Display chains with inline CRL validation ──────────────── print_header("Certificate Chains & CRL Status") # Track all CDP URLs we validate (to find orphans later) validated_cdp_urls = set() # Track all CRL results for summary all_crl_results = [] # Track revocation status revocation_issues = [] for i, chain in enumerate(chains, 1): print_section(f"Chain {i} ({len(chain)} certificate(s))") for depth, fp in enumerate(chain): cert, url = certs_dict[fp] if depth == 0 and is_self_signed(cert): role = "Root CA" elif is_ca(cert): role = "Intermediate CA" else: role = "End Entity" print(f"\n{' '*depth}{C.BOLD}{C.BLUE}[{depth}] {role}{C.RESET}") display_cert(cert, url, indent=depth) # Fetch and validate each CDP for this certificate cdp_urls = get_cdp_urls(cert) if cdp_urls: # Find issuer cert for signature verification aki = get_authority_key_id(cert) issuer_cert = ski_to_cert.get(aki) if aki else None for cdp_url in cdp_urls: validated_cdp_urls.add(cdp_url) result = fetch_and_validate_crl(cdp_url, issuer_cert, session) all_crl_results.append(result) display_crl_inline(result, indent=depth) # Check if THIS certificate is revoked if result['crl'] is not None: if is_cert_revoked(cert, result['crl']): revocation_issues.append( (cert, url, cdp_url) ) print(f"{' '*depth} {C.RED}{C.BOLD}" f"⚠ THIS CERTIFICATE IS REVOKED!{C.RESET}") elif not is_self_signed(cert): print(f"{' '*depth} {C.YELLOW}⚠ No CDP — cannot check revocation{C.RESET}") # ── Step 6: Find orphaned CRLs on the site ────────────────────────── print_header("Orphaned CRL Files (on site but not referenced by any certificate)") # Normalize URLs for comparison def normalize_url(u): return unquote(u).lower().rstrip('/') referenced_normalized = set(normalize_url(u) for u in validated_cdp_urls) orphans = [] for crl_url in all_crl_urls: if normalize_url(crl_url) not in referenced_normalized: orphans.append(crl_url) if orphans: print_warn(f"Found {len(orphans)} CRL file(s) not referenced by any certificate CDP:") for url in orphans: # Try to load and show basic info try: data = download_file(url, session) crl = load_crl(data, url) if crl is not None: now = datetime.now(timezone.utc) expired = "" if crl.next_update_utc and crl.next_update_utc < now: expired = f" {C.RED}[EXPIRED]{C.RESET}" print(f" 📋 {unquote(url)}") print(f" Issuer: {subject_str(crl.issuer)}") print(f" Next update: {crl.next_update_utc}{expired}") print(f" Revoked: {len(list(crl))}") else: print(f" 📋 {unquote(url)} (could not parse)") except Exception: print(f" 📋 {unquote(url)} (could not download)") print() print_warn("These may be leftover files that should be reviewed/removed.") else: print_ok("No orphaned CRL files — all CRLs on site are referenced by certificates") # ── Step 7: Summary ────────────────────────────────────────────────── print_header("Summary") print_info("Certificates parsed", str(len(certs_dict))) print_info("Chains found", str(len(chains))) # CRL stats total_cdps = len(all_crl_results) reachable = sum(1 for r in all_crl_results if r['reachable']) parsed = sum(1 for r in all_crl_results if r['parseable']) expired = sum(1 for r in all_crl_results if r['expired']) sig_ok = sum(1 for r in all_crl_results if r['sig_valid'] is True) sig_fail = sum(1 for r in all_crl_results if r['sig_valid'] is False) unreachable = sum(1 for r in all_crl_results if not r['reachable'] and r['error'] and 'Non-HTTP' not in r['error']) ldap_skipped = sum(1 for r in all_crl_results if r['error'] and 'Non-HTTP' in r['error']) print_info("CDP URLs checked", str(total_cdps)) if ldap_skipped: print_info(" LDAP (skipped)", str(ldap_skipped)) if reachable: print_info(" Reachable", str(reachable)) if unreachable: print_info(" Unreachable", str(unreachable)) if parsed: print_info(" Parsed OK", str(parsed)) if sig_ok: print_info(" Signature valid", str(sig_ok)) if sig_fail: print_info(" Signature INVALID", str(sig_fail)) # Expiry now = datetime.now(timezone.utc) exp_certs = [fp for fp, (c, _) in certs_dict.items() if c.not_valid_after_utc < now] exp_crls = [r for r in all_crl_results if r['expired']] if exp_certs: print_err(f"Expired certificates: {len(exp_certs)}") for fp in exp_certs: c, u = certs_dict[fp] print(f" → {subject_str(c.subject)} (expired {c.not_valid_after_utc})") else: print_ok("No expired certificates") if exp_crls: print_err(f"Expired CRLs: {len(exp_crls)}") for r in exp_crls: print(f" → {r['url']} (expired {r['next_update']})") else: print_ok("No expired CRLs") if unreachable: print_err(f"Unreachable CDPs: {unreachable}") for r in all_crl_results: if not r['reachable'] and r['error'] and 'Non-HTTP' not in r['error']: print(f" → {r['url']}") print(f" {r['error']}") if revocation_issues: print_err(f"REVOKED CERTIFICATES: {len(revocation_issues)}") for cert, url, cdp in revocation_issues: print(f" → {subject_str(cert.subject)}") print(f" Serial: {format(cert.serial_number, 'x')}") print(f" CRL: {cdp}") else: print_ok("No downloaded certificates are revoked") if orphans: print_warn(f"Orphaned CRL files: {len(orphans)}") else: print_ok("No orphaned CRL files") if __name__ == '__main__': main()