#!/usr/bin/env python3 """boma local-VM integration test harness driver (ADR-025). Stdlib-only by convention (TODO-14): never imports a YAML library. The transient inventory is emitted via string templates; stubs/cert-tiers reach Ansible as `-e @` extra-vars; profile metadata is JSON. Talks to libvirt via `virsh`. """ import argparse import hashlib import json import os import pathlib import re import shutil import subprocess import sys import time import urllib.request import uuid REPO_ROOT = pathlib.Path(__file__).resolve().parent.parent CACHE_DIR = pathlib.Path(os.environ.get("BOMA_IT_CACHE", "/var/lib/boma-integration")) IMAGE_URL = "https://cloud.debian.org/images/cloud/trixie/latest/debian-13-genericcloud-amd64.qcow2" SHA_URL = "https://cloud.debian.org/images/cloud/trixie/latest/SHA512SUMS" IMAGE_NAME = "debian-13-genericcloud-amd64.qcow2" NET_NAME = "boma-it" NET_XML = """ boma-it """ NAME_PREFIX = "boma-it-" RUN_DIR = REPO_ROOT / "tests" / "integration" / ".run" DIAG_ROOT = pathlib.Path.home() / "integration-runs" PROFILE_DIR = REPO_ROOT / "tests" / "integration" / "profiles" INTEG_DIR = REPO_ROOT / "tests" / "integration" CERT_DIR = REPO_ROOT / "tests" / "integration" / "certs" DEFAULT_MEM_MIB = 3072 DEFAULT_VCPUS = 2 MIN_FREE_MIB = 4096 VALID_TIERS = ("internal", "le-staging", "le-prod-wildcard") def vm_name(host, suffix=None): suffix = suffix or uuid.uuid4().hex[:8] return f"{NAME_PREFIX}{host}-{suffix}" def free_mib(meminfo_text): m = re.search(r"^MemAvailable:\s+(\d+)\s+kB", meminfo_text, re.MULTILINE) return int(m.group(1)) // 1024 if m else 0 def parse_lease_ip(domifaddr_output): m = re.search(r"ipv4\s+(\d+\.\d+\.\d+\.\d+)", domifaddr_output) return m.group(1) if m else None def render_meta_data(instance_id, hostname): return f"instance-id: {instance_id}\nlocal-hostname: {hostname}\n" def render_user_data(ssh_pubkey, ansible_user): return ( "#cloud-config\n" "users:\n" f" - name: {ansible_user}\n" " sudo: 'ALL=(ALL) NOPASSWD:ALL'\n" " shell: /bin/bash\n" " ssh_authorized_keys:\n" f" - {ssh_pubkey}\n" "ssh_pwauth: false\n" "package_update: false\n" ) def cert_file(tier): if tier not in VALID_TIERS: raise ValueError(f"unknown cert tier: {tier}") return CERT_DIR / f"{tier}.yml" def profile_path(host): return PROFILE_DIR / f"{host}.json" def render_run_hosts(name, ip, ansible_user, groups): lines = [ "# Generated by scripts/integration-vm.py — transient, gitignored. Do not edit.", "# Single test host ONLY (safety invariant: no real host is ever in scope).", "all:", " children:", ] for g in dict.fromkeys(groups): lines += [ f" {g}:", " hosts:", f" {name}:", f" ansible_host: {ip}", f" ansible_user: {ansible_user}", ] return "\n".join(lines) + "\n" def sh(cmd, check=True, capture=False, **kw): """Run a command (list form). Logs the command to stderr.""" print("+ " + " ".join(str(c) for c in cmd), file=sys.stderr) return subprocess.run(cmd, check=check, capture_output=capture, text=True, **kw) def _expected_sha(sha_text, filename): for line in sha_text.splitlines(): parts = line.split() if len(parts) == 2 and parts[1].lstrip("*") == filename: return parts[0] return None def ensure_image(): CACHE_DIR.mkdir(parents=True, exist_ok=True) img = CACHE_DIR / IMAGE_NAME if img.exists(): return img print(f"Downloading {IMAGE_URL} ...", file=sys.stderr) tmp = img.with_suffix(".part") urllib.request.urlretrieve(IMAGE_URL, tmp) sha_text = urllib.request.urlopen(SHA_URL).read().decode() want = _expected_sha(sha_text, IMAGE_NAME) if not want: tmp.unlink(missing_ok=True) raise SystemExit(f"checksum for {IMAGE_NAME} not found at {SHA_URL}") h = hashlib.sha512() with open(tmp, "rb") as fh: for chunk in iter(lambda: fh.read(1 << 20), b""): h.update(chunk) if h.hexdigest() != want: tmp.unlink(missing_ok=True) raise SystemExit("golden image SHA512 mismatch — refusing to use it") tmp.rename(img) return img def net_ensure(): r = sh(["virsh", "net-info", NET_NAME], check=False, capture=True) if r.returncode != 0: xml = RUN_DIR / "net.xml" RUN_DIR.mkdir(parents=True, exist_ok=True) xml.write_text(NET_XML) sh(["virsh", "net-define", str(xml)]) sh(["virsh", "net-autostart", NET_NAME]) active = sh(["virsh", "net-info", NET_NAME], capture=True).stdout if "Active: yes" not in active: sh(["virsh", "net-start", NET_NAME]) def _ssh_pubkey(): for cand in ("id_ed25519.pub", "id_rsa.pub"): p = pathlib.Path.home() / ".ssh" / cand if p.exists(): return p.read_text().strip() raise SystemExit("no SSH public key found in ~/.ssh") def up(host, name=None, mem_mib=DEFAULT_MEM_MIB, vcpus=DEFAULT_VCPUS): free = free_mib(pathlib.Path("/proc/meminfo").read_text()) if free < MIN_FREE_MIB: raise SystemExit(f"refusing to start: only {free} MiB free (< {MIN_FREE_MIB})") running = sh(["virsh", "list", "--name"], capture=True).stdout.split() if any(n.startswith(NAME_PREFIX) for n in running): raise SystemExit("an integration VM is already running (one at a time); " "run `integration-vm prune` first") name = name or vm_name(host) img = ensure_image() net_ensure() RUN_DIR.mkdir(parents=True, exist_ok=True) overlay = RUN_DIR / f"{name}.qcow2" sh(["qemu-img", "create", "-f", "qcow2", "-F", "qcow2", "-b", str(img), str(overlay)]) (RUN_DIR / "user-data").write_text(render_user_data(_ssh_pubkey(), "ansible")) (RUN_DIR / "meta-data").write_text(render_meta_data(f"iid-{name}", name)) seed = RUN_DIR / f"{name}-seed.img" sh(["cloud-localds", str(seed), str(RUN_DIR / "user-data"), str(RUN_DIR / "meta-data")]) DIAG_ROOT.mkdir(parents=True, exist_ok=True) console = DIAG_ROOT / f"{name}-console.log" sh(["virt-install", "--name", name, "--memory", str(mem_mib), "--vcpus", str(vcpus), "--import", "--disk", f"path={overlay},format=qcow2", "--disk", f"path={seed},device=cdrom", "--network", f"network={NET_NAME}", "--osinfo", "debian13", "--graphics", "none", "--serial", f"file,path={console}", "--noautoconsole"]) ip = wait_for_ip(name) wait_for_ssh(ip, "ansible") (RUN_DIR / "current").write_text(f"{name}\n{ip}\n{host}\n") print(f"VM {name} up at {ip}") return name, ip def wait_for_ip(name, timeout=120): end = time.time() + timeout while time.time() < end: out = sh(["virsh", "domifaddr", name, "--source", "lease"], check=False, capture=True).stdout ip = parse_lease_ip(out) if ip: return ip time.sleep(4) raise SystemExit(f"timed out waiting for {name} to get a DHCP lease") def wait_for_ssh(ip, user, timeout=180): end = time.time() + timeout while time.time() < end: r = sh(["ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=5", f"{user}@{ip}", "true"], check=False, capture=True) if r.returncode == 0: return time.sleep(5) raise SystemExit(f"timed out waiting for SSH to {ip}") def main(argv=None): p = argparse.ArgumentParser(prog="integration-vm", description=__doc__) sub = p.add_subparsers(dest="cmd", required=True) for c in ("up", "apply", "reboot", "assert", "cycle", "down", "console"): sp = sub.add_parser(c) sp.add_argument("--host", required=True) sp.add_argument("--certs", choices=VALID_TIERS, default="internal") sp.add_argument("--keep", action="store_true") sp.add_argument("--no-reboot", action="store_true") sub.add_parser("prune") args = p.parse_args(argv) return DISPATCH[args.cmd](args) if __name__ == "__main__": # pragma: no cover sys.exit(main())