#!/usr/bin/env python3 # /// script # requires-python = ">=3.10" # dependencies = [ # "requests", # "beautifulsoup4", # "cryptography", # ] # /// """ PKI Site Analyzer v6 ==================== Scrapes IIS sites with directory browsing enabled (recursively), downloads all .crt/.cer/.crl files, parses certificates, builds chain relationships, and validates CRLs. Usage: uv run pki_analyzer.py [ ...] Examples: uv run pki_analyzer.py http://pki.matas.dk/cdp http://pki.matas.dk/aia uv run pki_analyzer.py http://pki.imy.se/ """ import sys import traceback from datetime import datetime, timezone from urllib.parse import urljoin, unquote, urlparse from collections import defaultdict import requests from bs4 import BeautifulSoup from cryptography import x509 from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec from cryptography.exceptions import InvalidSignature # ─── Configuration ─────────────────────────────────────────────────────────── CERT_EXTENSIONS = ('.crt', '.cer', '.pem') CRL_EXTENSIONS = ('.crl',) TIMEOUT = 30 MAX_DEPTH = 5 DEBUG_SCRAPE = False # Set True to see all tags during scraping class C: HEADER = '\033[95m' BLUE = '\033[94m' CYAN = '\033[96m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' DIM = '\033[2m' RESET = '\033[0m' # ─── Utility ───────────────────────────────────────────────────────────────── def print_header(text): print(f"\n{C.BOLD}{C.HEADER}{'='*80}{C.RESET}") print(f"{C.BOLD}{C.HEADER} {text}{C.RESET}") print(f"{C.BOLD}{C.HEADER}{'='*80}{C.RESET}") def print_section(text): print(f"\n{C.BOLD}{C.CYAN}── {text} {'─'*(74-len(text))}{C.RESET}") def print_ok(text): print(f" {C.GREEN}✔{C.RESET} {text}") def print_warn(text): print(f" {C.YELLOW}⚠{C.RESET} {text}") def print_err(text): print(f" {C.RED}✘{C.RESET} {text}") def print_info(label, value): print(f" {C.BOLD}{label:.<40}{C.RESET} {value}") def print_debug(text): if DEBUG_SCRAPE: print(f" {C.DIM}DBG: {text}{C.RESET}") def sha1_fingerprint(cert): return cert.fingerprint(hashes.SHA1()).hex(':') def subject_str(name): parts = [] for attr in name: parts.append(f"{attr.oid._name}={attr.value}") return ', '.join(parts) if parts else '(empty)' # ─── Recursive Scraping ───────────────────────────────────────────────────── def scrape_recursive(base_url, session, origin_host=None, depth=0, visited=None): if visited is None: visited = set() if not base_url.endswith('/'): base_url += '/' if base_url in visited or depth > MAX_DEPTH: return [], [] visited.add(base_url) if origin_host is None: origin_host = urlparse(base_url).netloc indent = " " * depth print(f"{indent} 🔍 Scanning: {base_url}") try: resp = session.get(base_url, timeout=TIMEOUT) resp.raise_for_status() except Exception as e: print_warn(f"{indent} Could not fetch {base_url}: {e}") return [], [] soup = BeautifulSoup(resp.text, 'html.parser') cert_urls = [] crl_urls = [] subdirs = [] all_links = soup.find_all('a', href=True) print_debug(f"Found {len(all_links)} tags in {base_url}") for a_tag in all_links: href = a_tag['href'] full_url = urljoin(base_url, href) decoded_url = unquote(full_url).lower() decoded_href = unquote(href) print_debug(f" href='{href}' → '{full_url}'") if urlparse(full_url).netloc != origin_host: print_debug(f" Skipped: different host") continue if href in ('../', '..'): print_debug(f" Skipped: parent link") continue if full_url.rstrip('/') == base_url.rstrip('/'): print_debug(f" Skipped: self link") continue if any(decoded_url.endswith(ext) for ext in CERT_EXTENSIONS): cert_urls.append(full_url) print(f"{indent} 📜 {decoded_href}") elif any(decoded_url.endswith(ext) for ext in CRL_EXTENSIONS): crl_urls.append(full_url) print(f"{indent} 📋 {decoded_href}") elif full_url.endswith('/') and full_url not in visited: subdirs.append(full_url) print(f"{indent} 📁 {decoded_href}") else: print_debug(f" Skipped: not a cert/crl/dir") for subdir in subdirs: sub_certs, sub_crls = scrape_recursive( subdir, session, origin_host, depth + 1, visited ) cert_urls.extend(sub_certs) crl_urls.extend(sub_crls) return cert_urls, crl_urls def download_file(url, session): resp = session.get(url, timeout=TIMEOUT) resp.raise_for_status() data = resp.content content_type = resp.headers.get('content-type', 'unknown') print(f" ↓ {len(data)} bytes | HTTP {resp.status_code} | {content_type}") return data # ─── Certificate Parsing ──────────────────────────────────────────────────── def load_certificate(data, url=""): if data[:50].lstrip().lower().startswith((b'<', b' 0: if data[0] == 0x30: print_warn(f" Starts with 0x30 (ASN.1 SEQUENCE)") elif data[:5] == b'-----': print_warn(f" Starts with PEM header") else: print_warn(f" First byte: 0x{data[0]:02x}") return None # ─── Extension Helpers ─────────────────────────────────────────────────────── def get_extension_value(obj, oid): try: return obj.extensions.get_extension_for_oid(oid).value except x509.ExtensionNotFound: return None def get_cdp_urls(cert): cdp = get_extension_value(cert, ExtensionOID.CRL_DISTRIBUTION_POINTS) urls = [] if cdp: for dp in cdp: if dp.full_name: for name in dp.full_name: if isinstance(name, x509.UniformResourceIdentifier): urls.append(name.value) return urls def get_aia_info(cert): aia = get_extension_value(cert, ExtensionOID.AUTHORITY_INFORMATION_ACCESS) ocsp_urls = [] ca_issuer_urls = [] if aia: for desc in aia: if isinstance(desc.access_location, x509.UniformResourceIdentifier): if desc.access_method == AuthorityInformationAccessOID.OCSP: ocsp_urls.append(desc.access_location.value) elif desc.access_method == AuthorityInformationAccessOID.CA_ISSUERS: ca_issuer_urls.append(desc.access_location.value) return ca_issuer_urls, ocsp_urls def get_subject_key_id(cert): ski = get_extension_value(cert, ExtensionOID.SUBJECT_KEY_IDENTIFIER) return ski.digest.hex() if ski else None def get_authority_key_id(obj): aki = get_extension_value(obj, ExtensionOID.AUTHORITY_KEY_IDENTIFIER) return aki.key_identifier.hex() if aki and aki.key_identifier else None def is_self_signed(cert): return cert.issuer == cert.subject def is_ca(cert): bc = get_extension_value(cert, ExtensionOID.BASIC_CONSTRAINTS) return bc.ca if bc else False def get_key_usage(cert): ku = get_extension_value(cert, ExtensionOID.KEY_USAGE) if not ku: return None usages = [] for attr in ['digital_signature', 'key_encipherment', 'key_cert_sign', 'crl_sign', 'content_commitment', 'data_encipherment', 'key_agreement']: try: if getattr(ku, attr): usages.append(attr) except Exception: pass return ', '.join(usages) def get_eku(cert): eku = get_extension_value(cert, ExtensionOID.EXTENDED_KEY_USAGE) if not eku: return None return ', '.join(u.dotted_string for u in eku) def get_san(cert): san = get_extension_value(cert, ExtensionOID.SUBJECT_ALTERNATIVE_NAME) if not san: return None return ', '.join(str(n.value) for n in san) # ─── Chain Building ────────────────────────────────────────────────────────── def build_chains(certs_dict): ski_index = {} for fp, (cert, url) in certs_dict.items(): ski = get_subject_key_id(cert) if ski: if ski in ski_index: existing_cert, _ = certs_dict[ski_index[ski]] if cert.not_valid_after_utc > existing_cert.not_valid_after_utc: ski_index[ski] = fp else: ski_index[ski] = fp parent_of = {} for fp, (cert, url) in certs_dict.items(): if is_self_signed(cert): continue aki = get_authority_key_id(cert) if aki and aki in ski_index: parent_fp = ski_index[aki] if parent_fp != fp: parent_of[fp] = parent_fp roots = [fp for fp, (cert, _) in certs_dict.items() if is_self_signed(cert)] children_of = defaultdict(list) for child_fp, par_fp in parent_of.items(): children_of[par_fp].append(child_fp) chains = [] def walk(fp, current_chain): current_chain.append(fp) kids = children_of.get(fp, []) if not kids: chains.append(list(current_chain)) else: for kid in kids: walk(kid, current_chain) current_chain.pop() for root_fp in roots: walk(root_fp, []) all_in_chains = set() for chain in chains: all_in_chains.update(chain) for fp in certs_dict: if fp not in all_in_chains: chains.append([fp]) return chains # ─── CRL Validation ───────────────────────────────────────────────────────── def verify_crl_signature(crl, issuer_cert): try: pub_key = issuer_cert.public_key() if isinstance(pub_key, rsa.RSAPublicKey): pub_key.verify( crl.signature, crl.tbs_certlist_bytes, padding.PKCS1v15(), crl.signature_hash_algorithm, ) elif isinstance(pub_key, ec.EllipticCurvePublicKey): from cryptography.hazmat.primitives.asymmetric import ec as ec_mod pub_key.verify( crl.signature, crl.tbs_certlist_bytes, ec_mod.ECDSA(crl.signature_hash_algorithm), ) else: return None, "Unsupported key type" return True, "Signature valid" except InvalidSignature: return False, "Signature INVALID" except Exception as e: return None, f"Verification error: {e}" def validate_crl(crl, crl_url, certs_dict): now = datetime.now(timezone.utc) results = { 'url': crl_url, 'issuer': subject_str(crl.issuer), 'last_update': crl.last_update_utc, 'next_update': crl.next_update_utc, 'expired': crl.next_update_utc < now if crl.next_update_utc else None, 'sig_valid': None, 'sig_msg': 'No matching issuer found', 'revoked_count': len(list(crl)), 'revoked_serials': [], 'is_delta': False, } try: crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR) results['is_delta'] = True except x509.ExtensionNotFound: pass for revoked in crl: results['revoked_serials'].append(format(revoked.serial_number, 'x')) crl_aki = get_authority_key_id(crl) for fp, (cert, url) in certs_dict.items(): ski = get_subject_key_id(cert) match = (crl_aki and ski == crl_aki) or (cert.subject == crl.issuer) if match: valid, msg = verify_crl_signature(crl, cert) results['sig_valid'] = valid results['sig_msg'] = msg results['verified_by'] = subject_str(cert.subject) break return results # ─── Display Functions ─────────────────────────────────────────────────────── def display_cert(cert, url, indent=0): prefix = " " * indent now = datetime.now(timezone.utc) not_before = cert.not_valid_before_utc not_after = cert.not_valid_after_utc expired = not_after < now days_left = (not_after - now).days print(f"{prefix}{C.BOLD}Subject:{C.RESET} {subject_str(cert.subject)}") print(f"{prefix}{C.BOLD}Issuer:{C.RESET} {subject_str(cert.issuer)}") print(f"{prefix}{C.BOLD}Serial:{C.RESET} {format(cert.serial_number, 'x')}") print(f"{prefix}{C.BOLD}Not Before:{C.RESET} {not_before}") print(f"{prefix}{C.BOLD}Not After:{C.RESET} {not_after}", end="") if expired: print(f" {C.RED}[EXPIRED]{C.RESET}") elif days_left < 90: print(f" {C.YELLOW}[{days_left} days remaining]{C.RESET}") else: print(f" {C.GREEN}[{days_left} days remaining]{C.RESET}") if not_before > now: print(f"{prefix} {C.RED}[NOT YET VALID]{C.RESET}") print(f"{prefix}{C.BOLD}SHA1:{C.RESET} {sha1_fingerprint(cert)}") print(f"{prefix}{C.BOLD}Self-signed:{C.RESET} {'Yes' if is_self_signed(cert) else 'No'}") print(f"{prefix}{C.BOLD}CA:{C.RESET} {'Yes' if is_ca(cert) else 'No'}") ku = get_key_usage(cert) if ku: print(f"{prefix}{C.BOLD}Key Usage:{C.RESET} {ku}") eku = get_eku(cert) if eku: print(f"{prefix}{C.BOLD}EKU:{C.RESET} {eku}") san = get_san(cert) if san: print(f"{prefix}{C.BOLD}SAN:{C.RESET} {san}") ski = get_subject_key_id(cert) if ski: print(f"{prefix}{C.BOLD}SKI:{C.RESET} {ski}") aki = get_authority_key_id(cert) if aki: print(f"{prefix}{C.BOLD}AKI:{C.RESET} {aki}") cdp_urls = get_cdp_urls(cert) if cdp_urls: print(f"{prefix}{C.BOLD}CDP:{C.RESET}") for u in cdp_urls: print(f"{prefix} → {u}") else: print(f"{prefix}{C.BOLD}CDP:{C.RESET} (none)") ca_issuers, ocsp_urls = get_aia_info(cert) if ca_issuers or ocsp_urls: print(f"{prefix}{C.BOLD}AIA:{C.RESET}") for u in ca_issuers: print(f"{prefix} → CA Issuer: {u}") for u in ocsp_urls: print(f"{prefix} → OCSP: {u}") else: print(f"{prefix}{C.BOLD}AIA:{C.RESET} (none)") policies = get_extension_value(cert, ExtensionOID.CERTIFICATE_POLICIES) if policies: print(f"{prefix}{C.BOLD}Policies:{C.RESET}") for pol in policies: oid = pol.policy_identifier.dotted_string print(f"{prefix} → {oid}") if pol.policy_qualifiers: for q in pol.policy_qualifiers: if isinstance(q, str): print(f"{prefix} CPS: {q}") print(f"{prefix}{C.BOLD}Source:{C.RESET} {url}") def display_crl_results(results): r = results crl_type = "Delta CRL" if r['is_delta'] else "Base CRL" print(f"\n {C.BOLD}CRL:{C.RESET} {r['url']}") print(f" {C.BOLD}Type:{C.RESET} {crl_type}") print(f" {C.BOLD}Issuer:{C.RESET} {r['issuer']}") print(f" {C.BOLD}Last Update:{C.RESET} {r['last_update']}") print(f" {C.BOLD}Next Update:{C.RESET} {r['next_update']}", end="") if r['expired']: print(f" {C.RED}[EXPIRED]{C.RESET}") else: remaining = (r['next_update'] - datetime.now(timezone.utc)).days print(f" {C.GREEN}[valid, {remaining} days remaining]{C.RESET}") if r['sig_valid'] is True: print_ok(f"Signature: {r['sig_msg']} (by {r.get('verified_by', 'unknown')})") elif r['sig_valid'] is False: print_err(f"Signature: {r['sig_msg']}") else: print_warn(f"Signature: {r['sig_msg']}") count = r['revoked_count'] print_info("Revoked certificates", str(count)) if count > 0: for serial in r['revoked_serials'][:10]: print(f" Serial: {serial}") if count > 10: print(f" ... and {count - 10} more") # ─── Main ──────────────────────────────────────────────────────────────────── def main(): if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} [ ...]") print(f"Example: {sys.argv[0]} http://pki.matas.dk/cdp http://pki.matas.dk/aia") print(f" {sys.argv[0]} http://pki.imy.se/") print(f"\nRecursively scans directories up to {MAX_DEPTH} levels deep.") sys.exit(1) base_urls = sys.argv[1:] session = requests.Session() session.verify = False session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/125.0.0.0 Safari/537.36', 'Accept': '*/*', 'Accept-Encoding': 'identity', }) requests.packages.urllib3.disable_warnings() all_cert_urls = [] all_crl_urls = [] # ── Step 1: Recursively scrape all provided URLs ───────────────────── for base_url in base_urls: print_header(f"Scanning: {base_url}") cert_urls, crl_urls = scrape_recursive(base_url, session) all_cert_urls.extend(cert_urls) all_crl_urls.extend(crl_urls) # Deduplicate all_cert_urls = list(dict.fromkeys(all_cert_urls)) all_crl_urls = list(dict.fromkeys(all_crl_urls)) print_section("Discovery Summary") print_ok(f"Total certificate files: {len(all_cert_urls)}") print_ok(f"Total CRL files: {len(all_crl_urls)}") # ── Step 2: Download and parse certificates ────────────────────────── print_header("Parsing Certificates") certs_dict = {} for url in all_cert_urls: try: data = download_file(url, session) cert = load_certificate(data, url) if cert: fp = sha1_fingerprint(cert) certs_dict[fp] = (cert, url) print_ok(f"Loaded: {subject_str(cert.subject)}") else: print_warn(f"Could not parse certificate: {url}") except Exception as e: print_err(f"Failed to download {url}: {e}") # ── Step 3: Download and parse CRLs ────────────────────────────────── print_header("Parsing CRLs") crls_list = [] for url in all_crl_urls: try: data = download_file(url, session) crl = load_crl(data, url) if crl: crls_list.append((crl, url)) print_ok(f"Loaded CRL: {subject_str(crl.issuer)}") else: print_warn(f"Could not parse CRL: {url}") except Exception as e: print_err(f"Failed to download CRL {url}: {e}") traceback.print_exc() # ── Step 4: Fetch CRLs from CDP extensions ─────────────────────────── print_header("Fetching CRLs from CDP Extensions") referenced_crl_urls = set() for fp, (cert, url) in certs_dict.items(): for cdp_url in get_cdp_urls(cert): if cdp_url.lower().startswith('http'): referenced_crl_urls.add(cdp_url) already_fetched = set(u for _, u in crls_list) new_crl_urls = referenced_crl_urls - already_fetched if new_crl_urls: for url in new_crl_urls: try: data = download_file(url, session) crl = load_crl(data, url) if crl: crls_list.append((crl, url)) print_ok(f"Fetched CDP CRL: {subject_str(crl.issuer)}") else: print_warn(f"Could not parse CDP CRL: {url}") except Exception as e: print_err(f"Failed to fetch CDP CRL {url}: {e}") else: print_info("No additional CRLs", "All CDP CRLs already downloaded") # ── Step 5: Fetch CA certs from AIA extensions ─────────────────────── print_header("Fetching CA Certs from AIA Extensions") referenced_aia_urls = set() for fp, (cert, url) in certs_dict.items(): ca_issuers, _ = get_aia_info(cert) for aia_url in ca_issuers: if aia_url.lower().startswith('http'): referenced_aia_urls.add(aia_url) already_loaded_urls = set(u for _, (c, u) in certs_dict.items()) new_aia_urls = referenced_aia_urls - already_loaded_urls if new_aia_urls: for url in new_aia_urls: try: data = download_file(url, session) cert = load_certificate(data, url) if cert: fp = sha1_fingerprint(cert) if fp not in certs_dict: certs_dict[fp] = (cert, url) print_ok(f"Fetched AIA cert: {subject_str(cert.subject)}") else: print_info("Already loaded", subject_str(cert.subject)) else: print_warn(f"Could not parse AIA cert: {url}") except Exception as e: print_err(f"Failed to fetch AIA cert {url}: {e}") else: print_info("No additional certs", "All AIA certs already loaded") # ── Step 6: Build and display chains ───────────────────────────────── print_header("Certificate Chains") chains = build_chains(certs_dict) if not chains: print_warn("No certificate chains could be built (no certificates loaded)") else: for i, chain in enumerate(chains, 1): print_section(f"Chain {i} ({len(chain)} certificate(s))") for depth, fp in enumerate(chain): cert, url = certs_dict[fp] if depth == 0 and is_self_signed(cert): role = "Root CA" elif is_ca(cert): role = "Intermediate CA" else: role = "End Entity" print(f"\n{' ' * depth}{C.BOLD}{C.BLUE}[{depth}] {role}{C.RESET}") display_cert(cert, url, indent=depth) # ── Step 7: Validate CRLs ─────────────────────────────────────────── print_header("CRL Validation") if not crls_list: print_warn("No CRLs to validate") else: for crl, crl_url in crls_list: results = validate_crl(crl, crl_url, certs_dict) display_crl_results(results) # ── Step 8: Revocation cross-check ─────────────────────────────────── print_header("Revocation Check — Are Any Downloaded Certs Revoked?") revoked_serials_by_aki = defaultdict(set) for crl, crl_url in crls_list: aki = get_authority_key_id(crl) for revoked in crl: key = aki or subject_str(crl.issuer) revoked_serials_by_aki[key].add(revoked.serial_number) found_revoked = False for fp, (cert, url) in certs_dict.items(): aki = get_authority_key_id(cert) serial = cert.serial_number if aki and aki in revoked_serials_by_aki: if serial in revoked_serials_by_aki[aki]: print_err( f"REVOKED: {subject_str(cert.subject)} " f"(serial {format(serial, 'x')})" ) found_revoked = True if not found_revoked: print_ok("None of the downloaded certificates appear on the downloaded CRLs") # ── Summary ────────────────────────────────────────────────────────── print_header("Summary") print_info("Certificates parsed", str(len(certs_dict))) print_info("CRLs parsed", str(len(crls_list))) print_info("Chains found", str(len(chains))) now = datetime.now(timezone.utc) expired_certs = [fp for fp, (c, u) in certs_dict.items() if c.not_valid_after_utc < now] expired_crls = [(crl, u) for crl, u in crls_list if crl.next_update_utc and crl.next_update_utc < now] if expired_certs: print_err(f"Expired certificates: {len(expired_certs)}") for fp in expired_certs: cert, url = certs_dict[fp] print(f" → {subject_str(cert.subject)} (expired {cert.not_valid_after_utc})") else: print_ok("No expired certificates") if expired_crls: print_err(f"Expired CRLs: {len(expired_crls)}") for crl, url in expired_crls: print(f" → {url} (expired {crl.next_update_utc})") else: print_ok("No expired CRLs") if __name__ == '__main__': main()