Skip to content
Snippets Groups Projects
Commit b89adfb3 authored by Daniel Salzman's avatar Daniel Salzman
Browse files

func-test: add RCODE and NSID tests + improvements

parent ea42cf17
No related branches found
No related tags found
No related merge requests found
......@@ -14,23 +14,23 @@ t.link(zone, knot)
t.start()
# RD flag preservation.
resp = knot.dig("flags", "NS", use_recursion=True)
resp.check(flags="QR AA RD", no_flags="TC RA AD CD")
resp = knot.dig("flags", "NS", recursion=True)
resp.check(flags="QR AA RD", noflags="TC RA AD CD")
# NS record for delegated subdomain (not authoritative).
resp = knot.dig("sub.flags", "NS")
resp.check(flags="QR", no_flags="AA TC RD RA AD CD")
resp.check(flags="QR", noflags="AA TC RD RA AD CD")
# Glue record for delegated subdomain (not authoritative).
resp = knot.dig("ns.sub.flags", "A")
resp.check(flags="QR", no_flags="AA TC RD RA AD CD")
resp.check(flags="QR", noflags="AA TC RD RA AD CD")
# TC bit - UDP.
resp = knot.dig("text.flags", "TXT", use_udp=True)
resp.check(flags="QR AA TC", no_flags="RD RA AD CD")
resp = knot.dig("text.flags", "TXT", udp=True)
resp.check(flags="QR AA TC", noflags="RD RA AD CD")
# No TC bit - TCP.
resp = knot.dig("text.flags", "TXT", use_udp=False)
resp.check(flags="QR AA", no_flags="TC RD RA AD CD")
resp = knot.dig("text.flags", "TXT", udp=False)
resp.check(flags="QR AA", noflags="TC RD RA AD CD")
t.stop()
#!/usr/bin/env python3
'''Test for response rcodes'''
import dnstest
t = dnstest.DnsTest()
server = t.server("knot")
zone = t.zone("example.com.")
t.link(zone, server)
t.start()
# No error.
resp = server.dig("example.com", "SOA")
resp.check(rcode="NOERROR")
# Not existent subdomain.
resp = server.dig("unknown.example.com", "SOA")
resp.check(rcode="NXDOMAIN")
# Not provided domain.
resp = server.dig("example.cz", "SOA")
resp.check(rcode="REFUSED")
t.stop()
#!/usr/bin/env python3
'''Test for EDNS0/NSID identification'''
import dnstest
import socket
t = dnstest.DnsTest()
name = "Knot DNS server"
hex_name = "0x01020304"
server1 = t.server("knot", nsid=name)
server2 = t.server("knot", nsid=True)
server3 = t.server("knot", nsid=False)
server4 = t.server("knot")
server5 = t.server("knot", nsid=hex_name)
zone = t.zone("example.com.")
t.link(zone, server1)
t.link(zone, server2)
t.link(zone, server3)
t.link(zone, server4)
t.link(zone, server5)
t.start()
# 1) Custom identification string.
resp = server1.dig("example.com", "SOA", nsid=True)
resp.check_edns(nsid=name)
# 2) FQDN hostname.
resp = server2.dig("example.com", "SOA", nsid=True)
resp.check_edns(nsid=socket.getfqdn())
# 3) Explicitly disabled.
resp = server3.dig("example.com", "SOA", nsid=True)
resp.check_edns()
# 4) Disabled.
resp = server4.dig("example.com", "SOA", nsid=True)
resp.check_edns()
# 5) Hex string.
resp = server5.dig("example.com", "SOA", nsid=True)
resp.check_edns(nsid=hex_name)
t.stop()
#!/usr/bin/env python3
import base64
import binascii
import re
import os
import random
......@@ -211,22 +212,22 @@ class Response(object):
compare(question.rdclass, self.rclass, "question.class")
compare(question.rdtype, self.rtype, "question.type")
def _check_flags(self, flags, no_flags):
def _check_flags(self, flags, noflags):
flag_names = flags.split()
for flag in flag_names:
flag_val = dns.flags.from_text(flag)
isset(self.resp.flags & flag_val, "%s flag" % flag)
flag_names = no_flags.split()
flag_names = noflags.split()
for flag in flag_names:
flag_val = dns.flags.from_text(flag)
isset(not(self.resp.flags & flag_val), "no %s flag" % flag)
def check(self, rdata=None, ttl=None, rcode="NOERROR", flags="", \
no_flags=""):
noflags=""):
'''Flags are text strings separated by whitespace character'''
self._check_flags(flags, no_flags)
self._check_flags(flags, noflags)
self._check_question()
# Check rcode.
......@@ -260,8 +261,20 @@ class Response(object):
err("RDATA (" + str(rdata) + ") not in answer section")
params.err = True
def check_edns(self, nsid="", buff_size=None):
pass
def check_edns(self, nsid=None, buff_size=None):
compare(self.resp.edns, 0, "EDNS version")
options = 1 if nsid != None else 0
compare(len(self.resp.options), options, "number of EDNS0 options")
if options > 0:
option = list(self.resp.options)[0]
compare(option.otype, dns.edns.NSID, "option type")
if nsid[:2] == "0x":
compare(binascii.hexlify(option.data).decode('ascii'), \
nsid[2:], "hex NSID")
else:
compare(option.data.decode('ascii'), nsid, "txt NSID")
class Update(object):
'''DNS update context'''
......@@ -474,22 +487,21 @@ class DnsServer(object):
f.write(self.get_config())
f.close
def dig(self, rname, rtype, rclass="IN", use_udp=None, serial=None, \
timeout=DIG_TIMEOUT, tries=3, use_recursion=False):
def dig(self, rname, rtype, rclass="IN", udp=None, serial=None, \
timeout=DIG_TIMEOUT, tries=3, recursion=False, buffsize=None, \
nsid=False, dnssec=False):
key_params = self.tsig.key_params if self.tsig else dict()
for t in range(tries):
try:
if rtype.upper() == "AXFR":
# Use TCP if not specified. UDP is for testing.
use_udp = use_udp if use_udp != None else False
# Always use TCP.
resp = dns.query.xfr(self.addr, rname, rtype, rclass, \
port=self.port, lifetime=timeout, \
use_udp=use_udp, **key_params)
use_udp=False, **key_params)
elif rtype.upper() == "IXFR":
# Use TCP if not specified.
use_udp = use_udp if use_udp != None else False
use_udp = udp if udp != None else False
resp = dns.query.xfr(self.addr, rname, rtype, rclass, \
port=self.port, lifetime=timeout, \
......@@ -497,16 +509,31 @@ class DnsServer(object):
**key_params)
else:
# Use TCP or UDP at random if not specified.
use_udp = use_udp if use_udp != None else \
use_udp = udp if udp != None else \
random.choice([True, False])
query = dns.message.make_query(rname, rtype, rclass)
if not use_recursion:
# Set query.
if not recursion:
# Remove RD bit which is a default.
query.flags &= ~dns.flags.RD
if use_udp:
if nsid or buffsize:
class NsidFix(object):
'''Current pythondns doesn't implement this'''
def __init__(self):
self.otype = dns.edns.NSID
def to_wire(self, file=None):
pass
payload = int(buffsize) if buffsize else 1280
options = [NsidFix()] if nsid else None
query.use_edns(edns=0, payload=payload, options=options)
if dnssec:
query.want_dnssec()
# Send query.
if udp:
resp = dns.query.udp(query, self.addr, port=self.port, \
timeout=timeout)
else:
......@@ -981,7 +1008,7 @@ class DnsTest(object):
else:
sign = True if dnssec else False
serial = random.randint(1, 4294967295)
items = records if not records else random.randint(1, 1000)
items = records if records else random.randint(1, 1000)
filename = self.zones_dir + name + ".rndzone"
try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment