cloud-init package_update:true + block on 'cloud-init status --wait' in up() so apply sees populated apt lists (fresh genericcloud images ship empty lists); dump_diagnostics()/console() read the root:0600 serial log via sudo instead of shutil.copy, which raised PermissionError mid-diagnostics. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
436 lines
16 KiB
Python
436 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""boma local-VM integration test harness driver (ADR-025).
|
|
|
|
Stdlib-only by convention (TODO-14): never imports a YAML library. The transient
|
|
inventory is emitted via string templates; stubs/cert-tiers reach Ansible as
|
|
`-e @<file>` extra-vars; profile metadata is JSON. Talks to libvirt via `virsh`.
|
|
"""
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import uuid
|
|
|
|
REPO_ROOT = pathlib.Path(__file__).resolve().parent.parent
|
|
CACHE_DIR = pathlib.Path(os.environ.get("BOMA_IT_CACHE", "/var/lib/boma-integration"))
|
|
IMAGE_URL = "https://cloud.debian.org/images/cloud/trixie/latest/debian-13-genericcloud-amd64.qcow2"
|
|
SHA_URL = "https://cloud.debian.org/images/cloud/trixie/latest/SHA512SUMS"
|
|
IMAGE_NAME = "debian-13-genericcloud-amd64.qcow2"
|
|
NET_NAME = "boma-it"
|
|
NET_XML = """<network>
|
|
<name>boma-it</name>
|
|
<forward mode='nat'/>
|
|
<bridge name='virbr-boma' stp='on' delay='0'/>
|
|
<ip address='192.168.150.1' netmask='255.255.255.0'>
|
|
<dhcp><range start='192.168.150.10' end='192.168.150.254'/></dhcp>
|
|
</ip>
|
|
</network>
|
|
"""
|
|
NAME_PREFIX = "boma-it-"
|
|
RUN_DIR = REPO_ROOT / "tests" / "integration" / ".run"
|
|
DIAG_ROOT = pathlib.Path.home() / "integration-runs"
|
|
PROFILE_DIR = REPO_ROOT / "tests" / "integration" / "profiles"
|
|
INTEG_DIR = REPO_ROOT / "tests" / "integration"
|
|
CERT_DIR = REPO_ROOT / "tests" / "integration" / "certs"
|
|
DEFAULT_MEM_MIB = 3072
|
|
DEFAULT_VCPUS = 2
|
|
MIN_FREE_MIB = 4096
|
|
VALID_TIERS = ("internal", "le-staging", "le-prod-wildcard")
|
|
|
|
# Target the SYSTEM libvirtd — where the substrate, /dev/kvm, and the NAT network live.
|
|
# Without this, a non-root caller's bare virsh/virt-install default to qemu:///session.
|
|
os.environ.setdefault("LIBVIRT_DEFAULT_URI", "qemu:///system")
|
|
|
|
|
|
def vm_name(host, suffix=None):
|
|
suffix = suffix or uuid.uuid4().hex[:8]
|
|
return f"{NAME_PREFIX}{host}-{suffix}"
|
|
|
|
|
|
def free_mib(meminfo_text):
|
|
m = re.search(r"^MemAvailable:\s+(\d+)\s+kB", meminfo_text, re.MULTILINE)
|
|
return int(m.group(1)) // 1024 if m else 0
|
|
|
|
|
|
def parse_lease_ip(domifaddr_output):
|
|
m = re.search(r"ipv4\s+(\d+\.\d+\.\d+\.\d+)", domifaddr_output)
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def render_meta_data(instance_id, hostname):
|
|
return f"instance-id: {instance_id}\nlocal-hostname: {hostname}\n"
|
|
|
|
|
|
def render_user_data(ssh_pubkey, ansible_user):
|
|
return (
|
|
"#cloud-config\n"
|
|
"users:\n"
|
|
f" - name: {ansible_user}\n"
|
|
" sudo: 'ALL=(ALL) NOPASSWD:ALL'\n"
|
|
" shell: /bin/bash\n"
|
|
" ssh_authorized_keys:\n"
|
|
f" - {ssh_pubkey}\n"
|
|
"ssh_pwauth: false\n"
|
|
"package_update: true\n"
|
|
)
|
|
|
|
|
|
def cert_file(tier):
|
|
if tier not in VALID_TIERS:
|
|
raise ValueError(f"unknown cert tier: {tier}")
|
|
return CERT_DIR / f"{tier}.yml"
|
|
|
|
|
|
def profile_path(host):
|
|
return PROFILE_DIR / f"{host}.json"
|
|
|
|
|
|
def render_run_hosts(name, ip, ansible_user, groups):
|
|
lines = [
|
|
"# Generated by scripts/integration-vm.py — transient, gitignored. Do not edit.",
|
|
"# Single test host ONLY (safety invariant: no real host is ever in scope).",
|
|
"all:",
|
|
" children:",
|
|
]
|
|
for g in dict.fromkeys(groups):
|
|
lines += [
|
|
f" {g}:",
|
|
" hosts:",
|
|
f" {name}:",
|
|
f" ansible_host: {ip}",
|
|
f" ansible_user: {ansible_user}",
|
|
]
|
|
return "\n".join(lines) + "\n"
|
|
|
|
|
|
def sh(cmd, check=True, capture=False, **kw):
|
|
"""Run a command (list form). Logs the command to stderr."""
|
|
print("+ " + " ".join(str(c) for c in cmd), file=sys.stderr)
|
|
return subprocess.run(cmd, check=check,
|
|
capture_output=capture, text=True, **kw)
|
|
|
|
|
|
def _expected_sha(sha_text, filename):
|
|
for line in sha_text.splitlines():
|
|
parts = line.split()
|
|
if len(parts) == 2 and parts[1].lstrip("*") == filename:
|
|
return parts[0]
|
|
return None
|
|
|
|
|
|
def ensure_image():
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
img = CACHE_DIR / IMAGE_NAME
|
|
if img.exists():
|
|
return img
|
|
print(f"Downloading {IMAGE_URL} ...", file=sys.stderr)
|
|
tmp = img.with_suffix(".part")
|
|
urllib.request.urlretrieve(IMAGE_URL, tmp)
|
|
sha_text = urllib.request.urlopen(SHA_URL).read().decode()
|
|
want = _expected_sha(sha_text, IMAGE_NAME)
|
|
if not want:
|
|
tmp.unlink(missing_ok=True)
|
|
raise SystemExit(f"checksum for {IMAGE_NAME} not found at {SHA_URL}")
|
|
h = hashlib.sha512()
|
|
with open(tmp, "rb") as fh:
|
|
for chunk in iter(lambda: fh.read(1 << 20), b""):
|
|
h.update(chunk)
|
|
if h.hexdigest() != want:
|
|
tmp.unlink(missing_ok=True)
|
|
raise SystemExit("golden image SHA512 mismatch — refusing to use it")
|
|
tmp.rename(img)
|
|
return img
|
|
|
|
|
|
def net_ensure():
|
|
r = sh(["virsh", "net-info", NET_NAME], check=False, capture=True)
|
|
if r.returncode != 0:
|
|
xml = RUN_DIR / "net.xml"
|
|
RUN_DIR.mkdir(parents=True, exist_ok=True)
|
|
xml.write_text(NET_XML)
|
|
sh(["virsh", "net-define", str(xml)])
|
|
sh(["virsh", "net-autostart", NET_NAME])
|
|
active = sh(["virsh", "net-info", NET_NAME], capture=True).stdout
|
|
if not re.search(r"Active:\s+yes", active):
|
|
sh(["virsh", "net-start", NET_NAME])
|
|
|
|
|
|
def _ssh_pubkey():
|
|
for cand in ("id_ed25519.pub", "id_rsa.pub"):
|
|
p = pathlib.Path.home() / ".ssh" / cand
|
|
if p.exists():
|
|
return p.read_text().strip()
|
|
raise SystemExit("no SSH public key found in ~/.ssh")
|
|
|
|
|
|
def up(host, name=None, mem_mib=DEFAULT_MEM_MIB, vcpus=DEFAULT_VCPUS):
|
|
free = free_mib(pathlib.Path("/proc/meminfo").read_text())
|
|
if free < MIN_FREE_MIB:
|
|
raise SystemExit(f"refusing to start: only {free} MiB free (< {MIN_FREE_MIB})")
|
|
running = sh(["virsh", "list", "--name"], capture=True).stdout.split()
|
|
if any(n.startswith(NAME_PREFIX) for n in running):
|
|
raise SystemExit("an integration VM is already running (one at a time); "
|
|
"run `integration-vm prune` first")
|
|
name = name or vm_name(host)
|
|
img = ensure_image()
|
|
net_ensure()
|
|
RUN_DIR.mkdir(parents=True, exist_ok=True)
|
|
# VM disk/seed/console must live where the SYSTEM hypervisor (libvirt-qemu) can reach
|
|
# them — NOT under the repo/home (qemu cannot traverse /home/claude). CACHE_DIR is
|
|
# group-libvirt + world-traversable (created by the integration_test role).
|
|
overlay = CACHE_DIR / f"{name}.qcow2"
|
|
sh(["qemu-img", "create", "-f", "qcow2", "-F", "qcow2", "-b", str(img), str(overlay)])
|
|
(RUN_DIR / "user-data").write_text(render_user_data(_ssh_pubkey(), "ansible"))
|
|
(RUN_DIR / "meta-data").write_text(render_meta_data(f"iid-{name}", name))
|
|
seed = CACHE_DIR / f"{name}-seed.img"
|
|
# Force DHCP on the VM NIC — don't rely on the genericcloud image's network fallback.
|
|
(RUN_DIR / "network-config").write_text(
|
|
'version: 2\n'
|
|
'ethernets:\n'
|
|
' primary:\n'
|
|
' match:\n'
|
|
' name: "en*"\n'
|
|
' dhcp4: true\n')
|
|
sh(["cloud-localds", "--network-config", str(RUN_DIR / "network-config"),
|
|
str(seed), str(RUN_DIR / "user-data"), str(RUN_DIR / "meta-data")])
|
|
console = CACHE_DIR / f"{name}-console.log"
|
|
sh(["virt-install", "--name", name, "--memory", str(mem_mib), "--vcpus", str(vcpus),
|
|
"--boot", "uefi", # genericcloud triple-faults on legacy BIOS handoff; UEFI boots
|
|
"--import",
|
|
"--disk", f"path={overlay},format=qcow2",
|
|
"--disk", f"path={seed},device=cdrom",
|
|
"--network", f"network={NET_NAME}",
|
|
"--osinfo", "debian13",
|
|
"--graphics", "none",
|
|
"--serial", f"file,path={console}",
|
|
"--noautoconsole"])
|
|
ip = wait_for_ip(name)
|
|
wait_for_ssh(ip, "ansible")
|
|
# Block until cloud-init finishes (incl. apt-get update) so apply sees a ready system.
|
|
sh(["ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null",
|
|
f"ansible@{ip}", "sudo cloud-init status --wait"], check=False)
|
|
(RUN_DIR / "current").write_text(f"{name}\n{ip}\n{host}\n")
|
|
print(f"VM {name} up at {ip}")
|
|
return name, ip
|
|
|
|
|
|
def wait_for_ip(name, timeout=120):
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
out = sh(["virsh", "domifaddr", name, "--source", "lease"],
|
|
check=False, capture=True).stdout
|
|
ip = parse_lease_ip(out)
|
|
if ip:
|
|
return ip
|
|
time.sleep(4)
|
|
raise SystemExit(f"timed out waiting for {name} to get a DHCP lease — "
|
|
"VM left defined; run `integration-vm prune` to remove it")
|
|
|
|
|
|
def wait_for_ssh(ip, user, timeout=180):
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
r = sh(["ssh", "-o", "StrictHostKeyChecking=no",
|
|
"-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=5",
|
|
f"{user}@{ip}", "true"], check=False, capture=True)
|
|
if r.returncode == 0:
|
|
return
|
|
time.sleep(5)
|
|
raise SystemExit(f"timed out waiting for SSH to {ip} — "
|
|
"VM left defined; run `integration-vm prune` to remove it")
|
|
|
|
|
|
def _read_current():
|
|
txt = (RUN_DIR / "current").read_text().splitlines()
|
|
return txt[0], txt[1], txt[2] # name, ip, host
|
|
|
|
|
|
def write_run_inventory(name, ip, groups):
|
|
RUN_DIR.mkdir(parents=True, exist_ok=True)
|
|
(RUN_DIR / "hosts.yml").write_text(
|
|
render_run_hosts(name, ip, "ansible", groups))
|
|
link = RUN_DIR / "group_vars"
|
|
target = REPO_ROOT / "inventories" / "production" / "group_vars"
|
|
if link.is_symlink():
|
|
link.unlink()
|
|
elif link.exists():
|
|
raise SystemExit(f"{link} exists and is not a symlink; remove it manually")
|
|
link.symlink_to(target)
|
|
|
|
|
|
def apply(host, certs):
|
|
name, ip, _ = _read_current()
|
|
prof = json.loads(profile_path(host).read_text())
|
|
write_run_inventory(name, ip, prof["groups"])
|
|
extra = []
|
|
for f in prof.get("extra_vars_files", []):
|
|
extra += ["-e", f"@{INTEG_DIR / f}"]
|
|
extra += ["-e", f"@{cert_file(certs)}"]
|
|
for step in prof["applies"]:
|
|
cmd = [".venv/bin/ansible-playbook", "-i", str(RUN_DIR / "hosts.yml"),
|
|
f"playbooks/{step['playbook']}", "--limit", name]
|
|
if step.get("tags"):
|
|
cmd += ["--tags", ",".join(step["tags"])]
|
|
cmd += extra
|
|
sh(cmd, cwd=str(REPO_ROOT))
|
|
print(f"applied {host} profile to {name}")
|
|
|
|
|
|
def _boot_id(ip, user):
|
|
r = sh(["ssh", "-o", "StrictHostKeyChecking=no",
|
|
"-o", "UserKnownHostsFile=/dev/null", "-o", "ConnectTimeout=5",
|
|
f"{user}@{ip}", "cat /proc/sys/kernel/random/boot_id"],
|
|
check=False, capture=True)
|
|
return r.stdout.strip() if r.returncode == 0 else None
|
|
|
|
|
|
def wait_for_reboot(ip, user, before_boot_id, timeout=240):
|
|
"""Confirm a REAL reboot: SSH back up AND boot_id changed (not the pre-reboot sshd)."""
|
|
end = time.time() + timeout
|
|
while time.time() < end:
|
|
bid = _boot_id(ip, user)
|
|
if bid and bid != before_boot_id:
|
|
return
|
|
time.sleep(5)
|
|
raise SystemExit(f"timed out waiting for {ip} to reboot (boot_id unchanged) — "
|
|
"VM left defined; run `integration-vm prune` to remove it")
|
|
|
|
|
|
def reboot_vm():
|
|
name, ip, _ = _read_current()
|
|
before = _boot_id(ip, "ansible")
|
|
sh(["virsh", "reboot", name])
|
|
wait_for_reboot(ip, "ansible", before)
|
|
print(f"{name} rebooted (boot_id changed), SSH back at {ip}")
|
|
|
|
|
|
def run_assert(host, certs):
|
|
name, ip, _ = _read_current()
|
|
prof = json.loads(profile_path(host).read_text())
|
|
write_run_inventory(name, ip, prof["groups"])
|
|
extra = []
|
|
for f in prof.get("extra_vars_files", []):
|
|
extra += ["-e", f"@{INTEG_DIR / f}"]
|
|
extra += ["-e", f"@{cert_file(certs)}"]
|
|
cmd = [".venv/bin/ansible-playbook", "-i", str(RUN_DIR / "hosts.yml"),
|
|
"tests/integration/verify.yml", "--limit", name] + extra
|
|
r = sh(cmd, cwd=str(REPO_ROOT), check=False)
|
|
if r.returncode != 0:
|
|
dump_diagnostics(name, ip)
|
|
raise SystemExit(f"VERIFY FAILED for {name} — diagnostics in {DIAG_ROOT}")
|
|
print(f"VERIFY PASSED for {name}")
|
|
|
|
|
|
def dump_diagnostics(name, ip):
|
|
d = DIAG_ROOT / name
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
for label, cmd in [
|
|
("nft", "nft list ruleset"),
|
|
("docker", "docker ps -a"),
|
|
("ss", "ss -tlnp"),
|
|
("journal", "journalctl -b --no-pager"),
|
|
("critical-chain", "systemd-analyze critical-chain"),
|
|
]:
|
|
r = sh(["ssh", "-o", "StrictHostKeyChecking=no",
|
|
"-o", "UserKnownHostsFile=/dev/null",
|
|
f"ansible@{ip}", "sudo " + cmd], check=False, capture=True)
|
|
(d / f"{label}.txt").write_text((r.stdout or "") + (r.stderr or ""))
|
|
console = CACHE_DIR / f"{name}-console.log"
|
|
if console.exists():
|
|
# The serial log is root:0600 (libvirt-created); read it via sudo (ADR-015: the
|
|
# claude worker has sudo) and write a worker-owned copy into the bundle.
|
|
r = sh(["sudo", "cat", str(console)], check=False, capture=True)
|
|
(d / "console.log").write_text(r.stdout or "")
|
|
print(f"diagnostics written to {d}", file=sys.stderr)
|
|
|
|
|
|
def _destroy(name):
|
|
sh(["virsh", "destroy", name], check=False)
|
|
sh(["virsh", "undefine", name, "--nvram"], check=False)
|
|
for base in (RUN_DIR, CACHE_DIR):
|
|
for f in base.glob(f"{name}*"):
|
|
f.unlink(missing_ok=True)
|
|
|
|
|
|
def down(host=None, keep=False):
|
|
if keep:
|
|
print("--keep: leaving the VM running for inspection")
|
|
return
|
|
cur = RUN_DIR / "current"
|
|
if cur.exists():
|
|
name = cur.read_text().splitlines()[0]
|
|
_destroy(name)
|
|
cur.unlink(missing_ok=True)
|
|
print(f"destroyed {name}")
|
|
|
|
|
|
def prune():
|
|
running = sh(["virsh", "list", "--all", "--name"], capture=True).stdout.split()
|
|
for n in running:
|
|
if n.startswith(NAME_PREFIX):
|
|
_destroy(n)
|
|
print(f"pruned {n}")
|
|
(RUN_DIR / "current").unlink(missing_ok=True)
|
|
|
|
|
|
def console():
|
|
name = (RUN_DIR / "current").read_text().splitlines()[0]
|
|
log = CACHE_DIR / f"{name}-console.log"
|
|
if log.exists():
|
|
print(sh(["sudo", "cat", str(log)], check=False, capture=True).stdout or "")
|
|
else:
|
|
print(f"no console log at {log}")
|
|
|
|
|
|
def cycle(host, certs, keep=False, no_reboot=False):
|
|
ok = False
|
|
try:
|
|
up(host)
|
|
apply(host, certs)
|
|
if not no_reboot:
|
|
reboot_vm()
|
|
run_assert(host, certs)
|
|
ok = True
|
|
finally:
|
|
if ok and not keep:
|
|
down(host)
|
|
elif not ok:
|
|
print("FAILED — VM left up for inspection; `integration-vm prune` to clean.",
|
|
file=sys.stderr)
|
|
|
|
|
|
DISPATCH = {
|
|
"up": lambda a: (up(a.host), None)[1],
|
|
"apply": lambda a: apply(a.host, a.certs),
|
|
"reboot": lambda a: reboot_vm(),
|
|
"assert": lambda a: run_assert(a.host, a.certs),
|
|
"down": lambda a: down(a.host, a.keep),
|
|
"console": lambda a: console(),
|
|
"prune": lambda a: prune(),
|
|
"cycle": lambda a: cycle(a.host, a.certs, a.keep, a.no_reboot),
|
|
}
|
|
|
|
|
|
def main(argv=None):
|
|
p = argparse.ArgumentParser(prog="integration-vm", description=__doc__)
|
|
sub = p.add_subparsers(dest="cmd", required=True)
|
|
for c in ("up", "apply", "reboot", "assert", "cycle", "down", "console"):
|
|
sp = sub.add_parser(c)
|
|
sp.add_argument("--host", required=True)
|
|
sp.add_argument("--certs", choices=VALID_TIERS, default="internal")
|
|
sp.add_argument("--keep", action="store_true")
|
|
sp.add_argument("--no-reboot", action="store_true")
|
|
sub.add_parser("prune")
|
|
args = p.parse_args(argv)
|
|
return DISPATCH[args.cmd](args)
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
sys.exit(main())
|