#!/usr/bin/env python3 """boma local-VM integration test harness driver (ADR-025). Stdlib-only by convention (TODO-14): never imports a YAML library. The transient inventory is emitted via string templates; stubs/cert-tiers reach Ansible as `-e @` extra-vars; profile metadata is JSON. Talks to libvirt via `virsh`. """ import argparse import hashlib import json import os import pathlib import re import shutil import subprocess import sys import time import urllib.request import uuid REPO_ROOT = pathlib.Path(__file__).resolve().parent.parent CACHE_DIR = pathlib.Path(os.environ.get("BOMA_IT_CACHE", "/var/lib/boma-integration")) IMAGE_URL = "https://cloud.debian.org/images/cloud/trixie/latest/debian-13-genericcloud-amd64.qcow2" SHA_URL = "https://cloud.debian.org/images/cloud/trixie/latest/SHA512SUMS" IMAGE_NAME = "debian-13-genericcloud-amd64.qcow2" NET_NAME = "boma-it" NET_XML = """ boma-it """ NAME_PREFIX = "boma-it-" RUN_DIR = REPO_ROOT / "tests" / "integration" / ".run" DIAG_ROOT = pathlib.Path.home() / "integration-runs" PROFILE_DIR = REPO_ROOT / "tests" / "integration" / "profiles" INTEG_DIR = REPO_ROOT / "tests" / "integration" CERT_DIR = REPO_ROOT / "tests" / "integration" / "certs" DEFAULT_MEM_MIB = 3072 DEFAULT_VCPUS = 2 MIN_FREE_MIB = 4096 VALID_TIERS = ("internal", "le-staging", "le-prod-wildcard") # Target the SYSTEM libvirtd — where the substrate, /dev/kvm, and the NAT network live. # Without this, a non-root caller's bare virsh/virt-install default to qemu:///session. os.environ.setdefault("LIBVIRT_DEFAULT_URI", "qemu:///system") def vm_name(host, suffix=None): suffix = suffix or uuid.uuid4().hex[:8] return f"{NAME_PREFIX}{host}-{suffix}" def free_mib(meminfo_text): m = re.search(r"^MemAvailable:\s+(\d+)\s+kB", meminfo_text, re.MULTILINE) return int(m.group(1)) // 1024 if m else 0 def parse_lease_ip(domifaddr_output): m = re.search(r"ipv4\s+(\d+\.\d+\.\d+\.\d+)", domifaddr_output) return m.group(1) if m else None def render_meta_data(instance_id, hostname): return f"instance-id: {instance_id}\nlocal-hostname: {hostname}\n" def render_user_data(ssh_pubkey, ansible_user): return ( "#cloud-config\n" "users:\n" f" - name: {ansible_user}\n" " sudo: 'ALL=(ALL) NOPASSWD:ALL'\n" " shell: /bin/bash\n" " ssh_authorized_keys:\n" f" - {ssh_pubkey}\n" "ssh_pwauth: false\n" "package_update: true\n" ) def cert_file(tier): if tier not in VALID_TIERS: raise ValueError(f"unknown cert tier: {tier}") return CERT_DIR / f"{tier}.yml" def profile_path(host): return PROFILE_DIR / f"{host}.json" def render_run_hosts(name, ip, ansible_user, groups): lines = [ "---", "# Generated by scripts/integration-vm.py — transient, gitignored. Do not edit.", "# Single test host ONLY (safety invariant: no real host is ever in scope).", "all:", " children:", ] for g in dict.fromkeys(groups): lines += [ f" {g}:", " hosts:", f" {name}:", f" ansible_host: {ip}", f" ansible_user: {ansible_user}", ] return "\n".join(lines) + "\n" def sh(cmd, check=True, capture=False, **kw): """Run a command (list form). Logs the command to stderr.""" print("+ " + " ".join(str(c) for c in cmd), file=sys.stderr) return subprocess.run(cmd, check=check, capture_output=capture, text=True, **kw) def _expected_sha(sha_text, filename): for line in sha_text.splitlines(): parts = line.split() if len(parts) == 2 and parts[1].lstrip("*") == filename: return parts[0] return None def ensure_image(): CACHE_DIR.mkdir(parents=True, exist_ok=True) img = CACHE_DIR / IMAGE_NAME if img.exists(): return img print(f"Downloading {IMAGE_URL} ...", file=sys.stderr) tmp = img.with_suffix(".part") urllib.request.urlretrieve(IMAGE_URL, tmp) sha_text = urllib.request.urlopen(SHA_URL).read().decode() want = _expected_sha(sha_text, IMAGE_NAME) if not want: tmp.unlink(missing_ok=True) raise SystemExit(f"checksum for {IMAGE_NAME} not found at {SHA_URL}") h = hashlib.sha512() with open(tmp, "rb") as fh: for chunk in iter(lambda: fh.read(1 << 20), b""): h.update(chunk) if h.hexdigest() != want: tmp.unlink(missing_ok=True) raise SystemExit("golden image SHA512 mismatch — refusing to use it") tmp.rename(img) return img def net_ensure(): r = sh(["virsh", "net-info", NET_NAME], check=False, capture=True) if r.returncode != 0: xml = RUN_DIR / "net.xml" RUN_DIR.mkdir(parents=True, exist_ok=True) xml.write_text(NET_XML) sh(["virsh", "net-define", str(xml)]) sh(["virsh", "net-autostart", NET_NAME]) active = sh(["virsh", "net-info", NET_NAME], capture=True).stdout if not re.search(r"Active:\s+yes", active): sh(["virsh", "net-start", NET_NAME]) def _ssh_pubkey(): for cand in ("id_ed25519.pub", "id_rsa.pub"): p = pathlib.Path.home() / ".ssh" / cand if p.exists(): return p.read_text().strip() raise SystemExit("no SSH public key found in ~/.ssh") def up(host, name=None, mem_mib=DEFAULT_MEM_MIB, vcpus=DEFAULT_VCPUS): free = free_mib(pathlib.Path("/proc/meminfo").read_text()) if free < MIN_FREE_MIB: raise SystemExit(f"refusing to start: only {free} MiB free (< {MIN_FREE_MIB})") running = sh(["virsh", "list", "--name"], capture=True).stdout.split() if any(n.startswith(NAME_PREFIX) for n in running): raise SystemExit("an integration VM is already running (one at a time); " "run `integration-vm prune` first") name = name or vm_name(host) img = ensure_image() net_ensure() RUN_DIR.mkdir(parents=True, exist_ok=True) # VM disk/seed/console must live where the SYSTEM hypervisor (libvirt-qemu) can reach # them — NOT under the repo/home (qemu cannot traverse /home/claude). CACHE_DIR is # group-libvirt + world-traversable (created by the integration_test role). overlay = CACHE_DIR / f"{name}.qcow2" sh(["qemu-img", "create", "-f", "qcow2", "-F", "qcow2", "-b", str(img), str(overlay)]) (RUN_DIR / "user-data").write_text(render_user_data(_ssh_pubkey(), "ansible")) (RUN_DIR / "meta-data").write_text(render_meta_data(f"iid-{name}", name)) seed = CACHE_DIR / f"{name}-seed.img" # Force DHCP on the VM NIC — don't rely on the genericcloud image's network fallback. (RUN_DIR / "network-config").write_text( 'version: 2\n' 'ethernets:\n' ' primary:\n' ' match:\n' ' name: "en*"\n' ' dhcp4: true\n') sh(["cloud-localds", "--network-config", str(RUN_DIR / "network-config"), str(seed), str(RUN_DIR / "user-data"), str(RUN_DIR / "meta-data")]) console = CACHE_DIR / f"{name}-console.log" sh(["virt-install", "--name", name, "--memory", str(mem_mib), "--vcpus", str(vcpus), "--boot", "uefi", # genericcloud triple-faults on legacy BIOS handoff; UEFI boots "--import", "--disk", f"path={overlay},format=qcow2", "--disk", f"path={seed},device=cdrom", "--network", f"network={NET_NAME}", "--osinfo", "debian13", "--graphics", "none", "--serial", f"file,path={console}", "--noautoconsole"]) ip = wait_for_ip(name) wait_for_ssh(ip, "ansible") # Block until cloud-init finishes (incl. apt-get update) so apply sees a ready system. sh(["ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", f"ansible@{ip}", "sudo cloud-init status --wait"], check=False) (RUN_DIR / "current").write_text(f"{name}\n{ip}\n{host}\n") print(f"VM {name} up at {ip}") return name, ip def wait_for_ip(name, timeout=120): end = time.time() + timeout while time.time() < end: out = sh(["virsh", "domifaddr", name, "--source", "lease"], check=False, capture=True).stdout ip = parse_lease_ip(out) if ip: return ip time.sleep(4) raise SystemExit(f"timed out waiting for {name} to get a DHCP lease — " "VM left defined; run `integration-vm prune` to remove it") def wait_for_ssh(ip, user, timeout=180): end = time.time() + timeout while time.time() < end: r = sh(["ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=5", f"{user}@{ip}", "true"], check=False, capture=True) if r.returncode == 0: return time.sleep(5) raise SystemExit(f"timed out waiting for SSH to {ip} — " "VM left defined; run `integration-vm prune` to remove it") def _read_current(): txt = (RUN_DIR / "current").read_text().splitlines() return txt[0], txt[1], txt[2] # name, ip, host def write_run_inventory(name, ip, groups): RUN_DIR.mkdir(parents=True, exist_ok=True) (RUN_DIR / "hosts.yml").write_text( render_run_hosts(name, ip, "ansible", groups)) link = RUN_DIR / "group_vars" target = REPO_ROOT / "inventories" / "production" / "group_vars" if link.is_symlink(): link.unlink() elif link.exists(): raise SystemExit(f"{link} exists and is not a symlink; remove it manually") link.symlink_to(target) def apply(host, certs): name, ip, _ = _read_current() prof = json.loads(profile_path(host).read_text()) write_run_inventory(name, ip, prof["groups"]) extra = [] for f in prof.get("extra_vars_files", []): extra += ["-e", f"@{INTEG_DIR / f}"] extra += ["-e", f"@{cert_file(certs)}"] for step in prof["applies"]: cmd = [".venv/bin/ansible-playbook", "-i", str(RUN_DIR / "hosts.yml"), f"playbooks/{step['playbook']}", "--limit", name] if step.get("tags"): cmd += ["--tags", ",".join(step["tags"])] cmd += extra sh(cmd, cwd=str(REPO_ROOT)) print(f"applied {host} profile to {name}") def _boot_id(ip, user): r = sh(["ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=5", f"{user}@{ip}", "cat /proc/sys/kernel/random/boot_id"], check=False, capture=True) return r.stdout.strip() if r.returncode == 0 else None def wait_for_reboot(ip, user, before_boot_id, timeout=240): """Confirm a REAL reboot: SSH back up AND boot_id changed (not the pre-reboot sshd).""" end = time.time() + timeout while time.time() < end: bid = _boot_id(ip, user) if bid and bid != before_boot_id: return time.sleep(5) raise SystemExit(f"timed out waiting for {ip} to reboot (boot_id unchanged) — " "VM left defined; run `integration-vm prune` to remove it") def reboot_vm(): name, ip, _ = _read_current() before = _boot_id(ip, "ansible") sh(["virsh", "reboot", name]) wait_for_reboot(ip, "ansible", before) print(f"{name} rebooted (boot_id changed), SSH back at {ip}") def run_assert(host, certs): name, ip, _ = _read_current() prof = json.loads(profile_path(host).read_text()) write_run_inventory(name, ip, prof["groups"]) extra = [] for f in prof.get("extra_vars_files", []): extra += ["-e", f"@{INTEG_DIR / f}"] extra += ["-e", f"@{cert_file(certs)}"] cmd = [".venv/bin/ansible-playbook", "-i", str(RUN_DIR / "hosts.yml"), "tests/integration/verify.yml", "--limit", name] + extra r = sh(cmd, cwd=str(REPO_ROOT), check=False) if r.returncode != 0: dump_diagnostics(name, ip) raise SystemExit(f"VERIFY FAILED for {name} — diagnostics in {DIAG_ROOT}") print(f"VERIFY PASSED for {name}") def dump_diagnostics(name, ip): d = DIAG_ROOT / name d.mkdir(parents=True, exist_ok=True) for label, cmd in [ ("nft", "nft list ruleset"), ("docker", "docker ps -a"), ("ss", "ss -tlnp"), ("journal", "journalctl -b --no-pager"), ("critical-chain", "systemd-analyze critical-chain"), ]: r = sh(["ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", f"ansible@{ip}", "sudo " + cmd], check=False, capture=True) (d / f"{label}.txt").write_text((r.stdout or "") + (r.stderr or "")) console = CACHE_DIR / f"{name}-console.log" if console.exists(): # The serial log is root:0600 (libvirt-created); read it via sudo (ADR-015: the # claude worker has sudo) and write a worker-owned copy into the bundle. r = sh(["sudo", "cat", str(console)], check=False, capture=True) (d / "console.log").write_text(r.stdout or "") print(f"diagnostics written to {d}", file=sys.stderr) def _destroy(name): sh(["virsh", "destroy", name], check=False) sh(["virsh", "undefine", name, "--nvram"], check=False) for base in (RUN_DIR, CACHE_DIR): for f in base.glob(f"{name}*"): f.unlink(missing_ok=True) def down(host=None, keep=False): if keep: print("--keep: leaving the VM running for inspection") return cur = RUN_DIR / "current" if cur.exists(): name = cur.read_text().splitlines()[0] _destroy(name) cur.unlink(missing_ok=True) print(f"destroyed {name}") def prune(): running = sh(["virsh", "list", "--all", "--name"], capture=True).stdout.split() for n in running: if n.startswith(NAME_PREFIX): _destroy(n) print(f"pruned {n}") (RUN_DIR / "current").unlink(missing_ok=True) def console(): name = (RUN_DIR / "current").read_text().splitlines()[0] log = CACHE_DIR / f"{name}-console.log" if log.exists(): print(sh(["sudo", "cat", str(log)], check=False, capture=True).stdout or "") else: print(f"no console log at {log}") def cycle(host, certs, keep=False, no_reboot=False): ok = False try: up(host) apply(host, certs) if not no_reboot: reboot_vm() run_assert(host, certs) ok = True finally: if ok and not keep: down(host) elif not ok: print("FAILED — VM left up for inspection; `integration-vm prune` to clean.", file=sys.stderr) DISPATCH = { "up": lambda a: (up(a.host), None)[1], "apply": lambda a: apply(a.host, a.certs), "reboot": lambda a: reboot_vm(), "assert": lambda a: run_assert(a.host, a.certs), "down": lambda a: down(a.host, a.keep), "console": lambda a: console(), "prune": lambda a: prune(), "cycle": lambda a: cycle(a.host, a.certs, a.keep, a.no_reboot), } def main(argv=None): p = argparse.ArgumentParser(prog="integration-vm", description=__doc__) sub = p.add_subparsers(dest="cmd", required=True) for c in ("up", "apply", "reboot", "assert", "cycle", "down", "console"): sp = sub.add_parser(c) sp.add_argument("--host", required=True) sp.add_argument("--certs", choices=VALID_TIERS, default="internal") sp.add_argument("--keep", action="store_true") sp.add_argument("--no-reboot", action="store_true") sub.add_parser("prune") args = p.parse_args(argv) return DISPATCH[args.cmd](args) if __name__ == "__main__": # pragma: no cover sys.exit(main())