Commit 8b0bc194 authored by Petr Špaček's avatar Petr Špaček
Browse files

Merge branch 'rplint-refactor' into 'master'

`rplint` refactor

See merge request !120
parents 9d6cdec8 574e1407
Pipeline #38822 passed with stage
in 2 minutes and 35 seconds
......@@ -89,13 +89,26 @@ def scenarios(paths, configs):
return scenario_list
def rpls(paths):
for path in paths:
if os.path.isfile(path):
filelist = [path] # path to single file, accept it
else:
filelist = sorted(glob.glob(os.path.join(path, "*.rpl")))
return filelist
def pytest_addoption(parser):
parser.addoption("--config", action="append", help="path to Deckard configuration .yaml file")
parser.addoption("--scenarios", action="append", help="directory with .rpl files")
def pytest_generate_tests(metafunc):
"""This is pytest weirdness to parametrize the test over all the *.rpl files."""
"""This is pytest weirdness to parametrize the test over all the *.rpl files.
See https://docs.pytest.org/en/latest/parametrize.html#basic-pytest-generate-tests-example
for more info."""
if 'scenario' in metafunc.fixturenames:
if metafunc.config.option.config is None:
configs = []
......@@ -108,3 +121,6 @@ def pytest_generate_tests(metafunc):
paths = metafunc.config.option.scenarios
metafunc.parametrize("scenario", scenarios(paths, configs), ids=str)
if 'rpl_path' in metafunc.fixturenames:
paths = metafunc.config.option.scenarios
metafunc.parametrize("rpl_path", rpls(paths), ids=str)
......@@ -5,6 +5,7 @@ import glob
import itertools
import os
import sys
from typing import Any, Callable, Iterable, Iterator, Optional, List, Union, Set
import dns.name
......@@ -12,6 +13,8 @@ import pydnstest.augwrap
import pydnstest.matchpart
import pydnstest.scenario
Element = Union["Entry", "Step", pydnstest.scenario.Range]
RCODES = {"NOERROR", "FORMERR", "SERVFAIL", "NXDOMAIN", "NOTIMP", "REFUSED", "YXDOMAIN", "YXRRSET",
"NXRRSET", "NOTAUTH", "NOTZONE", "BADVERS", "BADSIG", "BADKEY", "BADTIME", "BADMODE",
"BADNAME", "BADALG", "BADTRUNC", "BADCOOKIE"}
......@@ -19,15 +22,24 @@ FLAGS = {"QR", "AA", "TC", "RD", "RA", "AD", "CD"}
SECTIONS = {"question", "answer", "authority", "additional"}
def get_line_number(file, char_number):
class RplintError(ValueError):
def __init__(self, fails):
msg = ""
for fail in fails:
msg += str(fail) + "\n"
super().__init__(msg)
def get_line_number(file: str, char_number: int) -> int:
pos = 0
for number, line in enumerate(open(file)):
pos += len(line)
if pos >= char_number:
return number + 2
return 0
def is_empty(iterable):
def is_empty(iterable: Iterator[Any]) -> bool:
try:
next(iterable)
except StopIteration:
......@@ -36,7 +48,7 @@ def is_empty(iterable):
class Entry:
def __init__(self, node):
def __init__(self, node: pydnstest.augwrap.AugeasNode) -> None:
self.match = {m.value for m in node.match("/match")}
self.adjust = {a.value for a in node.match("/adjust")}
self.authority = list(node.match("/section/authority/record"))
......@@ -46,17 +58,35 @@ class Entry:
class Step:
def __init__(self, node):
def __init__(self, node: pydnstest.augwrap.AugeasNode) -> None:
self.node = node
self.type = node["/type"].value
try:
self.entry = Entry(node["/entry"])
self.entry = Entry(node["/entry"]) # type: Optional[Entry]
except KeyError:
self.entry = None
class Test:
def __init__(self, path):
class RplintFail:
def __init__(self, test: "RplintTest",
element: Optional[Element] = None,
etc: str = "") -> None:
self.path = test.path
self.element = element # type: Optional[Element]
self.line = get_line_number(self.path, element.node.char if element is not None else 0)
self.etc = etc
self.check = None # type: Optional[Callable[[RplintTest], List[RplintFail]]]
def __str__(self):
if self.etc:
return "{}:{} {}: {} ({})".format(os.path.basename(self.path), self.line,
self.check.__name__, self.check.__doc__, self.etc)
return "{}:{} {}: {}".format(os.path.basename(self.path), self.line, self.check.__name__,
self.check.__doc__)
class RplintTest:
def __init__(self, path: str) -> None:
aug = pydnstest.augwrap.AugeasWrapper(confpath=os.path.realpath(path),
lens='Deckard',
loadpath=os.path.join(os.path.dirname(__file__),
......@@ -73,41 +103,61 @@ class Test:
self.ranges = [pydnstest.scenario.Range(n) for n in self.node.match("/scenario/range")]
self.checks = [entry_more_than_one_rcode, entry_no_qname_qtype_copy_query,
entry_ns_in_authority, range_overlapping_ips, range_shadowing_match_rules,
step_check_answer_no_match, step_query_match, step_section_unchecked,
step_unchecked_match, step_unchecked_rcode, test_ad_or_rrsig_no_ta,
test_timestamp, test_trust_anchor_trailing_period_missing,
step_duplicate_id]
def print_results(self):
failed = False
self.fails = None # type: Optional[List[RplintFail]]
self.checks = [
entry_more_than_one_rcode,
entry_no_qname_qtype_copy_query,
# Commented out for now until we implement selective turning off of checks
# entry_ns_in_authority,
range_overlapping_ips,
range_shadowing_match_rules,
step_check_answer_no_match,
step_query_match,
step_section_unchecked,
step_unchecked_match,
step_unchecked_rcode,
scenario_ad_or_rrsig_no_ta,
scenario_timestamp,
config_trust_anchor_trailing_period_missing,
step_duplicate_id,
]
def run_checks(self) -> bool:
"""returns True iff all tests passed"""
self.fails = []
for check in self.checks:
fails = check(self)
if fails and not failed:
print(self.path)
failed = True
for fail in fails:
pos = get_line_number(self.path, fail)
print("\t line " + str(pos), check.__doc__)
fail.check = check
self.fails += fails
if self.fails == []:
return True
return False
def test_trust_anchor_trailing_period_missing(test):
def print_fails(self) -> None:
if self.fails is None:
raise RuntimeError("Maybe you should run some test first…")
for fail in self.fails:
print(fail)
def config_trust_anchor_trailing_period_missing(test: RplintTest) -> List[RplintFail]:
"""Trust-anchor option in configuration contains domain without trailing period"""
for conf in test.config:
if conf[0] == "trust-anchor":
if conf[1].split()[0][-1] != ".":
return [0]
return [RplintFail(test, etc=conf[1])]
return []
def test_timestamp(test):
def scenario_timestamp(test: RplintTest) -> List[RplintFail]:
"""RRSSIG record present in test but no val-override-date or val-override-timestamp in config"""
rrsigs = []
for entry in test.entries:
for record in entry.records:
if record["/type"].value == "RRSIG":
rrsigs.append(record.char)
rrsigs.append(RplintFail(test, entry))
if rrsigs:
for k in test.config:
if k[0] == "val-override-date" or k[0] == "val-override-timestamp":
......@@ -115,46 +165,47 @@ def test_timestamp(test):
return rrsigs
def entry_no_qname_qtype_copy_query(test):
def entry_no_qname_qtype_copy_query(test: RplintTest) -> List[RplintFail]:
"""ENTRY without qname and qtype in MATCH and without copy_query in ADJUST"""
fails = []
for entry in test.range_entries:
if "qname" not in entry.match or "qtype" not in entry.match:
if "question" not in entry.match and ("qname" not in entry.match or
"qtype" not in entry.match):
if "copy_query" not in entry.adjust:
fails.append(entry.node.char)
fails.append(RplintFail(test, entry))
return fails
def entry_ns_in_authority(test):
def entry_ns_in_authority(test: RplintTest) -> List[RplintFail]:
"""ENTRY has authority section with NS records, consider using MATCH subdomain"""
fails = []
for entry in test.range_entries:
if entry.authority and "subdomain" not in entry.match:
for record in entry.authority:
if record["/type"].value == "NS":
fails.append(entry.node.char)
fails.append(RplintFail(test, entry))
return fails
def entry_more_than_one_rcode(test):
def entry_more_than_one_rcode(test: RplintTest) -> List[RplintFail]:
"""ENTRY has more than one rcode in MATCH"""
fails = []
for entry in test.entries:
if len(RCODES & entry.reply) > 1:
fails.append(entry.node.char)
fails.append(RplintFail(test, entry))
return fails
def test_ad_or_rrsig_no_ta(test):
def scenario_ad_or_rrsig_no_ta(test: RplintTest) -> List[RplintFail]:
"""AD or RRSIG present in test but no trust-anchor present in config"""
dnssec = []
for entry in test.entries:
if "AD" in entry.reply or "AD" in entry.match:
dnssec.append(entry.node.char)
dnssec.append(RplintFail(test, entry))
else:
for record in entry.records:
if record["/type"].value == "RRSIG":
dnssec.append(entry.node.char)
dnssec.append(RplintFail(test, entry))
if dnssec:
for k in test.config:
......@@ -163,93 +214,92 @@ def test_ad_or_rrsig_no_ta(test):
return dnssec
def step_query_match(test):
def step_query_match(test: RplintTest) -> List[RplintFail]:
"""STEP QUERY has a MATCH rule"""
return [step.node.char for step in test.steps if step.type == "QUERY" and step.entry.match]
return [RplintFail(test, step) for step in test.steps if step.type == "QUERY" and
step.entry and step.entry.match]
def step_check_answer_no_match(test):
def step_check_answer_no_match(test: RplintTest) -> List[RplintFail]:
"""ENTRY in STEP CHECK_ANSWER has no MATCH rule"""
return [step.entry.node.char for step in test.steps if step.type == "CHECK_ANSWER"
and not step.entry.match]
return [RplintFail(test, step) for step in test.steps if step.type == "CHECK_ANSWER" and
step.entry and not step.entry.match]
def step_unchecked_rcode(test):
def step_unchecked_rcode(test: RplintTest) -> List[RplintFail]:
"""ENTRY specifies rcode but STEP MATCH does not check for it."""
fails = []
for step in test.steps:
if step.type == "CHECK_ANSWER" and "all" not in step.entry.match:
if step.type == "CHECK_ANSWER" and step.entry and "all" not in step.entry.match:
if step.entry.reply & RCODES and "rcode" not in step.entry.match:
fails.append(step.entry.node.char)
fails.append(RplintFail(test, step.entry))
return fails
def step_unchecked_match(test):
def step_unchecked_match(test: RplintTest) -> List[RplintFail]:
"""ENTRY specifies flags but MATCH does not check for them"""
fails = []
for step in test.steps:
if step.type == "CHECK_ANSWER":
entry = step.entry
if "all" not in entry.match and entry.reply - RCODES and "flags" not in entry.match:
fails.append(entry.node.char)
if entry and "all" not in entry.match and entry.reply - RCODES and \
"flags" not in entry.match:
fails.append(RplintFail(test, entry, str(entry.reply - RCODES)))
return fails
def step_section_unchecked(test):
def step_section_unchecked(test: RplintTest) -> List[RplintFail]:
"""ENTRY has non-empty sections but MATCH does not check for all of them"""
fails = []
for step in test.steps:
if step.type == "CHECK_ANSWER" and "all" not in step.entry.match:
if step.type == "CHECK_ANSWER" and step.entry and "all" not in step.entry.match:
for section in SECTIONS:
if not is_empty(step.node.match("/entry/section/" + section + "/*")):
if section not in step.entry.match:
fails.append(step.entry.node.char)
fails.append(RplintFail(test, step.entry, section))
return fails
def range_overlapping_ips(test):
def range_overlapping_ips(test: RplintTest) -> List[RplintFail]:
"""RANGE has common IPs with some previous overlapping RANGE"""
fails = []
for r1, r2 in itertools.combinations(test.ranges, 2):
# If the ranges overlap
if min(r1.b, r2.b) >= max(r1.a, r2.a):
if r1.addresses & r2.addresses:
fails.append(r2.node.char)
info = "previous range on line %d" % get_line_number(test.path, r1.node.char)
fails.append(RplintFail(test, r2, info))
return fails
def range_shadowing_match_rules(test):
"""ENTRY has no effect since one of previous entries has broader match rules"""
def range_shadowing_match_rules(test: RplintTest) -> List[RplintFail]:
"""ENTRY has no effect since one of previous entries has the same or broader match rules"""
fails = []
for r in test.ranges:
for e1, e2 in itertools.combinations(r.stored, 2):
match1 = set(e1.match_fields)
match2 = set(e2.match_fields)
msg1 = e1.message
msg2 = e2.message
if match1 >= match2:
with suppress(pydnstest.matchpart.DataMismatch):
if pydnstest.matchpart.compare_rrs(msg1.question, msg2.question):
fails.append(e2.node.char)
if "subdomain" in match1:
if msg1.question[0].name.is_superdomain(msg2.question[0].name):
match1.discard("subdomain")
match2.discard("subdomain")
if match1 >= match2:
msg1.question[0].name = dns.name.Name("")
msg2.question[0].name = dns.name.Name("")
with suppress(pydnstest.matchpart.DataMismatch):
if pydnstest.matchpart.compare_rrs(msg1.question, msg2.question):
fails.append(e2.node.char)
try:
e1.match(e2.message)
# IndexError is here especially because of how we handle empty question section
# in matchpart
except (ValueError, IndexError):
pass
else:
info = "previous entry on line %d" % get_line_number(test.path, e1.node.char)
if e1.match_fields > e2.match_fields:
continue
fails.append(RplintFail(test, e2, info))
return fails
def step_duplicate_id(test):
def step_duplicate_id(test: RplintTest) -> List[RplintFail]:
"""STEP has the same ID as one of previous ones"""
fails = []
for step1, step2 in itertools.combinations(test.steps, 2):
if step1.node.value == step2.node.value:
fails.append(step2.node.char)
step_numbers = set() # type: Set[int]
for step in test.steps:
if step.node.value in step_numbers:
fails.append(RplintFail(test, step))
else:
step_numbers.add(step.node.value)
return fails
......@@ -258,12 +308,28 @@ def step_duplicate_id(test):
# if "copy_id" not in adjust:
# entry_error(test, entry, "copy_id should be in ADJUST")
def test_run_rplint(rpl_path: str) -> None:
t = RplintTest(rpl_path)
passed = t.run_checks()
if not passed:
raise RplintError(t.fails)
if __name__ == '__main__':
tests_path = sys.argv[1]
if tests_path.endswith(".rpl"):
t = Test(tests_path)
t.print_results()
else:
for file_path in sorted(glob.glob(os.path.join(tests_path, "*.rpl"))):
t = Test(file_path)
t.print_results()
try:
test_path = sys.argv[1]
except IndexError:
print("usage: %s <path to rpl file>" % sys.argv[0])
sys.exit(2)
if not os.path.isfile(test_path):
print("rplint.py works on single file only.")
print("Use rplint.sh with --scenarios=<directory with rpls> to run on rpls.")
sys.exit(2)
print("Linting %s" % test_path)
t = RplintTest(test_path)
passed = t.run_checks()
t.print_fails()
if passed:
sys.exit(0)
sys.exit(1)
#!/bin/bash
set -x
set -o errexit -o nounset
MAKEDIR="$(dirname "$0")"
python3 -m pytest -c "${MAKEDIR}/rplint_pytest.ini" ${TESTS:+"--scenarios=${TESTS}"} "$@"
[pytest]
log_print = False
python_files=rplint.py
norecursedirs=*
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment