Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
D
deckard
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
31
Issues
31
List
Boards
Labels
Service Desk
Milestones
Merge Requests
4
Merge Requests
4
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Incidents
Environments
Packages & Registries
Packages & Registries
Container Registry
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Knot projects
deckard
Commits
723c84b1
Commit
723c84b1
authored
Jun 15, 2017
by
Štěpán Balážik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fix to pep8 conventions
parent
7a86af1f
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
24 additions
and
36 deletions
+24
-36
deckard.py
deckard.py
+0
-1
pydnstest/augwrap.py
pydnstest/augwrap.py
+1
-5
pydnstest/scenario.py
pydnstest/scenario.py
+21
-28
pydnstest/testserver.py
pydnstest/testserver.py
+2
-2
No files found.
deckard.py
View file @
723c84b1
#!/usr/bin/env python
import
argparse
from
datetime
import
datetime
import
fileinput
import
logging
import
logging.config
import
os
...
...
pydnstest/augwrap.py
View file @
723c84b1
...
...
@@ -10,14 +10,13 @@ import sys
from
augeas
import
Augeas
#from IPython.core.debugger import Tracer
AUGEAS_LOAD_PATH
=
'/augeas/load/'
AUGEAS_FILES_PATH
=
'/files/'
AUGEAS_ERROR_PATH
=
'//error'
log
=
logging
.
getLogger
(
'augeas'
)
def
join
(
*
paths
):
"""
join two Augeas tree paths
...
...
@@ -67,10 +66,8 @@ class AugeasWrapper(object):
aug_load_path
=
join
(
AUGEAS_LOAD_PATH
,
lens
)
# /augeas/load/{lens}/lens = {lens}.lns
self
.
_aug
.
set
(
join
(
aug_load_path
,
'lens'
),
'%s.lns'
%
lens
)
#print(join(aug_load_path, 'lens'), '%s.lns' % lens)
# /augeas/load/{lens}/incl[0] = {confpath}
self
.
_aug
.
set
(
join
(
aug_load_path
,
'incl[0]'
),
confpath
)
#print(join(aug_load_path, 'incl[0]'), confpath)
self
.
_aug
.
load
()
errors
=
self
.
_aug
.
match
(
AUGEAS_ERROR_PATH
)
...
...
@@ -143,7 +140,6 @@ class AugeasNode(collections.MutableMapping):
self
.
_aug
=
aug
self
.
_path
=
path
@
property
def
path
(
self
):
"""canonical path in Augeas tree, read-only"""
...
...
pydnstest/scenario.py
View file @
723c84b1
...
...
@@ -208,7 +208,7 @@ class Entry:
self
.
message
.
use_edns
(
edns
=
0
,
payload
=
4096
)
self
.
fired
=
0
#RAW
#
RAW
try
:
self
.
raw_data
=
binascii
.
unhexlify
(
node
[
"/raw"
].
value
)
self
.
is_raw_data_entry
=
True
...
...
@@ -217,20 +217,18 @@ class Entry:
self
.
raw_data
=
None
self
.
is_raw_data_entry
=
False
#MATCH
#
MATCH
self
.
match_fields
=
[
m
.
value
for
m
in
node
.
match
(
"/match"
)]
if
not
self
.
match_fields
:
self
.
match_fields
=
[
'opcode'
,
'qtype'
,
'qname'
]
#print("match_fields", self.match_fields)
#FLAGS
#
FLAGS
self
.
fields
=
[
f
.
value
for
f
in
node
.
match
(
"/reply"
)]
flags
=
[]
rcode
=
dns
.
rcode
.
from_text
(
self
.
default_rc
)
for
code
in
self
.
fields
:
if
code
==
'DO'
:
self
.
message
.
want_dnssec
(
True
)
#print("dnssec")
continue
try
:
rcode
=
dns
.
rcode
.
from_text
(
code
)
...
...
@@ -239,33 +237,29 @@ class Entry:
self
.
message
.
flags
=
dns
.
flags
.
from_text
(
' '
.
join
(
flags
))
self
.
message
.
set_rcode
(
rcode
)
#ADJUST
#
ADJUST
self
.
adjust_fields
=
[
m
.
value
for
m
in
node
.
match
(
"/adjust"
)]
if
not
self
.
adjust_fields
:
self
.
adjust_fields
=
[
'copy_id'
]
#print("adjust", self.adjust_fields)
#MANDATORY
#
MANDATORY
try
:
mandatory
=
list
(
node
.
match
(
"/mandatory"
))[
0
].
value
self
.
mandatory
=
True
except
(
KeyError
,
IndexError
):
self
.
mandatory
=
False
#print("mandatory", self.mandatory)
#TSIG
#
TSIG
try
:
tsig
=
list
(
node
.
match
(
"/tsig"
))[
0
]
tsig_keyname
=
tsig
[
"/keyname"
].
value
tsig_secret
=
tsig
[
"/secret"
].
value
keyring
=
dns
.
tsigkeyring
.
from_text
({
tsig_keyname
:
tsig_secret
})
self
.
message
.
use_tsig
(
keyring
=
keyring
,
keyname
=
tsig_keyname
)
#print("tsig:", tsig_keyname, tsig_secret)
except
(
KeyError
,
IndexError
):
pass
#SECTIONS & RECORDS
# SECTIONS & RECORDS
self
.
sections
=
[]
for
section
in
node
.
match
(
"/section/*"
):
section_name
=
posixpath
.
basename
(
section
.
path
)
...
...
@@ -273,7 +267,7 @@ class Entry:
for
record
in
section
.
match
(
"/record"
):
owner
=
record
[
'/domain'
].
value
if
not
owner
.
endswith
(
"."
):
owner
+=
self
.
origin
owner
+=
self
.
origin
try
:
ttl
=
dns
.
ttl
.
from_text
(
record
[
'/ttl'
].
value
)
except
KeyError
:
...
...
@@ -281,7 +275,7 @@ class Entry:
try
:
rdclass
=
dns
.
rdataclass
.
from_text
(
record
[
'/class'
].
value
)
except
KeyError
:
cls
=
self
.
default_cls
rdclass
=
dns
.
rdataclass
.
from_text
(
self
.
default_cls
)
rdtype
=
dns
.
rdatatype
.
from_text
(
record
[
'/type'
].
value
)
rr
=
dns
.
rrset
.
from_text
(
owner
,
ttl
,
rdclass
,
rdtype
)
if
section_name
!=
"question"
:
...
...
@@ -302,9 +296,6 @@ class Entry:
self
.
__rr_add
(
self
.
message
.
authority
,
rr
)
elif
section_name
==
'additional'
:
self
.
__rr_add
(
self
.
message
.
additional
,
rr
)
#print(rr)
#self.message.id = 0
#print(self.message)
def
__str__
(
self
):
txt
=
'ENTRY_BEGIN
\n
'
...
...
@@ -406,7 +397,7 @@ class Entry:
self
.
match_part
(
code
,
msg
)
except
Exception
as
e
:
errstr
=
'%s in the response:
\n
%s'
%
(
str
(
e
),
msg
.
to_text
())
raise
Exception
(
"line %d,
\"
%s
\"
: %s"
%
(
42
,
code
,
errstr
))
#
TODO: cisla radku
raise
Exception
(
"line %d,
\"
%s
\"
: %s"
%
(
42
,
code
,
errstr
))
#
TODO: cisla radku
def
cmp_raw
(
self
,
raw_value
):
assert
self
.
is_raw_data_entry
...
...
@@ -577,7 +568,7 @@ class Step:
self
.
pause_if_fail
=
0
self
.
next_if_fail
=
-
1
# TODO Parser currently can't parse CHECK_ANSWER args, player does
not understand them anyways
# TODO Parser currently can't parse CHECK_ANSWER args, player does
n't understand them anyway
# if type == 'CHECK_ANSWER':
# for arg in extra_args:
# param = arg.split('=')
...
...
@@ -814,7 +805,7 @@ class Scenario:
for
rng
in
self
.
ranges
:
if
rng
.
eligible
(
current_step_id
,
address
):
self
.
current_range
=
rng
return
(
rng
.
reply
(
query
),
False
)
return
rng
.
reply
(
query
),
False
# Find any prescripted one-shot replies
for
step
in
self
.
steps
:
if
step
.
id
<
current_step_id
or
step
.
type
!=
'REPLY'
:
...
...
@@ -825,13 +816,13 @@ class Scenario:
candidate
.
match
(
query
)
step
.
data
.
remove
(
candidate
)
answer
=
candidate
.
adjust_reply
(
query
)
return
(
answer
,
False
)
return
answer
,
False
else
:
answer
=
candidate
.
raw_data
return
(
answer
,
True
)
return
answer
,
True
except
:
pass
return
(
None
,
True
)
return
None
,
True
def
play
(
self
,
paddr
):
""" Play given scenario. """
...
...
@@ -872,7 +863,8 @@ class Scenario:
for
r
in
self
.
ranges
:
for
e
in
r
.
stored
:
if
e
.
mandatory
is
True
and
e
.
fired
==
0
:
raise
Exception
(
'Mandatory section at line %d is not fired'
%
42
)
#TODO: cisla radku
# TODO: cisla radku
raise
Exception
(
'Mandatory section at line %d is not fired'
%
42
)
def
get_next
(
file_in
,
skip_empty
=
True
):
...
...
@@ -887,7 +879,7 @@ def get_next(file_in, skip_empty=True):
escaped
=
not
escaped
if
not
escaped
and
line
[
i
]
==
'"'
:
quoted
=
not
quoted
if
line
[
i
]
in
(
';'
)
and
not
quoted
:
if
line
[
i
]
in
';'
and
not
quoted
:
line
=
line
[
0
:
i
]
break
if
line
[
i
]
!=
'
\\
'
:
...
...
@@ -998,7 +990,8 @@ def parse_config(scn_cfg, qmin, installdir):
def
parse_file
(
path
):
""" Parse scenario from a file. """
aug
=
pydnstest
.
augwrap
.
AugeasWrapper
(
confpath
=
path
,
lens
=
'Deckard'
,
loadpath
=
os
.
path
.
dirname
(
__file__
))
aug
=
pydnstest
.
augwrap
.
AugeasWrapper
(
confpath
=
path
,
lens
=
'Deckard'
,
loadpath
=
os
.
path
.
dirname
(
__file__
))
node
=
aug
.
tree
try
:
config
=
[]
...
...
@@ -1015,4 +1008,4 @@ def parse_file(path):
# print(scenario)
return
scenario
,
config
except
Exception
as
e
:
raise
#
Exception('%s#%d: %s' % (file_in.filename(), file_in.lineno(), str(e)))
raise
#
Exception('%s#%d: %s' % (file_in.filename(), file_in.lineno(), str(e)))
pydnstest/testserver.py
View file @
723c84b1
...
...
@@ -117,7 +117,7 @@ class TestServer:
self
.
undefined_answers
=
0
with
self
.
active_lock
:
if
not
self
.
active
:
raise
Exception
(
self
.
scenario
.
file
+
"
[query_io] Test server not active"
)
raise
Exception
(
"
[query_io] Test server not active"
)
while
True
:
with
self
.
condition
:
self
.
condition
.
notify
()
...
...
@@ -221,7 +221,7 @@ def empty_test_case():
empty_test_path
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
+
"/empty.rpl"
test_config
=
{
'ROOT_ADDR'
:
'127.0.0.10'
,
'_SOCKET_FAMILY'
:
socket
.
AF_INET
}
return
scenario
.
parse_file
(
empty_test_path
)[
0
],
test_config
return
scenario
.
parse_file
(
empty_test_path
)[
0
],
test_config
def
standalone_self_test
():
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment