#!/usr/bin/env python3 """capacity-scan.py — deterministic capacity facts for /capacity-review. Python standard library only. Emits a JSON object to stdout. Reads physical capacities and workload allocations from the machine-readable tables in docs/hardware/reference.md, computes per-node allocated-vs-physical rollups, and cross-checks workload hostnames against `terraform output -json` and `ansible-inventory --list` to surface drift. Degrades gracefully when nothing is provisioned. Live usage stats are a documented future hook. Usage: python3 scripts/capacity-scan.py [--env staging] [--reference PATH] """ import argparse import json import os import subprocess import sys REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def parse_table(markdown, required_cols): """Return row dicts for the first markdown table whose header contains all required_cols. Keys are header names; values are raw cell strings. Rows whose cell count does not match the header are skipped.""" lines = markdown.splitlines() required = set(required_cols) for i, raw in enumerate(lines): line = raw.strip() if not line.startswith("|"): continue headers = [c.strip() for c in line.strip("|").split("|")] if not required.issubset(set(headers)): continue rows = [] # i + 2 skips the header's GFM separator row (|---|---|) for body in lines[i + 2:]: if not body.strip().startswith("|"): break cells = [c.strip() for c in body.strip().strip("|").split("|")] if len(cells) == len(headers): rows.append(dict(zip(headers, cells))) return rows return [] def compute_rollup(node_rows, workload_rows): """Per node: physical totals, summed allocations, RAM headroom %, and an oversubscribed flag. Workloads on unknown nodes are ignored.""" nodes = {} for r in node_rows: nodes[r["node"]] = { "cores": int(r["cores"]), "ram_gb": float(r["ram_gb"]), "disk_gb": float(r["disk_gb"]), "alloc_cores": 0, "alloc_ram_mb": 0, "alloc_disk_gb": 0.0, } for w in workload_rows: node = nodes.get(w["node"]) if node is None: continue node["alloc_cores"] += int(w["cores"]) node["alloc_ram_mb"] += int(w["ram_mb"]) node["alloc_disk_gb"] += float(w["disk_gb"]) for node in nodes.values(): node["alloc_ram_gb"] = round(node.pop("alloc_ram_mb") / 1024, 1) node["ram_headroom_pct"] = ( round(100 * (node["ram_gb"] - node["alloc_ram_gb"]) / node["ram_gb"]) if node["ram_gb"] else 0 ) node["oversubscribed"] = ( node["alloc_cores"] > node["cores"] or node["alloc_ram_gb"] > node["ram_gb"] or node["alloc_disk_gb"] > node["disk_gb"] ) return nodes def parse_tf_hostnames(tf_json): """Hostnames from `terraform output -json` (the `vms` map keys).""" data = json.loads(tf_json) return set(data.get("vms", {}).get("value", {}).keys()) def parse_inventory_hostnames(inv_json): """Hostnames from `ansible-inventory --list` (_meta.hostvars keys).""" data = json.loads(inv_json) return set(data.get("_meta", {}).get("hostvars", {}).keys()) def find_drift(workload_rows, known_hostnames): """Warn when reference.md workloads and live hostnames disagree. Silent when no hostnames are known (pre-provisioning) — nothing to compare against.""" warnings = [] declared = {w["workload"] for w in workload_rows} if not known_hostnames: return warnings for name in sorted(declared - known_hostnames): warnings.append( f"reference.md lists '{name}' but no Terraform/inventory host declares it" ) for name in sorted(known_hostnames - declared): warnings.append( f"host '{name}' exists in Terraform/inventory but is absent from reference.md" ) return warnings def gather_usage(): """FUTURE: live per-VM CPU/RAM/disk usage history. Requires the physical cluster online; source UNDECIDED (Proxmox RRD vs Prometheus/Loki/Grafana — see docs/TODO.md 8.4). Until then the evaluator reasons on declared intent.""" return {"available": False, "reason": "cluster not provisioned (see STATUS.md)"} def _run_json(cmd): return subprocess.run(cmd, capture_output=True, text=True, check=True).stdout def known_hostnames(env): """Union of hostnames from Terraform output and Ansible inventory. Each source is best-effort: missing tool / no state / bad JSON yields nothing.""" hosts = set() tf_dir = os.path.join(REPO_ROOT, "terraform", "environments", env) try: hosts |= parse_tf_hostnames(_run_json(["terraform", f"-chdir={tf_dir}", "output", "-json"])) except (OSError, subprocess.CalledProcessError, ValueError): pass # Point at the inventory DIRECTORY so every source file merges — hosts.yml AND # offsite.yml (offsite_hosts / askari), which a bare hosts.yml would miss. inv = os.path.join(REPO_ROOT, "inventories", env) try: hosts |= parse_inventory_hostnames(_run_json(["ansible-inventory", "-i", inv, "--list"])) except (OSError, subprocess.CalledProcessError, ValueError): pass return hosts def main(): parser = argparse.ArgumentParser(description="Deterministic capacity facts for /capacity-review.") parser.add_argument("--env", default="staging") parser.add_argument( "--reference", default=os.path.join(REPO_ROOT, "docs", "hardware", "reference.md"), ) args = parser.parse_args() with open(args.reference, encoding="utf-8") as fh: markdown = fh.read() node_rows = parse_table(markdown, ["node", "cores", "ram_gb", "disk_gb"]) workload_rows = parse_table(markdown, ["workload", "node", "cores", "ram_mb", "disk_gb"]) nodes = compute_rollup(node_rows, workload_rows) warnings = find_drift(workload_rows, known_hostnames(args.env)) json.dump( {"nodes": nodes, "workloads": workload_rows, "usage": gather_usage(), "warnings": warnings}, sys.stdout, indent=2, sort_keys=True, ) sys.stdout.write("\n") if __name__ == "__main__": main()