diff --git a/roles/base/filter_plugins/firewall_rules.py b/roles/base/filter_plugins/firewall_rules.py new file mode 100644 index 0000000..4bb2c9d --- /dev/null +++ b/roles/base/filter_plugins/firewall_rules.py @@ -0,0 +1,72 @@ +"""Resolve the shared firewall catalog into concrete nftables ingress rules for one host. + +Used by the base role's nftables template (ADR-020 / host-nftables design). Pure +functions — unit-tested in tests/test_firewall_rules.py. +""" + + +def _placement_hosts(entry, groups): + """Hostnames a catalog entry is placed on (exactly one of host/group/hosts).""" + if "host" in entry: + return [entry["host"]] + if "group" in entry: + return list(groups.get(entry["group"], [])) + if "hosts" in entry: + return list(entry["hosts"]) + raise ValueError(f"catalog entry has no placement (host/group/hosts): {entry!r}") + + +def _host_cidr(host, hostvars): + hv = hostvars.get(host) or {} + ip = hv.get("ansible_host") + if not ip: + raise ValueError(f"no ansible_host for '{host}' — cannot resolve firewall source") + return f"{ip}/32" + + +def _resolve_source(frm, catalog, zones, hostvars, groups): + """Resolve a symbolic `from` to a sorted list of source CIDRs.""" + if frm in zones: + return [zones[frm]] + if frm in catalog: + return sorted(_host_cidr(h, hostvars) + for h in _placement_hosts(catalog[frm], groups)) + if frm in groups: + return sorted(_host_cidr(h, hostvars) for h in groups[frm]) + if frm in hostvars: + return [_host_cidr(frm, hostvars)] + raise ValueError(f"unresolvable firewall source '{frm}'") + + +def resolve_firewall_rules(catalog, zones, inventory_hostname, hostvars, groups): + """Return sorted, de-duped [{proto, port, sources:[cidr,...]}] for services on this host.""" + catalog = catalog or {} + zones = zones or {} + groups = groups or {} + + rules = [] + for _name, entry in sorted(catalog.items()): + if inventory_hostname not in _placement_hosts(entry, groups): + continue + for ing in entry.get("ingress", []): + rules.append({ + "proto": ing.get("proto", "tcp"), + "port": int(ing["port"]), + "sources": _resolve_source(ing["from"], catalog, zones, hostvars, groups), + }) + + seen = set() + out = [] + for r in sorted(rules, key=lambda x: (x["port"], x["proto"], x["sources"])): + key = (r["proto"], r["port"], tuple(r["sources"])) + if key not in seen: + seen.add(key) + out.append(r) + return out + + +class FilterModule: + """Ansible filter plugin entry point.""" + + def filters(self): + return {"resolve_firewall_rules": resolve_firewall_rules} diff --git a/tests/test_firewall_rules.py b/tests/test_firewall_rules.py new file mode 100644 index 0000000..d45cdd6 --- /dev/null +++ b/tests/test_firewall_rules.py @@ -0,0 +1,73 @@ +import importlib.util +import pathlib + +import pytest + +_PATH = ( + pathlib.Path(__file__).resolve().parent.parent + / "roles" / "base" / "filter_plugins" / "firewall_rules.py" +) +_spec = importlib.util.spec_from_file_location("firewall_rules", _PATH) +fr = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(fr) + +ZONES = {"lan": "10.30.0.0/24", "srv": "10.20.0.0/24"} +HOSTVARS = { + "docker01": {"ansible_host": "10.20.0.50"}, + "docker02": {"ansible_host": "10.20.0.51"}, +} +GROUPS = {"docker_hosts": ["docker01", "docker02"]} + + +def test_zone_source(): + cat = {"reverse_proxy": {"host": "docker01", + "ingress": [{"from": "lan", "port": 443, "proto": "tcp"}]}} + out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) + assert out == [{"proto": "tcp", "port": 443, "sources": ["10.30.0.0/24"]}] + + +def test_service_source_resolves_to_host_ip(): + cat = { + "reverse_proxy": {"host": "docker01", "ingress": []}, + "photoprism": {"host": "docker01", + "ingress": [{"from": "reverse_proxy", "port": 2342, "proto": "tcp"}]}, + } + out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) + assert out == [{"proto": "tcp", "port": 2342, "sources": ["10.20.0.50/32"]}] + + +def test_group_placement_and_source_multi_host(): + cat = {"dns": {"group": "docker_hosts", + "ingress": [{"from": "docker_hosts", "port": 53, "proto": "udp"}]}} + out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) + assert out == [{"proto": "udp", "port": 53, + "sources": ["10.20.0.50/32", "10.20.0.51/32"]}] + + +def test_host_with_no_services_returns_empty(): + cat = {"photoprism": {"host": "docker02", + "ingress": [{"from": "lan", "port": 2342, "proto": "tcp"}]}} + assert fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) == [] + + +def test_unresolvable_from_raises(): + cat = {"x": {"host": "docker01", + "ingress": [{"from": "nope", "port": 80, "proto": "tcp"}]}} + with pytest.raises(ValueError): + fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) + + +def test_duplicate_rules_deduped(): + cat = {"app": {"host": "docker01", "ingress": [ + {"from": "lan", "port": 8080, "proto": "tcp"}, + {"from": "lan", "port": 8080, "proto": "tcp"}, + ]}} + out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) + assert out == [{"proto": "tcp", "port": 8080, "sources": ["10.30.0.0/24"]}] + + +def test_missing_ansible_host_raises(): + cat = {"x": {"host": "docker01", + "ingress": [{"from": "docker02", "port": 80, "proto": "tcp"}]}} + with pytest.raises(ValueError): + fr.resolve_firewall_rules(cat, ZONES, "docker01", {"docker01": {}, "docker02": {}}, GROUPS)