Skip to content
Snippets Groups Projects
deckard.py 8.88 KiB
#!/usr/bin/env python3
import errno
import logging
import logging.config
import os
import shlex
import shutil
import socket
import subprocess
import time
from datetime import datetime
from typing import Set  # noqa

import jinja2

from pydnstest import scenario, testserver

# path to Deckard files
INSTALLDIR = os.path.dirname(os.path.abspath(__file__))
# relative to working directory
TRUST_ANCHOR_SUBDIR = 'ta'


class DeckardUnderLoadError(Exception):
    pass


def setup_internal_addresses(context):
    context["DECKARD_IP"] = context["if_manager"].assign_internal_address(context["_SOCKET_FAMILY"])
    for program in context["programs"]:
        program["address"] = context["if_manager"].assign_internal_address(
            context["_SOCKET_FAMILY"])


def write_timestamp_file(path, tst):
    with open(path, 'w') as time_file:
        time_file.write(datetime.fromtimestamp(tst).strftime('@%Y-%m-%d %H:%M:%S'))


def setup_faketime(context):
    """
    Setup environment shared between Deckard and binaries under test.

    Environment for child processes must be based on on.environ as modified
    by this function.
    """
    # Set up libfaketime
    os.environ["FAKETIME_NO_CACHE"] = "1"
    os.environ["FAKETIME_TIMESTAMP_FILE"] = os.path.join(context["tmpdir"], ".time")
    os.unsetenv("FAKETIME")

    write_timestamp_file(os.environ["FAKETIME_TIMESTAMP_FILE"],
                         context.get('_OVERRIDE_TIMESTAMP', time.time()))


def setup_daemon_environment(program_config, context):
    program_config["WORKING_DIR"] = os.path.join(context["tmpdir"], program_config["name"])
    os.mkdir(program_config['WORKING_DIR'])
    program_config["DAEMON_NAME"] = program_config["name"]
    program_config['SELF_ADDR'] = program_config['address']
    program_config['TRUST_ANCHOR_FILES'] = create_trust_anchor_files(
        context["TRUST_ANCHOR_FILES"], program_config['WORKING_DIR'])


def create_trust_anchor_files(ta_files, work_dir):
    """
    Write trust anchor files in specified working directory.

    Params:
      ta_files Dict {domain name: [TA lines]}
    Returns:
      List of absolute filesystem paths to TA files.
    """
    full_paths = []
    for domain, ta_lines in ta_files.items():
        file_name = u'{}.key'.format(domain)
        full_path = os.path.realpath(
            os.path.join(work_dir, TRUST_ANCHOR_SUBDIR, file_name))
        full_paths.append(full_path)
        dir_path = os.path.dirname(full_path)
        try:
            os.makedirs(dir_path)
        except OSError as ex:
            if ex.errno != errno.EEXIST:
                raise
        with open(full_path, "w") as ta_file:
            ta_file.writelines('{0}\n'.format(line) for line in ta_lines)
    return full_paths


def generate_from_templates(program_config, context):
    """Generate configuration for the program"""
    template_ctx = context.copy()
    template_ctx.update(program_config)

    # public mapping program name -> program vars
    template_ctx['PROGRAMS'] = {}
    for cfg in template_ctx['programs']:
        template_ctx['PROGRAMS'][cfg['name']] = cfg

    j2template_loader = jinja2.FileSystemLoader(searchpath=os.getcwd())
    j2template_env = jinja2.Environment(loader=j2template_loader)

    for template_name, config_name in zip(template_ctx['templates'], template_ctx['configs']):
        j2template = j2template_env.get_template(template_name)
        cfg_rendered = j2template.render(template_ctx)
        with open(os.path.join(template_ctx['WORKING_DIR'], config_name), 'w') as output:
            output.write(cfg_rendered)


def run_daemon(program_config):
    """Start binary and return its process object"""
    name = program_config['DAEMON_NAME']
    proc = None
    program_config['log'] = os.path.join(program_config["WORKING_DIR"], 'server.log')
    program_config['args'] = (
        shlex.split(os.environ.get('DECKARD_WRAPPER', ''))
        + [program_config['binary']]
        + program_config['additional']
    )
    logging.getLogger('deckard.daemon.%s.argv' % name).debug('%s', program_config['args'])
    with open(program_config['log'], 'w') as daemon_log_file:
        try:
            # pylint: disable=consider-using-with
            proc = subprocess.Popen(program_config['args'], stdout=daemon_log_file,
                                    stderr=subprocess.STDOUT, cwd=program_config['WORKING_DIR'])
        except subprocess.CalledProcessError:
            logger = logging.getLogger('deckard.daemon_log.%s' % name)
            logger.exception("Can't start '%s'", program_config['args'])
            raise
    return proc


