import importlib.util import pathlib import pytest _PATH = ( pathlib.Path(__file__).resolve().parent.parent / "roles" / "base" / "filter_plugins" / "firewall_rules.py" ) _spec = importlib.util.spec_from_file_location("firewall_rules", _PATH) fr = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(fr) ZONES = {"lan": "10.30.0.0/24", "srv": "10.20.0.0/24"} HOSTVARS = { "docker01": {"ansible_host": "10.20.0.50"}, "docker02": {"ansible_host": "10.20.0.51"}, } GROUPS = {"docker_hosts": ["docker01", "docker02"]} def test_zone_source(): cat = {"reverse_proxy": {"host": "docker01", "ingress": [{"from": "lan", "port": 443, "proto": "tcp"}]}} out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) assert out == [{"proto": "tcp", "port": 443, "sources": ["10.30.0.0/24"]}] def test_service_source_resolves_to_host_ip(): cat = { "reverse_proxy": {"host": "docker01", "ingress": []}, "photoprism": {"host": "docker01", "ingress": [{"from": "reverse_proxy", "port": 2342, "proto": "tcp"}]}, } out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) assert out == [{"proto": "tcp", "port": 2342, "sources": ["10.20.0.50/32"]}] def test_group_placement_and_source_multi_host(): cat = {"dns": {"group": "docker_hosts", "ingress": [{"from": "docker_hosts", "port": 53, "proto": "udp"}]}} out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) assert out == [{"proto": "udp", "port": 53, "sources": ["10.20.0.50/32", "10.20.0.51/32"]}] def test_host_with_no_services_returns_empty(): cat = {"photoprism": {"host": "docker02", "ingress": [{"from": "lan", "port": 2342, "proto": "tcp"}]}} assert fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) == [] def test_unresolvable_from_raises(): cat = {"x": {"host": "docker01", "ingress": [{"from": "nope", "port": 80, "proto": "tcp"}]}} with pytest.raises(ValueError): fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) def test_duplicate_rules_deduped(): cat = {"app": {"host": "docker01", "ingress": [ {"from": "lan", "port": 8080, "proto": "tcp"}, {"from": "lan", "port": 8080, "proto": "tcp"}, ]}} out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) assert out == [{"proto": "tcp", "port": 8080, "sources": ["10.30.0.0/24"]}] def test_missing_ansible_host_raises(): cat = {"x": {"host": "docker01", "ingress": [{"from": "docker02", "port": 80, "proto": "tcp"}]}} with pytest.raises(ValueError): fr.resolve_firewall_rules(cat, ZONES, "docker01", {"docker01": {}, "docker02": {}}, GROUPS) def test_hosts_list_placement(): cat = {"svc": {"hosts": ["docker01", "docker02"], "ingress": [{"from": "lan", "port": 9090, "proto": "tcp"}]}} out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) assert out == [{"proto": "tcp", "port": 9090, "sources": ["10.30.0.0/24"]}] def test_proto_defaults_to_tcp(): cat = {"svc": {"host": "docker01", "ingress": [{"from": "lan", "port": 80}]}} out = fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) assert out == [{"proto": "tcp", "port": 80, "sources": ["10.30.0.0/24"]}] def test_empty_group_source_raises(): cat = {"svc": {"host": "docker01", "ingress": [{"from": "empty_grp", "port": 80, "proto": "tcp"}]}} with pytest.raises(ValueError): fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, {"empty_grp": []}) def test_ingress_missing_port_raises(): cat = {"svc": {"host": "docker01", "ingress": [{"from": "lan"}]}} with pytest.raises(ValueError): fr.resolve_firewall_rules(cat, ZONES, "docker01", HOSTVARS, GROUPS) def test_public_zone_resolves_to_anywhere(): catalog = {"web": {"host": "askari", "ingress": [{"from": "public", "port": 443, "proto": "tcp"}]}} zones = {"public": "0.0.0.0/0"} rules = fr.resolve_firewall_rules(catalog, zones, "askari", {"askari": {"ansible_host": "100.99.226.39"}}, {}) assert rules == [{"proto": "tcp", "port": 443, "sources": ["0.0.0.0/0"]}]