feat(base): firewall catalog resolver filter plugin + tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sjat 2026-06-06 18:51:10 +02:00
parent 390cd3b335
commit 4127f8bc6b
2 changed files with 145 additions and 0 deletions

View file

@ -0,0 +1,72 @@
"""Resolve the shared firewall catalog into concrete nftables ingress rules for one host.
Used by the base role's nftables template (ADR-020 / host-nftables design). Pure
functions unit-tested in tests/test_firewall_rules.py.
"""
def _placement_hosts(entry, groups):
"""Hostnames a catalog entry is placed on (exactly one of host/group/hosts)."""
if "host" in entry:
return [entry["host"]]
if "group" in entry:
return list(groups.get(entry["group"], []))
if "hosts" in entry:
return list(entry["hosts"])
raise ValueError(f"catalog entry has no placement (host/group/hosts): {entry!r}")
def _host_cidr(host, hostvars):
hv = hostvars.get(host) or {}
ip = hv.get("ansible_host")
if not ip:
raise ValueError(f"no ansible_host for '{host}' — cannot resolve firewall source")
return f"{ip}/32"
def _resolve_source(frm, catalog, zones, hostvars, groups):
"""Resolve a symbolic `from` to a sorted list of source CIDRs."""
if frm in zones:
return [zones[frm]]
if frm in catalog:
return sorted(_host_cidr(h, hostvars)
for h in _placement_hosts(catalog[frm], groups))
if frm in groups:
return sorted(_host_cidr(h, hostvars) for h in groups[frm])
if frm in hostvars:
return [_host_cidr(frm, hostvars)]
raise ValueError(f"unresolvable firewall source '{frm}'")
def resolve_firewall_rules(catalog, zones, inventory_hostname, hostvars, groups):
"""Return sorted, de-duped [{proto, port, sources:[cidr,...]}] for services on this host."""
catalog = catalog or {}
zones = zones or {}
groups = groups or {}
rules = []
for _name, entry in sorted(catalog.items()):
if inventory_hostname not in _placement_hosts(entry, groups):
continue
for ing in entry.get("ingress", []):
rules.append({
"proto": ing.get("proto", "tcp"),
"port": int(ing["port"]),
"sources": _resolve_source(ing["from"], catalog, zones, hostvars, groups),
})
seen = set()
out = []
for r in sorted(rules, key=lambda x: (x["port"], x["proto"], x["sources"])):
key = (r["proto"], r["port"], tuple(r["sources"]))
if key not in seen:
seen.add(key)
out.append(r)
return out
class FilterModule:
"""Ansible filter plugin entry point."""
def filters(self):
return {"resolve_firewall_rules": resolve_firewall_rules}

View file

@ -0,0 +1,73 @@
import importlib.util
import pathlib
import pytest
_PATH = (
pathlib.Path(__file__).resolve().parent.parent
/ "roles" / "base" / "filter_plugins" / "firewall_rules.py"
)
_spec = importlib.util.spec_from_file_location("firewall_rules", _PATH)
fr = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(fr)
ZONES = {"lan": "10.30.0.0/24", "srv": "10.20.0.0/24"}
HOSTVARS = {
"docker01": {"ansible_host": "10.20.0.50"},
"docker02": {"ansible_host": "10.20.0.51"},
}
GROUPS = {"docker_hosts": ["docker01", "docker02"]}
def test_zone_source():
cat = {"reverse_proxy": {"host": "docker01",
"ingress": [{"from": "lan", "port": 443, "proto": "tcp"}]}}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "tcp", "port": 443, "sources": ["10.30.0.0/24"]}]
def test_service_source_resolves_to_host_ip():
cat = {
"reverse_proxy": {"host": "docker01", "ingress": []},
"photoprism": {"host": "docker01",
"ingress": [{"from": "reverse_proxy", "port": 2342, "proto": "tcp"}]},
}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "tcp", "port": 2342, "sources": ["10.20.0.50/32"]}]
def test_group_placement_and_source_multi_host():
cat = {"dns": {"group": "docker_hosts",
"ingress": [{"from": "docker_hosts", "port": 53, "proto": "udp"}]}}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "udp", "port": 53,
"sources": ["10.20.0.50/32", "10.20.0.51/32"]}]
def test_host_with_no_services_returns_empty():
cat = {"photoprism": {"host": "docker02",
"ingress": [{"from": "lan", "port": 2342, "proto": "tcp"}]}}
assert fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) == []
def test_unresolvable_from_raises():
cat = {"x": {"host": "docker01",
"ingress": [{"from": "nope", "port": 80, "proto": "tcp"}]}}
with pytest.raises(ValueError):
fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
def test_duplicate_rules_deduped():
cat = {"app": {"host": "docker01", "ingress": [
{"from": "lan", "port": 8080, "proto": "tcp"},
{"from": "lan", "port": 8080, "proto": "tcp"},
]}}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "tcp", "port": 8080, "sources": ["10.30.0.0/24"]}]
def test_missing_ansible_host_raises():
cat = {"x": {"host": "docker01",
"ingress": [{"from": "docker02", "port": 80, "proto": "tcp"}]}}
with pytest.raises(ValueError):
fr.resolve_firewall_rules(cat, ZONES, "docker01", {"docker01": {}, "docker02": {}}, GROUPS)