boma/scripts/integration-vm.py
sjat 26bb7e442d fix(integration): pin system python for virt-install (venv PATH hijack)
The Makefile prepends .venv/bin to PATH (so the venv's ansible tools resolve),
but virt-install's `#!/usr/bin/env python3` shebang then resolved to the
isolated venv, which lacks system PyGObject (gi) -> ModuleNotFoundError. Strip
.venv/bin from PATH for the virt-install call so its shebang finds
/usr/bin/python3 (which has gi); ansible runs via its absolute .venv path and is
unaffected. Surfaced running `make test-integration HOST=ubongo`.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 10:32:09 +02:00

444 lines
16 KiB
Python

#!/usr/bin/env python3
"""boma local-VM integration test harness driver (ADR-025).
Stdlib-only by convention (TODO-14): never imports a YAML library. The transient
inventory is emitted via string templates; stubs/cert-tiers reach Ansible as
`-e @<file>` extra-vars; profile metadata is JSON. Talks to libvirt via `virsh`.
"""
import argparse
import hashlib
import json
import os
import pathlib
import re
import subprocess
import sys
import time
import urllib.request
import uuid
REPO_ROOT = pathlib.Path(__file__).resolve().parent.parent
CACHE_DIR = pathlib.Path(os.environ.get("BOMA_IT_CACHE", "/var/lib/boma-integration"))
IMAGE_URL = "https://cloud.debian.org/images/cloud/trixie/latest/debian-13-genericcloud-amd64.qcow2"
SHA_URL = "https://cloud.debian.org/images/cloud/trixie/latest/SHA512SUMS"
IMAGE_NAME = "debian-13-genericcloud-amd64.qcow2"
NET_NAME = "boma-it"
NET_XML = """<network>
<name>boma-it</name>
<forward mode='nat'/>
<bridge name='virbr-boma' stp='on' delay='0'/>
<ip address='192.168.150.1' netmask='255.255.255.0'>
<dhcp><range start='192.168.150.10' end='192.168.150.254'/></dhcp>
</ip>
</network>
"""
NAME_PREFIX = "boma-it-"
RUN_DIR = REPO_ROOT / "tests" / "integration" / ".run"
DIAG_ROOT = pathlib.Path.home() / "integration-runs"
PROFILE_DIR = REPO_ROOT / "tests" / "integration" / "profiles"
INTEG_DIR = REPO_ROOT / "tests" / "integration"
CERT_DIR = REPO_ROOT / "tests" / "integration" / "certs"
DEFAULT_MEM_MIB = 3072
DEFAULT_VCPUS = 2
MIN_FREE_MIB = 4096
VALID_TIERS = ("internal", "le-staging", "le-prod-wildcard")
# Target the SYSTEM libvirtd — where the substrate, /dev/kvm, and the NAT network live.
# Without this, a non-root caller's bare virsh/virt-install default to qemu:///session.
os.environ.setdefault("LIBVIRT_DEFAULT_URI", "qemu:///system")
def vm_name(host, suffix=None):
suffix = suffix or uuid.uuid4().hex[:8]
return f"{NAME_PREFIX}{host}-{suffix}"
def free_mib(meminfo_text):
m = re.search(r"^MemAvailable:\s+(\d+)\s+kB", meminfo_text, re.MULTILINE)
return int(m.group(1)) // 1024 if m else 0
def parse_lease_ip(domifaddr_output):
m = re.search(r"ipv4\s+(\d+\.\d+\.\d+\.\d+)", domifaddr_output)
return m.group(1) if m else None
def render_meta_data(instance_id, hostname):
return f"instance-id: {instance_id}\nlocal-hostname: {hostname}\n"
def render_user_data(ssh_pubkey, ansible_user):
return (
"#cloud-config\n"
"users:\n"
f" - name: {ansible_user}\n"
" sudo: 'ALL=(ALL) NOPASSWD:ALL'\n"
" shell: /bin/bash\n"
" ssh_authorized_keys:\n"
f" - {ssh_pubkey}\n"
"ssh_pwauth: false\n"
"package_update: true\n"
)
def cert_file(tier):
if tier not in VALID_TIERS:
raise ValueError(f"unknown cert tier: {tier}")
return CERT_DIR / f"{tier}.yml"
def profile_path(host):
return PROFILE_DIR / f"{host}.json"
def render_run_hosts(name, ip, ansible_user, groups):
lines = [
"---",
"# Generated by scripts/integration-vm.py — transient, gitignored. Do not edit.",
"# Single test host ONLY (safety invariant: no real host is ever in scope).",
"all:",
" children:",
]
for g in dict.fromkeys(groups):
lines += [
f" {g}:",
" hosts:",
f" {name}:",
f" ansible_host: {ip}",
f" ansible_user: {ansible_user}",
]
return "\n".join(lines) + "\n"
def sh(cmd, check=True, capture=False, **kw):
"""Run a command (list form). Logs the command to stderr."""
print("+ " + " ".join(str(c) for c in cmd), file=sys.stderr)
return subprocess.run(cmd, check=check,
capture_output=capture, text=True, **kw)
def _expected_sha(sha_text, filename):
for line in sha_text.splitlines():
parts = line.split()
if len(parts) == 2 and parts[1].lstrip("*") == filename:
return parts[0]
return None
def ensure_image():
CACHE_DIR.mkdir(parents=True, exist_ok=True)
img = CACHE_DIR / IMAGE_NAME
if img.exists():
return img
print(f"Downloading {IMAGE_URL} ...", file=sys.stderr)
tmp = img.with_suffix(".part")
urllib.request.urlretrieve(IMAGE_URL, tmp)
sha_text = urllib.request.urlopen(SHA_URL).read().decode()
want = _expected_sha(sha_text, IMAGE_NAME)
if not want:
tmp.unlink(missing_ok=True)
raise SystemExit(f"checksum for {IMAGE_NAME} not found at {SHA_URL}")
h = hashlib.sha512()
with open(tmp, "rb") as fh:
for chunk in iter(lambda: fh.read(1 << 20), b""):
h.update(chunk)
if h.hexdigest() != want:
tmp.unlink(missing_ok=True)
raise SystemExit("golden image SHA512 mismatch — refusing to use it")
tmp.rename(img)
return img
def net_ensure():
r = sh(["virsh", "net-info", NET_NAME], check=False, capture=True)
if r.returncode != 0:
xml = RUN_DIR / "net.xml"
RUN_DIR.mkdir(parents=True, exist_ok=True)
xml.write_text(NET_XML)
sh(["virsh", "net-define", str(xml)])
sh(["virsh", "net-autostart", NET_NAME])
active = sh(["virsh", "net-info", NET_NAME], capture=True).stdout
if not re.search(r"Active:\s+yes", active):
sh(["virsh", "net-start", NET_NAME])
def _ssh_pubkey():
for cand in ("id_ed25519.pub", "id_rsa.pub"):
p = pathlib.Path.home() / ".ssh" / cand
if p.exists():
return p.read_text().strip()
raise SystemExit("no SSH public key found in ~/.ssh")
def up(host, name=None, mem_mib=DEFAULT_MEM_MIB, vcpus=DEFAULT_VCPUS):
free = free_mib(pathlib.Path("/proc/meminfo").read_text())
if free < MIN_FREE_MIB:
raise SystemExit(f"refusing to start: only {free} MiB free (< {MIN_FREE_MIB})")
running = sh(["virsh", "list", "--name"], capture=True).stdout.split()
if any(n.startswith(NAME_PREFIX) for n in running):
raise SystemExit("an integration VM is already running (one at a time); "
"run `integration-vm prune` first")
name = name or vm_name(host)
img = ensure_image()
net_ensure()
RUN_DIR.mkdir(parents=True, exist_ok=True)
# VM disk/seed/console must live where the SYSTEM hypervisor (libvirt-qemu) can reach
# them — NOT under the repo/home (qemu cannot traverse /home/claude). CACHE_DIR is
# group-libvirt + world-traversable (created by the integration_test role).
overlay = CACHE_DIR / f"{name}.qcow2"
sh(["qemu-img", "create", "-f", "qcow2", "-F", "qcow2", "-b", str(img), str(overlay)])
(RUN_DIR / "user-data").write_text(render_user_data(_ssh_pubkey(), "ansible"))
(RUN_DIR / "meta-data").write_text(render_meta_data(f"iid-{name}", name))
seed = CACHE_DIR / f"{name}-seed.img"
# Force DHCP on the VM NIC — don't rely on the genericcloud image's network fallback.
(RUN_DIR / "network-config").write_text(
'version: 2\n'
'ethernets:\n'
' primary:\n'
' match:\n'
' name: "en*"\n'
' dhcp4: true\n')
sh(["cloud-localds", "--network-config", str(RUN_DIR / "network-config"),
str(seed), str(RUN_DIR / "user-data"), str(RUN_DIR / "meta-data")])
console = CACHE_DIR / f"{name}-console.log"
# virt-install has a `#!/usr/bin/env python3` shebang; the Makefile prepends .venv/bin to
# PATH (so the venv's ansible tools resolve), which would hijack virt-install into the
# isolated venv — it lacks system PyGObject (`gi`) and crashes. Strip the venv from PATH
# for this system tool so its shebang finds /usr/bin/python3 (which has gi). Ansible is
# invoked via its absolute .venv path elsewhere, so it is unaffected.
sys_path = ":".join(p for p in os.environ.get("PATH", "").split(":")
if "/.venv/bin" not in p)
sh(["virt-install", "--name", name, "--memory", str(mem_mib), "--vcpus", str(vcpus),
"--boot", "uefi", # genericcloud triple-faults on legacy BIOS handoff; UEFI boots
"--import",
"--disk", f"path={overlay},format=qcow2",
"--disk", f"path={seed},device=cdrom",
"--network", f"network={NET_NAME}",
"--osinfo", "debian13",
"--graphics", "none",
"--serial", f"file,path={console}",
"--noautoconsole"],
env=dict(os.environ, PATH=sys_path))
ip = wait_for_ip(name)
wait_for_ssh(ip, "ansible")
# Block until cloud-init finishes (incl. apt-get update) so apply sees a ready system.
sh(["ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null",
f"ansible@{ip}", "sudo cloud-init status --wait"], check=False)
(RUN_DIR / "current").write_text(f"{name}\n{ip}\n{host}\n")
print(f"VM {name} up at {ip}")
return name, ip
def wait_for_ip(name, timeout=120):
end = time.time() + timeout
while time.time() < end:
out = sh(["virsh", "domifaddr", name, "--source", "lease"],
check=False, capture=True).stdout
ip = parse_lease_ip(out)
if ip:
return ip
time.sleep(4)
raise SystemExit(f"timed out waiting for {name} to get a DHCP lease — "
"VM left defined; run `integration-vm prune` to remove it")
def wait_for_ssh(ip, user, timeout=180):
end = time.time() + timeout
while time.time() < end:
r = sh(["ssh", "-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=5",
f"{user}@{ip}", "true"], check=False, capture=True)
if r.returncode == 0:
return
time.sleep(5)
raise SystemExit(f"timed out waiting for SSH to {ip}"
"VM left defined; run `integration-vm prune` to remove it")
def _read_current():
txt = (RUN_DIR / "current").read_text().splitlines()
return txt[0], txt[1], txt[2] # name, ip, host
def write_run_inventory(name, ip, groups):
RUN_DIR.mkdir(parents=True, exist_ok=True)
(RUN_DIR / "hosts.yml").write_text(
render_run_hosts(name, ip, "ansible", groups))
link = RUN_DIR / "group_vars"
target = REPO_ROOT / "inventories" / "production" / "group_vars"
if link.is_symlink():
link.unlink()
elif link.exists():
raise SystemExit(f"{link} exists and is not a symlink; remove it manually")
link.symlink_to(target)
def apply(host, certs):
name, ip, _ = _read_current()
prof = json.loads(profile_path(host).read_text())
write_run_inventory(name, ip, prof["groups"])
extra = []
for f in prof.get("extra_vars_files", []):
extra += ["-e", f"@{INTEG_DIR / f}"]
extra += ["-e", f"@{cert_file(certs)}"]
for step in prof["applies"]:
cmd = [".venv/bin/ansible-playbook", "-i", str(RUN_DIR / "hosts.yml"),
f"playbooks/{step['playbook']}", "--limit", name]
if step.get("tags"):
cmd += ["--tags", ",".join(step["tags"])]
cmd += extra
sh(cmd, cwd=str(REPO_ROOT))
print(f"applied {host} profile to {name}")
def _boot_id(ip, user):
r = sh(["ssh", "-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=5",
f"{user}@{ip}", "cat /proc/sys/kernel/random/boot_id"],
check=False, capture=True)
return r.stdout.strip() if r.returncode == 0 else None
def wait_for_reboot(ip, user, before_boot_id, timeout=240):
"""Confirm a REAL reboot: SSH back up AND boot_id changed (not the pre-reboot sshd)."""
end = time.time() + timeout
while time.time() < end:
bid = _boot_id(ip, user)
if bid and bid != before_boot_id:
return
time.sleep(5)
raise SystemExit(f"timed out waiting for {ip} to reboot (boot_id unchanged) — "
"VM left defined; run `integration-vm prune` to remove it")
def reboot_vm():
name, ip, _ = _read_current()
before = _boot_id(ip, "ansible")
sh(["virsh", "reboot", name])
wait_for_reboot(ip, "ansible", before)
print(f"{name} rebooted (boot_id changed), SSH back at {ip}")
def run_assert(host, certs):
name, ip, _ = _read_current()
prof = json.loads(profile_path(host).read_text())
write_run_inventory(name, ip, prof["groups"])
extra = []
for f in prof.get("extra_vars_files", []):
extra += ["-e", f"@{INTEG_DIR / f}"]
extra += ["-e", f"@{cert_file(certs)}"]
cmd = [".venv/bin/ansible-playbook", "-i", str(RUN_DIR / "hosts.yml"),
"tests/integration/verify.yml", "--limit", name] + extra
r = sh(cmd, cwd=str(REPO_ROOT), check=False)
if r.returncode != 0:
dump_diagnostics(name, ip)
raise SystemExit(f"VERIFY FAILED for {name} — diagnostics in {DIAG_ROOT}")
print(f"VERIFY PASSED for {name}")
def dump_diagnostics(name, ip):
d = DIAG_ROOT / name
d.mkdir(parents=True, exist_ok=True)
for label, cmd in [
("nft", "nft list ruleset"),
("docker", "docker ps -a"),
("ss", "ss -tlnp"),
("journal", "journalctl -b --no-pager"),
("critical-chain", "systemd-analyze critical-chain"),
]:
r = sh(["ssh", "-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
f"ansible@{ip}", "sudo " + cmd], check=False, capture=True)
(d / f"{label}.txt").write_text((r.stdout or "") + (r.stderr or ""))
console = CACHE_DIR / f"{name}-console.log"
if console.exists():
# The serial log is root:0600 (libvirt-created); read it via sudo (ADR-015: the
# claude worker has sudo) and write a worker-owned copy into the bundle.
r = sh(["sudo", "cat", str(console)], check=False, capture=True)
(d / "console.log").write_text(r.stdout or "")
print(f"diagnostics written to {d}", file=sys.stderr)
def _destroy(name):
sh(["virsh", "destroy", name], check=False)
sh(["virsh", "undefine", name, "--nvram"], check=False)
for base in (RUN_DIR, CACHE_DIR):
for f in base.glob(f"{name}*"):
f.unlink(missing_ok=True)
def down(host=None, keep=False):
if keep:
print("--keep: leaving the VM running for inspection")
return
cur = RUN_DIR / "current"
if cur.exists():
name = cur.read_text().splitlines()[0]
_destroy(name)
cur.unlink(missing_ok=True)
print(f"destroyed {name}")
def prune():
running = sh(["virsh", "list", "--all", "--name"], capture=True).stdout.split()
for n in running:
if n.startswith(NAME_PREFIX):
_destroy(n)
print(f"pruned {n}")
(RUN_DIR / "current").unlink(missing_ok=True)
def console():
name = (RUN_DIR / "current").read_text().splitlines()[0]
log = CACHE_DIR / f"{name}-console.log"
if log.exists():
print(sh(["sudo", "cat", str(log)], check=False, capture=True).stdout or "")
else:
print(f"no console log at {log}")
def cycle(host, certs, keep=False, no_reboot=False):
ok = False
try:
up(host)
apply(host, certs)
if not no_reboot:
reboot_vm()
run_assert(host, certs)
ok = True
finally:
if ok and not keep:
down(host)
elif not ok:
print("FAILED — VM left up for inspection; `integration-vm prune` to clean.",
file=sys.stderr)
DISPATCH = {
"up": lambda a: (up(a.host), None)[1],
"apply": lambda a: apply(a.host, a.certs),
"reboot": lambda a: reboot_vm(),
"assert": lambda a: run_assert(a.host, a.certs),
"down": lambda a: down(a.host, a.keep),
"console": lambda a: console(),
"prune": lambda a: prune(),
"cycle": lambda a: cycle(a.host, a.certs, a.keep, a.no_reboot),
}
def main(argv=None):
p = argparse.ArgumentParser(prog="integration-vm", description=__doc__)
sub = p.add_subparsers(dest="cmd", required=True)
for c in ("up", "apply", "reboot", "assert", "cycle", "down", "console"):
sp = sub.add_parser(c)
sp.add_argument("--host", required=True)
sp.add_argument("--certs", choices=VALID_TIERS, default="internal")
sp.add_argument("--keep", action="store_true")
sp.add_argument("--no-reboot", action="store_true")
sub.add_parser("prune")
args = p.parse_args(argv)
return DISPATCH[args.cmd](args)
if __name__ == "__main__": # pragma: no cover
sys.exit(main())