Adds a public (0.0.0.0/0) zone and askari's Caddy (80/443) + NetBird STUN (3478/udp) ingress so the base nftables default-deny does not drop the live public services when applied to askari. Molecule + filter unit test cover the public-zone rendering. Mesh-hardening 1/3 (ADR-020/024/016). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
108 lines
4.3 KiB
Python
108 lines
4.3 KiB
Python
import importlib.util
|
|
import pathlib
|
|
|
|
import pytest
|
|
|
|
_PATH = (
|
|
pathlib.Path(__file__).resolve().parent.parent
|
|
/ "roles" / "base" / "filter_plugins" / "firewall_rules.py"
|
|
)
|
|
_spec = importlib.util.spec_from_file_location("firewall_rules", _PATH)
|
|
fr = importlib.util.module_from_spec(_spec)
|
|
_spec.loader.exec_module(fr)
|
|
|
|
ZONES = {"lan": "10.30.0.0/24", "srv": "10.20.0.0/24"}
|
|
HOSTVARS = {
|
|
"docker01": {"ansible_host": "10.20.0.50"},
|
|
"docker02": {"ansible_host": "10.20.0.51"},
|
|
}
|
|
GROUPS = {"docker_hosts": ["docker01", "docker02"]}
|
|
|
|
|
|
def test_zone_source():
|
|
cat = {"reverse_proxy": {"host": "docker01",
|
|
"ingress": [{"from": "lan", "port": 443, "proto": "tcp"}]}}
|
|
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
|
|
assert out == [{"proto": "tcp", "port": 443, "sources": ["10.30.0.0/24"]}]
|
|
|
|
|
|
def test_service_source_resolves_to_host_ip():
|
|
cat = {
|
|
"reverse_proxy": {"host": "docker01", "ingress": []},
|
|
"photoprism": {"host": "docker01",
|
|
"ingress": [{"from": "reverse_proxy", "port": 2342, "proto": "tcp"}]},
|
|
}
|
|
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
|
|
assert out == [{"proto": "tcp", "port": 2342, "sources": ["10.20.0.50/32"]}]
|
|
|
|
|
|
def test_group_placement_and_source_multi_host():
|
|
cat = {"dns": {"group": "docker_hosts",
|
|
"ingress": [{"from": "docker_hosts", "port": 53, "proto": "udp"}]}}
|
|
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
|
|
assert out == [{"proto": "udp", "port": 53,
|
|
"sources": ["10.20.0.50/32", "10.20.0.51/32"]}]
|
|
|
|
|
|
def test_host_with_no_services_returns_empty():
|
|
cat = {"photoprism": {"host": "docker02",
|
|
"ingress": [{"from": "lan", "port": 2342, "proto": "tcp"}]}}
|
|
assert fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) == []
|
|
|
|
|
|
def test_unresolvable_from_raises():
|
|
cat = {"x": {"host": "docker01",
|
|
"ingress": [{"from": "nope", "port": 80, "proto": "tcp"}]}}
|
|
with pytest.raises(ValueError):
|
|
fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
|
|
|
|
|
|
def test_duplicate_rules_deduped():
|
|
cat = {"app": {"host": "docker01", "ingress": [
|
|
{"from": "lan", "port": 8080, "proto": "tcp"},
|
|
{"from": "lan", "port": 8080, "proto": "tcp"},
|
|
]}}
|
|
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
|
|
assert out == [{"proto": "tcp", "port": 8080, "sources": ["10.30.0.0/24"]}]
|
|
|
|
|
|
def test_missing_ansible_host_raises():
|
|
cat = {"x": {"host": "docker01",
|
|
"ingress": [{"from": "docker02", "port": 80, "proto": "tcp"}]}}
|
|
with pytest.raises(ValueError):
|
|
fr.resolve_firewall_rules(cat, ZONES, "docker01", {"docker01": {}, "docker02": {}}, GROUPS)
|
|
|
|
|
|
def test_hosts_list_placement():
|
|
cat = {"svc": {"hosts": ["docker01", "docker02"],
|
|
"ingress": [{"from": "lan", "port": 9090, "proto": "tcp"}]}}
|
|
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
|
|
assert out == [{"proto": "tcp", "port": 9090, "sources": ["10.30.0.0/24"]}]
|
|
|
|
|
|
def test_proto_defaults_to_tcp():
|
|
cat = {"svc": {"host": "docker01", "ingress": [{"from": "lan", "port": 80}]}}
|
|
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
|
|
assert out == [{"proto": "tcp", "port": 80, "sources": ["10.30.0.0/24"]}]
|
|
|
|
|
|
def test_empty_group_source_raises():
|
|
cat = {"svc": {"host": "docker01",
|
|
"ingress": [{"from": "empty_grp", "port": 80, "proto": "tcp"}]}}
|
|
with pytest.raises(ValueError):
|
|
fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, {"empty_grp": []})
|
|
|
|
|
|
def test_ingress_missing_port_raises():
|
|
cat = {"svc": {"host": "docker01", "ingress": [{"from": "lan"}]}}
|
|
with pytest.raises(ValueError):
|
|
fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
|
|
|
|
|
|
def test_public_zone_resolves_to_anywhere():
|
|
catalog = {"web": {"host": "askari",
|
|
"ingress": [{"from": "public", "port": 443, "proto": "tcp"}]}}
|
|
zones = {"public": "0.0.0.0/0"}
|
|
rules = fr.resolve_firewall_rules(catalog, zones, "askari",
|
|
{"askari": {"ansible_host": "100.99.226.39"}}, {})
|
|
assert rules == [{"proto": "tcp", "port": 443, "sources": ["0.0.0.0/0"]}]
|