Skip to content
Snippets Groups Projects
Commit 57d8e1d6 authored by Libor Peltan's avatar Libor Peltan Committed by Daniel Salzman
Browse files

tests-extra: improve test for dnstap and extend it with quic

parent 1a681a6c
No related branches found
No related tags found
No related merge requests found
......@@ -4,18 +4,123 @@
import os
import re
import dns
from dnstest.test import Test
from dnstest.module import ModDnstap
from dnstest.utils import *
t = Test()
# --- Simple DNSTAP file parser ---
def read_byte(f, left):
if left[0] < 1:
return 0
left[0] -= 1
return f.read(1)[0]
def read_be32(f, left):
if left[0] < 4:
return 0
left[0] -= 4
be = f.read(4)
return (be[0] << 24) + (be[1] << 16) + (be[2] << 8) + be[3]
def pb_varint(f, left):
x = 0
shift = 0
while True:
b = read_byte(f, left)
x += ((b & 0x7f) << shift)
if (b & 0x80) == 0:
return x
shift += 7
def pb_skip2field(f, left, field):
while left[0] > 0:
b = read_byte(f, left)
if (b >> 3) == field:
return ((b & 0x2) != 0)
if (b & 0x1) != 0:
valen = 4 if (b & 0x7) == 5 else 8
left[0] -= valen
_ = f.read(valen)
else:
val = pb_varint(f, left)
if (b & 0x2) != 0: # val is length
left[0] -= val
_ = f.read(val)
def sd_file(f, msg, field, subfield):
left = [ 2**62 ]
# skip intro frame
zero = read_be32(f, left)
if zero != 0:
return None
intro = read_be32(f, left)
_ = f.read(intro)
# skip to n-th msg
for imsg in range(msg):
skip = read_be32(f, left)
_ = f.read(skip)
left[0] = read_be32(f, left)
# now continue with protobuf format
haslen = pb_skip2field(f, left, field)
if subfield is not None:
if not haslen:
return None
left_prev = left[0]
left[0] = pb_varint(f, left)
if left_prev < left[0]:
return None
haslen = pb_skip2field(f, left, subfield)
val = pb_varint(f, left)
if haslen:
return f.read(val) if left[0] >= val else None
else:
return val
def simple_dnstap(file, msg, field, subfield=None):
with open(file, "rb") as f:
return sd_file(f, msg, field, subfield)
# --- Simple DNSTAP file parser end ---
def sink_contains(sink, qname):
try:
for msg in range(1000): # in practice, while(!exception)
query = simple_dnstap(sink, msg, 14, 10)
if query is None or query == 0:
continue
qr = dns.message.from_wire(query)
if str(qr.question[0].name) == str(qname):
return True
except Exception as e:
return False
return False
def sink_contains_proto(sink, proto):
try:
for msg in range(1000): # in practice, while(!exception)
prot = simple_dnstap(sink, msg, 14, 3)
if prot == proto:
return True
except Exception as e:
return False
return False
t = Test(quic=True, stress=False)
ModDnstap.check()
# Initialize server configuration
knot = t.server("knot")
slave = t.server("knot")
zone = t.zone("flags.")
t.link(zone, knot)
t.link(zone, knot, slave)
# Configure 'dnstap' module for all queries (default).
dflt_sink = t.out_dir + "/all.tap"
......@@ -26,10 +131,12 @@ knot.add_module(zone, ModDnstap(flags_sink))
t.start()
dflt_qname = "dnstap_default_test"
resp = knot.dig(dflt_qname + ".example", "NS")
flags_qname = "dnstap_flags_test"
resp = knot.dig(flags_qname + ".flags", "NS")
slave.zone_wait(zone)
dflt_qname = "dnstap_default_test" + ".example."
resp = knot.dig(dflt_qname, "NS")
flags_qname = "dnstap_flags_test" + ".flags."
resp = knot.dig(flags_qname, "NS")
knot.stop()
......@@ -37,19 +144,13 @@ knot.stop()
isset(os.path.isfile(dflt_sink), "default sink")
isset(os.path.isfile(flags_sink), "zone sink")
def sink_contains(sink, qname):
'''Checks the sink if contains QNAME'''
f = open(sink, "rb")
s = str(f.read())
f.close()
return s.find(qname) != -1
# Check sink contents.
isset(sink_contains(dflt_sink, flags_qname), "qname '%s' in '%s'" % (flags_qname, dflt_sink))
isset(sink_contains(dflt_sink, dflt_qname), "qname '%s' in '%s'" % (dflt_qname, dflt_sink))
isset(sink_contains(flags_sink, flags_qname), "qname '%s' in '%s'" % (flags_qname, flags_sink))
isset(not sink_contains(flags_sink, dflt_qname), "qname '%s' in '%s'" % (dflt_qname, flags_sink))
isset(sink_contains_proto(dflt_sink, 7), "QUIC in '%s'" % dflt_sink)
isset(sink_contains_proto(flags_sink, 7), "QUIC in '%s'" % flags_sink)
t.end()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment