diff --git a/ci/junit-compare.py b/ci/junit-compare.py index c939449421e1c3843613e53a1bf8962eecf1b7a1..94e2641d5c425a79005bc96d2bffc6e9c420840e 100755 --- a/ci/junit-compare.py +++ b/ci/junit-compare.py @@ -25,7 +25,7 @@ def parse_junit_xml(filename): new = sys.argv[1] old = sys.argv[2] -with open(sys.argv[3]) as f: +with open(sys.argv[3], encoding="utf-8") as f: modified_tests = [line.strip() for line in f.readlines()] test_diffs = parse_junit_xml(old) ^ parse_junit_xml(new) diff --git a/ci/pylint-run.sh b/ci/pylint-run.sh index 788a4f5a41a2a51dda6161c61480e4e75f977224..1fe1d2c9210d197dfd33e52736f1f0554da87487 100755 --- a/ci/pylint-run.sh +++ b/ci/pylint-run.sh @@ -4,7 +4,7 @@ source "$(dirname "$0")/common.sh" PYFILES=$(find . \ -path ./.git -prune -o \ - -path ./contrib -o \ + -path ./contrib -prune -o \ -type d -exec test -e '{}/__init__.py' \; -print -prune -o \ -name '*.py' -print -o \ -type f -exec grep -qsm1 '^#!.*\bpython' '{}' \; -print) diff --git a/conftest.py b/conftest.py index 8cf9004c6622291aaba1d00ce2b1cff8625ef9d0..9705ba5d06918877c293a617aba2c5a6ce8472ef 100644 --- a/conftest.py +++ b/conftest.py @@ -16,24 +16,27 @@ def config_sanity_check(config_dict, config_name): mandatory_keys = {'name', 'binary', 'templates', 'configs', 'additional'} for cfg in config_dict['programs']: missing_keys = mandatory_keys - set(cfg.keys()) - assert not missing_keys, 'Mandatory fields in configuration are missing: %s' % missing_keys + assert not missing_keys, f'Mandatory fields in configuration are missing: {missing_keys}' # sanity check templates vs. configs - assert len(cfg['templates']) == len(cfg['configs']),\ - ('Number of jinja2 template files is not equal ' - 'to number of config files to be generated for ' - 'program "%s" (%s), i.e. len(templates) != len(configs)' - % (cfg['name'], config_name)) + assert len(cfg['templates']) == len(cfg['configs']), \ + ( + 'Number of jinja2 template files is not equal ' + 'to number of config files to be generated for ' + f'program "{cfg["name"]}" ({config_name}), i.e. len(templates) != len(configs)' + ) for additional in cfg["additional"]: - assert isinstance(additional, str),\ - "All additional arguments in yaml should be strings. (%s, %s)"\ - % (cfg['name'], config_name) + assert isinstance(additional, str), \ + ( + "All additional arguments in yaml should be strings. " + f"({cfg['name']}, {config_name})" + ) def get_qmin_config(path): """Reads configuration from the *.rpl file and determines query-minimization setting.""" - with open(path) as f: + with open(path, encoding='utf-8') as f: for line in f: if re.search(r"^CONFIG_END", line) or re.search(r"^SCENARIO_BEGIN", line): return None @@ -47,13 +50,13 @@ def get_qmin_config(path): def scenarios(paths, configs): """Returns list of *.rpl files from given path and packs them with their minimization setting""" - assert len(paths) == len(configs),\ + assert len(paths) == len(configs), \ "Number of --config has to be equal to number of --scenarios arguments." scenario_list = [] for path, config in zip(paths, configs): - with open(config) as f: + with open(config, encoding='utf-8') as f: config_dict = yaml.load(f, yaml.SafeLoader) config_sanity_check(config_dict, config) @@ -63,7 +66,7 @@ def scenarios(paths, configs): filelist = sorted(glob.glob(os.path.join(path, "*.rpl"))) if not filelist: - raise ValueError('no *.rpl files found in path "{}"'.format(path)) + raise ValueError(f'no *.rpl files found in path "{path}"') for file in filelist: scenario_list.append(Scenario(file, get_qmin_config(file), config_dict)) @@ -123,4 +126,4 @@ def pytest_collection_modifyitems(items): def pytest_runtest_setup(item): # pylint: disable=unused-argument - LinuxNamespace("user").__enter__() + LinuxNamespace("user").__enter__() # pylint: disable=unnecessary-dunder-call diff --git a/deckard.py b/deckard.py index 45fd04517086ef70b4ef8592cc8982929244fdd3..eae9becfb02888acfc0d232b36a407f9c1037131 100755 --- a/deckard.py +++ b/deckard.py @@ -33,7 +33,7 @@ def setup_internal_addresses(context): def write_timestamp_file(path, tst): - with open(path, 'w') as time_file: + with open(path, 'w', encoding='utf-8') as time_file: time_file.write(datetime.fromtimestamp(tst).strftime('@%Y-%m-%d %H:%M:%S')) @@ -73,7 +73,7 @@ def create_trust_anchor_files(ta_files, work_dir): """ full_paths = [] for domain, ta_lines in ta_files.items(): - file_name = u'{}.key'.format(domain) + file_name = f'{domain}.key' full_path = os.path.realpath( os.path.join(work_dir, TRUST_ANCHOR_SUBDIR, file_name)) full_paths.append(full_path) @@ -83,8 +83,8 @@ def create_trust_anchor_files(ta_files, work_dir): except OSError as ex: if ex.errno != errno.EEXIST: raise - with open(full_path, "w") as ta_file: - ta_file.writelines('{0}\n'.format(line) for line in ta_lines) + with open(full_path, "w", encoding="utf-8") as ta_file: + ta_file.writelines(f'{line}\n' for line in ta_lines) return full_paths @@ -104,7 +104,8 @@ def generate_from_templates(program_config, context): for template_name, config_name in zip(template_ctx['templates'], template_ctx['configs']): j2template = j2template_env.get_template(template_name) cfg_rendered = j2template.render(template_ctx) - with open(os.path.join(template_ctx['WORKING_DIR'], config_name), 'w') as output: + config_path = os.path.join(template_ctx['WORKING_DIR'], config_name) + with open(config_path, 'w', encoding='utf-8') as output: output.write(cfg_rendered) @@ -118,24 +119,24 @@ def run_daemon(program_config): + [program_config['binary']] + program_config['additional'] ) - logging.getLogger('deckard.daemon.%s.argv' % name).debug('%s', program_config['args']) - with open(program_config['log'], 'w') as daemon_log_file: + logging.getLogger(f'deckard.daemon.{name}.argv').debug('%s', program_config['args']) + with open(program_config['log'], 'w', encoding='utf-8') as daemon_log_file: try: # pylint: disable=consider-using-with proc = subprocess.Popen(program_config['args'], stdout=daemon_log_file, stderr=subprocess.STDOUT, cwd=program_config['WORKING_DIR']) except subprocess.CalledProcessError: - logger = logging.getLogger('deckard.daemon_log.%s' % name) + logger = logging.getLogger(f'deckard.daemon_log.{name}') logger.exception("Can't start '%s'", program_config['args']) raise return proc def log_fatal_daemon_error(cfg, msg): - logger = logging.getLogger('deckard.daemon_log.%s' % cfg['name']) + logger = logging.getLogger(f'deckard.daemon_log.{cfg["name"]}') logger.critical(msg) logger.critical('logs are in "%s"', cfg['WORKING_DIR']) - with open(cfg['log']) as logfile: + with open(cfg['log'], encoding='utf-8') as logfile: logger.error('daemon log follows:') logger.error(logfile.read()) @@ -147,8 +148,9 @@ def conncheck_daemon(process, cfg, sockfamily): with sock: while True: # Check if the process is running - if process.poll() is not None: - msg = 'process died, exit code %s' % process.poll() + ecode = process.poll() + if ecode is not None: + msg = f'process died, exit code {ecode}' log_fatal_daemon_error(cfg, msg) raise subprocess.CalledProcessError(process.returncode, cfg['args'], msg) try: @@ -206,14 +208,14 @@ def run_testcase(case, daemons, context, prog_under_test_ip): for daemon in daemons: daemon['proc'].terminate() daemon['proc'].wait() - daemon_logger_log = logging.getLogger('deckard.daemon_log.%s' % daemon['cfg']['name']) - with open(daemon['cfg']['log']) as logf: + daemon_logger_log = logging.getLogger(f'deckard.daemon_log.{daemon["cfg"]["name"]}') + with open(daemon['cfg']['log'], encoding='utf-8') as logf: for line in logf: daemon_logger_log.debug(line.strip()) ignore_exit = daemon["cfg"].get('ignore_exit_code', False) if daemon['proc'].returncode != 0 and not ignore_exit: - raise ValueError('process %s terminated with return code %s' - % (daemon['cfg']['name'], daemon['proc'].returncode)) + raise ValueError(f"process {daemon['cfg']['name']} terminated " + f"with return code {daemon['proc'].returncode}") if server.undefined_answers > 0: raise ValueError('the scenario does not define all necessary answers (see error log)') diff --git a/deckard_pytest.py b/deckard_pytest.py index b7b7a7e66fcca424bed142f00390d3e720fbf8c4..efe1d9b498731e3e417ebead81c78714d97b5edc 100755 --- a/deckard_pytest.py +++ b/deckard_pytest.py @@ -89,7 +89,7 @@ class TCPDump: if "DECKARD_DIR" in os.environ: tmpdir = os.environ["DECKARD_DIR"] if os.path.lexists(tmpdir): - raise ValueError('DECKARD_DIR "%s" must not exist' % tmpdir) + raise ValueError(f'DECKARD_DIR "{tmpdir}" must not exist') else: tmpdir = tempfile.mkdtemp(suffix='', prefix='tmpdeckard') @@ -132,7 +132,7 @@ class TCPDump: if unknown_addresses: raise RuntimeError("Binary under test queried an IP address not present" - " in scenario %s" % unknown_addresses) + f" in scenario {unknown_addresses}") def run_test(path, qmin, config, max_retries, retries=0): diff --git a/pydnstest/augwrap.py b/pydnstest/augwrap.py index 8f89e0be304b678301e500ae4fdd2611ede5a9fb..d20122e6e018134f91fa13d59ea3a2790209423c 100644 --- a/pydnstest/augwrap.py +++ b/pydnstest/augwrap.py @@ -63,7 +63,7 @@ class AugeasWrapper: # /augeas/load/{lens} aug_load_path = join(AUGEAS_LOAD_PATH, lens) # /augeas/load/{lens}/lens = {lens}.lns - self._aug.set(join(aug_load_path, 'lens'), '%s.lns' % lens) + self._aug.set(join(aug_load_path, 'lens'), f'{lens}.lns') # /augeas/load/{lens}/incl[0] = {confpath} self._aug.set(join(aug_load_path, 'incl[0]'), confpath) self._aug.load() @@ -71,14 +71,14 @@ class AugeasWrapper: errors = self._aug.match(AUGEAS_ERROR_PATH) if errors: err_msg = '\n'.join( - ["{}: {}".format(e, self._aug.get(e)) for e in errors] + [f"{e}: {self._aug.get(e)}" for e in errors] ) raise RuntimeError(err_msg) path = join(AUGEAS_FILES_PATH, confpath) paths = self._aug.match(path) if len(paths) != 1: - raise ValueError('path %s did not match exactly once' % path) + raise ValueError(f'path {path} did not match exactly once') self.tree = AugeasNode(self._aug, path) self._loaded = True @@ -164,7 +164,7 @@ class AugeasNode(collections.abc.MutableMapping): @property def span(self): if self._span is None: - self._span = "char position %s" % self._aug.span(self._path)[5] + self._span = f"char position {self.char}" return self._span @property @@ -183,13 +183,13 @@ class AugeasNode(collections.abc.MutableMapping): def __getitem__(self, key): if isinstance(key, int): # int is a shortcut to write [int] - target_path = '%s[%s]' % (self._path, key) + target_path = f'{self._path}[{key}]' else: target_path = self._path + key log.debug('tree getitem: target_path %s', target_path) paths = self._aug.match(target_path) if len(paths) != 1: - raise KeyError('path %s did not match exactly once' % target_path) + raise KeyError(f'path {target_path} did not match exactly once') return AugeasNode(self._aug, target_path) def __delitem__(self, key): @@ -217,10 +217,10 @@ class AugeasNode(collections.abc.MutableMapping): def match(self, subpath): """Yield AugeasNodes matching given sub-expression.""" assert subpath.startswith("/") - match_path = "%s%s" % (self._path, subpath) + match_path = f"{self._path}{subpath}" log.debug('tree match %s: %s', match_path, self._path) for matched_path in self._aug.match(match_path): yield AugeasNode(self._aug, matched_path) def __repr__(self): - return 'AugeasNode(%s)' % self._path + return f'AugeasNode({self._path})' diff --git a/pydnstest/matchpart.py b/pydnstest/matchpart.py index 4a9d8a0084fd9ec77223e7a87c686ff3c7bdc8be..fe663e0cec5cdeaba9bc9ae746bcfff2efc1f0a0 100644 --- a/pydnstest/matchpart.py +++ b/pydnstest/matchpart.py @@ -24,9 +24,10 @@ class DataMismatch(Exception): return str(value) def __str__(self) -> str: - return 'expected "{}" got "{}"'.format( - self.format_value(self.exp_val), - self.format_value(self.got_val)) + return ( + f'expected "{self.format_value(self.exp_val)}" ' + f'got "{self.format_value(self.got_val)}"' + ) def __eq__(self, other): return (isinstance(other, DataMismatch) @@ -82,7 +83,7 @@ def compare_rrs_types(exp_val, got_val, skip_rrsigs): if not rrsig: return dns.rdatatype.to_text(rrtype) else: - return 'RRSIG(%s)' % dns.rdatatype.to_text(rrtype) + return f'RRSIG({dns.rdatatype.to_text(rrtype)})' if skip_rrsigs: exp_val = (rrset for rrset in exp_val @@ -235,4 +236,4 @@ def match_part(exp, got, code): try: return MATCH[code](exp, got) except KeyError as ex: - raise NotImplementedError('unknown match request "%s"' % code) from ex + raise NotImplementedError(f'unknown match request "{code}"') from ex diff --git a/pydnstest/mock_client.py b/pydnstest/mock_client.py index 6089a21c3f26637396c6df406e44bd830f18bb45..9db8eb72e746f566b5f84d2d9a2e45e4dc6e053c 100644 --- a/pydnstest/mock_client.py +++ b/pydnstest/mock_client.py @@ -59,7 +59,7 @@ def recvfrom_blob(sock: socket.socket, data = recv_n_bytes_from_tcp(sock, msg_len, deadline) addr = sock.getpeername()[0] else: - raise NotImplementedError("[recvfrom_blob]: unknown socket type '%i'" % sock.type) + raise NotImplementedError(f"[recvfrom_blob]: unknown socket type '{sock.type}'") return data, addr except socket.timeout as ex: raise RuntimeError("Server took too long to respond") from ex @@ -89,7 +89,7 @@ def sendto_msg(sock: socket.socket, message: bytes, addr: Optional[str] = None) data = struct.pack("!H", len(message)) + message sock.sendall(data) else: - raise NotImplementedError("[sendto_msg]: unknown socket type '%i'" % sock.type) + raise NotImplementedError(f"[sendto_msg]: unknown socket type '{sock.type}'") except OSError as ex: # Reference: http://lkml.iu.edu/hypermail/linux/kernel/0002.3/0709.html if ex.errno != errno.ECONNREFUSED: @@ -99,7 +99,7 @@ def sendto_msg(sock: socket.socket, message: bytes, addr: Optional[str] = None) def setup_socket(address: str, port: int, tcp: bool = False, - src_address: str = None) -> socket.socket: + src_address: Optional[str] = None) -> socket.socket: family = dns.inet.af_for_address(address) sock = socket.socket(family, socket.SOCK_STREAM if tcp else socket.SOCK_DGRAM) if tcp: diff --git a/pydnstest/scenario.py b/pydnstest/scenario.py index 9ef318cfa36eb2ffa89f48877915de16d58ce598..7ef9e9456c6c3b56db4922e139875c6afd4fbde4 100644 --- a/pydnstest/scenario.py +++ b/pydnstest/scenario.py @@ -75,7 +75,7 @@ class DNSReply(DNSMessage): answer = dns.message.from_wire(self.message.to_wire(), xfr=self.message.xfr, one_rr_per_rrset=True) - answer.use_edns(query.edns, query.ednsflags, options=self.message.options) + answer.use_edns(query.edns, query.ednsflags, options=list(self.message.options)) if copy_id: answer.id = query.id # Copy letter-case if the template has QD @@ -248,7 +248,7 @@ class Entry: rd = record['/data'].value.split() if rd: if rdtype == dns.rdatatype.DS: - rd[1] = '{}'.format(dns.dnssec.algorithm_from_text(rd[1])) + rd[1] = f'{dns.dnssec.algorithm_from_text(rd[1])}' rd = dns.rdata.from_text(rr.rdclass, rr.rdtype, ' '.join( rd), origin=dns.name.from_text(self.origin), relativize=False) rr.add(rd) @@ -266,22 +266,30 @@ class Entry: def __str__(self): txt = 'ENTRY_BEGIN\n' + if self.raw_data is None: - txt += 'MATCH {0}\n'.format(' '.join(self.match_fields)) - txt += 'ADJUST {0}\n'.format(' '.join(self.adjust_fields)) - txt += 'REPLY {rcode} {flags}\n'.format( - rcode=dns.rcode.to_text(self.message.rcode()), - flags=' '.join([dns.flags.to_text(self.message.flags), - dns.flags.edns_to_text(self.message.ednsflags)]) - ) + match_fields = ' '.join(self.match_fields) + txt += f'MATCH {match_fields}\n' + + adjust_fields = ' '.join(self.adjust_fields) + txt += f'ADJUST {adjust_fields}\n' + + rcode = dns.rcode.to_text(self.message.rcode()) + flags = ' '.join([ + dns.flags.to_text(self.message.flags), + dns.flags.edns_to_text(self.message.ednsflags) + ]) + txt += f'REPLY {rcode} {flags}\n' + for sect_name in ['question', 'answer', 'authority', 'additional']: sect = getattr(self.message, sect_name) if not sect: continue - txt += 'SECTION {n}\n'.format(n=sect_name.upper()) + txt += f'SECTION {sect_name.upper()}\n' for rr in sect: txt += str(rr) txt += '\n' + if self.raw_data is not None: txt += 'RAW\n' if self.raw_data: @@ -289,6 +297,7 @@ class Entry: else: txt += 'NULL' txt += '\n' + txt += 'ENTRY_END\n' return txt @@ -346,9 +355,9 @@ class Entry: try: pydnstest.matchpart.match_part(self.message, msg, code) except pydnstest.matchpart.DataMismatch as ex: - errstr = '%s in the response:\n%s' % (str(ex), msg.to_text()) + errstr = f'{ex} in the response:\n{msg.to_text()}' # TODO: cisla radku - raise ValueError("%s, \"%s\": %s" % (self.node.span, code, errstr)) from None + raise ValueError(f"{self.node.span}, \"{code}\": {errstr}") from None def cmp_raw(self, raw_value): assert self.raw_data is not None @@ -359,7 +368,7 @@ class Entry: if raw_value is not None: got = binascii.hexlify(raw_value) if expected != got: - raise ValueError("raw message comparsion failed: expected %s got %s" % (expected, got)) + raise ValueError(f"raw message comparsion failed: expected {expected} got {got}") def reply(self, query) -> Optional[DNSBlob]: if 'do_not_answer' in self.adjust_fields: @@ -430,9 +439,9 @@ class Range: self.a, self.b, self.addresses, self.received, self.sent) def __str__(self): - txt = '\nRANGE_BEGIN {a} {b}\n'.format(a=self.a, b=self.b) + txt = f'\nRANGE_BEGIN {self.a} {self.b}\n' for addr in self.addresses: - txt += ' ADDRESS {0}\n'.format(addr) + txt += f' ADDRESS {addr}\n' for entry in self.stored: txt += '\n' @@ -472,7 +481,7 @@ class StepLogger(logging.LoggerAdapter): # pylint: disable=too-few-public-metho Prepent Step identification before each log message. """ def process(self, msg, kwargs): - return '[STEP %s %s] %s' % (self.extra['id'], self.extra['type'], msg), kwargs + return f'[STEP {self.extra["id"]} {self.extra["type"]}] {msg}', kwargs class Step: @@ -503,13 +512,13 @@ class Step: self.next_if_fail = -1 def __str__(self): - txt = '\nSTEP {i} {t}'.format(i=self.id, t=self.type) + txt = f'\nSTEP {self.id} {self.type}' if self.repeat_if_fail: - txt += ' REPEAT {v}'.format(v=self.repeat_if_fail) + txt += f' REPEAT {self.repeat_if_fail}' elif self.pause_if_fail: - txt += ' PAUSE {v}'.format(v=self.pause_if_fail) + txt += f' PAUSE {self.pause_if_fail}' elif self.next_if_fail != -1: - txt += ' NEXT {v}'.format(v=self.next_if_fail) + txt += f' NEXT {self.next_if_fail}' # if self.args: # txt += ' ' # txt += ' '.join(self.args) @@ -532,17 +541,17 @@ class Step: elif self.type == 'CHECK_OUT_QUERY': # ignore self.log.info('') return None - elif self.type == 'CHECK_ANSWER' or self.type == 'ANSWER': + elif self.type in ('CHECK_ANSWER', 'ANSWER'): self.log.info('') return self.__check_answer(ctx) elif self.type == 'TIME_PASSES ELAPSE': self.log.info('') return self.__time_passes() - elif self.type == 'REPLY' or self.type == 'MOCK': + elif self.type in ('REPLY', 'MOCK'): self.log.info('') return None else: - raise NotImplementedError('step %03d type %s unsupported' % (self.id, self.type)) + raise NotImplementedError(f'step {self.id:03} type {self.type} unsupported') def __check_answer(self, ctx): """ Compare answer from previously resolved query. """ @@ -574,7 +583,7 @@ class Step: if choice is None or not choice: choice = list(ctx.client.keys())[0] if choice not in ctx.client: - raise ValueError('step %03d invalid QUERY target: %s' % (self.id, choice)) + raise ValueError(f'step {self.id:03} invalid QUERY target: {choice}') tstart = datetime.now() @@ -608,11 +617,11 @@ class Step: """ Modify system time. """ file_old = os.environ["FAKETIME_TIMESTAMP_FILE"] file_next = os.environ["FAKETIME_TIMESTAMP_FILE"] + ".next" - with open(file_old, 'r') as time_file: + with open(file_old, 'r', encoding='utf-8') as time_file: line = time_file.readline().strip() t = time.mktime(datetime.strptime(line, '@%Y-%m-%d %H:%M:%S').timetuple()) t += self.delay - with open(file_next, 'w') as time_file: + with open(file_next, 'w', encoding='utf-8') as time_file: time_file.write(datetime.fromtimestamp(t).strftime('@%Y-%m-%d %H:%M:%S') + "\n") time_file.flush() os.replace(file_next, file_old) @@ -636,7 +645,7 @@ class Scenario: def __str__(self): txt = 'SCENARIO_BEGIN' if self.info: - txt += ' {0}'.format(self.info) + txt += f' {self.info}' txt += '\n' for range_ in self.ranges: txt += str(range_) @@ -696,24 +705,28 @@ class Scenario: next_steps = [j for j in range(len(self.steps)) if self.steps[ j].id == step.next_if_fail] if not next_steps: - raise ValueError('step %d: wrong NEXT value "%d"' % - (step.id, step.next_if_fail)) from ex + raise ValueError( + f'step {step.id}: ' + f'wrong NEXT value "{step.next_if_fail}"' + ) from ex next_step = next_steps[0] if next_step < len(self.steps): i = next_step else: - raise ValueError('step %d: Can''t branch to NEXT value "%d"' % - (step.id, step.next_if_fail)) from ex + raise ValueError( + f'step {step.id}: ' + f'Can\'t branch to NEXT value "{step.next_if_fail}"' + ) from ex continue ex_details = ex if self.log.isEnabledFor(logging.DEBUG) else None - raise ValueError('%s step %d %s' % (self.file, step.id, str(ex))) from ex_details + raise ValueError(f'{self.file} step {step.id} {ex}') from ex_details i += 1 for r in self.ranges: for e in r.stored: if e.mandatory and e.fired == 0: # TODO: cisla radku - raise ValueError('Mandatory section at %s not fired' % e.mandatory.span) + raise ValueError(f'Mandatory section at {e.mandatory.span} not fired') def get_next(file_in, skip_empty=True): @@ -794,8 +807,8 @@ def parse_config(scn_cfg, qmin, installdir): # FIXME: pylint: disable=too-many- ovr_hr = override_date_str[8:10] ovr_min = override_date_str[10:12] ovr_sec = override_date_str[12:] - override_date_str_arg = '{0} {1} {2} {3} {4} {5}'.format( - ovr_yr, ovr_mnt, ovr_day, ovr_hr, ovr_min, ovr_sec) + override_date_str_arg = \ + f'{ovr_yr} {ovr_mnt} {ovr_day} {ovr_hr} {ovr_min} {ovr_sec}' override_date = time.strptime(override_date_str_arg, "%Y %m %d %H %M %S") override_timestamp = calendar.timegm(override_date) elif k == 'stub-addr': @@ -815,8 +828,7 @@ def parse_config(scn_cfg, qmin, installdir): # FIXME: pylint: disable=too-many- f_value = "" features[f_key] = f_value except KeyError as ex: - raise KeyError("can't parse features (%s) in config section (%s)" - % (v, str(ex))) from ex + raise KeyError(f"can't parse features ({v}) in config section ({ex})") from ex elif k == 'feature-list': try: f_key, f_value = [x.strip() for x in v.split(feature_pair_delimiter, 1)] @@ -825,8 +837,7 @@ def parse_config(scn_cfg, qmin, installdir): # FIXME: pylint: disable=too-many- f_value = f_value.replace("{{INSTALL_DIR}}", installdir) features[f_key].append(f_value) except KeyError as ex: - raise KeyError("can't parse feature-list (%s) in config section (%s)" - % (v, str(ex))) from ex + raise KeyError(f"can't parse feature-list ({v}) in config section ({ex})") from ex elif k == 'force-ipv6' and v.upper() == 'TRUE': sockfamily = socket.AF_INET6 elif k == 'forward-addr': # currently forwards everything @@ -836,7 +847,7 @@ def parse_config(scn_cfg, qmin, installdir): # FIXME: pylint: disable=too-many- elif k == 'do-ip6': do_ip6 = str2bool(v) else: - raise NotImplementedError('unsupported CONFIG key "%s"' % k) + raise NotImplementedError(f'unsupported CONFIG key "{k}"') ctx = { "DO_NOT_QUERY_LOCALHOST": str(do_not_query_localhost).lower(), diff --git a/pydnstest/tests/test_parse_config.py b/pydnstest/tests/test_parse_config.py index d8cdea1697f4b366d432a0cf08f7a33c72614bac..44fa64d6a939ae6d93b60756eb06319c937dca5a 100644 --- a/pydnstest/tests/test_parse_config.py +++ b/pydnstest/tests/test_parse_config.py @@ -6,12 +6,12 @@ from pydnstest.scenario import parse_config def test_parse_config__trust_anchor(): """Checks if trust-anchors are separated into files according to domain.""" - anchor1 = u'domain1.com.\t3600\tIN\tDS\t11901 7 1 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' - anchor2 = u'domain2.net.\t3600\tIN\tDS\t59835 7 1 cccccccccccccccccccccccccccccccccccccccc' - anchor3 = u'domain1.com.\t3600\tIN\tDS\t11902 7 1 1111111111111111111111111111111111111111' - anchors = [[u'trust-anchor', u'"{}"'.format(anchor1)], - [u'trust-anchor', u'"{}"'.format(anchor2)], - [u'trust-anchor', u'"{}"'.format(anchor3)]] + anchor1 = 'domain1.com.\t3600\tIN\tDS\t11901 7 1 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' + anchor2 = 'domain2.net.\t3600\tIN\tDS\t59835 7 1 cccccccccccccccccccccccccccccccccccccccc' + anchor3 = 'domain1.com.\t3600\tIN\tDS\t11902 7 1 1111111111111111111111111111111111111111' + anchors = [['trust-anchor', f'"{anchor1}"'], + ['trust-anchor', f'"{anchor2}"'], + ['trust-anchor', f'"{anchor3}"']] args = (anchors, True, os.getcwd()) ta_files = parse_config(*args)["TRUST_ANCHOR_FILES"] assert sorted(ta_files.values()) == sorted([[anchor1, anchor3], [anchor2]]) diff --git a/pydnstest/tests/test_scenario.py b/pydnstest/tests/test_scenario.py index 454cb5c7a5f069d3dc9c51d2ddf461c141603aad..bbdcb8ccb4de660f5ae1bd40c808ad97a31607d9 100644 --- a/pydnstest/tests/test_scenario.py +++ b/pydnstest/tests/test_scenario.py @@ -16,7 +16,7 @@ def test_entry__get_flags(): for flag in RCODE_FLAGS + OPCODE_FLAGS: rcode_flags = Entry.get_flags(FLAGS + [flag]) assert rcode_flags == expected_flags, \ - 'Entry._get_flags does not filter out "{flag}"'.format(flag=flag) + f'Entry._get_flags does not filter out "{flag}"' def test_entry__get_rcode(): @@ -33,8 +33,7 @@ def test_entry__get_rcode(): for rcode in RCODE_FLAGS: given_rcode = Entry.get_rcode(FLAGS + OPCODE_FLAGS + [rcode]) - assert given_rcode is not None, 'Entry.get_rcode does not recognize {rcode}'.format( - rcode=rcode) + assert given_rcode is not None, f'Entry.get_rcode does not recognize {rcode}' def test_entry__get_opcode(): @@ -51,5 +50,4 @@ def test_entry__get_opcode(): for opcode in OPCODE_FLAGS: given_rcode = Entry.get_opcode(FLAGS + RCODE_FLAGS + [opcode]) - assert given_rcode is not None, 'Entry.get_opcode does not recognize {opcode}'.format( - opcode=opcode) + assert given_rcode is not None, f'Entry.get_opcode does not recognize {opcode}' diff --git a/pydnstest/testserver.py b/pydnstest/testserver.py index 7fa07288bed341ecf6014524b67baa350c521a2e..760480504bc49b839e3486d458eb923fe9d8f713 100644 --- a/pydnstest/testserver.py +++ b/pydnstest/testserver.py @@ -17,6 +17,14 @@ from pydnstest import scenario, mock_client from networking import InterfaceManager +class TestServerError(Exception): + pass + + +class QueryIoError(Exception): + pass + + class TestServer: """ This simulates UDP DNS server returning scripted or mirror DNS responses. """ @@ -52,7 +60,7 @@ class TestServer: """ Synchronous start """ with self.active_lock: if self.active: - raise Exception('TestServer already started') + raise TestServerError('TestServer already started') with self.active_lock: self.active = True @@ -126,7 +134,7 @@ class TestServer: self.undefined_answers = 0 with self.active_lock: if not self.active: - raise Exception("[query_io] Test server not active") + raise QueryIoError("Test server not active") while True: with self.condition: self.condition.notify() @@ -152,12 +160,11 @@ class TestServer: sock.close() self.connections.remove(sock) else: - raise Exception( - "[query_io] Socket IO internal error {}, exit" - .format(sock.getsockname())) + raise QueryIoError( + f"[query_io] Socket IO internal error {sock.getsockname()}, exit" + ) else: - raise Exception("[query_io] Socket IO error {}, exit" - .format(sock.getsockname())) + raise QueryIoError(f"[query_io] Socket IO error {sock.getsockname()}, exit") def start_srv(self, address, family, proto=socket.IPPROTO_UDP): """ Starts listening thread if necessary """ @@ -168,17 +175,16 @@ class TestServer: assert proto if family == socket.AF_INET6: if not socket.has_ipv6: - raise NotImplementedError("[start_srv] IPv6 is not supported by socket {0}" - .format(socket)) + raise NotImplementedError(f"[start_srv] IPv6 is not supported by socket {socket}") elif family != socket.AF_INET: - raise NotImplementedError("[start_srv] unsupported protocol family {0}".format(family)) + raise NotImplementedError(f"[start_srv] unsupported protocol family {family}") if proto == socket.IPPROTO_TCP: socktype = socket.SOCK_STREAM elif proto == socket.IPPROTO_UDP: socktype = socket.SOCK_DGRAM else: - raise NotImplementedError("[start_srv] unsupported protocol {0}".format(proto)) + raise NotImplementedError(f"[start_srv] unsupported protocol {proto}") if self.thread is None: self.thread = threading.Thread(target=self.query_io) @@ -290,7 +296,7 @@ def standalone_self_test(): if step.id == args.step: test_scenario.current_step = step if not test_scenario.current_step: - raise ValueError('step ID %s not found in scenario' % args.step) + raise ValueError(f'step ID {args.step} not found in scenario') else: test_scenario.current_step = test_scenario.steps[0] diff --git a/pylintrc b/pylintrc index 59eb6161341d8f3ca9f3c14b3aafabc63b8099c8..57349e3908be806391b606c4c35d2f624e2eff42 100644 --- a/pylintrc +++ b/pylintrc @@ -13,7 +13,6 @@ disable= invalid-name, global-statement, no-else-return, - bad-continuation, duplicate-code, diff --git a/rplint.py b/rplint.py index 558045e2edfa6803f7bb1dbbf1bbbdccdc568087..eb5c65c420c1113e54f811617258b37f7d487b9f 100755 --- a/rplint.py +++ b/rplint.py @@ -28,7 +28,7 @@ class RplintError(ValueError): def get_line_number(file: str, char_number: int) -> int: pos = 0 - with open(file) as f: + with open(file, encoding='utf-8') as f: for number, line in enumerate(f): pos += len(line) if pos >= char_number: @@ -77,11 +77,16 @@ class RplintFail: self.check = None # type: Optional[Callable[[RplintTest], List[RplintFail]]] def __str__(self): + base_path = os.path.basename(self.path) if self.etc: - return "{}:{} {}: {} ({})".format(os.path.basename(self.path), self.line, - self.check.__name__, self.check.__doc__, self.etc) - return "{}:{} {}: {}".format(os.path.basename(self.path), self.line, self.check.__name__, - self.check.__doc__) + return ( + f"{base_path}:{self.line} {self.check.__name__}: " + f"{self.check.__doc__} ({self.etc})" + ) + return ( + f"{base_path}:{self.line} {self.check.__name__}: " + f"{self.check.__doc__}" + ) class RplintTest: @@ -280,7 +285,7 @@ def range_overlapping_ips(test: RplintTest) -> List[RplintFail]: # If the ranges overlap if min(r1.b, r2.b) >= max(r1.a, r2.a): if r1.addresses & r2.addresses: - info = "previous range on line %d" % get_line_number(test.path, r1.node.char) + info = f"previous range on line {get_line_number(test.path, r1.node.char)}" fails.append(RplintFail(test, r2, info)) return fails @@ -295,7 +300,7 @@ def range_shadowing_match_rules(test: RplintTest) -> List[RplintFail]: except ValueError: pass else: - info = "previous entry on line %d" % get_line_number(test.path, e1.node.char) + info = f"previous entry on line {get_line_number(test.path, e1.node.char)}" if e1.match_fields > e2.match_fields: continue if "subdomain" not in e1.match_fields and "subdomain" in e2.match_fields: @@ -332,13 +337,13 @@ def main(): try: test_path = sys.argv[1] except IndexError: - print("usage: %s <path to rpl file>" % sys.argv[0]) + print(f"usage: {sys.argv[0]} <path to rpl file>") sys.exit(2) if not os.path.isfile(test_path): print("rplint.py works on single file only.") print("Use rplint.sh with --scenarios=<directory with rpls> to run on rpls.") sys.exit(2) - print("Linting %s" % test_path) + print(f"Linting {test_path}") t = RplintTest(test_path) passed = t.run_checks() t.print_fails() diff --git a/setup.py b/setup.py index 93e22b7f550430863fc207269621388fb2d73a91..c098071618e719b3eff73f2d657261efe233a8c0 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -from distutils.core import setup +from setuptools import setup version = '3.0' diff --git a/template/pdns_recursor.j2 b/template/pdns_recursor.j2 index 6bb99235f2c4466f8cacb78de3017d0561d9b2c4..874c757367b1d6101cffc4a88a8793f4f2f8a90c 100644 --- a/template/pdns_recursor.j2 +++ b/template/pdns_recursor.j2 @@ -21,7 +21,7 @@ allow-from= # auth-can-lower-ttl=off ################################# -# auth-zones Zones for which we have authoritative data, comma separated domain=file pairs +# auth-zones Zones for which we have authoritative data, comma separated domain=file pairs # # auth-zones= @@ -219,17 +219,7 @@ max-cache-entries=1000000 ################################# # query-local-address Source IP address for sending queries # -{% if ':' in SELF_ADDR %} -query-local-address=0.0.0.0 -query-local-address6={{SELF_ADDR}} -{% else %} query-local-address={{SELF_ADDR}} -query-local-address6=:: -{% endif %} - -################################# -# query-local-address6 Source IPv6 address for sending queries -# query-local-address6=:: ################################# # quiet Suppress logging of questions and answers diff --git a/tests/test_deckard.py b/tests/test_deckard.py index 50096eab99123b88ff0d199a2ed6c612ed559833..200c4f4ceaeead9fcfaed7e122f28f1a77fe0f9a 100644 --- a/tests/test_deckard.py +++ b/tests/test_deckard.py @@ -8,23 +8,22 @@ from deckard import create_trust_anchor_files def test_create_trust_anchor_files(): """Trust anchors must be into separate files grouped by domain.""" - anchor1a = u'domain1.com.\t3600\tIN\tDS\t11901 7 1 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' - anchor1b = u'domain1.com.\t3600\tIN\tDS\t11902 7 1 1111111111111111111111111111111111111111' - anchor2a = u'domain2.net.\t3600\tIN\tDS\t59835 7 1 cccccccccccccccccccccccccccccccccccccccc' + anchor1a = 'domain1.com.\t3600\tIN\tDS\t11901 7 1 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' + anchor1b = 'domain1.com.\t3600\tIN\tDS\t11902 7 1 1111111111111111111111111111111111111111' + anchor2a = 'domain2.net.\t3600\tIN\tDS\t59835 7 1 cccccccccccccccccccccccccccccccccccccccc' trust_anchors = {'domain1.com': [anchor1a, anchor1b], 'domain2.net': [anchor2a]} tmpdir = tempfile.mkdtemp() try: file_names = create_trust_anchor_files(trust_anchors, tmpdir) - assert sorted(file_names) == sorted('{wd}/ta/{f}'.format(wd=tmpdir, f=f) - for f in [u'domain1.com.key', u'domain2.net.key']) + assert sorted(file_names) == sorted(f'{tmpdir}/ta/{f}' + for f in ['domain1.com.key', 'domain2.net.key']) for path in file_names: - with open(path) as ta_file: + with open(path, encoding='utf-8') as ta_file: file_name = os.path.basename(path) assert file_name[-4:] == '.key' domain = file_name[:-4] - assert ta_file.read() == ''.join(u'{}\n'.format(ta) - for ta in trust_anchors[domain]) + assert ta_file.read() == ''.join(f'{ta}\n' for ta in trust_anchors[domain]) finally: shutil.rmtree(tmpdir) diff --git a/tools/answer_checker.py b/tools/answer_checker.py index 3754ef357ca7fa447c0193a6e827c202f73de7c1..f6cf735f170beadee1d6ac5e6309b4c975ee9d0c 100644 --- a/tools/answer_checker.py +++ b/tools/answer_checker.py @@ -31,14 +31,14 @@ def send_and_check(question: Union[dns.message.Message, bytes], # pylint: disab Returns True on success, raises an exceptions on failure. """ - print("Sending query:\n%s\n" % str(question)) + print(f"Sending query:\n{str(question)}\n") answer = get_answer(question, server, port, tcp, timeout=timeout) for flag in unset_flags: answer = unset_flag(answer, flag) - print("Got answer:\n%s\n" % answer) - print("Matching:\n%s\n%s\n" % (match_fields, expected)) + print(f"Got answer:\n{answer}\n") + print(f"Matching:\n{match_fields}\n{expected}\n") for field in match_fields: pydnstest.matchpart.match_part(expected, answer, field) diff --git a/tools/generate_answers.py b/tools/generate_answers.py index 399f9a75b1512006b22009daa515e7f2a15f7061..787456f358855d92e63b200be1f749ef355acc2f 100644 --- a/tools/generate_answers.py +++ b/tools/generate_answers.py @@ -23,4 +23,5 @@ d = {"SIMPLE_ANSWER" : answer_checker.make_random_case_query("good-a.test.knot-r "NONEXISTENT_TYPE_NSEC_ANSWER" : answer_checker.make_random_case_query("nsec.test.knot-resolver.cz", "TYPE65281", want_dnssec=True)} for k, v in d.items(): - print('%s = dns.message.from_text("""%s""")\n' % (k, answer_checker.string_answer(v, ipaddress.IPv4Address("127.0.0.1")))) + str_answer = answer_checker.string_answer(v, ipaddress.IPv4Address("127.0.0.1")) + print(f'{k} = dns.message.from_text("""{str_answer}""")\n')