Files
Test/pki-health.py
T
Johan Selmosson 40f9b0ed14 Initial Checkin
2026-05-16 11:38:14 +02:00

628 lines
22 KiB
Python

#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "requests",
# "beautifulsoup4",
# "cryptography",
# ]
# ///
"""
PKI Health Checker
==================
Discovers CA certificates and CRLs from IIS directory-browsing sites,
builds the CA hierarchy, and validates CDP/AIA for each CA certificate.
Usage:
uv run pki_health.py <url> [<url2> ...]
Examples:
uv run pki_health.py http://pki.matas.dk/aia http://pki.matas.dk/cdp
uv run pki_health.py http://pki.imy.se/
"""
import sys
from datetime import datetime, timezone
from urllib.parse import urljoin, unquote, urlparse
import requests
from bs4 import BeautifulSoup
from cryptography import x509
from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec
from cryptography.exceptions import InvalidSignature
CERT_EXTENSIONS = ('.crt', '.cer', '.pem')
CRL_EXTENSIONS = ('.crl',)
TIMEOUT = 30
MAX_DEPTH = 5
# ─── Terminal ────────────────────────────────────────────────────────────────
class C:
H = '\033[95m'; B = '\033[94m'; CN = '\033[96m'
G = '\033[92m'; Y = '\033[93m'; R = '\033[91m'
BD = '\033[1m'; DM = '\033[2m'; RS = '\033[0m'
def header(t):
print(f"\n{C.BD}{C.H}{'='*78}{C.RS}")
print(f"{C.BD}{C.H} {t}{C.RS}")
print(f"{C.BD}{C.H}{'='*78}{C.RS}")
# ─── Helpers ─────────────────────────────────────────────────────────────────
def sha1(cert):
return cert.fingerprint(hashes.SHA1()).hex(':')
def get_cn(name):
for attr in reversed(list(name)):
if attr.oid == x509.oid.NameOID.COMMON_NAME:
return attr.value
for attr in name:
return f"{attr.oid._name}={attr.value}"
return "(unknown)"
def ski_hex(obj):
try:
return obj.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_KEY_IDENTIFIER).value.digest.hex()
except x509.ExtensionNotFound:
return None
def aki_hex(obj):
try:
ext = obj.extensions.get_extension_for_oid(
ExtensionOID.AUTHORITY_KEY_IDENTIFIER)
return ext.value.key_identifier.hex() if ext.value.key_identifier else None
except x509.ExtensionNotFound:
return None
def cdp_urls(cert):
try:
ext = cert.extensions.get_extension_for_oid(
ExtensionOID.CRL_DISTRIBUTION_POINTS)
urls = []
for dp in ext.value:
if dp.full_name:
for n in dp.full_name:
if isinstance(n, x509.UniformResourceIdentifier):
urls.append(n.value)
return urls
except x509.ExtensionNotFound:
return []
def aia_urls(cert):
try:
ext = cert.extensions.get_extension_for_oid(
ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
ca, ocsp = [], []
for d in ext.value:
if isinstance(d.access_location, x509.UniformResourceIdentifier):
if d.access_method == AuthorityInformationAccessOID.CA_ISSUERS:
ca.append(d.access_location.value)
elif d.access_method == AuthorityInformationAccessOID.OCSP:
ocsp.append(d.access_location.value)
return ca, ocsp
except x509.ExtensionNotFound:
return [], []
def is_ca(cert):
try:
return cert.extensions.get_extension_for_oid(
ExtensionOID.BASIC_CONSTRAINTS).value.ca
except x509.ExtensionNotFound:
return False
def is_self_signed(cert):
return cert.issuer == cert.subject
def short_id(h):
return h[:16] + '...' if h and len(h) > 16 else (h or '(none)')
# ─── Network ────────────────────────────────────────────────────────────────
def make_session():
s = requests.Session()
s.verify = False
s.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36',
'Accept': '*/*', 'Accept-Encoding': 'identity',
})
requests.packages.urllib3.disable_warnings()
return s
def dl(url, session):
resp = session.get(url, timeout=TIMEOUT)
resp.raise_for_status()
return resp.content
def scrape(base_url, session, origin=None, depth=0, visited=None):
if visited is None:
visited = set()
if not base_url.endswith('/'):
base_url += '/'
if base_url in visited or depth > MAX_DEPTH:
return [], []
visited.add(base_url)
if origin is None:
origin = urlparse(base_url).netloc
try:
resp = session.get(base_url, timeout=TIMEOUT)
resp.raise_for_status()
except Exception:
return [], []
soup = BeautifulSoup(resp.text, 'html.parser')
certs, crls, dirs = [], [], []
for a in soup.find_all('a', href=True):
href = a['href']
full = urljoin(base_url, href)
low = unquote(full).lower()
if urlparse(full).netloc != origin: continue
if href in ('../', '..'): continue
if full.rstrip('/') == base_url.rstrip('/'): continue
if any(low.endswith(e) for e in CERT_EXTENSIONS): certs.append(full)
elif any(low.endswith(e) for e in CRL_EXTENSIONS): crls.append(full)
elif full.endswith('/') and full not in visited: dirs.append(full)
for d in dirs:
sc, sl = scrape(d, session, origin, depth+1, visited)
certs.extend(sc); crls.extend(sl)
return certs, crls
def load_cert(data):
for fn in [x509.load_der_x509_certificate, x509.load_pem_x509_certificate]:
try: return fn(data)
except Exception: pass
return None
def load_crl(data):
for fn in [x509.load_der_x509_crl, x509.load_pem_x509_crl]:
try: return fn(data)
except Exception: pass
return None
# ─── CRL Signature ──────────────────────────────────────────────────────────
def verify_crl_sig(crl, issuer_cert):
try:
pub = issuer_cert.public_key()
if isinstance(pub, rsa.RSAPublicKey):
pub.verify(crl.signature, crl.tbs_certlist_bytes,
padding.PKCS1v15(), crl.signature_hash_algorithm)
elif isinstance(pub, ec.EllipticCurvePublicKey):
from cryptography.hazmat.primitives.asymmetric import ec as ecm
pub.verify(crl.signature, crl.tbs_certlist_bytes,
ecm.ECDSA(crl.signature_hash_algorithm))
else:
return None, "Unsupported key type"
return True, "Signature OK"
except InvalidSignature:
return False, "Signature INVALID"
except Exception as e:
return None, f"Verify error: {e}"
# ─── Tree Builder ────────────────────────────────────────────────────────────
class CANode:
def __init__(self, cert, url):
self.cert = cert
self.url = url
self.tp = sha1(cert)
self.name = get_cn(cert.subject)
self.ski = ski_hex(cert)
self.aki = aki_hex(cert)
self.root = is_self_signed(cert)
self.children = []
self.alternates = [] # (cert, url, thumbprint) — renewed same-key certs
def build_tree(certs_dict):
by_ski = {}
for tp, (cert, url) in certs_dict.items():
ski = ski_hex(cert)
if ski:
by_ski.setdefault(ski, []).append((tp, cert, url))
nodes = {}
ski_primary = {}
for ski, group in by_ski.items():
group.sort(key=lambda x: x[1].not_valid_after_utc, reverse=True)
tp, cert, url = group[0]
node = CANode(cert, url)
for atp, acert, aurl in group[1:]:
node.alternates.append((acert, aurl, sha1(acert)))
nodes[tp] = node
ski_primary[ski] = tp
roots = []
for tp, node in nodes.items():
if node.root:
roots.append(node)
elif node.aki and node.aki in ski_primary:
ptk = ski_primary[node.aki]
if ptk in nodes and ptk != tp:
nodes[ptk].children.append(node)
else:
roots.append(node)
else:
roots.append(node)
return roots
# ─── CDP Check ───────────────────────────────────────────────────────────────
def check_cdp(url, issuer_ski, issuer_cert, session, p):
if not url.lower().startswith('http'):
print(f"{p} {C.DM}LDAP — not checked{C.RS}")
return False
try:
data = dl(url, session)
except Exception as e:
print(f"{p} {C.R}✘ Unreachable{C.RS}")
return True
crl = load_crl(data)
if crl is None:
print(f"{p} {C.R}✘ Failed to parse{C.RS}")
return True
parts = []
has_issue = False
now = datetime.now(timezone.utc)
# Expiry
if crl.next_update_utc:
if crl.next_update_utc < now:
days = (now - crl.next_update_utc).days
parts.append(f"{C.R}EXPIRED {days}d ago{C.RS}")
has_issue = True
else:
days = (crl.next_update_utc - now).days
color = C.G if days > 7 else C.Y
parts.append(f"{color}Valid ({days}d){C.RS}")
# AKI match
crl_aki = aki_hex(crl)
if crl_aki and issuer_ski:
if crl_aki == issuer_ski:
parts.append(f"{C.G}AKI match{C.RS}")
else:
parts.append(f"{C.R}AKI MISMATCH{C.RS}")
has_issue = True
# Signature
if issuer_cert is not None:
valid, msg = verify_crl_sig(crl, issuer_cert)
if valid is True:
parts.append(f"{C.G}Sig OK{C.RS}")
elif valid is False:
parts.append(f"{C.R}Sig INVALID{C.RS}")
has_issue = True
else:
parts.append(f"{C.Y}{msg}{C.RS}")
# Delta
try:
crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR)
parts.append("Delta")
except x509.ExtensionNotFound:
pass
# Revoked count
parts.append(f"{sum(1 for _ in crl)} revoked")
print(f"{p} {' | '.join(parts)}")
# Show details on mismatch
if crl_aki and issuer_ski and crl_aki != issuer_ski:
print(f"{p} {C.R}CRL AKI: {crl_aki}{C.RS}")
print(f"{p} {C.R}Issuer SKI: {issuer_ski}{C.RS}")
return has_issue
# ─── AIA Check ───────────────────────────────────────────────────────────────
def check_aia(url, expected_tp, session, p):
if not url.lower().startswith('http'):
print(f"{p} {C.DM}LDAP — not checked{C.RS}")
return False
try:
data = dl(url, session)
except Exception:
print(f"{p} {C.R}✘ Unreachable{C.RS}")
return True
cert = load_cert(data)
if cert is None:
print(f"{p} {C.R}✘ Failed to parse{C.RS}")
return True
got_tp = sha1(cert)
if expected_tp:
if got_tp == expected_tp:
print(f"{p} {C.G}✔ Matches issuer{C.RS} ({got_tp[:23]}...)")
return False
else:
print(f"{p} {C.R}✘ MISMATCH{C.RS}")
print(f"{p} Got: {get_cn(cert.subject)} ({got_tp[:23]}...)")
print(f"{p} Expected: {expected_tp[:23]}...")
return True
else:
print(f"{p} Downloaded: {get_cn(cert.subject)} ({got_tp[:23]}...)")
return False
# ─── Display a CA Node ───────────────────────────────────────────────────────
def show_cert_block(node, parent, session, p, issues):
"""Show one CA certificate block with CDP/AIA checks."""
cert = node.cert
now = datetime.now(timezone.utc)
nb = cert.not_valid_before_utc
na = cert.not_valid_after_utc
days = (na - now).days
expired = na < now
vc = C.R if expired else (C.Y if days < 90 else C.G)
vl = "EXPIRED" if expired else f"{days} days"
print(f"{p} Thumbprint: {node.tp}")
print(f"{p} SKI: {short_id(node.ski)}")
if node.aki:
match = ""
if parent and parent.ski:
if node.aki == parent.ski:
match = f" {C.G}✔ matches parent{C.RS}"
else:
match = f" {C.R}✘ MISMATCH with parent SKI!{C.RS}"
issues.append(f"AKI mismatch: {node.name}")
print(f"{p} AKI: {short_id(node.aki)}{match}")
print(f"{p} Valid: {nb.strftime('%Y-%m-%d')}{na.strftime('%Y-%m-%d')} {vc}[{vl}]{C.RS}")
if expired:
issues.append(f"EXPIRED: {node.name}")
# Alternates
if node.alternates:
print(f"{p} {C.Y}Also (renewed, same key):{C.RS}")
for ac, au, atp in node.alternates:
ana = ac.not_valid_after_utc
ad = (ana - now).days
ae = ana < now
ac2 = C.R if ae else (C.Y if ad < 90 else C.G)
al = "EXPIRED" if ae else f"{ad} days"
print(f"{p} {atp} {ac2}[{al}]{C.RS}")
# Issuer info for checks
issuer_cert = parent.cert if parent else None
issuer_ski = parent.ski if parent else None
issuer_tp = parent.tp if parent else None
# CDP
cdps = cdp_urls(cert)
if cdps:
for url in cdps:
print(f"{p} CDP: {url}")
has_issue = check_cdp(url, issuer_ski, issuer_cert, session, p)
if has_issue:
issues.append(f"CDP issue: {node.name}{url}")
elif node.root:
print(f"{p} CDP: (none — root)")
else:
print(f"{p} {C.Y}CDP: (none — missing!){C.RS}")
issues.append(f"No CDP: {node.name}")
# AIA
ca_issuers, ocsp = aia_urls(cert)
if ca_issuers:
for url in ca_issuers:
print(f"{p} AIA: {url}")
has_issue = check_aia(url, issuer_tp, session, p)
if has_issue:
issues.append(f"AIA issue: {node.name}{url}")
elif node.root:
print(f"{p} AIA: (none — root)")
else:
print(f"{p} {C.Y}AIA: (none — missing!){C.RS}")
issues.append(f"No AIA: {node.name}")
if ocsp:
for url in ocsp:
print(f"{p} OCSP: {url}")
# ─── Display Chain ───────────────────────────────────────────────────────────
def display_chain(root, chain_num, session, issues):
"""Display a full chain starting from root."""
print(f"\n{C.BD}{C.CN}Chain {chain_num}{C.RS}")
print(f"{C.BD}{C.CN}{''*78}{C.RS}")
# Root
print(f"\n{C.BD}{C.B}{root.name}{C.RS}")
show_cert_block(root, None, session, "", issues)
# Children
for child in root.children:
print(f"\n {C.BD}{C.B}{child.name}{C.RS}")
show_cert_block(child, root, session, " ", issues)
# Grandchildren (3-tier)
for gc in child.children:
print(f"\n {C.BD}{C.B}→ → {gc.name}{C.RS}")
show_cert_block(gc, child, session, " ", issues)
# ─── Orphan Detection ───────────────────────────────────────────────────────
def collect_referenced(roots):
urls = set()
def walk(node):
for u in cdp_urls(node.cert):
urls.add(unquote(u).lower())
ca, _ = aia_urls(node.cert)
for u in ca:
urls.add(unquote(u).lower())
for ch in node.children:
walk(ch)
for r in roots:
walk(r)
return urls
def collect_tree_sources(roots):
srcs = set()
def walk(node):
srcs.add(node.url)
for _, u, _ in node.alternates:
srcs.add(u)
for ch in node.children:
walk(ch)
for r in roots:
walk(r)
return srcs
# ─── Main ────────────────────────────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <url> [<url2> ...]")
print(f"Example: {sys.argv[0]} http://pki.imy.se/")
sys.exit(1)
session = make_session()
all_cert_urls, all_crl_urls = [], []
# ── Discover ─────────────────────────────────────────────────────────
for base_url in sys.argv[1:]:
header(f"Scanning: {base_url}")
cu, cl = scrape(base_url, session)
for u in cu:
print(f" 📜 {unquote(u.split('/')[-1])}")
for u in cl:
print(f" 📋 {unquote(u.split('/')[-1])}")
all_cert_urls.extend(cu)
all_crl_urls.extend(cl)
all_cert_urls = list(dict.fromkeys(all_cert_urls))
all_crl_urls = list(dict.fromkeys(all_crl_urls))
print(f"\n Found {len(all_cert_urls)} cert(s), {len(all_crl_urls)} CRL(s) on site")
# ── Load CA certs ────────────────────────────────────────────────────
header("Loading CA Certificates")
certs = {}
for url in all_cert_urls:
try:
data = dl(url, session)
cert = load_cert(data)
if cert is not None and is_ca(cert):
tp = sha1(cert)
if tp not in certs:
certs[tp] = (cert, url)
print(f" {C.G}{C.RS} {get_cn(cert.subject)}")
except Exception as e:
print(f" {C.R}{C.RS} {url}{e}")
# Follow AIA to get parent certs we might not have
for tp, (cert, url) in list(certs.items()):
ca_issuers, _ = aia_urls(cert)
for aia_url in ca_issuers:
if not aia_url.lower().startswith('http'):
continue
try:
data = dl(aia_url, session)
c = load_cert(data)
if c is not None and is_ca(c):
t = sha1(c)
if t not in certs:
certs[t] = (c, aia_url)
print(f" {C.G}{C.RS} {get_cn(c.subject)} (via AIA)")
except Exception:
pass
print(f"\n {len(certs)} CA certificate(s) loaded")
# ── Build and display ────────────────────────────────────────────────
roots = build_tree(certs)
if not roots:
print(f"\n {C.R}✘ Could not build CA hierarchy{C.RS}")
sys.exit(1)
header("PKI Health Check")
all_issues = []
for i, root in enumerate(roots, 1):
display_chain(root, i, session, all_issues)
# ── Orphans ──────────────────────────────────────────────────────────
header("Orphaned Files")
referenced = collect_referenced(roots)
tree_srcs = collect_tree_sources(roots)
orphan_crls = [u for u in all_crl_urls
if unquote(u).lower() not in referenced]
orphan_certs = [u for u in all_cert_urls
if u not in tree_srcs]
if orphan_crls:
print(f"\n {C.Y}{C.RS} {len(orphan_crls)} CRL(s) on site not referenced by any cert CDP:")
for url in orphan_crls:
name = unquote(url.split('/')[-1])
try:
data = dl(url, session)
crl = load_crl(data)
if crl is not None:
now = datetime.now(timezone.utc)
nu = crl.next_update_utc
if nu and nu < now:
exp = f"{C.R}EXPIRED{C.RS}"
elif nu:
exp = f"{C.G}{(nu-now).days}d{C.RS}"
else:
exp = "?"
print(f" 📋 {name} (issuer: {get_cn(crl.issuer)}, {exp})")
else:
print(f" 📋 {name} (could not parse)")
except Exception:
print(f" 📋 {name} (could not fetch)")
else:
print(f"\n {C.G}{C.RS} No orphaned CRLs")
if orphan_certs:
print(f"\n {C.Y}{C.RS} {len(orphan_certs)} cert(s) on site not in hierarchy:")
for url in orphan_certs:
print(f" 📜 {unquote(url.split('/')[-1])}")
else:
print(f" {C.G}{C.RS} No orphaned certificates")
# ── Summary ──────────────────────────────────────────────────────────
header("Summary")
print(f" Chains: {len(roots)} | CA certs: {len(certs)} | "
f"Site CRLs: {len(all_crl_urls)}")
if not all_issues:
print(f"\n {C.G}{C.BD}✔ All checks passed{C.RS}")
else:
print(f"\n {C.R}{C.BD}{len(all_issues)} issue(s):{C.RS}")
for issue in all_issues:
print(f" {C.R}{issue}{C.RS}")
if __name__ == '__main__':
main()