pytests/test_random_close: add test

parent fdc09ced
......@@ -61,6 +61,8 @@ _obj
......@@ -303,9 +303,7 @@ pytests:run:
- master
- pushd tests/pytests/proxy
- make all
- popd
- pushd tests/pytests/proxy && make all && popd
- PATH="$PREFIX/sbin:$PATH" ./ci/pytests/ &> pytests.log.txt
- tail -1 pytests.log.txt
......@@ -48,7 +48,7 @@ Forward = namedtuple('Forward', ['proto', 'ip', 'port', 'hostname', 'ca_file'])
class Kresd(ContextDecorator):
def __init__(
self, workdir, port=None, tls_port=None, ip=None, ip6=None, certname=None,
verbose=True, hints=None, forward=None):
verbose=True, hints=None, forward=None, policy_test_pass=False):
if ip is None and ip6 is None:
raise ValueError("IPv4 or IPv6 must be specified!")
self.workdir = str(workdir)
......@@ -62,6 +62,7 @@ class Kresd(ContextDecorator):
self.verbose = verbose
self.hints = {} if hints is None else hints
self.forward = forward
self.policy_test_pass = policy_test_pass
if certname:
self.tls_cert_path = os.path.join(CERTS_DIR, certname + '.cert.pem')
......@@ -297,9 +298,7 @@ KRESD_LOG_IO_CLOSE = re.compile(r'^\[io\].*closed by peer.*')
def make_kresd(
workdir, certname=None, ip='', ip6='::1', forward=None, hints=None,
port=None, tls_port=None):
with Kresd(workdir, port, tls_port, ip, ip6, certname, forward=forward, hints=hints) as kresd:
def make_kresd(workdir, certname=None, ip='', ip6='::1', **kwargs):
with Kresd(workdir, ip=ip, ip6=ip6, certname=certname, **kwargs) as kresd:
yield kresd
\ No newline at end of file
\ No newline at end of file
\ No newline at end of file
\ No newline at end of file
\ No newline at end of file
This diff is collapsed.
\ No newline at end of file
#include <stdio.h>
#include "tls-proxy.h"
#include <gnutls/gnutls.h>
int main()
struct tls_proxy_ctx *proxy = tls_proxy_allocate();
if (!proxy) {
fprintf(stderr, "can't allocate tls_proxy structure\n");
return 1;
int res = tls_proxy_init(proxy,
"", 54021, /* Address to listen */
"", 54010, /* Upstream address */
if (res) {
fprintf(stderr, "can't initialize tls_proxy structure\n");
return res;
res = tls_proxy_start_listen(proxy);
if (res) {
fprintf(stderr, "error starting listen, error code: %i\n", res);
return res;
fprintf(stdout, "started...\n");
res = tls_proxy_run(proxy);
return res;
......@@ -37,6 +37,10 @@ policy.add(policy.all(
{% endif %}
{% if kresd.policy_test_pass %}
policy.add(policy.suffix(policy.PASS, {todname('test.')}))
{% endif %}
"""TLS test when forward target closes connection after one second
Test utilizes random_close/tls-proxy, which forwards queries to configured
resolver, but closes the connection 1s after establishing.
Kresd must stay alive and be able to answer queries.
Make sure to run `make all` in `random_close/` to compile the proxy.
import os
import random
import string
import time
import pytest
from kresd import Forward, make_kresd, PYTESTS_DIR
import proxyutils
import utils
PROXY_PATH = os.path.join(PYTESTS_DIR, 'random_close', 'tlsproxy')
QPS = 500
def random_string(size=32, chars=(string.ascii_lowercase + string.digits)):
return ''.join(random.choice(chars) for x in range(size))
def rsa_cannon(sock, duration, domain='test.', qps=QPS):
end_time = time.time() + duration
while time.time() < end_time:
next_time = time.time() + 1/qps
buff, _ = utils.get_msgbuff('{}.{}'.format(random_string(), domain))
time_left = next_time - time.time()
if time_left > 0:
@pytest.mark.skipif(not os.path.exists(PROXY_PATH),
reason="{} not found (did you compile it?)".format(PROXY_PATH))
def test_proxy_random_close(tmpdir):
# run forward target instance
workdir = os.path.join(str(tmpdir), 'kresd_fwd_target')
with make_kresd(workdir, hints=proxyutils.HINTS, port=54010,
verbose=False) as kresd_fwd_target:
sock = kresd_fwd_target.ip_tls_socket()
proxyutils.resolve_hint(sock, list(proxyutils.HINTS.keys())[0])
with proxyutils.proxy(PROXY_PATH):
# run test kresd instance
workdir2 = os.path.join(str(tmpdir), 'kresd')
forward = Forward(
proto='tls', ip='', port=54021,
hostname='', ca_file=proxyutils.PROXY_CA_FILE)
with make_kresd(workdir2, forward=forward, policy_test_pass=True,
verbose=False) as kresd:
sock2 = kresd.ip_tcp_socket()
rsa_cannon(sock2, 20)
sock3 = kresd.ip_tcp_socket()
for hint in proxyutils.HINTS:
proxyutils.resolve_hint(sock3, hint)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment