Files
Test/pki_healthv2.py
Johan Selmosson 40f9b0ed14 Initial Checkin
2026-05-16 11:38:14 +02:00

881 lines
30 KiB
Python

#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "requests",
# "beautifulsoup4",
# "cryptography",
# "pyopenssl",
# ]
# ///
"""
PKI Health Checker
==================
Discovers CA certificates from IIS directory-browsing sites, direct cert URLs,
or live TLS servers. Builds the CA hierarchy by following AIA chains upward,
validates CDP/AIA for each CA, and checks issuing CA CRLs.
Usage:
uv run pki_health.py <url> [<url2> ...]
Input types:
Directory: http://pki.kinda.se/
Cert URL: https://r10.i.lencr.org/
TLS server: https://letsencrypt.org
Examples:
uv run pki_health.py http://pki.imy.se/
uv run pki_health.py https://r10.i.lencr.org/
uv run pki_health.py https://www.google.com
"""
import ssl
import socket
import sys
from datetime import datetime, timezone
from urllib.parse import urljoin, unquote, urlparse
import requests
from bs4 import BeautifulSoup
from cryptography import x509
from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.exceptions import InvalidSignature
from OpenSSL import SSL, crypto
CERT_EXTENSIONS = ('.crt', '.cer', '.pem')
CRL_EXTENSIONS = ('.crl',)
TIMEOUT = 30
MAX_DEPTH = 5
# ─── Terminal ────────────────────────────────────────────────────────────────
class C:
H = '\033[95m'; B = '\033[94m'; CN = '\033[96m'
G = '\033[92m'; Y = '\033[93m'; R = '\033[91m'
BD = '\033[1m'; DM = '\033[2m'; RS = '\033[0m'
def header(t):
print(f"\n{C.BD}{C.H}{'='*78}{C.RS}")
print(f"{C.BD}{C.H} {t}{C.RS}")
print(f"{C.BD}{C.H}{'='*78}{C.RS}")
def section(t):
print(f"\n{C.BD}{C.CN}── {t} {''*(72-len(t))}{C.RS}")
# ─── Helpers ─────────────────────────────────────────────────────────────────
def sha1(cert):
return cert.fingerprint(hashes.SHA1()).hex(':')
def get_cn(name):
for attr in reversed(list(name)):
if attr.oid == x509.oid.NameOID.COMMON_NAME:
return attr.value
for attr in name:
return f"{attr.oid._name}={attr.value}"
return "(unknown)"
def ski_hex(obj):
try:
return obj.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_KEY_IDENTIFIER).value.digest.hex()
except x509.ExtensionNotFound:
return None
def aki_hex(obj):
try:
ext = obj.extensions.get_extension_for_oid(
ExtensionOID.AUTHORITY_KEY_IDENTIFIER)
return ext.value.key_identifier.hex() if ext.value.key_identifier else None
except x509.ExtensionNotFound:
return None
def cdp_urls(cert):
try:
ext = cert.extensions.get_extension_for_oid(
ExtensionOID.CRL_DISTRIBUTION_POINTS)
urls = []
for dp in ext.value:
if dp.full_name:
for n in dp.full_name:
if isinstance(n, x509.UniformResourceIdentifier):
urls.append(n.value)
return urls
except x509.ExtensionNotFound:
return []
def aia_ca_issuer_urls(cert):
try:
ext = cert.extensions.get_extension_for_oid(
ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
urls = []
for d in ext.value:
if isinstance(d.access_location, x509.UniformResourceIdentifier):
if d.access_method == AuthorityInformationAccessOID.CA_ISSUERS:
urls.append(d.access_location.value)
return urls
except x509.ExtensionNotFound:
return []
def aia_ocsp_urls(cert):
try:
ext = cert.extensions.get_extension_for_oid(
ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
urls = []
for d in ext.value:
if isinstance(d.access_location, x509.UniformResourceIdentifier):
if d.access_method == AuthorityInformationAccessOID.OCSP:
urls.append(d.access_location.value)
return urls
except x509.ExtensionNotFound:
return []
def is_ca(cert):
try:
return cert.extensions.get_extension_for_oid(
ExtensionOID.BASIC_CONSTRAINTS).value.ca
except x509.ExtensionNotFound:
return False
def is_self_signed(cert):
return cert.issuer == cert.subject
def short_ski(h):
return h[:16] + '...' if h and len(h) > 16 else (h or '(none)')
# ─── Network ────────────────────────────────────────────────────────────────
def make_session():
s = requests.Session()
s.verify = False
s.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36',
'Accept': '*/*', 'Accept-Encoding': 'identity',
})
requests.packages.urllib3.disable_warnings()
return s
def fetch(url, session):
resp = session.get(url, timeout=TIMEOUT)
resp.raise_for_status()
return resp.content
def load_cert(data):
for fn in [x509.load_der_x509_certificate, x509.load_pem_x509_certificate]:
try:
return fn(data)
except Exception:
pass
return None
def load_crl(data):
for fn in [x509.load_der_x509_crl, x509.load_pem_x509_crl]:
try:
return fn(data)
except Exception:
pass
return None
# ─── Input Detection & Loading ───────────────────────────────────────────────
def scrape_directory(base_url, session, origin=None, depth=0, visited=None):
"""Recursively scrape IIS directory listing."""
if visited is None:
visited = set()
if not base_url.endswith('/'):
base_url += '/'
if base_url in visited or depth > MAX_DEPTH:
return [], []
visited.add(base_url)
if origin is None:
origin = urlparse(base_url).netloc
try:
resp = session.get(base_url, timeout=TIMEOUT)
resp.raise_for_status()
except Exception:
return [], []
soup = BeautifulSoup(resp.text, 'html.parser')
certs, crls, dirs = [], [], []
for a in soup.find_all('a', href=True):
href = a['href']
full = urljoin(base_url, href)
low = unquote(full).lower()
if urlparse(full).netloc != origin: continue
if href in ('../', '..'): continue
if full.rstrip('/') == base_url.rstrip('/'): continue
if any(low.endswith(e) for e in CERT_EXTENSIONS): certs.append(full)
elif any(low.endswith(e) for e in CRL_EXTENSIONS): crls.append(full)
elif full.endswith('/') and full not in visited: dirs.append(full)
for d in dirs:
sc, sl = scrape_directory(d, session, origin, depth+1, visited)
certs.extend(sc); crls.extend(sl)
return certs, crls
def get_tls_chain(hostname, port=443):
"""Connect to a TLS server and return the certificate chain as cryptography objects."""
import select
ctx = SSL.Context(SSL.TLS_CLIENT_METHOD)
ctx.set_verify(SSL.VERIFY_NONE, lambda *a: True)
sock = socket.create_connection((hostname, port), timeout=TIMEOUT)
conn = SSL.Connection(ctx, sock)
conn.set_tlsext_host_name(hostname.encode())
conn.set_connect_state()
# Retry handshake — needed when socket has timeout set
while True:
try:
conn.do_handshake()
break
except SSL.WantReadError:
select.select([sock], [], [], 5)
except SSL.WantWriteError:
select.select([], [sock], [], 5)
certs = []
chain = conn.get_peer_cert_chain()
if chain:
for pyopenssl_cert in chain:
der = crypto.dump_certificate(crypto.FILETYPE_ASN1, pyopenssl_cert)
cert = x509.load_der_x509_certificate(der)
certs.append(cert)
else:
peer = conn.get_peer_certificate()
if peer:
der = crypto.dump_certificate(crypto.FILETYPE_ASN1, peer)
certs.append(x509.load_der_x509_certificate(der))
conn.shutdown()
conn.close()
sock.close()
if not certs:
raise Exception("No certificates received from server")
return certs
def walk_aia_chain(start_cert, session):
"""Follow AIA CA Issuer URLs upward to collect the full chain."""
collected = {} # thumbprint -> cert
tp = sha1(start_cert)
collected[tp] = start_cert
current = start_cert
seen = {tp}
while True:
if is_self_signed(current):
break
issuer_urls = aia_ca_issuer_urls(current)
http_urls = [u for u in issuer_urls if u.lower().startswith('http')]
if not http_urls:
break
found_parent = False
for url in http_urls:
try:
data = fetch(url, session)
parent = load_cert(data)
if parent is not None:
ptp = sha1(parent)
if ptp not in seen:
seen.add(ptp)
collected[ptp] = parent
current = parent
found_parent = True
break
except Exception:
continue
if not found_parent:
break
return collected
def detect_and_load(url, session):
"""
Detect input type and return (certs_dict, crl_urls, input_type).
certs_dict: thumbprint -> (cert, source_url_or_description)
"""
parsed = urlparse(url)
certs_dict = {}
crl_urls = []
# ── Try 1: TLS server (https:// without obvious file extension) ──────
if parsed.scheme == 'https':
low_path = parsed.path.lower().rstrip('/')
is_file = any(low_path.endswith(e) for e in CERT_EXTENSIONS + CRL_EXTENSIONS)
if not is_file and (not low_path or low_path == '/'):
# Looks like a server, try TLS handshake
hostname = parsed.hostname
port = parsed.port or 443
try:
print(f" 🔌 TLS connecting to {hostname}:{port}...")
chain = get_tls_chain(hostname, port)
if chain:
print(f" ✔ Got {len(chain)} cert(s) from TLS handshake")
for cert in chain:
tp = sha1(cert)
source = f"TLS:{hostname}:{port}"
certs_dict[tp] = (cert, source)
# Walk AIA from the deepest cert we got
for cert in chain:
aia_certs = walk_aia_chain(cert, session)
for tp2, cert2 in aia_certs.items():
if tp2 not in certs_dict:
aia_url = aia_ca_issuer_urls(cert2)
src = aia_url[0] if aia_url else "AIA chain"
certs_dict[tp2] = (cert2, src)
return certs_dict, crl_urls, "tls"
except Exception as e:
print(f" {C.Y}⚠ TLS failed ({e}), trying as URL...{C.RS}")
# ── Try 2: Direct certificate URL ────────────────────────────────────
try:
data = fetch(url, session)
except Exception as e:
print(f" {C.R}✘ Failed to fetch {url}: {e}{C.RS}")
return certs_dict, crl_urls, "error"
# Check if it's a certificate
cert = load_cert(data)
if cert is not None:
print(f" 📜 Direct certificate: {get_cn(cert.subject)}")
tp = sha1(cert)
certs_dict[tp] = (cert, url)
# Walk AIA upward
print(f" 🔗 Walking AIA chain...")
aia_certs = walk_aia_chain(cert, session)
for tp2, cert2 in aia_certs.items():
if tp2 not in certs_dict:
urls = aia_ca_issuer_urls(cert2)
src = urls[0] if urls else "AIA"
certs_dict[tp2] = (cert2, src)
print(f"{get_cn(cert2.subject)}")
return certs_dict, crl_urls, "cert"
# ── Try 3: Directory listing ─────────────────────────────────────────
# If we got HTML, assume it's a directory
if data[:50].lstrip().lower().startswith((b'<', b'<!', b'<html')):
print(f" 📁 Directory listing detected")
cert_urls, crl_found = scrape_directory(url, session)
for u in cert_urls:
print(f" 📜 {unquote(u.split('/')[-1])}")
for u in crl_found:
print(f" 📋 {unquote(u.split('/')[-1])}")
crl_urls.extend(crl_found)
for cert_url in cert_urls:
try:
d = fetch(cert_url, session)
c = load_cert(d)
if c is not None:
t = sha1(c)
if t not in certs_dict:
certs_dict[t] = (c, cert_url)
except Exception:
pass
# Follow AIA for any certs we loaded
for tp, (cert, src) in list(certs_dict.items()):
aia = walk_aia_chain(cert, session)
for tp2, cert2 in aia.items():
if tp2 not in certs_dict:
urls = aia_ca_issuer_urls(cert2)
s = urls[0] if urls else "AIA"
certs_dict[tp2] = (cert2, s)
return certs_dict, crl_urls, "directory"
print(f" {C.Y}⚠ Could not determine input type for {url}{C.RS}")
return certs_dict, crl_urls, "unknown"
# ─── CRL Signature ──────────────────────────────────────────────────────────
def verify_crl_sig(crl, issuer_cert):
try:
pub = issuer_cert.public_key()
if isinstance(pub, rsa.RSAPublicKey):
pub.verify(crl.signature, crl.tbs_certlist_bytes,
padding.PKCS1v15(), crl.signature_hash_algorithm)
elif isinstance(pub, ec.EllipticCurvePublicKey):
from cryptography.hazmat.primitives.asymmetric import ec as ecm
pub.verify(crl.signature, crl.tbs_certlist_bytes,
ecm.ECDSA(crl.signature_hash_algorithm))
else:
return None, "Unsupported key type"
return True, "Sig OK"
except InvalidSignature:
return False, "Sig INVALID"
except Exception as e:
return None, f"Verify error: {e}"
# ─── Tree Builder ────────────────────────────────────────────────────────────
class CANode:
def __init__(self, cert, url):
self.cert = cert
self.url = url
self.tp = sha1(cert)
self.name = get_cn(cert.subject)
self.ski = ski_hex(cert)
self.aki = aki_hex(cert)
self.root = is_self_signed(cert)
self.children = []
self.alternates = []
def build_tree(certs_dict):
# Only CA certs
ca_certs = {tp: (c, u) for tp, (c, u) in certs_dict.items() if is_ca(c)}
by_ski = {}
for tp, (cert, url) in ca_certs.items():
ski = ski_hex(cert)
if ski:
by_ski.setdefault(ski, []).append((tp, cert, url))
nodes = {}
ski_primary = {}
for ski, group in by_ski.items():
group.sort(key=lambda x: x[1].not_valid_after_utc, reverse=True)
tp, cert, url = group[0]
node = CANode(cert, url)
for atp, acert, aurl in group[1:]:
node.alternates.append((acert, aurl, sha1(acert)))
nodes[tp] = node
ski_primary[ski] = tp
roots = []
for tp, node in nodes.items():
if node.root:
roots.append(node)
elif node.aki and node.aki in ski_primary:
ptk = ski_primary[node.aki]
if ptk in nodes and ptk != tp:
nodes[ptk].children.append(node)
else:
roots.append(node)
else:
roots.append(node)
return roots, nodes, ski_primary
# ─── CDP Check ───────────────────────────────────────────────────────────────
def check_cdp(url, issuer_ski, issuer_cert, session, p):
if not url.lower().startswith('http'):
print(f"{p} {C.DM}LDAP — not checked{C.RS}")
return False
try:
data = fetch(url, session)
except Exception:
print(f"{p} {C.R}✘ Unreachable{C.RS}")
return True
crl = load_crl(data)
if crl is None:
print(f"{p} {C.R}✘ Failed to parse ({len(data)} bytes){C.RS}")
return True
parts = []
has_issue = False
now = datetime.now(timezone.utc)
if crl.next_update_utc:
if crl.next_update_utc < now:
days = (now - crl.next_update_utc).days
parts.append(f"{C.R}EXPIRED {days}d ago{C.RS}")
has_issue = True
else:
days = (crl.next_update_utc - now).days
color = C.G if days > 7 else C.Y
parts.append(f"{color}Valid ({days}d){C.RS}")
crl_aki = aki_hex(crl)
if crl_aki and issuer_ski:
if crl_aki == issuer_ski:
parts.append(f"{C.G}AKI match{C.RS}")
else:
parts.append(f"{C.R}AKI MISMATCH{C.RS}")
has_issue = True
if issuer_cert is not None:
valid, msg = verify_crl_sig(crl, issuer_cert)
if valid is True:
parts.append(f"{C.G}{msg}{C.RS}")
elif valid is False:
parts.append(f"{C.R}{msg}{C.RS}")
has_issue = True
else:
parts.append(f"{C.Y}{msg}{C.RS}")
try:
crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR)
parts.append("Delta")
except x509.ExtensionNotFound:
pass
parts.append(f"{sum(1 for _ in crl)} revoked")
print(f"{p} {' | '.join(parts)}")
if crl_aki and issuer_ski and crl_aki != issuer_ski:
print(f"{p} {C.R}CRL AKI: {crl_aki}{C.RS}")
print(f"{p} {C.R}Issuer SKI: {issuer_ski}{C.RS}")
return has_issue
# ─── AIA Check ───────────────────────────────────────────────────────────────
def check_aia(url, expected_tp, session, p):
if not url.lower().startswith('http'):
print(f"{p} {C.DM}LDAP — not checked{C.RS}")
return False
try:
data = fetch(url, session)
except Exception:
print(f"{p} {C.R}✘ Unreachable{C.RS}")
return True
cert = load_cert(data)
if cert is None:
print(f"{p} {C.R}✘ Failed to parse{C.RS}")
return True
got_tp = sha1(cert)
if expected_tp:
if got_tp == expected_tp:
print(f"{p} {C.G}✔ Matches issuer{C.RS} ({got_tp[:23]}...)")
return False
else:
# Check if same key (renewed cert)
got_ski = ski_hex(cert)
# find expected cert's SKI from our data
print(f"{p} {C.Y}⚠ Different thumbprint{C.RS}")
print(f"{p} Got: {get_cn(cert.subject)} ({got_tp[:23]}...)")
print(f"{p} Expected: {expected_tp[:23]}...")
return True
else:
print(f"{p} Downloaded: {get_cn(cert.subject)} ({got_tp[:23]}...)")
return False
# ─── Display ─────────────────────────────────────────────────────────────────
def show_cert(node, parent, session, p, issues):
cert = node.cert
now = datetime.now(timezone.utc)
nb = cert.not_valid_before_utc
na = cert.not_valid_after_utc
days = (na - now).days
expired = na < now
vc = C.R if expired else (C.Y if days < 90 else C.G)
vl = "EXPIRED" if expired else f"{days} days"
print(f"{p}Thumbprint: {node.tp}")
print(f"{p}SKI: {short_ski(node.ski)}")
if node.aki and parent and parent.ski and node.aki != parent.ski:
print(f"{p}AKI: {short_ski(node.aki)} "
f"{C.R}✘ MISMATCH with parent SKI ({short_ski(parent.ski)}){C.RS}")
issues.append(f"AKI mismatch: {node.name}")
print(f"{p}Valid: {nb.strftime('%Y-%m-%d')}{na.strftime('%Y-%m-%d')} "
f"{vc}[{vl}]{C.RS}")
if expired:
issues.append(f"EXPIRED: {node.name}")
if node.alternates:
print(f"{p}{C.Y}Also (renewed, same key):{C.RS}")
for ac, au, atp in node.alternates:
ana = ac.not_valid_after_utc
ad = (ana - now).days
ae = ana < now
ac2 = C.R if ae else (C.Y if ad < 90 else C.G)
al = "EXPIRED" if ae else f"{ad} days"
print(f"{p} {atp} {ac2}[{al}]{C.RS}")
issuer_cert = parent.cert if parent else None
issuer_ski = parent.ski if parent else None
issuer_tp = parent.tp if parent else None
# CDP
cdps = cdp_urls(cert)
if cdps:
for url in cdps:
print(f"{p}CDP:")
print(f"{p} {url}")
has = check_cdp(url, issuer_ski, issuer_cert, session, p)
if has:
issues.append(f"CDP issue: {node.name}")
elif node.root:
print(f"{p}CDP: (none — root)")
else:
print(f"{p}{C.Y}CDP: (none — missing!){C.RS}")
issues.append(f"No CDP: {node.name}")
# AIA
ca_urls = aia_ca_issuer_urls(cert)
if ca_urls:
for url in ca_urls:
print(f"{p}AIA:")
print(f"{p} {url}")
has = check_aia(url, issuer_tp, session, p)
if has:
issues.append(f"AIA issue: {node.name}")
elif node.root:
print(f"{p}AIA: (none — root)")
else:
print(f"{p}{C.Y}AIA: (none — missing!){C.RS}")
issues.append(f"No AIA: {node.name}")
# OCSP
ocsp = aia_ocsp_urls(cert)
for url in ocsp:
print(f"{p}OCSP: {url}")
def display_chain(root, num, session, issues):
print(f"\n{C.BD}{C.CN}Chain {num}{C.RS}")
print(f"{C.BD}{C.CN}{''*78}{C.RS}")
print(f"\n{C.BD}{C.B}{root.name}{C.RS}")
show_cert(root, None, session, " ", issues)
for child in root.children:
print(f"\n {C.BD}{C.B}{child.name}{C.RS}")
show_cert(child, root, session, " ", issues)
for gc in child.children:
print(f"\n {C.BD}{C.B}→ → {gc.name}{C.RS}")
show_cert(gc, child, session, " ", issues)
# ─── Issuing CA CRLs ────────────────────────────────────────────────────────
def check_issuing_ca_crls(roots, all_crl_urls, session, issues):
"""Check CRLs on the site that are published by CAs in our tree (for leaf validation)."""
# Collect all CA nodes
ca_nodes = {} # ski -> node
def collect(node):
if node.ski:
ca_nodes[node.ski] = node
for ch in node.children:
collect(ch)
for r in roots:
collect(r)
# Collect all CDP URLs referenced by certs in the tree
referenced_cdps = set()
def collect_cdps(node):
for u in cdp_urls(node.cert):
referenced_cdps.add(unquote(u).lower())
for ch in node.children:
collect_cdps(ch)
for r in roots:
collect_cdps(r)
# Find CRLs on site that are issued by a known CA but NOT referenced as CDP
issuing_crls = []
orphan_crls = []
for url in all_crl_urls:
normalized = unquote(url).lower()
if normalized in referenced_cdps:
continue # Already checked inline with the cert
try:
data = fetch(url, session)
crl = load_crl(data)
if crl is None:
orphan_crls.append((url, None, "Could not parse"))
continue
crl_aki = aki_hex(crl)
if crl_aki and crl_aki in ca_nodes:
issuing_crls.append((url, crl, ca_nodes[crl_aki]))
else:
# Try match by issuer name
matched = False
for ski, node in ca_nodes.items():
if crl.issuer == node.cert.subject:
issuing_crls.append((url, crl, node))
matched = True
break
if not matched:
orphan_crls.append((url, crl, None))
except Exception:
orphan_crls.append((url, None, "Could not fetch"))
# Display issuing CA CRLs
if issuing_crls:
header("Issuing CA CRLs (for leaf certificate validation)")
for url, crl, node in issuing_crls:
print(f"\n {C.BD}{node.name}{C.RS}")
print(f" {url}")
parts = []
has_issue = False
now = datetime.now(timezone.utc)
if crl.next_update_utc:
if crl.next_update_utc < now:
days = (now - crl.next_update_utc).days
parts.append(f"{C.R}EXPIRED {days}d ago{C.RS}")
has_issue = True
else:
days = (crl.next_update_utc - now).days
color = C.G if days > 7 else C.Y
parts.append(f"{color}Valid ({days}d){C.RS}")
crl_aki = aki_hex(crl)
if crl_aki and node.ski:
if crl_aki == node.ski:
parts.append(f"{C.G}AKI match{C.RS}")
else:
parts.append(f"{C.R}AKI MISMATCH{C.RS}")
has_issue = True
valid, msg = verify_crl_sig(crl, node.cert)
if valid is True:
parts.append(f"{C.G}{msg}{C.RS}")
elif valid is False:
parts.append(f"{C.R}{msg}{C.RS}")
has_issue = True
else:
parts.append(f"{C.Y}{msg}{C.RS}")
try:
crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR)
parts.append("Delta")
except x509.ExtensionNotFound:
pass
parts.append(f"{sum(1 for _ in crl)} revoked")
print(f" {' | '.join(parts)}")
if has_issue:
issues.append(f"Issuing CA CRL issue: {node.name}")
# Display orphans
if orphan_crls:
header("Orphaned Files")
print(f"\n {C.Y}{len(orphan_crls)} CRL(s) not matched to any CA:{C.RS}")
for url, crl, reason in orphan_crls:
name = unquote(url.split('/')[-1])
if crl is not None:
now = datetime.now(timezone.utc)
nu = crl.next_update_utc
if nu and nu < now:
exp = f"{C.R}EXPIRED{C.RS}"
elif nu:
exp = f"{C.G}{(nu-now).days}d{C.RS}"
else:
exp = "?"
print(f" 📋 {name} ({get_cn(crl.issuer)}, {exp})")
else:
print(f" 📋 {name} ({reason})")
elif all_crl_urls:
if not issuing_crls:
header("Orphaned Files")
print(f"\n {C.G}{C.RS} No orphaned files")
return issuing_crls, orphan_crls
# ─── Main ────────────────────────────────────────────────────────────────────
def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <url> [<url2> ...]")
print(f"\nInput types:")
print(f" Directory: http://pki.kinda.se/")
print(f" Cert URL: https://r10.i.lencr.org/")
print(f" TLS server: https://www.google.com")
sys.exit(1)
session = make_session()
all_certs = {} # thumbprint -> (cert, source)
all_crl_urls = []
# ── Load from all inputs ─────────────────────────────────────────────
for url in sys.argv[1:]:
header(f"Input: {url}")
certs, crls, input_type = detect_and_load(url, session)
for tp, (cert, src) in certs.items():
if tp not in all_certs:
all_certs[tp] = (cert, src)
all_crl_urls.extend(crls)
all_crl_urls = list(dict.fromkeys(all_crl_urls))
# Count CA certs
ca_count = sum(1 for tp, (c, u) in all_certs.items() if is_ca(c))
total = len(all_certs)
non_ca = total - ca_count
print(f"\n Loaded: {ca_count} CA cert(s)", end="")
if non_ca:
print(f", {non_ca} non-CA cert(s) (skipped)")
else:
print()
# ── Build tree ───────────────────────────────────────────────────────
roots, nodes, ski_primary = build_tree(all_certs)
if not roots:
print(f"\n {C.R}✘ Could not build CA hierarchy{C.RS}")
sys.exit(1)
# ── Display chains ───────────────────────────────────────────────────
header("PKI Health Check")
all_issues = []
for i, root in enumerate(roots, 1):
display_chain(root, i, session, all_issues)
# ── Issuing CA CRLs & Orphans ────────────────────────────────────────
if all_crl_urls:
check_issuing_ca_crls(roots, all_crl_urls, session, all_issues)
# ── Summary ──────────────────────────────────────────────────────────
header("Summary")
print(f" Chains: {len(roots)} | CA certs: {ca_count} | "
f"Site CRLs: {len(all_crl_urls)}")
if not all_issues:
print(f"\n {C.G}{C.BD}✔ All checks passed{C.RS}")
else:
print(f"\n {C.R}{C.BD}{len(all_issues)} issue(s):{C.RS}")
seen = set()
for issue in all_issues:
if issue not in seen:
seen.add(issue)
print(f" {C.R}{issue}{C.RS}")
if __name__ == '__main__':
main()