770 lines
27 KiB
Python
770 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
# /// script
|
|
# requires-python = ">=3.10"
|
|
# dependencies = [
|
|
# "requests",
|
|
# "beautifulsoup4",
|
|
# "cryptography",
|
|
# ]
|
|
# ///
|
|
"""
|
|
PKI Site Analyzer v8
|
|
====================
|
|
Scrapes IIS PKI sites recursively, builds certificate chains,
|
|
fetches and validates CRLs referenced from each certificate's CDP,
|
|
and flags orphaned CRLs that may need cleanup.
|
|
|
|
Usage:
|
|
uv run pki_analyzer.py <url> [<url2> ...]
|
|
|
|
Examples:
|
|
uv run pki_analyzer.py http://pki.matas.dk/aia http://pki.matas.dk/cdp
|
|
uv run pki_analyzer.py http://pki.imy.se/
|
|
"""
|
|
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from urllib.parse import urljoin, unquote, urlparse
|
|
from collections import defaultdict
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec
|
|
from cryptography.exceptions import InvalidSignature
|
|
|
|
|
|
CERT_EXTENSIONS = ('.crt', '.cer', '.pem')
|
|
CRL_EXTENSIONS = ('.crl',)
|
|
TIMEOUT = 30
|
|
MAX_DEPTH = 5
|
|
DEBUG_SCRAPE = False
|
|
|
|
|
|
class C:
|
|
HEADER = '\033[95m'
|
|
BLUE = '\033[94m'
|
|
CYAN = '\033[96m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
RED = '\033[91m'
|
|
BOLD = '\033[1m'
|
|
DIM = '\033[2m'
|
|
RESET = '\033[0m'
|
|
|
|
|
|
def print_header(text):
|
|
print(f"\n{C.BOLD}{C.HEADER}{'='*80}{C.RESET}")
|
|
print(f"{C.BOLD}{C.HEADER} {text}{C.RESET}")
|
|
print(f"{C.BOLD}{C.HEADER}{'='*80}{C.RESET}")
|
|
|
|
def print_section(text):
|
|
print(f"\n{C.BOLD}{C.CYAN}── {text} {'─'*(74-len(text))}{C.RESET}")
|
|
|
|
def print_ok(text):
|
|
print(f" {C.GREEN}✔{C.RESET} {text}")
|
|
|
|
def print_warn(text):
|
|
print(f" {C.YELLOW}⚠{C.RESET} {text}")
|
|
|
|
def print_err(text):
|
|
print(f" {C.RED}✘{C.RESET} {text}")
|
|
|
|
def print_info(label, value):
|
|
print(f" {C.BOLD}{label:.<40}{C.RESET} {value}")
|
|
|
|
def print_debug(text):
|
|
if DEBUG_SCRAPE:
|
|
print(f" {C.DIM}DBG: {text}{C.RESET}")
|
|
|
|
def sha1_fingerprint(cert):
|
|
return cert.fingerprint(hashes.SHA1()).hex(':')
|
|
|
|
def subject_str(name):
|
|
parts = []
|
|
for attr in name:
|
|
parts.append(f"{attr.oid._name}={attr.value}")
|
|
return ', '.join(parts) if parts else '(empty)'
|
|
|
|
|
|
# ─── Scraping ────────────────────────────────────────────────────────────────
|
|
|
|
def scrape_recursive(base_url, session, origin_host=None, depth=0, visited=None):
|
|
if visited is None:
|
|
visited = set()
|
|
if not base_url.endswith('/'):
|
|
base_url += '/'
|
|
if base_url in visited or depth > MAX_DEPTH:
|
|
return [], []
|
|
visited.add(base_url)
|
|
if origin_host is None:
|
|
origin_host = urlparse(base_url).netloc
|
|
|
|
indent = " " * depth
|
|
print(f"{indent} 🔍 Scanning: {base_url}")
|
|
|
|
try:
|
|
resp = session.get(base_url, timeout=TIMEOUT)
|
|
resp.raise_for_status()
|
|
except Exception as e:
|
|
print_warn(f"{indent} Could not fetch {base_url}: {e}")
|
|
return [], []
|
|
|
|
soup = BeautifulSoup(resp.text, 'html.parser')
|
|
cert_urls, crl_urls, subdirs = [], [], []
|
|
|
|
for a_tag in soup.find_all('a', href=True):
|
|
href = a_tag['href']
|
|
full_url = urljoin(base_url, href)
|
|
decoded_url = unquote(full_url).lower()
|
|
decoded_href = unquote(href)
|
|
|
|
print_debug(f" href='{href}' → '{full_url}'")
|
|
|
|
if urlparse(full_url).netloc != origin_host:
|
|
continue
|
|
if href in ('../', '..'):
|
|
continue
|
|
if full_url.rstrip('/') == base_url.rstrip('/'):
|
|
continue
|
|
|
|
if any(decoded_url.endswith(ext) for ext in CERT_EXTENSIONS):
|
|
cert_urls.append(full_url)
|
|
print(f"{indent} 📜 {decoded_href}")
|
|
elif any(decoded_url.endswith(ext) for ext in CRL_EXTENSIONS):
|
|
crl_urls.append(full_url)
|
|
print(f"{indent} 📋 {decoded_href}")
|
|
elif full_url.endswith('/') and full_url not in visited:
|
|
subdirs.append(full_url)
|
|
print(f"{indent} 📁 {decoded_href}")
|
|
|
|
for subdir in subdirs:
|
|
sc, sl = scrape_recursive(subdir, session, origin_host, depth + 1, visited)
|
|
cert_urls.extend(sc)
|
|
crl_urls.extend(sl)
|
|
|
|
return cert_urls, crl_urls
|
|
|
|
|
|
def download_file(url, session):
|
|
resp = session.get(url, timeout=TIMEOUT)
|
|
resp.raise_for_status()
|
|
return resp.content
|
|
|
|
|
|
# ─── Parsing ─────────────────────────────────────────────────────────────────
|
|
|
|
def load_certificate(data, url=""):
|
|
for loader in [x509.load_der_x509_certificate, x509.load_pem_x509_certificate]:
|
|
try:
|
|
return loader(data)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def load_crl(data, url=""):
|
|
for loader in [x509.load_der_x509_crl, x509.load_pem_x509_crl]:
|
|
try:
|
|
return loader(data)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
# ─── Extensions ──────────────────────────────────────────────────────────────
|
|
|
|
def get_extension_value(obj, oid):
|
|
try:
|
|
return obj.extensions.get_extension_for_oid(oid).value
|
|
except x509.ExtensionNotFound:
|
|
return None
|
|
|
|
def get_cdp_urls(cert):
|
|
cdp = get_extension_value(cert, ExtensionOID.CRL_DISTRIBUTION_POINTS)
|
|
urls = []
|
|
if cdp:
|
|
for dp in cdp:
|
|
if dp.full_name:
|
|
for n in dp.full_name:
|
|
if isinstance(n, x509.UniformResourceIdentifier):
|
|
urls.append(n.value)
|
|
return urls
|
|
|
|
def get_aia_info(cert):
|
|
aia = get_extension_value(cert, ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
|
|
ocsp, ca = [], []
|
|
if aia:
|
|
for d in aia:
|
|
if isinstance(d.access_location, x509.UniformResourceIdentifier):
|
|
if d.access_method == AuthorityInformationAccessOID.OCSP:
|
|
ocsp.append(d.access_location.value)
|
|
elif d.access_method == AuthorityInformationAccessOID.CA_ISSUERS:
|
|
ca.append(d.access_location.value)
|
|
return ca, ocsp
|
|
|
|
def get_subject_key_id(cert):
|
|
ski = get_extension_value(cert, ExtensionOID.SUBJECT_KEY_IDENTIFIER)
|
|
return ski.digest.hex() if ski else None
|
|
|
|
def get_authority_key_id(obj):
|
|
aki = get_extension_value(obj, ExtensionOID.AUTHORITY_KEY_IDENTIFIER)
|
|
return aki.key_identifier.hex() if aki and aki.key_identifier else None
|
|
|
|
def is_self_signed(cert):
|
|
return cert.issuer == cert.subject
|
|
|
|
def is_ca(cert):
|
|
bc = get_extension_value(cert, ExtensionOID.BASIC_CONSTRAINTS)
|
|
return bc.ca if bc else False
|
|
|
|
def get_key_usage(cert):
|
|
ku = get_extension_value(cert, ExtensionOID.KEY_USAGE)
|
|
if not ku:
|
|
return None
|
|
usages = []
|
|
for attr in ['digital_signature', 'key_encipherment', 'key_cert_sign',
|
|
'crl_sign', 'content_commitment', 'data_encipherment',
|
|
'key_agreement']:
|
|
try:
|
|
if getattr(ku, attr):
|
|
usages.append(attr)
|
|
except Exception:
|
|
pass
|
|
return ', '.join(usages)
|
|
|
|
def get_eku(cert):
|
|
eku = get_extension_value(cert, ExtensionOID.EXTENDED_KEY_USAGE)
|
|
return ', '.join(u.dotted_string for u in eku) if eku else None
|
|
|
|
def get_san(cert):
|
|
san = get_extension_value(cert, ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
|
|
return ', '.join(str(n.value) for n in san) if san else None
|
|
|
|
|
|
# ─── Chain Building ──────────────────────────────────────────────────────────
|
|
|
|
def build_chains(certs_dict):
|
|
ski_index = {}
|
|
for fp, (cert, url) in certs_dict.items():
|
|
ski = get_subject_key_id(cert)
|
|
if ski:
|
|
if ski in ski_index:
|
|
existing, _ = certs_dict[ski_index[ski]]
|
|
if cert.not_valid_after_utc > existing.not_valid_after_utc:
|
|
ski_index[ski] = fp
|
|
else:
|
|
ski_index[ski] = fp
|
|
|
|
parent_of = {}
|
|
for fp, (cert, url) in certs_dict.items():
|
|
if is_self_signed(cert):
|
|
continue
|
|
aki = get_authority_key_id(cert)
|
|
if aki and aki in ski_index and ski_index[aki] != fp:
|
|
parent_of[fp] = ski_index[aki]
|
|
|
|
roots = [fp for fp, (c, _) in certs_dict.items() if is_self_signed(c)]
|
|
children_of = defaultdict(list)
|
|
for child, par in parent_of.items():
|
|
children_of[par].append(child)
|
|
|
|
chains = []
|
|
def walk(fp, chain):
|
|
chain.append(fp)
|
|
kids = children_of.get(fp, [])
|
|
if not kids:
|
|
chains.append(list(chain))
|
|
else:
|
|
for k in kids:
|
|
walk(k, chain)
|
|
chain.pop()
|
|
|
|
for r in roots:
|
|
walk(r, [])
|
|
|
|
seen = set()
|
|
for c in chains:
|
|
seen.update(c)
|
|
for fp in certs_dict:
|
|
if fp not in seen:
|
|
chains.append([fp])
|
|
|
|
return chains
|
|
|
|
|
|
# ─── CRL Operations ─────────────────────────────────────────────────────────
|
|
|
|
def verify_crl_signature(crl, issuer_cert):
|
|
try:
|
|
pub = issuer_cert.public_key()
|
|
if isinstance(pub, rsa.RSAPublicKey):
|
|
pub.verify(crl.signature, crl.tbs_certlist_bytes,
|
|
padding.PKCS1v15(), crl.signature_hash_algorithm)
|
|
elif isinstance(pub, ec.EllipticCurvePublicKey):
|
|
from cryptography.hazmat.primitives.asymmetric import ec as ec_mod
|
|
pub.verify(crl.signature, crl.tbs_certlist_bytes,
|
|
ec_mod.ECDSA(crl.signature_hash_algorithm))
|
|
else:
|
|
return None, "Unsupported key type"
|
|
return True, "Signature valid"
|
|
except InvalidSignature:
|
|
return False, "Signature INVALID"
|
|
except Exception as e:
|
|
return None, f"Verification error: {e}"
|
|
|
|
|
|
def fetch_and_validate_crl(cdp_url, issuer_cert, session):
|
|
"""Fetch a CRL from a CDP URL, validate it, and return results dict."""
|
|
result = {
|
|
'url': cdp_url,
|
|
'reachable': False,
|
|
'parseable': False,
|
|
'issuer': None,
|
|
'last_update': None,
|
|
'next_update': None,
|
|
'expired': None,
|
|
'is_delta': False,
|
|
'sig_valid': None,
|
|
'sig_msg': None,
|
|
'revoked_count': 0,
|
|
'error': None,
|
|
'crl': None,
|
|
}
|
|
|
|
# Skip non-HTTP
|
|
if not cdp_url.lower().startswith('http'):
|
|
result['error'] = f"Non-HTTP CDP (skipped): {cdp_url}"
|
|
return result
|
|
|
|
# Download
|
|
try:
|
|
data = download_file(cdp_url, session)
|
|
result['reachable'] = True
|
|
except Exception as e:
|
|
result['error'] = f"Download failed: {e}"
|
|
return result
|
|
|
|
# Parse
|
|
crl = load_crl(data, cdp_url)
|
|
if crl is None:
|
|
result['error'] = f"Parse failed ({len(data)} bytes)"
|
|
return result
|
|
|
|
result['parseable'] = True
|
|
result['crl'] = crl
|
|
result['issuer'] = subject_str(crl.issuer)
|
|
result['last_update'] = crl.last_update_utc
|
|
result['next_update'] = crl.next_update_utc
|
|
result['revoked_count'] = len(list(crl))
|
|
|
|
now = datetime.now(timezone.utc)
|
|
if crl.next_update_utc:
|
|
result['expired'] = crl.next_update_utc < now
|
|
|
|
# Delta check
|
|
try:
|
|
crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR)
|
|
result['is_delta'] = True
|
|
except x509.ExtensionNotFound:
|
|
pass
|
|
|
|
# Signature verification
|
|
if issuer_cert is not None:
|
|
valid, msg = verify_crl_signature(crl, issuer_cert)
|
|
result['sig_valid'] = valid
|
|
result['sig_msg'] = msg
|
|
else:
|
|
result['sig_msg'] = "No issuer cert available"
|
|
|
|
return result
|
|
|
|
|
|
def is_cert_revoked(cert, crl):
|
|
"""Check if a certificate's serial is on the CRL."""
|
|
serial = cert.serial_number
|
|
for revoked in crl:
|
|
if revoked.serial_number == serial:
|
|
return True
|
|
return False
|
|
|
|
|
|
# ─── Display ─────────────────────────────────────────────────────────────────
|
|
|
|
def display_cert(cert, url, indent=0):
|
|
p = " " * indent
|
|
now = datetime.now(timezone.utc)
|
|
nb = cert.not_valid_before_utc
|
|
na = cert.not_valid_after_utc
|
|
exp = na < now
|
|
dl = (na - now).days
|
|
|
|
print(f"{p}{C.BOLD}Subject:{C.RESET} {subject_str(cert.subject)}")
|
|
print(f"{p}{C.BOLD}Issuer:{C.RESET} {subject_str(cert.issuer)}")
|
|
print(f"{p}{C.BOLD}Serial:{C.RESET} {format(cert.serial_number, 'x')}")
|
|
print(f"{p}{C.BOLD}Not Before:{C.RESET} {nb}")
|
|
print(f"{p}{C.BOLD}Not After:{C.RESET} {na}", end="")
|
|
if exp:
|
|
print(f" {C.RED}[EXPIRED]{C.RESET}")
|
|
elif dl < 90:
|
|
print(f" {C.YELLOW}[{dl} days remaining]{C.RESET}")
|
|
else:
|
|
print(f" {C.GREEN}[{dl} days remaining]{C.RESET}")
|
|
if nb > now:
|
|
print(f"{p} {C.RED}[NOT YET VALID]{C.RESET}")
|
|
|
|
print(f"{p}{C.BOLD}SHA1:{C.RESET} {sha1_fingerprint(cert)}")
|
|
print(f"{p}{C.BOLD}Self-signed:{C.RESET} {'Yes' if is_self_signed(cert) else 'No'}")
|
|
print(f"{p}{C.BOLD}CA:{C.RESET} {'Yes' if is_ca(cert) else 'No'}")
|
|
|
|
ku = get_key_usage(cert)
|
|
if ku:
|
|
print(f"{p}{C.BOLD}Key Usage:{C.RESET} {ku}")
|
|
eku = get_eku(cert)
|
|
if eku:
|
|
print(f"{p}{C.BOLD}EKU:{C.RESET} {eku}")
|
|
san = get_san(cert)
|
|
if san:
|
|
print(f"{p}{C.BOLD}SAN:{C.RESET} {san}")
|
|
|
|
ski = get_subject_key_id(cert)
|
|
if ski:
|
|
print(f"{p}{C.BOLD}SKI:{C.RESET} {ski}")
|
|
aki = get_authority_key_id(cert)
|
|
if aki:
|
|
print(f"{p}{C.BOLD}AKI:{C.RESET} {aki}")
|
|
|
|
# CDP
|
|
cdps = get_cdp_urls(cert)
|
|
if cdps:
|
|
print(f"{p}{C.BOLD}CDP:{C.RESET}")
|
|
for u in cdps:
|
|
print(f"{p} → {u}")
|
|
else:
|
|
print(f"{p}{C.BOLD}CDP:{C.RESET} (none)")
|
|
|
|
# AIA
|
|
ca_issuers, ocsp_urls = get_aia_info(cert)
|
|
if ca_issuers or ocsp_urls:
|
|
print(f"{p}{C.BOLD}AIA:{C.RESET}")
|
|
for u in ca_issuers:
|
|
print(f"{p} → CA Issuer: {u}")
|
|
for u in ocsp_urls:
|
|
print(f"{p} → OCSP: {u}")
|
|
else:
|
|
print(f"{p}{C.BOLD}AIA:{C.RESET} (none)")
|
|
|
|
# Policies
|
|
policies = get_extension_value(cert, ExtensionOID.CERTIFICATE_POLICIES)
|
|
if policies:
|
|
print(f"{p}{C.BOLD}Policies:{C.RESET}")
|
|
for pol in policies:
|
|
print(f"{p} → {pol.policy_identifier.dotted_string}")
|
|
if pol.policy_qualifiers:
|
|
for q in pol.policy_qualifiers:
|
|
if isinstance(q, str):
|
|
print(f"{p} CPS: {q}")
|
|
|
|
print(f"{p}{C.BOLD}Source:{C.RESET} {url}")
|
|
|
|
|
|
def display_crl_inline(result, indent=0):
|
|
"""Display CRL validation results inline with its parent certificate."""
|
|
p = " " * indent
|
|
r = result
|
|
|
|
if r['error']:
|
|
if 'Non-HTTP' in r['error']:
|
|
print(f"{p}{C.DIM} ↳ CRL: {r['url']} (LDAP — not checked){C.RESET}")
|
|
else:
|
|
print(f"{p} {C.RED}↳ CRL: {r['url']}{C.RESET}")
|
|
print(f"{p} {C.RED}✘ {r['error']}{C.RESET}")
|
|
return
|
|
|
|
crl_type = "Delta" if r['is_delta'] else "Base"
|
|
|
|
print(f"{p} ↳ CRL: {r['url']}")
|
|
print(f"{p} Type: {crl_type} | Revoked: {r['revoked_count']}", end="")
|
|
|
|
if r['expired']:
|
|
print(f" | {C.RED}EXPIRED (next update: {r['next_update']}){C.RESET}")
|
|
elif r['next_update']:
|
|
rem = (r['next_update'] - datetime.now(timezone.utc)).days
|
|
print(f" | {C.GREEN}Valid ({rem} days){C.RESET}")
|
|
else:
|
|
print()
|
|
|
|
print(f"{p} Published: {r['last_update']}")
|
|
|
|
if r['sig_valid'] is True:
|
|
print(f"{p} {C.GREEN}✔ {r['sig_msg']}{C.RESET}")
|
|
elif r['sig_valid'] is False:
|
|
print(f"{p} {C.RED}✘ {r['sig_msg']}{C.RESET}")
|
|
else:
|
|
print(f"{p} {C.YELLOW}⚠ {r['sig_msg']}{C.RESET}")
|
|
|
|
|
|
# ─── Main ────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print(f"Usage: {sys.argv[0]} <url> [<url2> ...]")
|
|
print(f"Example: {sys.argv[0]} http://pki.imy.se/")
|
|
sys.exit(1)
|
|
|
|
base_urls = sys.argv[1:]
|
|
|
|
session = requests.Session()
|
|
session.verify = False
|
|
session.headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
|
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
|
'Chrome/125.0.0.0 Safari/537.36',
|
|
'Accept': '*/*',
|
|
'Accept-Encoding': 'identity',
|
|
})
|
|
requests.packages.urllib3.disable_warnings()
|
|
|
|
all_cert_urls, all_crl_urls = [], []
|
|
|
|
# ── Step 1: Discover files ───────────────────────────────────────────
|
|
for base_url in base_urls:
|
|
print_header(f"Scanning: {base_url}")
|
|
cu, cl = scrape_recursive(base_url, session)
|
|
all_cert_urls.extend(cu)
|
|
all_crl_urls.extend(cl)
|
|
|
|
all_cert_urls = list(dict.fromkeys(all_cert_urls))
|
|
all_crl_urls = list(dict.fromkeys(all_crl_urls))
|
|
|
|
print_section("Discovery Summary")
|
|
print_ok(f"Certificate files: {len(all_cert_urls)}")
|
|
print_ok(f"CRL files on site: {len(all_crl_urls)}")
|
|
|
|
# ── Step 2: Download and parse certificates ──────────────────────────
|
|
print_header("Loading Certificates")
|
|
|
|
certs_dict = {} # fingerprint -> (cert, url)
|
|
for url in all_cert_urls:
|
|
try:
|
|
data = download_file(url, session)
|
|
cert = load_certificate(data, url)
|
|
if cert is not None:
|
|
fp = sha1_fingerprint(cert)
|
|
certs_dict[fp] = (cert, url)
|
|
print_ok(f"{subject_str(cert.subject)}")
|
|
else:
|
|
print_warn(f"Could not parse: {url}")
|
|
except Exception as e:
|
|
print_err(f"Download failed: {url} — {e}")
|
|
|
|
# ── Step 3: Fetch additional certs from AIA ──────────────────────────
|
|
print_header("Fetching CA Certs from AIA Extensions")
|
|
aia_urls = set()
|
|
for fp, (cert, url) in certs_dict.items():
|
|
ca_issuers, _ = get_aia_info(cert)
|
|
for u in ca_issuers:
|
|
if u.lower().startswith('http'):
|
|
aia_urls.add(u)
|
|
|
|
loaded_urls = set(u for _, (c, u) in certs_dict.items())
|
|
new_aia = aia_urls - loaded_urls
|
|
|
|
if new_aia:
|
|
for url in new_aia:
|
|
try:
|
|
data = download_file(url, session)
|
|
cert = load_certificate(data, url)
|
|
if cert is not None:
|
|
fp = sha1_fingerprint(cert)
|
|
if fp not in certs_dict:
|
|
certs_dict[fp] = (cert, url)
|
|
print_ok(f"Fetched: {subject_str(cert.subject)}")
|
|
except Exception as e:
|
|
print_err(f"Failed: {url} — {e}")
|
|
else:
|
|
print_info("No additional certs needed", "All issuers already loaded")
|
|
|
|
# ── Step 4: Build chains ─────────────────────────────────────────────
|
|
chains = build_chains(certs_dict)
|
|
|
|
# Build issuer lookup: AKI -> issuer cert
|
|
ski_to_cert = {}
|
|
for fp, (cert, url) in certs_dict.items():
|
|
ski = get_subject_key_id(cert)
|
|
if ski:
|
|
if ski not in ski_to_cert:
|
|
ski_to_cert[ski] = cert
|
|
else:
|
|
existing = ski_to_cert[ski]
|
|
if cert.not_valid_after_utc > existing.not_valid_after_utc:
|
|
ski_to_cert[ski] = cert
|
|
|
|
# ── Step 5: Display chains with inline CRL validation ────────────────
|
|
print_header("Certificate Chains & CRL Status")
|
|
|
|
# Track all CDP URLs we validate (to find orphans later)
|
|
validated_cdp_urls = set()
|
|
# Track all CRL results for summary
|
|
all_crl_results = []
|
|
# Track revocation status
|
|
revocation_issues = []
|
|
|
|
for i, chain in enumerate(chains, 1):
|
|
print_section(f"Chain {i} ({len(chain)} certificate(s))")
|
|
|
|
for depth, fp in enumerate(chain):
|
|
cert, url = certs_dict[fp]
|
|
|
|
if depth == 0 and is_self_signed(cert):
|
|
role = "Root CA"
|
|
elif is_ca(cert):
|
|
role = "Intermediate CA"
|
|
else:
|
|
role = "End Entity"
|
|
|
|
print(f"\n{' '*depth}{C.BOLD}{C.BLUE}[{depth}] {role}{C.RESET}")
|
|
display_cert(cert, url, indent=depth)
|
|
|
|
# Fetch and validate each CDP for this certificate
|
|
cdp_urls = get_cdp_urls(cert)
|
|
if cdp_urls:
|
|
# Find issuer cert for signature verification
|
|
aki = get_authority_key_id(cert)
|
|
issuer_cert = ski_to_cert.get(aki) if aki else None
|
|
|
|
for cdp_url in cdp_urls:
|
|
validated_cdp_urls.add(cdp_url)
|
|
result = fetch_and_validate_crl(cdp_url, issuer_cert, session)
|
|
all_crl_results.append(result)
|
|
display_crl_inline(result, indent=depth)
|
|
|
|
# Check if THIS certificate is revoked
|
|
if result['crl'] is not None:
|
|
if is_cert_revoked(cert, result['crl']):
|
|
revocation_issues.append(
|
|
(cert, url, cdp_url)
|
|
)
|
|
print(f"{' '*depth} {C.RED}{C.BOLD}"
|
|
f"⚠ THIS CERTIFICATE IS REVOKED!{C.RESET}")
|
|
elif not is_self_signed(cert):
|
|
print(f"{' '*depth} {C.YELLOW}⚠ No CDP — cannot check revocation{C.RESET}")
|
|
|
|
# ── Step 6: Find orphaned CRLs on the site ──────────────────────────
|
|
print_header("Orphaned CRL Files (on site but not referenced by any certificate)")
|
|
|
|
# Normalize URLs for comparison
|
|
def normalize_url(u):
|
|
return unquote(u).lower().rstrip('/')
|
|
|
|
referenced_normalized = set(normalize_url(u) for u in validated_cdp_urls)
|
|
orphans = []
|
|
|
|
for crl_url in all_crl_urls:
|
|
if normalize_url(crl_url) not in referenced_normalized:
|
|
orphans.append(crl_url)
|
|
|
|
if orphans:
|
|
print_warn(f"Found {len(orphans)} CRL file(s) not referenced by any certificate CDP:")
|
|
for url in orphans:
|
|
# Try to load and show basic info
|
|
try:
|
|
data = download_file(url, session)
|
|
crl = load_crl(data, url)
|
|
if crl is not None:
|
|
now = datetime.now(timezone.utc)
|
|
expired = ""
|
|
if crl.next_update_utc and crl.next_update_utc < now:
|
|
expired = f" {C.RED}[EXPIRED]{C.RESET}"
|
|
print(f" 📋 {unquote(url)}")
|
|
print(f" Issuer: {subject_str(crl.issuer)}")
|
|
print(f" Next update: {crl.next_update_utc}{expired}")
|
|
print(f" Revoked: {len(list(crl))}")
|
|
else:
|
|
print(f" 📋 {unquote(url)} (could not parse)")
|
|
except Exception:
|
|
print(f" 📋 {unquote(url)} (could not download)")
|
|
print()
|
|
print_warn("These may be leftover files that should be reviewed/removed.")
|
|
else:
|
|
print_ok("No orphaned CRL files — all CRLs on site are referenced by certificates")
|
|
|
|
# ── Step 7: Summary ──────────────────────────────────────────────────
|
|
print_header("Summary")
|
|
print_info("Certificates parsed", str(len(certs_dict)))
|
|
print_info("Chains found", str(len(chains)))
|
|
|
|
# CRL stats
|
|
total_cdps = len(all_crl_results)
|
|
reachable = sum(1 for r in all_crl_results if r['reachable'])
|
|
parsed = sum(1 for r in all_crl_results if r['parseable'])
|
|
expired = sum(1 for r in all_crl_results if r['expired'])
|
|
sig_ok = sum(1 for r in all_crl_results if r['sig_valid'] is True)
|
|
sig_fail = sum(1 for r in all_crl_results if r['sig_valid'] is False)
|
|
unreachable = sum(1 for r in all_crl_results
|
|
if not r['reachable'] and r['error']
|
|
and 'Non-HTTP' not in r['error'])
|
|
ldap_skipped = sum(1 for r in all_crl_results
|
|
if r['error'] and 'Non-HTTP' in r['error'])
|
|
|
|
print_info("CDP URLs checked", str(total_cdps))
|
|
if ldap_skipped:
|
|
print_info(" LDAP (skipped)", str(ldap_skipped))
|
|
if reachable:
|
|
print_info(" Reachable", str(reachable))
|
|
if unreachable:
|
|
print_info(" Unreachable", str(unreachable))
|
|
if parsed:
|
|
print_info(" Parsed OK", str(parsed))
|
|
if sig_ok:
|
|
print_info(" Signature valid", str(sig_ok))
|
|
if sig_fail:
|
|
print_info(" Signature INVALID", str(sig_fail))
|
|
|
|
# Expiry
|
|
now = datetime.now(timezone.utc)
|
|
exp_certs = [fp for fp, (c, _) in certs_dict.items()
|
|
if c.not_valid_after_utc < now]
|
|
exp_crls = [r for r in all_crl_results if r['expired']]
|
|
|
|
if exp_certs:
|
|
print_err(f"Expired certificates: {len(exp_certs)}")
|
|
for fp in exp_certs:
|
|
c, u = certs_dict[fp]
|
|
print(f" → {subject_str(c.subject)} (expired {c.not_valid_after_utc})")
|
|
else:
|
|
print_ok("No expired certificates")
|
|
|
|
if exp_crls:
|
|
print_err(f"Expired CRLs: {len(exp_crls)}")
|
|
for r in exp_crls:
|
|
print(f" → {r['url']} (expired {r['next_update']})")
|
|
else:
|
|
print_ok("No expired CRLs")
|
|
|
|
if unreachable:
|
|
print_err(f"Unreachable CDPs: {unreachable}")
|
|
for r in all_crl_results:
|
|
if not r['reachable'] and r['error'] and 'Non-HTTP' not in r['error']:
|
|
print(f" → {r['url']}")
|
|
print(f" {r['error']}")
|
|
|
|
if revocation_issues:
|
|
print_err(f"REVOKED CERTIFICATES: {len(revocation_issues)}")
|
|
for cert, url, cdp in revocation_issues:
|
|
print(f" → {subject_str(cert.subject)}")
|
|
print(f" Serial: {format(cert.serial_number, 'x')}")
|
|
print(f" CRL: {cdp}")
|
|
else:
|
|
print_ok("No downloaded certificates are revoked")
|
|
|
|
if orphans:
|
|
print_warn(f"Orphaned CRL files: {len(orphans)}")
|
|
else:
|
|
print_ok("No orphaned CRL files")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |