boma/tests/test_firewall_rules.py

109 lines
4.3 KiB
Python
Raw Normal View History

import importlib.util
import pathlib
import pytest
_PATH = (
pathlib.Path(__file__).resolve().parent.parent
/ "roles" / "base" / "filter_plugins" / "firewall_rules.py"
)
_spec = importlib.util.spec_from_file_location("firewall_rules", _PATH)
fr = importlib.util.module_from_spec(_spec)
_spec.loader.exec_module(fr)
ZONES = {"lan": "10.30.0.0/24", "srv": "10.20.0.0/24"}
HOSTVARS = {
"docker01": {"ansible_host": "10.20.0.50"},
"docker02": {"ansible_host": "10.20.0.51"},
}
GROUPS = {"docker_hosts": ["docker01", "docker02"]}
def test_zone_source():
cat = {"reverse_proxy": {"host": "docker01",
"ingress": [{"from": "lan", "port": 443, "proto": "tcp"}]}}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "tcp", "port": 443, "sources": ["10.30.0.0/24"]}]
def test_service_source_resolves_to_host_ip():
cat = {
"reverse_proxy": {"host": "docker01", "ingress": []},
"photoprism": {"host": "docker01",
"ingress": [{"from": "reverse_proxy", "port": 2342, "proto": "tcp"}]},
}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "tcp", "port": 2342, "sources": ["10.20.0.50/32"]}]
def test_group_placement_and_source_multi_host():
cat = {"dns": {"group": "docker_hosts",
"ingress": [{"from": "docker_hosts", "port": 53, "proto": "udp"}]}}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "udp", "port": 53,
"sources": ["10.20.0.50/32", "10.20.0.51/32"]}]
def test_host_with_no_services_returns_empty():
cat = {"photoprism": {"host": "docker02",
"ingress": [{"from": "lan", "port": 2342, "proto": "tcp"}]}}
assert fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) == []
def test_unresolvable_from_raises():
cat = {"x": {"host": "docker01",
"ingress": [{"from": "nope", "port": 80, "proto": "tcp"}]}}
with pytest.raises(ValueError):
fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
def test_duplicate_rules_deduped():
cat = {"app": {"host": "docker01", "ingress": [
{"from": "lan", "port": 8080, "proto": "tcp"},
{"from": "lan", "port": 8080, "proto": "tcp"},
]}}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "tcp", "port": 8080, "sources": ["10.30.0.0/24"]}]
def test_missing_ansible_host_raises():
cat = {"x": {"host": "docker01",
"ingress": [{"from": "docker02", "port": 80, "proto": "tcp"}]}}
with pytest.raises(ValueError):
fr.resolve_firewall_rules(cat, ZONES, "docker01", {"docker01": {}, "docker02": {}}, GROUPS)
def test_hosts_list_placement():
cat = {"svc": {"hosts": ["docker01", "docker02"],
"ingress": [{"from": "lan", "port": 9090, "proto": "tcp"}]}}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "tcp", "port": 9090, "sources": ["10.30.0.0/24"]}]
def test_proto_defaults_to_tcp():
cat = {"svc": {"host": "docker01", "ingress": [{"from": "lan", "port": 80}]}}
out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
assert out == [{"proto": "tcp", "port": 80, "sources": ["10.30.0.0/24"]}]
def test_empty_group_source_raises():
cat = {"svc": {"host": "docker01",
"ingress": [{"from": "empty_grp", "port": 80, "proto": "tcp"}]}}
with pytest.raises(ValueError):
fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, {"empty_grp": []})
def test_ingress_missing_port_raises():
cat = {"svc": {"host": "docker01", "ingress": [{"from": "lan"}]}}
with pytest.raises(ValueError):
fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS)
def test_public_zone_resolves_to_anywhere():
catalog = {"web": {"host": "askari",
"ingress": [{"from": "public", "port": 443, "proto": "tcp"}]}}
zones = {"public": "0.0.0.0/0"}
rules = fr.resolve_firewall_rules(catalog, zones, "askari",
{"askari": {"ansible_host": "100.99.226.39"}}, {})
assert rules == [{"proto": "tcp", "port": 443, "sources": ["0.0.0.0/0"]}]