import importlib.util import json as _json import pathlib _PATH = pathlib.Path(__file__).resolve().parent.parent / "scripts" / "capacity-scan.py" _spec = importlib.util.spec_from_file_location("capacity_scan", _PATH) cs = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(cs) def test_parse_table_keys_on_header_and_ignores_extra_cols(): md = """ intro text | node | cores | ram_gb | disk_gb | notes | |------|-------|--------|---------|-------| | pve0 | 20 | 64 | 4000 | nvme | | pve1 | 20 | 64 | 4000 | nvme | trailing text """ rows = cs.parse_table(md, ["node", "cores", "ram_gb", "disk_gb"]) assert rows == [ {"node": "pve0", "cores": "20", "ram_gb": "64", "disk_gb": "4000", "notes": "nvme"}, {"node": "pve1", "cores": "20", "ram_gb": "64", "disk_gb": "4000", "notes": "nvme"}, ] def test_parse_table_returns_empty_when_header_absent(): assert cs.parse_table("no tables here", ["node", "cores"]) == [] def test_compute_rollup_sums_allocations_and_flags_headroom(): node_rows = [{"node": "pve0", "cores": "20", "ram_gb": "64", "disk_gb": "4000"}] workload_rows = [ {"workload": "dns1", "node": "pve0", "cores": "1", "ram_mb": "512", "disk_gb": "10"}, {"workload": "forgejo", "node": "pve0", "cores": "4", "ram_mb": "8192", "disk_gb": "100"}, ] nodes = cs.compute_rollup(node_rows, workload_rows) pve0 = nodes["pve0"] assert pve0["alloc_cores"] == 5 assert pve0["alloc_ram_gb"] == 8.5 # (512 + 8192) / 1024 assert pve0["alloc_disk_gb"] == 110.0 assert pve0["ram_headroom_pct"] == 87 # round(100 * (64 - 8.5) / 64) assert pve0["oversubscribed"] is False def test_compute_rollup_flags_oversubscription(): node_rows = [{"node": "tiny", "cores": "2", "ram_gb": "4", "disk_gb": "50"}] workload_rows = [ {"workload": "hog", "node": "tiny", "cores": "4", "ram_mb": "1024", "disk_gb": "10"}, ] nodes = cs.compute_rollup(node_rows, workload_rows) assert nodes["tiny"]["oversubscribed"] is True # 4 cores > 2 def test_compute_rollup_ignores_workloads_on_unknown_nodes(): nodes = cs.compute_rollup( [{"node": "pve0", "cores": "20", "ram_gb": "64", "disk_gb": "4000"}], [{"workload": "ghost", "node": "nope", "cores": "1", "ram_mb": "512", "disk_gb": "10"}], ) assert nodes["pve0"]["alloc_cores"] == 0 def test_parse_tf_hostnames_reads_vms_value_keys(): tf_json = '{"vms": {"value": {"dns1": {"ip": "10.20.0.10", "group": "docker_hosts"}}}}' assert cs.parse_tf_hostnames(tf_json) == {"dns1"} def test_parse_inventory_hostnames_reads_meta_hostvars(): inv_json = '{"_meta": {"hostvars": {"dns1": {}, "proxy": {}}}}' assert cs.parse_inventory_hostnames(inv_json) == {"dns1", "proxy"} def test_find_drift_reports_both_directions(): workload_rows = [{"workload": "dns1", "node": "pve0", "cores": "1", "ram_mb": "512", "disk_gb": "10"}] warnings = cs.find_drift(workload_rows, {"proxy"}) assert any("dns1" in w and "no Terraform" in w for w in warnings) assert any("proxy" in w and "absent from reference.md" in w for w in warnings) def test_find_drift_silent_when_no_hostnames_known(): workload_rows = [{"workload": "dns1", "node": "pve0", "cores": "1", "ram_mb": "512", "disk_gb": "10"}] assert cs.find_drift(workload_rows, set()) == [] def test_gather_usage_is_stubbed_unavailable(): usage = cs.gather_usage() assert usage["available"] is False assert "reason" in usage def test_known_hostnames_degrades_to_empty(monkeypatch): # Simulate terraform/ansible-inventory being absent or failing. def boom(*a, **k): raise FileNotFoundError("no such tool") monkeypatch.setattr(cs.subprocess, "run", boom) assert cs.known_hostnames("staging") == set() def test_main_emits_valid_json_against_real_reference(monkeypatch, capsys): # Isolate from the host: no real terraform/ansible needed. monkeypatch.setattr(cs, "known_hostnames", lambda env: set()) monkeypatch.setattr("sys.argv", ["capacity-scan.py"]) cs.main() out = _json.loads(capsys.readouterr().out) assert set(out) == {"nodes", "workloads", "usage", "warnings"} assert out["usage"]["available"] is False assert "pve0" in out["nodes"] # from the skeleton reference.md