def log_fatal_daemon_error(cfg, msg):
    logger = logging.getLogger('deckard.daemon_log.%s' % cfg['name'])
    logger.critical(msg)
    logger.critical('logs are in "%s"', cfg['WORKING_DIR'])
    with open(cfg['log']) as logfile:
        logger.error('daemon log follows:')
        logger.error(logfile.read())


def conncheck_daemon(process, cfg, sockfamily):
    """Wait until the server accepts TCP clients"""
    sock = socket.socket(sockfamily, socket.SOCK_STREAM)
    deadline = time.monotonic() + 5
    with sock:
        while True:
            # Check if the process is running
            if process.poll() is not None:
                msg = 'process died, exit code %s' % process.poll()
                log_fatal_daemon_error(cfg, msg)
                raise subprocess.CalledProcessError(process.returncode, cfg['args'], msg)
            try:
                sock.connect((cfg['address'], 53))
                return  # success
            except socket.error as ex:
                if time.monotonic() > deadline:
                    msg = 'server does not accept connections on TCP port 53'
                    log_fatal_daemon_error(cfg, msg)
                    raise DeckardUnderLoadError(msg) from ex

            time.sleep(0.1)


def setup_daemons(context):
    """Configure daemons and start them"""
    # Setup daemon environment
    daemons = []

    for program_config in context['programs']:
        setup_daemon_environment(program_config, context)
        generate_from_templates(program_config, context)

        daemon_proc = run_daemon(program_config)
        daemons.append({'proc': daemon_proc, 'cfg': program_config})
        if program_config.get('conncheck', True):
            try:
                conncheck_daemon(daemon_proc, program_config, context['_SOCKET_FAMILY'])
            except:  # noqa  -- bare except might be valid here?
                daemon_proc.terminate()
                raise

    return daemons


def check_for_reply_steps(case: scenario.Scenario) -> bool:
    return any(s.type == "REPLY" for s in case.steps)


def run_testcase(case, daemons, context, prog_under_test_ip):
    """Run actual test and raise exception if the test failed"""
    server = testserver.TestServer(case, context["_SOCKET_FAMILY"],
                                   context["DECKARD_IP"], context["if_manager"])
    server.start()

    try:
        server.play(prog_under_test_ip)
    finally:
        server.stop()

        if check_for_reply_steps(case):
            logging.warning("%s has REPLY steps in it. These are known to fail randomly. "
                            "Errors might be false positives.", case.file)

        for daemon in daemons:
            daemon['proc'].terminate()
            daemon['proc'].wait()
            daemon_logger_log = logging.getLogger('deckard.daemon_log.%s' % daemon['cfg']['name'])
            with open(daemon['cfg']['log']) as logf:
                for line in logf:
                    daemon_logger_log.debug(line.strip())
            ignore_exit = daemon["cfg"].get('ignore_exit_code', False)
            if daemon['proc'].returncode != 0 and not ignore_exit:
                raise ValueError('process %s terminated with return code %s'
                                 % (daemon['cfg']['name'], daemon['proc'].returncode))

    if server.undefined_answers > 0:
        raise ValueError('the scenario does not define all necessary answers (see error log)')


def process_file(path, qmin, config):
    """Parse scenario from a file object and create workdir."""

    # Preserve original configuration
    context = config.copy()

    # Parse scenario
    case, case_config_text = scenario.parse_file(os.path.realpath(path))
    case_config = scenario.parse_config(case_config_text, qmin, INSTALLDIR)

    # Merge global and scenario configs
    context.update(case_config)

    # Asign addresses to the programs and Deckard itself
    setup_internal_addresses(context)

    # Deckard will communicate with first program
    prog_under_test_ip = context['programs'][0]['address']

    setup_faketime(context)

    # Copy the scenario to tmpdir for future reference
    shutil.copy2(path, os.path.join(context["tmpdir"]))

    try:
        daemons = setup_daemons(context)
        run_testcase(case, daemons, context, prog_under_test_ip)

    except Exception:
        logging.getLogger('deckard.hint').error(
            'test failed, inspect working directory %s', context["tmpdir"])
        raise