Verified Commit 84a1cdb0 authored by Jan Miksik's avatar Jan Miksik
Browse files

fixup! fixup! fixup! fixup! fixup! tests/fwall:zones

parent f9ca1270
Pipeline #96938 passed with stages
in 1 minute and 2 seconds
import logging
from .. import cli
from .uci import UciShell
logger = logging.getLogger(__name__)
class Zone(UciShell):
"""Class to manage zone via uci."""
......@@ -21,7 +17,7 @@ class Zone(UciShell):
):
super().__init__(
shell_handler=shell_handler,
post=["/etc/init.d/network restart; /etc/init.d/dnsmasq restart"],
post=["/etc/init.d/network restart; /etc/init.d/dnsmasq restart; /etc/init.d/firewall reload"],
revert_on_exit=True,
)
self.name = name
......@@ -37,10 +33,12 @@ class Zone(UciShell):
def add_network(self, network):
self.networks.append(network)
self.add_list(f"{self.cfg_id}.networks", [network])
self.commit()
def del_network(self, network):
self.networks.remove(network)
self.del_list(f"{self.cfg_id}.networks", [network])
self.commit()
def update_zone(self):
"""Updates zone according to actual attributes"""
......@@ -54,6 +52,10 @@ class Zone(UciShell):
self.set(f"{self.cfg_id}.masq", "1")
self.commit()
def commit(self):
super().commit()
self._shell.run("/etc/init.d/firewall reload")
class Forwarding(UciShell):
def __init__(self, shell_handler, source, destination):
......@@ -63,6 +65,10 @@ class Forwarding(UciShell):
revert_on_exit=True,
)
self.add("firewall", "forwarding")
self.set(f"firewall.@forwarding[-1].src", source.name)
self.set(f"firewall.@forwarding[-1].dest", destination.name)
self.set("firewall.@forwarding[-1].src", source.name)
self.set("firewall.@forwarding[-1].dest", destination.name)
self.commit()
def commit(self):
super().commit()
self._shell.run("/etc/init.d/firewall reload")
......@@ -101,3 +101,4 @@ class Bridge(UciShell):
self._shell.run(f"ip link set br-{name} type bridge vlan_filtering 1")
for name in self._bridge_names:
self._shell.run(f"ip link set br-{name} type bridge vlan_filtering 0")
self.commit()
""" Tests of zone propagation and separation
This file contains zone tests. In main scope it is for INPUT, OUTPUT, FORWARDING (IOF) and zone forwarding.
The idea is that we have 6 preconfigured settings, which of 3 are set to the same value and three
combines all possible values, to throughtly check if the IOF values are properly set separately.
Main class contains tests for IOF and zone forwarding (also masquareding, but it needs traffic check)
There are generated two bridges using VLAN 42 and 73, which can contain up to 2 container as of now,
connected to DUT via lan0.XY and eth2.XY (now it is set to have 42 on lan0 and 73 on wan/eth2).
Zones are fixtures on function scope, see what is possible in `nsfarm/setup/firewall`
Zone
"""
import abc
import contextlib
import logging
import time
import typing
import pexpect
......@@ -113,23 +130,31 @@ def fixture_allow_port_8090(client_board):
uci.set("firewall.@rule[-1].dest_port", "8090")
uci.add_list("firewall.@rule[-1].proto", ["tcp", "udp", "icmp"])
uci.commit()
client_board.run("/etc/init.d/network restart; /etc/init.d/dnsmasq restart")
uci._shell.run("/etc/init.d/firewall reload")
yield cfg
def get_ip(target):
"""Returns IPv4 of target on lan port."""
return target.get_ip(["lan"], [4])
timeout = 5
start = time.time()
while not target.get_ip(["lan"], [4]):
time.sleep(0.5)
if time.time() - start > timeout:
raise RuntimeError("Cannot obtain IP of {target}")
return target.get_ip(["lan"], [4])[0].ip.exploded
class CommonIOF(abc.ABC):
"""CommonIOF abstract for tests
All attributes with prefix `res` are used in ts
Attributes with `do` triggers execution of specific tests.
"""
iof: tuple # configuration of INPUT, OUTPUT, FORWARDING
port: int = 8090
timeout: int = 30
results = {
"ACCEPT": 0,
......@@ -137,8 +162,8 @@ class CommonIOF(abc.ABC):
"DROP": 1,
}
do_zf: typing.Any = False
do_masq: typing.Any = False
do_zf: typing.Any = False # Runs tests_zone_forwarding
do_masq: typing.Any = False # Tuns tests_masqueradinf - Not Working yet
@pytest.fixture(name="zone_A", scope="function")
def fixture_zone_A(self, board_access_for_fixture):
......@@ -172,7 +197,7 @@ class CommonIOF(abc.ABC):
with Forwarding(board_access_for_fixture, zone_A, zone_B) as fwd:
yield fwd
def try_netcat(self, src_shell: cli.Shell, dst_shell: cli.Shell, target_ip: str):
def try_netcat(self, src_shell: cli.Shell, dst_shell: cli.Shell, target_ip: str, expected):
"""Tool to run netcat test
src_shell and dst_shell are due to inconsistency of parent objects (client_board, container)
......@@ -180,62 +205,76 @@ class CommonIOF(abc.ABC):
Returns:
0 on succesful connection
"TIMEOUT" on timeout
"Other??" TODO: Check what it does...
1 on reject
Other options are failures of different kind.
"""
dst_shell.command(f"nc -l -p {self.port}")
try:
ecode = src_shell.run(f"nc -vvz {target_ip} {self.port} -w 10", check=False)
except pexpect.exceptions.TIMEOUT:
# dropped packet
ecode = "TIMEOUT"
except Exception as err:
# rejected connection
ecode = "UNKNOWN ERROR", err
start = time.time()
if not isinstance(expected, (list, tuple)):
expected = [expected]
ecode = None
while ecode not in expected:
try:
ecode = src_shell.run(f"nc -vvz {target_ip} {self.port} -w 30", check=False)
except pexpect.exceptions.TIMEOUT:
# dropped packet
ecode = "TIMEOUT"
except Exception as err:
# rejected connection
ecode = "UNKNOWN ERROR", err
if time.time() - start > self.timeout:
import pdb
pdb.set_trace(header="TIMEOUT!!!")
break
dst_shell.ctrl_c()
return ecode
def test_input(self, cli42, zone_A, client_board):
ecode = self.try_netcat(cli42.shell, client_board, "192.168.42.1")
assert ecode == self.results[self.iof[0]], str(ecode)
ecode = self.try_netcat(cli42.shell, client_board, "192.168.42.1", self.results[self.iof[0]])
assert ecode == self.results[self.iof[0]], f"ecode = {ecode}, expected {self.iof[0]}"
def test_output(self, cli42, zone_A, client_board):
ecode = self.try_netcat(client_board, cli42.shell, get_ip(cli42))
assert ecode == self.results[self.iof[1]], str(ecode)
ecode = self.try_netcat(client_board, cli42.shell, get_ip(cli42), self.results[self.iof[1]])
assert ecode == self.results[self.iof[1]], f"ecode = {ecode}, expected {self.iof[1]}"
def test_forwarding(self, cli42, cli73, zone_AB):
ecode = self.try_netcat(cli42.shell, cli73.shell, get_ip(cli73))
assert ecode == self.results[self.iof[2]], str(ecode)
ecode = self.try_netcat(cli42.shell, cli73.shell, get_ip(cli73), self.results[self.iof[2]])
assert ecode == self.results[self.iof[2]], f"ecode = {ecode}, expected {self.iof[2]}"
def test_zone_forwarding(self, cli42, cli73, zone_A, zone_B, forwarding_AB):
"""This test should be always passing."""
if self.do_zf is None:
pytest.skip("Skipping - not expected to run.")
ecode = self.try_netcat(cli42.shell, cli73.shell, get_ip(cli73))
assert ecode == 0, str(ecode)
ecode = self.try_netcat(cli73.shell, cli42.shell, get_ip(cli42))
assert ecode != 0, str(ecode)
ecodeAB = self.try_netcat(cli42.shell, cli73.shell, get_ip(cli73), 0)
ecodeBA = self.try_netcat(cli73.shell, cli42.shell, get_ip(cli42), ["TIMEOUT", 1, 2])
assert ecodeAB == 0 and ecodeBA != 0, f"ecode1 = {ecodeAB}, ecode2 = {ecodeBA} (ecpected 0 and non 0)"
def test_zone_separation(self, cli42, cli73, zone_A, zone_B):
"""This connection should be always failing"""
if not self.do_zf:
pytest.skip("Skipping - not expected to run.")
ecode = self.try_netcat(cli42.shell, cli73.shell, get_ip(cli73))
assert ecode != 0, str(ecode)
ecode = self.try_netcat(cli73.shell, cli42.shell, get_ip(cli42))
assert ecode != 0, str(ecode)
ecodeAB = self.try_netcat(cli42.shell, cli73.shell, get_ip(cli73), ["TIMEOUT", 1, 2])
ecodeBA = self.try_netcat(cli73.shell, cli42.shell, get_ip(cli42), ["TIMEOUT", 1, 2])
assert ecodeAB != 0 and ecodeBA != 0, f"ecode1 = {ecodeAB}, ecode2 = {ecodeBA} (non 0 and non 0)"
@pytest.mark.skip("This test needs to be rewritten.")
def test_masqareding(self, cli42, cli73, zone_A, zone_B):
if not self.do_masq:
pytest.skip("Skipping - not expected to run.")
zone_B.masq = True
zone_B.update()
ecode = self.try_netcat(cli42.shell, cli73.shell, get_ip(cli73))
assert ecode != 0, str(ecode)
ecode = self.try_netcat(cli73.shell, cli42.shell, get_ip(cli42))
assert ecode != 0, str(ecode)
ecodeAB = self.try_netcat(cli42.shell, cli73.shell, get_ip(cli73), ["TIMEOUT", 1, 2])
ecodeBA = self.try_netcat(cli73.shell, cli42.shell, get_ip(cli42), ["TIMEOUT", 1, 2])
assert ecodeAB != 0 and ecodeBA != 0, f"ecode1 = {ecodeAB}, ecode2 = {ecodeBA} (non 0 and non 0)"
# this is probably missing proper masquarading check, that could be done using TCP dump.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment