811 lines
28 KiB
Python
811 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
# /// script
|
|
# requires-python = ">=3.10"
|
|
# dependencies = [
|
|
# "requests",
|
|
# "beautifulsoup4",
|
|
# "cryptography",
|
|
# ]
|
|
# ///
|
|
"""
|
|
PKI Site Analyzer v6
|
|
====================
|
|
Scrapes IIS sites with directory browsing enabled (recursively),
|
|
downloads all .crt/.cer/.crl files, parses certificates, builds
|
|
chain relationships, and validates CRLs.
|
|
|
|
Usage:
|
|
uv run pki_analyzer.py <url> [<url2> ...]
|
|
|
|
Examples:
|
|
uv run pki_analyzer.py http://pki.matas.dk/cdp http://pki.matas.dk/aia
|
|
uv run pki_analyzer.py http://pki.imy.se/
|
|
"""
|
|
|
|
import sys
|
|
import traceback
|
|
from datetime import datetime, timezone
|
|
from urllib.parse import urljoin, unquote, urlparse
|
|
from collections import defaultdict
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import ExtensionOID, AuthorityInformationAccessOID
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding, rsa, ec
|
|
from cryptography.exceptions import InvalidSignature
|
|
|
|
|
|
# ─── Configuration ───────────────────────────────────────────────────────────
|
|
|
|
CERT_EXTENSIONS = ('.crt', '.cer', '.pem')
|
|
CRL_EXTENSIONS = ('.crl',)
|
|
TIMEOUT = 30
|
|
MAX_DEPTH = 5
|
|
DEBUG_SCRAPE = False # Set True to see all <a> tags during scraping
|
|
|
|
|
|
class C:
|
|
HEADER = '\033[95m'
|
|
BLUE = '\033[94m'
|
|
CYAN = '\033[96m'
|
|
GREEN = '\033[92m'
|
|
YELLOW = '\033[93m'
|
|
RED = '\033[91m'
|
|
BOLD = '\033[1m'
|
|
DIM = '\033[2m'
|
|
RESET = '\033[0m'
|
|
|
|
|
|
# ─── Utility ─────────────────────────────────────────────────────────────────
|
|
|
|
def print_header(text):
|
|
print(f"\n{C.BOLD}{C.HEADER}{'='*80}{C.RESET}")
|
|
print(f"{C.BOLD}{C.HEADER} {text}{C.RESET}")
|
|
print(f"{C.BOLD}{C.HEADER}{'='*80}{C.RESET}")
|
|
|
|
|
|
def print_section(text):
|
|
print(f"\n{C.BOLD}{C.CYAN}── {text} {'─'*(74-len(text))}{C.RESET}")
|
|
|
|
|
|
def print_ok(text):
|
|
print(f" {C.GREEN}✔{C.RESET} {text}")
|
|
|
|
|
|
def print_warn(text):
|
|
print(f" {C.YELLOW}⚠{C.RESET} {text}")
|
|
|
|
|
|
def print_err(text):
|
|
print(f" {C.RED}✘{C.RESET} {text}")
|
|
|
|
|
|
def print_info(label, value):
|
|
print(f" {C.BOLD}{label:.<40}{C.RESET} {value}")
|
|
|
|
|
|
def print_debug(text):
|
|
if DEBUG_SCRAPE:
|
|
print(f" {C.DIM}DBG: {text}{C.RESET}")
|
|
|
|
|
|
def sha1_fingerprint(cert):
|
|
return cert.fingerprint(hashes.SHA1()).hex(':')
|
|
|
|
|
|
def subject_str(name):
|
|
parts = []
|
|
for attr in name:
|
|
parts.append(f"{attr.oid._name}={attr.value}")
|
|
return ', '.join(parts) if parts else '(empty)'
|
|
|
|
|
|
# ─── Recursive Scraping ─────────────────────────────────────────────────────
|
|
|
|
def scrape_recursive(base_url, session, origin_host=None, depth=0, visited=None):
|
|
if visited is None:
|
|
visited = set()
|
|
|
|
if not base_url.endswith('/'):
|
|
base_url += '/'
|
|
|
|
if base_url in visited or depth > MAX_DEPTH:
|
|
return [], []
|
|
|
|
visited.add(base_url)
|
|
|
|
if origin_host is None:
|
|
origin_host = urlparse(base_url).netloc
|
|
|
|
indent = " " * depth
|
|
print(f"{indent} 🔍 Scanning: {base_url}")
|
|
|
|
try:
|
|
resp = session.get(base_url, timeout=TIMEOUT)
|
|
resp.raise_for_status()
|
|
except Exception as e:
|
|
print_warn(f"{indent} Could not fetch {base_url}: {e}")
|
|
return [], []
|
|
|
|
soup = BeautifulSoup(resp.text, 'html.parser')
|
|
|
|
cert_urls = []
|
|
crl_urls = []
|
|
subdirs = []
|
|
|
|
all_links = soup.find_all('a', href=True)
|
|
print_debug(f"Found {len(all_links)} <a> tags in {base_url}")
|
|
|
|
for a_tag in all_links:
|
|
href = a_tag['href']
|
|
full_url = urljoin(base_url, href)
|
|
decoded_url = unquote(full_url).lower()
|
|
decoded_href = unquote(href)
|
|
|
|
print_debug(f" href='{href}' → '{full_url}'")
|
|
|
|
if urlparse(full_url).netloc != origin_host:
|
|
print_debug(f" Skipped: different host")
|
|
continue
|
|
|
|
if href in ('../', '..'):
|
|
print_debug(f" Skipped: parent link")
|
|
continue
|
|
|
|
if full_url.rstrip('/') == base_url.rstrip('/'):
|
|
print_debug(f" Skipped: self link")
|
|
continue
|
|
|
|
if any(decoded_url.endswith(ext) for ext in CERT_EXTENSIONS):
|
|
cert_urls.append(full_url)
|
|
print(f"{indent} 📜 {decoded_href}")
|
|
elif any(decoded_url.endswith(ext) for ext in CRL_EXTENSIONS):
|
|
crl_urls.append(full_url)
|
|
print(f"{indent} 📋 {decoded_href}")
|
|
elif full_url.endswith('/') and full_url not in visited:
|
|
subdirs.append(full_url)
|
|
print(f"{indent} 📁 {decoded_href}")
|
|
else:
|
|
print_debug(f" Skipped: not a cert/crl/dir")
|
|
|
|
for subdir in subdirs:
|
|
sub_certs, sub_crls = scrape_recursive(
|
|
subdir, session, origin_host, depth + 1, visited
|
|
)
|
|
cert_urls.extend(sub_certs)
|
|
crl_urls.extend(sub_crls)
|
|
|
|
return cert_urls, crl_urls
|
|
|
|
|
|
def download_file(url, session):
|
|
resp = session.get(url, timeout=TIMEOUT)
|
|
resp.raise_for_status()
|
|
data = resp.content
|
|
content_type = resp.headers.get('content-type', 'unknown')
|
|
print(f" ↓ {len(data)} bytes | HTTP {resp.status_code} | {content_type}")
|
|
return data
|
|
|
|
|
|
# ─── Certificate Parsing ────────────────────────────────────────────────────
|
|
|
|
def load_certificate(data, url=""):
|
|
if data[:50].lstrip().lower().startswith((b'<', b'<!', b'<html')):
|
|
print_warn(f" Got HTML instead of certificate: {url}")
|
|
return None
|
|
|
|
# Try DER first
|
|
try:
|
|
return x509.load_der_x509_certificate(data)
|
|
except Exception:
|
|
pass
|
|
|
|
# Try PEM
|
|
try:
|
|
return x509.load_pem_x509_certificate(data)
|
|
except Exception:
|
|
pass
|
|
|
|
# Try stripping BOM/whitespace
|
|
stripped = data.lstrip(b'\xef\xbb\xbf\x00\r\n ')
|
|
if stripped != data:
|
|
try:
|
|
return x509.load_der_x509_certificate(stripped)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
return x509.load_pem_x509_certificate(stripped)
|
|
except Exception:
|
|
pass
|
|
|
|
print_warn(f" Could not parse certificate: {url}")
|
|
return None
|
|
|
|
|
|
def load_crl(data, url=""):
|
|
if data[:50].lstrip().lower().startswith((b'<', b'<!', b'<html')):
|
|
print_warn(f" Got HTML instead of CRL: {url}")
|
|
return None
|
|
|
|
# Try DER first — this is the most common format for CRLs
|
|
try:
|
|
crl = x509.load_der_x509_crl(data)
|
|
return crl
|
|
except BaseException as e:
|
|
der_err = f"{type(e).__name__}: {e}"
|
|
|
|
# Try PEM
|
|
try:
|
|
crl = x509.load_pem_x509_crl(data)
|
|
return crl
|
|
except BaseException as e:
|
|
pem_err = f"{type(e).__name__}: {e}"
|
|
|
|
# Try stripping BOM/whitespace
|
|
stripped = data.lstrip(b'\xef\xbb\xbf\x00\r\n ')
|
|
if stripped != data:
|
|
try:
|
|
return x509.load_der_x509_crl(stripped)
|
|
except BaseException:
|
|
pass
|
|
try:
|
|
return x509.load_pem_x509_crl(stripped)
|
|
except BaseException:
|
|
pass
|
|
|
|
# All attempts failed — print diagnostics
|
|
print_err(f" Failed to parse CRL: {url}")
|
|
print_warn(f" DER: {der_err}")
|
|
print_warn(f" PEM: {pem_err}")
|
|
print_warn(f" Size: {len(data)} bytes")
|
|
print_warn(f" Hex (first 40): {data[:40].hex(' ')}")
|
|
if len(data) > 0:
|
|
if data[0] == 0x30:
|
|
print_warn(f" Starts with 0x30 (ASN.1 SEQUENCE)")
|
|
elif data[:5] == b'-----':
|
|
print_warn(f" Starts with PEM header")
|
|
else:
|
|
print_warn(f" First byte: 0x{data[0]:02x}")
|
|
return None
|
|
|
|
|
|
# ─── Extension Helpers ───────────────────────────────────────────────────────
|
|
|
|
def get_extension_value(obj, oid):
|
|
try:
|
|
return obj.extensions.get_extension_for_oid(oid).value
|
|
except x509.ExtensionNotFound:
|
|
return None
|
|
|
|
|
|
def get_cdp_urls(cert):
|
|
cdp = get_extension_value(cert, ExtensionOID.CRL_DISTRIBUTION_POINTS)
|
|
urls = []
|
|
if cdp:
|
|
for dp in cdp:
|
|
if dp.full_name:
|
|
for name in dp.full_name:
|
|
if isinstance(name, x509.UniformResourceIdentifier):
|
|
urls.append(name.value)
|
|
return urls
|
|
|
|
|
|
def get_aia_info(cert):
|
|
aia = get_extension_value(cert, ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
|
|
ocsp_urls = []
|
|
ca_issuer_urls = []
|
|
if aia:
|
|
for desc in aia:
|
|
if isinstance(desc.access_location, x509.UniformResourceIdentifier):
|
|
if desc.access_method == AuthorityInformationAccessOID.OCSP:
|
|
ocsp_urls.append(desc.access_location.value)
|
|
elif desc.access_method == AuthorityInformationAccessOID.CA_ISSUERS:
|
|
ca_issuer_urls.append(desc.access_location.value)
|
|
return ca_issuer_urls, ocsp_urls
|
|
|
|
|
|
def get_subject_key_id(cert):
|
|
ski = get_extension_value(cert, ExtensionOID.SUBJECT_KEY_IDENTIFIER)
|
|
return ski.digest.hex() if ski else None
|
|
|
|
|
|
def get_authority_key_id(obj):
|
|
aki = get_extension_value(obj, ExtensionOID.AUTHORITY_KEY_IDENTIFIER)
|
|
return aki.key_identifier.hex() if aki and aki.key_identifier else None
|
|
|
|
|
|
def is_self_signed(cert):
|
|
return cert.issuer == cert.subject
|
|
|
|
|
|
def is_ca(cert):
|
|
bc = get_extension_value(cert, ExtensionOID.BASIC_CONSTRAINTS)
|
|
return bc.ca if bc else False
|
|
|
|
|
|
def get_key_usage(cert):
|
|
ku = get_extension_value(cert, ExtensionOID.KEY_USAGE)
|
|
if not ku:
|
|
return None
|
|
usages = []
|
|
for attr in ['digital_signature', 'key_encipherment', 'key_cert_sign',
|
|
'crl_sign', 'content_commitment', 'data_encipherment',
|
|
'key_agreement']:
|
|
try:
|
|
if getattr(ku, attr):
|
|
usages.append(attr)
|
|
except Exception:
|
|
pass
|
|
return ', '.join(usages)
|
|
|
|
|
|
def get_eku(cert):
|
|
eku = get_extension_value(cert, ExtensionOID.EXTENDED_KEY_USAGE)
|
|
if not eku:
|
|
return None
|
|
return ', '.join(u.dotted_string for u in eku)
|
|
|
|
|
|
def get_san(cert):
|
|
san = get_extension_value(cert, ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
|
|
if not san:
|
|
return None
|
|
return ', '.join(str(n.value) for n in san)
|
|
|
|
|
|
# ─── Chain Building ──────────────────────────────────────────────────────────
|
|
|
|
def build_chains(certs_dict):
|
|
ski_index = {}
|
|
for fp, (cert, url) in certs_dict.items():
|
|
ski = get_subject_key_id(cert)
|
|
if ski:
|
|
if ski in ski_index:
|
|
existing_cert, _ = certs_dict[ski_index[ski]]
|
|
if cert.not_valid_after_utc > existing_cert.not_valid_after_utc:
|
|
ski_index[ski] = fp
|
|
else:
|
|
ski_index[ski] = fp
|
|
|
|
parent_of = {}
|
|
for fp, (cert, url) in certs_dict.items():
|
|
if is_self_signed(cert):
|
|
continue
|
|
aki = get_authority_key_id(cert)
|
|
if aki and aki in ski_index:
|
|
parent_fp = ski_index[aki]
|
|
if parent_fp != fp:
|
|
parent_of[fp] = parent_fp
|
|
|
|
roots = [fp for fp, (cert, _) in certs_dict.items() if is_self_signed(cert)]
|
|
|
|
children_of = defaultdict(list)
|
|
for child_fp, par_fp in parent_of.items():
|
|
children_of[par_fp].append(child_fp)
|
|
|
|
chains = []
|
|
|
|
def walk(fp, current_chain):
|
|
current_chain.append(fp)
|
|
kids = children_of.get(fp, [])
|
|
if not kids:
|
|
chains.append(list(current_chain))
|
|
else:
|
|
for kid in kids:
|
|
walk(kid, current_chain)
|
|
current_chain.pop()
|
|
|
|
for root_fp in roots:
|
|
walk(root_fp, [])
|
|
|
|
all_in_chains = set()
|
|
for chain in chains:
|
|
all_in_chains.update(chain)
|
|
for fp in certs_dict:
|
|
if fp not in all_in_chains:
|
|
chains.append([fp])
|
|
|
|
return chains
|
|
|
|
|
|
# ─── CRL Validation ─────────────────────────────────────────────────────────
|
|
|
|
def verify_crl_signature(crl, issuer_cert):
|
|
try:
|
|
pub_key = issuer_cert.public_key()
|
|
if isinstance(pub_key, rsa.RSAPublicKey):
|
|
pub_key.verify(
|
|
crl.signature,
|
|
crl.tbs_certlist_bytes,
|
|
padding.PKCS1v15(),
|
|
crl.signature_hash_algorithm,
|
|
)
|
|
elif isinstance(pub_key, ec.EllipticCurvePublicKey):
|
|
from cryptography.hazmat.primitives.asymmetric import ec as ec_mod
|
|
pub_key.verify(
|
|
crl.signature,
|
|
crl.tbs_certlist_bytes,
|
|
ec_mod.ECDSA(crl.signature_hash_algorithm),
|
|
)
|
|
else:
|
|
return None, "Unsupported key type"
|
|
return True, "Signature valid"
|
|
except InvalidSignature:
|
|
return False, "Signature INVALID"
|
|
except Exception as e:
|
|
return None, f"Verification error: {e}"
|
|
|
|
|
|
def validate_crl(crl, crl_url, certs_dict):
|
|
now = datetime.now(timezone.utc)
|
|
results = {
|
|
'url': crl_url,
|
|
'issuer': subject_str(crl.issuer),
|
|
'last_update': crl.last_update_utc,
|
|
'next_update': crl.next_update_utc,
|
|
'expired': crl.next_update_utc < now if crl.next_update_utc else None,
|
|
'sig_valid': None,
|
|
'sig_msg': 'No matching issuer found',
|
|
'revoked_count': len(list(crl)),
|
|
'revoked_serials': [],
|
|
'is_delta': False,
|
|
}
|
|
|
|
try:
|
|
crl.extensions.get_extension_for_oid(ExtensionOID.DELTA_CRL_INDICATOR)
|
|
results['is_delta'] = True
|
|
except x509.ExtensionNotFound:
|
|
pass
|
|
|
|
for revoked in crl:
|
|
results['revoked_serials'].append(format(revoked.serial_number, 'x'))
|
|
|
|
crl_aki = get_authority_key_id(crl)
|
|
|
|
for fp, (cert, url) in certs_dict.items():
|
|
ski = get_subject_key_id(cert)
|
|
match = (crl_aki and ski == crl_aki) or (cert.subject == crl.issuer)
|
|
if match:
|
|
valid, msg = verify_crl_signature(crl, cert)
|
|
results['sig_valid'] = valid
|
|
results['sig_msg'] = msg
|
|
results['verified_by'] = subject_str(cert.subject)
|
|
break
|
|
|
|
return results
|
|
|
|
|
|
# ─── Display Functions ───────────────────────────────────────────────────────
|
|
|
|
def display_cert(cert, url, indent=0):
|
|
prefix = " " * indent
|
|
now = datetime.now(timezone.utc)
|
|
not_before = cert.not_valid_before_utc
|
|
not_after = cert.not_valid_after_utc
|
|
expired = not_after < now
|
|
days_left = (not_after - now).days
|
|
|
|
print(f"{prefix}{C.BOLD}Subject:{C.RESET} {subject_str(cert.subject)}")
|
|
print(f"{prefix}{C.BOLD}Issuer:{C.RESET} {subject_str(cert.issuer)}")
|
|
print(f"{prefix}{C.BOLD}Serial:{C.RESET} {format(cert.serial_number, 'x')}")
|
|
print(f"{prefix}{C.BOLD}Not Before:{C.RESET} {not_before}")
|
|
print(f"{prefix}{C.BOLD}Not After:{C.RESET} {not_after}", end="")
|
|
|
|
if expired:
|
|
print(f" {C.RED}[EXPIRED]{C.RESET}")
|
|
elif days_left < 90:
|
|
print(f" {C.YELLOW}[{days_left} days remaining]{C.RESET}")
|
|
else:
|
|
print(f" {C.GREEN}[{days_left} days remaining]{C.RESET}")
|
|
|
|
if not_before > now:
|
|
print(f"{prefix} {C.RED}[NOT YET VALID]{C.RESET}")
|
|
|
|
print(f"{prefix}{C.BOLD}SHA1:{C.RESET} {sha1_fingerprint(cert)}")
|
|
print(f"{prefix}{C.BOLD}Self-signed:{C.RESET} {'Yes' if is_self_signed(cert) else 'No'}")
|
|
print(f"{prefix}{C.BOLD}CA:{C.RESET} {'Yes' if is_ca(cert) else 'No'}")
|
|
|
|
ku = get_key_usage(cert)
|
|
if ku:
|
|
print(f"{prefix}{C.BOLD}Key Usage:{C.RESET} {ku}")
|
|
eku = get_eku(cert)
|
|
if eku:
|
|
print(f"{prefix}{C.BOLD}EKU:{C.RESET} {eku}")
|
|
san = get_san(cert)
|
|
if san:
|
|
print(f"{prefix}{C.BOLD}SAN:{C.RESET} {san}")
|
|
|
|
ski = get_subject_key_id(cert)
|
|
if ski:
|
|
print(f"{prefix}{C.BOLD}SKI:{C.RESET} {ski}")
|
|
aki = get_authority_key_id(cert)
|
|
if aki:
|
|
print(f"{prefix}{C.BOLD}AKI:{C.RESET} {aki}")
|
|
|
|
cdp_urls = get_cdp_urls(cert)
|
|
if cdp_urls:
|
|
print(f"{prefix}{C.BOLD}CDP:{C.RESET}")
|
|
for u in cdp_urls:
|
|
print(f"{prefix} → {u}")
|
|
else:
|
|
print(f"{prefix}{C.BOLD}CDP:{C.RESET} (none)")
|
|
|
|
ca_issuers, ocsp_urls = get_aia_info(cert)
|
|
if ca_issuers or ocsp_urls:
|
|
print(f"{prefix}{C.BOLD}AIA:{C.RESET}")
|
|
for u in ca_issuers:
|
|
print(f"{prefix} → CA Issuer: {u}")
|
|
for u in ocsp_urls:
|
|
print(f"{prefix} → OCSP: {u}")
|
|
else:
|
|
print(f"{prefix}{C.BOLD}AIA:{C.RESET} (none)")
|
|
|
|
policies = get_extension_value(cert, ExtensionOID.CERTIFICATE_POLICIES)
|
|
if policies:
|
|
print(f"{prefix}{C.BOLD}Policies:{C.RESET}")
|
|
for pol in policies:
|
|
oid = pol.policy_identifier.dotted_string
|
|
print(f"{prefix} → {oid}")
|
|
if pol.policy_qualifiers:
|
|
for q in pol.policy_qualifiers:
|
|
if isinstance(q, str):
|
|
print(f"{prefix} CPS: {q}")
|
|
|
|
print(f"{prefix}{C.BOLD}Source:{C.RESET} {url}")
|
|
|
|
|
|
def display_crl_results(results):
|
|
r = results
|
|
crl_type = "Delta CRL" if r['is_delta'] else "Base CRL"
|
|
|
|
print(f"\n {C.BOLD}CRL:{C.RESET} {r['url']}")
|
|
print(f" {C.BOLD}Type:{C.RESET} {crl_type}")
|
|
print(f" {C.BOLD}Issuer:{C.RESET} {r['issuer']}")
|
|
print(f" {C.BOLD}Last Update:{C.RESET} {r['last_update']}")
|
|
print(f" {C.BOLD}Next Update:{C.RESET} {r['next_update']}", end="")
|
|
|
|
if r['expired']:
|
|
print(f" {C.RED}[EXPIRED]{C.RESET}")
|
|
else:
|
|
remaining = (r['next_update'] - datetime.now(timezone.utc)).days
|
|
print(f" {C.GREEN}[valid, {remaining} days remaining]{C.RESET}")
|
|
|
|
if r['sig_valid'] is True:
|
|
print_ok(f"Signature: {r['sig_msg']} (by {r.get('verified_by', 'unknown')})")
|
|
elif r['sig_valid'] is False:
|
|
print_err(f"Signature: {r['sig_msg']}")
|
|
else:
|
|
print_warn(f"Signature: {r['sig_msg']}")
|
|
|
|
count = r['revoked_count']
|
|
print_info("Revoked certificates", str(count))
|
|
if count > 0:
|
|
for serial in r['revoked_serials'][:10]:
|
|
print(f" Serial: {serial}")
|
|
if count > 10:
|
|
print(f" ... and {count - 10} more")
|
|
|
|
|
|
# ─── Main ────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print(f"Usage: {sys.argv[0]} <url> [<url2> ...]")
|
|
print(f"Example: {sys.argv[0]} http://pki.matas.dk/cdp http://pki.matas.dk/aia")
|
|
print(f" {sys.argv[0]} http://pki.imy.se/")
|
|
print(f"\nRecursively scans directories up to {MAX_DEPTH} levels deep.")
|
|
sys.exit(1)
|
|
|
|
base_urls = sys.argv[1:]
|
|
|
|
session = requests.Session()
|
|
session.verify = False
|
|
session.headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
|
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
|
'Chrome/125.0.0.0 Safari/537.36',
|
|
'Accept': '*/*',
|
|
'Accept-Encoding': 'identity',
|
|
})
|
|
requests.packages.urllib3.disable_warnings()
|
|
|
|
all_cert_urls = []
|
|
all_crl_urls = []
|
|
|
|
# ── Step 1: Recursively scrape all provided URLs ─────────────────────
|
|
for base_url in base_urls:
|
|
print_header(f"Scanning: {base_url}")
|
|
cert_urls, crl_urls = scrape_recursive(base_url, session)
|
|
all_cert_urls.extend(cert_urls)
|
|
all_crl_urls.extend(crl_urls)
|
|
|
|
# Deduplicate
|
|
all_cert_urls = list(dict.fromkeys(all_cert_urls))
|
|
all_crl_urls = list(dict.fromkeys(all_crl_urls))
|
|
|
|
print_section("Discovery Summary")
|
|
print_ok(f"Total certificate files: {len(all_cert_urls)}")
|
|
print_ok(f"Total CRL files: {len(all_crl_urls)}")
|
|
|
|
# ── Step 2: Download and parse certificates ──────────────────────────
|
|
print_header("Parsing Certificates")
|
|
|
|
certs_dict = {}
|
|
for url in all_cert_urls:
|
|
try:
|
|
data = download_file(url, session)
|
|
cert = load_certificate(data, url)
|
|
if cert:
|
|
fp = sha1_fingerprint(cert)
|
|
certs_dict[fp] = (cert, url)
|
|
print_ok(f"Loaded: {subject_str(cert.subject)}")
|
|
else:
|
|
print_warn(f"Could not parse certificate: {url}")
|
|
except Exception as e:
|
|
print_err(f"Failed to download {url}: {e}")
|
|
|
|
# ── Step 3: Download and parse CRLs ──────────────────────────────────
|
|
print_header("Parsing CRLs")
|
|
|
|
crls_list = []
|
|
for url in all_crl_urls:
|
|
try:
|
|
data = download_file(url, session)
|
|
crl = load_crl(data, url)
|
|
if crl:
|
|
crls_list.append((crl, url))
|
|
print_ok(f"Loaded CRL: {subject_str(crl.issuer)}")
|
|
else:
|
|
print_warn(f"Could not parse CRL: {url}")
|
|
except Exception as e:
|
|
print_err(f"Failed to download CRL {url}: {e}")
|
|
traceback.print_exc()
|
|
|
|
# ── Step 4: Fetch CRLs from CDP extensions ───────────────────────────
|
|
print_header("Fetching CRLs from CDP Extensions")
|
|
|
|
referenced_crl_urls = set()
|
|
for fp, (cert, url) in certs_dict.items():
|
|
for cdp_url in get_cdp_urls(cert):
|
|
if cdp_url.lower().startswith('http'):
|
|
referenced_crl_urls.add(cdp_url)
|
|
|
|
already_fetched = set(u for _, u in crls_list)
|
|
new_crl_urls = referenced_crl_urls - already_fetched
|
|
|
|
if new_crl_urls:
|
|
for url in new_crl_urls:
|
|
try:
|
|
data = download_file(url, session)
|
|
crl = load_crl(data, url)
|
|
if crl:
|
|
crls_list.append((crl, url))
|
|
print_ok(f"Fetched CDP CRL: {subject_str(crl.issuer)}")
|
|
else:
|
|
print_warn(f"Could not parse CDP CRL: {url}")
|
|
except Exception as e:
|
|
print_err(f"Failed to fetch CDP CRL {url}: {e}")
|
|
else:
|
|
print_info("No additional CRLs", "All CDP CRLs already downloaded")
|
|
|
|
# ── Step 5: Fetch CA certs from AIA extensions ───────────────────────
|
|
print_header("Fetching CA Certs from AIA Extensions")
|
|
|
|
referenced_aia_urls = set()
|
|
for fp, (cert, url) in certs_dict.items():
|
|
ca_issuers, _ = get_aia_info(cert)
|
|
for aia_url in ca_issuers:
|
|
if aia_url.lower().startswith('http'):
|
|
referenced_aia_urls.add(aia_url)
|
|
|
|
already_loaded_urls = set(u for _, (c, u) in certs_dict.items())
|
|
new_aia_urls = referenced_aia_urls - already_loaded_urls
|
|
|
|
if new_aia_urls:
|
|
for url in new_aia_urls:
|
|
try:
|
|
data = download_file(url, session)
|
|
cert = load_certificate(data, url)
|
|
if cert:
|
|
fp = sha1_fingerprint(cert)
|
|
if fp not in certs_dict:
|
|
certs_dict[fp] = (cert, url)
|
|
print_ok(f"Fetched AIA cert: {subject_str(cert.subject)}")
|
|
else:
|
|
print_info("Already loaded", subject_str(cert.subject))
|
|
else:
|
|
print_warn(f"Could not parse AIA cert: {url}")
|
|
except Exception as e:
|
|
print_err(f"Failed to fetch AIA cert {url}: {e}")
|
|
else:
|
|
print_info("No additional certs", "All AIA certs already loaded")
|
|
|
|
# ── Step 6: Build and display chains ─────────────────────────────────
|
|
print_header("Certificate Chains")
|
|
|
|
chains = build_chains(certs_dict)
|
|
|
|
if not chains:
|
|
print_warn("No certificate chains could be built (no certificates loaded)")
|
|
else:
|
|
for i, chain in enumerate(chains, 1):
|
|
print_section(f"Chain {i} ({len(chain)} certificate(s))")
|
|
for depth, fp in enumerate(chain):
|
|
cert, url = certs_dict[fp]
|
|
if depth == 0 and is_self_signed(cert):
|
|
role = "Root CA"
|
|
elif is_ca(cert):
|
|
role = "Intermediate CA"
|
|
else:
|
|
role = "End Entity"
|
|
|
|
print(f"\n{' ' * depth}{C.BOLD}{C.BLUE}[{depth}] {role}{C.RESET}")
|
|
display_cert(cert, url, indent=depth)
|
|
|
|
# ── Step 7: Validate CRLs ───────────────────────────────────────────
|
|
print_header("CRL Validation")
|
|
|
|
if not crls_list:
|
|
print_warn("No CRLs to validate")
|
|
else:
|
|
for crl, crl_url in crls_list:
|
|
results = validate_crl(crl, crl_url, certs_dict)
|
|
display_crl_results(results)
|
|
|
|
# ── Step 8: Revocation cross-check ───────────────────────────────────
|
|
print_header("Revocation Check — Are Any Downloaded Certs Revoked?")
|
|
|
|
revoked_serials_by_aki = defaultdict(set)
|
|
for crl, crl_url in crls_list:
|
|
aki = get_authority_key_id(crl)
|
|
for revoked in crl:
|
|
key = aki or subject_str(crl.issuer)
|
|
revoked_serials_by_aki[key].add(revoked.serial_number)
|
|
|
|
found_revoked = False
|
|
for fp, (cert, url) in certs_dict.items():
|
|
aki = get_authority_key_id(cert)
|
|
serial = cert.serial_number
|
|
if aki and aki in revoked_serials_by_aki:
|
|
if serial in revoked_serials_by_aki[aki]:
|
|
print_err(
|
|
f"REVOKED: {subject_str(cert.subject)} "
|
|
f"(serial {format(serial, 'x')})"
|
|
)
|
|
found_revoked = True
|
|
|
|
if not found_revoked:
|
|
print_ok("None of the downloaded certificates appear on the downloaded CRLs")
|
|
|
|
# ── Summary ──────────────────────────────────────────────────────────
|
|
print_header("Summary")
|
|
print_info("Certificates parsed", str(len(certs_dict)))
|
|
print_info("CRLs parsed", str(len(crls_list)))
|
|
print_info("Chains found", str(len(chains)))
|
|
|
|
now = datetime.now(timezone.utc)
|
|
expired_certs = [fp for fp, (c, u) in certs_dict.items()
|
|
if c.not_valid_after_utc < now]
|
|
expired_crls = [(crl, u) for crl, u in crls_list
|
|
if crl.next_update_utc and crl.next_update_utc < now]
|
|
|
|
if expired_certs:
|
|
print_err(f"Expired certificates: {len(expired_certs)}")
|
|
for fp in expired_certs:
|
|
cert, url = certs_dict[fp]
|
|
print(f" → {subject_str(cert.subject)} (expired {cert.not_valid_after_utc})")
|
|
else:
|
|
print_ok("No expired certificates")
|
|
|
|
if expired_crls:
|
|
print_err(f"Expired CRLs: {len(expired_crls)}")
|
|
for crl, url in expired_crls:
|
|
print(f" → {url} (expired {crl.next_update_utc})")
|
|
else:
|
|
print_ok("No expired CRLs")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |