Complete capacity-scan.py: usage stub, subprocess glue, main()
Adds gather_usage() (stubbed, returns available:false), known_hostnames() with graceful degradation when terraform/ansible-inventory are absent, _run_json() helper, and main() that parses reference.md and emits JSON. Three new TDD tests (12 total, all passing). Script exits 0 with valid JSON even when no cluster is provisioned. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
8ed00c9206
commit
05694f6ea4
2 changed files with 85 additions and 0 deletions
|
|
@ -108,3 +108,61 @@ def find_drift(workload_rows, known_hostnames):
|
||||||
f"host '{name}' exists in Terraform/inventory but is absent from reference.md"
|
f"host '{name}' exists in Terraform/inventory but is absent from reference.md"
|
||||||
)
|
)
|
||||||
return warnings
|
return warnings
|
||||||
|
|
||||||
|
|
||||||
|
def gather_usage():
|
||||||
|
"""FUTURE: live per-VM CPU/RAM/disk usage history. Requires the physical
|
||||||
|
cluster online; source UNDECIDED (Proxmox RRD vs Prometheus/Loki/Grafana —
|
||||||
|
see docs/TODO.md 8.4). Until then the evaluator reasons on declared intent."""
|
||||||
|
return {"available": False, "reason": "cluster not provisioned (see STATUS.md)"}
|
||||||
|
|
||||||
|
|
||||||
|
def _run_json(cmd):
|
||||||
|
return subprocess.run(cmd, capture_output=True, text=True, check=True).stdout
|
||||||
|
|
||||||
|
|
||||||
|
def known_hostnames(env):
|
||||||
|
"""Union of hostnames from Terraform output and Ansible inventory. Each
|
||||||
|
source is best-effort: missing tool / no state / bad JSON yields nothing."""
|
||||||
|
hosts = set()
|
||||||
|
tf_dir = os.path.join(REPO_ROOT, "terraform", "environments", env)
|
||||||
|
try:
|
||||||
|
hosts |= parse_tf_hostnames(_run_json(["terraform", f"-chdir={tf_dir}", "output", "-json"]))
|
||||||
|
except (OSError, subprocess.CalledProcessError, ValueError):
|
||||||
|
pass
|
||||||
|
inv = os.path.join(REPO_ROOT, "inventories", env, "hosts.yml")
|
||||||
|
try:
|
||||||
|
hosts |= parse_inventory_hostnames(_run_json(["ansible-inventory", "-i", inv, "--list"]))
|
||||||
|
except (OSError, subprocess.CalledProcessError, ValueError):
|
||||||
|
pass
|
||||||
|
return hosts
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Deterministic capacity facts for /capacity-review.")
|
||||||
|
parser.add_argument("--env", default="staging")
|
||||||
|
parser.add_argument(
|
||||||
|
"--reference",
|
||||||
|
default=os.path.join(REPO_ROOT, "docs", "hardware", "reference.md"),
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
with open(args.reference, encoding="utf-8") as fh:
|
||||||
|
markdown = fh.read()
|
||||||
|
|
||||||
|
node_rows = parse_table(markdown, ["node", "cores", "ram_gb", "disk_gb"])
|
||||||
|
workload_rows = parse_table(markdown, ["workload", "node", "cores", "ram_mb", "disk_gb"])
|
||||||
|
nodes = compute_rollup(node_rows, workload_rows)
|
||||||
|
warnings = find_drift(workload_rows, known_hostnames(args.env))
|
||||||
|
|
||||||
|
json.dump(
|
||||||
|
{"nodes": nodes, "workloads": workload_rows, "usage": gather_usage(), "warnings": warnings},
|
||||||
|
sys.stdout,
|
||||||
|
indent=2,
|
||||||
|
sort_keys=True,
|
||||||
|
)
|
||||||
|
sys.stdout.write("\n")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import importlib.util
|
import importlib.util
|
||||||
|
import json as _json
|
||||||
import pathlib
|
import pathlib
|
||||||
|
|
||||||
_PATH = pathlib.Path(__file__).resolve().parent.parent / "scripts" / "capacity-scan.py"
|
_PATH = pathlib.Path(__file__).resolve().parent.parent / "scripts" / "capacity-scan.py"
|
||||||
|
|
@ -80,3 +81,29 @@ def test_find_drift_reports_both_directions():
|
||||||
def test_find_drift_silent_when_no_hostnames_known():
|
def test_find_drift_silent_when_no_hostnames_known():
|
||||||
workload_rows = [{"workload": "dns1", "node": "pve0", "cores": "1", "ram_mb": "512", "disk_gb": "10"}]
|
workload_rows = [{"workload": "dns1", "node": "pve0", "cores": "1", "ram_mb": "512", "disk_gb": "10"}]
|
||||||
assert cs.find_drift(workload_rows, set()) == []
|
assert cs.find_drift(workload_rows, set()) == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_gather_usage_is_stubbed_unavailable():
|
||||||
|
usage = cs.gather_usage()
|
||||||
|
assert usage["available"] is False
|
||||||
|
assert "reason" in usage
|
||||||
|
|
||||||
|
|
||||||
|
def test_known_hostnames_degrades_to_empty(monkeypatch):
|
||||||
|
# Simulate terraform/ansible-inventory being absent or failing.
|
||||||
|
def boom(*a, **k):
|
||||||
|
raise FileNotFoundError("no such tool")
|
||||||
|
|
||||||
|
monkeypatch.setattr(cs.subprocess, "run", boom)
|
||||||
|
assert cs.known_hostnames("staging") == set()
|
||||||
|
|
||||||
|
|
||||||
|
def test_main_emits_valid_json_against_real_reference(monkeypatch, capsys):
|
||||||
|
# Isolate from the host: no real terraform/ansible needed.
|
||||||
|
monkeypatch.setattr(cs, "known_hostnames", lambda env: set())
|
||||||
|
monkeypatch.setattr("sys.argv", ["capacity-scan.py"])
|
||||||
|
cs.main()
|
||||||
|
out = _json.loads(capsys.readouterr().out)
|
||||||
|
assert set(out) == {"nodes", "workloads", "usage", "warnings"}
|
||||||
|
assert out["usage"]["available"] is False
|
||||||
|
assert "pve0" in out["nodes"] # from the skeleton reference.md
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue