boma/scripts/capacity-scan.py
sjat 8ed00c9206 Add hostname parsers + find_drift() to capacity-scan.py
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 10:24:11 +02:00

110 lines
4 KiB
Python

#!/usr/bin/env python3
"""capacity-scan.py — deterministic capacity facts for /capacity-review.
Python standard library only. Emits a JSON object to stdout.
Reads physical capacities and workload allocations from the machine-readable
tables in docs/hardware/reference.md, computes per-node allocated-vs-physical
rollups, and cross-checks workload hostnames against `terraform output -json`
and `ansible-inventory --list` to surface drift. Degrades gracefully when
nothing is provisioned. Live usage stats are a documented future hook.
Usage: python3 scripts/capacity-scan.py [--env staging] [--reference PATH]
"""
import argparse
import json
import os
import subprocess
import sys
REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def parse_table(markdown, required_cols):
"""Return row dicts for the first markdown table whose header contains all
required_cols. Keys are header names; values are raw cell strings.
Rows whose cell count does not match the header are skipped."""
lines = markdown.splitlines()
required = set(required_cols)
for i, raw in enumerate(lines):
line = raw.strip()
if not line.startswith("|"):
continue
headers = [c.strip() for c in line.strip("|").split("|")]
if not required.issubset(set(headers)):
continue
rows = []
# i + 2 skips the header's GFM separator row (|---|---|)
for body in lines[i + 2:]:
if not body.strip().startswith("|"):
break
cells = [c.strip() for c in body.strip().strip("|").split("|")]
if len(cells) == len(headers):
rows.append(dict(zip(headers, cells)))
return rows
return []
def compute_rollup(node_rows, workload_rows):
"""Per node: physical totals, summed allocations, RAM headroom %, and an
oversubscribed flag. Workloads on unknown nodes are ignored."""
nodes = {}
for r in node_rows:
nodes[r["node"]] = {
"cores": int(r["cores"]),
"ram_gb": float(r["ram_gb"]),
"disk_gb": float(r["disk_gb"]),
"alloc_cores": 0,
"alloc_ram_mb": 0,
"alloc_disk_gb": 0.0,
}
for w in workload_rows:
node = nodes.get(w["node"])
if node is None:
continue
node["alloc_cores"] += int(w["cores"])
node["alloc_ram_mb"] += int(w["ram_mb"])
node["alloc_disk_gb"] += float(w["disk_gb"])
for node in nodes.values():
node["alloc_ram_gb"] = round(node.pop("alloc_ram_mb") / 1024, 1)
node["ram_headroom_pct"] = (
round(100 * (node["ram_gb"] - node["alloc_ram_gb"]) / node["ram_gb"])
if node["ram_gb"]
else 0
)
node["oversubscribed"] = (
node["alloc_cores"] > node["cores"]
or node["alloc_ram_gb"] > node["ram_gb"]
or node["alloc_disk_gb"] > node["disk_gb"]
)
return nodes
def parse_tf_hostnames(tf_json):
"""Hostnames from `terraform output -json` (the `vms` map keys)."""
data = json.loads(tf_json)
return set(data.get("vms", {}).get("value", {}).keys())
def parse_inventory_hostnames(inv_json):
"""Hostnames from `ansible-inventory --list` (_meta.hostvars keys)."""
data = json.loads(inv_json)
return set(data.get("_meta", {}).get("hostvars", {}).keys())
def find_drift(workload_rows, known_hostnames):
"""Warn when reference.md workloads and live hostnames disagree. Silent when
no hostnames are known (pre-provisioning) — nothing to compare against."""
warnings = []
declared = {w["workload"] for w in workload_rows}
if not known_hostnames:
return warnings
for name in sorted(declared - known_hostnames):
warnings.append(
f"reference.md lists '{name}' but no Terraform/inventory host declares it"
)
for name in sorted(known_hostnames - declared):
warnings.append(
f"host '{name}' exists in Terraform/inventory but is absent from reference.md"
)
return warnings