Verified Commit 8c2ae7e0 authored by Vojtech Myslivec's avatar Vojtech Myslivec
Browse files

tests: Use fixtures instead of crypto functions

parent 75b07ff3
......@@ -4,6 +4,7 @@ PyTest init, mocks and fixtures
import sys
sys.path.append("..")
import itertools
from unittest.mock import Mock
import pytest
......@@ -12,9 +13,73 @@ from sentinel_ca.db import db_connection
from sentinel_ca.ca import CA
from .ca_helpers import build_ca_config
from .crypto_helpers import build_request
from .sn_helpers import checker_good_reply
# Request fixtures -----------------------------------------
@pytest.fixture
def good_request():
return build_request()
@pytest.fixture
def good_request_renew():
return build_request(renew=True)
@pytest.fixture
def bad_request_empty():
return None
@pytest.fixture
def bad_request_no_json():
return "This is not a dictionary"
@pytest.fixture
def bad_request_invalid_csr():
req = build_request()
req["csr_str"] = "foobar"
return req
@pytest.fixture(params=[
build_request(valid_subject_name=False),
build_request(valid_hash=False),
])
def bad_request_invalid_csr_params(request):
return request.param
def bad_request_missing_generator():
req = build_request()
for subreq in itertools.combinations(req, len(req)-1):
yield {i: req[i] for i in subreq}
@pytest.fixture(params=bad_request_missing_generator())
def bad_request_missing(request):
return request.param
# CA fixtures ----------------------------------------------
@pytest.fixture
def ca(tmpdir):
ca_config = build_ca_config(tmpdir)
with db_connection(ca_config) as db:
yield CA(ca_config, db)
@pytest.fixture
def ca_expire_soon(tmpdir):
ca_config = build_ca_config(tmpdir, expire_soon=True)
with db_connection(ca_config) as db:
yield CA(ca_config, db)
# Mocks ----------------------------------------------------
@pytest.fixture
def redis_mock():
redis_instance = Mock()
......@@ -39,17 +104,3 @@ def socket_mock():
def good_socket_mock(socket_mock):
socket_mock.recv_multipart.return_value = checker_good_reply()
return socket_mock
@pytest.fixture
def ca(tmpdir):
ca_config = build_ca_config(tmpdir)
with db_connection(ca_config) as db:
yield CA(ca_config, db)
@pytest.fixture
def ca_expire_soon(tmpdir):
ca_config = build_ca_config(tmpdir, expire_soon=True)
with db_connection(ca_config) as db:
yield CA(ca_config, db)
......@@ -4,7 +4,6 @@ Reusable functions for cryptography stuff
import datetime
import hashlib
import itertools
import os
# backend
......@@ -223,42 +222,6 @@ def build_request(renew=False, valid_subject_name=True, valid_hash=True):
return req
def good_request():
return build_request()
def good_request_renew():
return build_request(renew=True)
def bad_request_empty():
return None
def bad_request_no_json():
return "This is not a dictionary"
def bad_request_missing():
req = build_request()
for subreq in itertools.combinations(req, len(req)-1):
yield {i: req[i] for i in subreq}
def bad_request_invalid_csr():
req = build_request()
req["csr_str"] = "foobar"
return req
def bad_request_invalid_csr_name():
return build_request(valid_subject_name=False)
def bad_request_invalid_csr_hash():
return build_request(valid_hash=False)
def cert_from_bytes(cert_bytes):
return x509.load_pem_x509_certificate(
data=cert_bytes,
......
......@@ -10,18 +10,7 @@ import sn
from sentinel_ca.main import process
from sentinel_ca.exceptions import CAError
from .crypto_helpers import \
bad_request_empty, \
bad_request_missing, \
bad_request_no_json, \
bad_request_invalid_csr, \
bad_request_invalid_csr_name, \
bad_request_invalid_csr_hash, \
good_request, \
good_request_renew, \
cert_from_bytes, \
csr_from_str, \
get_cert_common_name
from .crypto_helpers import build_request, cert_from_bytes, csr_from_str, get_cert_common_name
from .sn_helpers import \
checker_bad_reply1, \
checker_bad_reply2, \
......@@ -37,9 +26,9 @@ def bytes_to_dict(b):
return json.loads(str(b), encoding='utf-8')
def test_process_good_request(redis_mock, good_socket_mock, ca):
def test_process_good_request(redis_mock, good_socket_mock, ca, good_request):
# prepare env
req = good_request()
req = good_request
redis_mock.brpop.return_value = (1, dict_to_bytes(req))
# test
......@@ -75,9 +64,9 @@ def test_process_good_request(redis_mock, good_socket_mock, ca):
assert ca.get_valid_cert_matching_csr(req["sn"], csr)
def test_process_repeated_request(redis_mock, good_socket_mock, ca):
def test_process_repeated_request(redis_mock, good_socket_mock, ca, good_request):
# prepare env
req = good_request()
req = good_request
redis_mock.brpop.return_value = (1, dict_to_bytes(req))
# issue the cert
......@@ -118,9 +107,9 @@ def test_process_repeated_request(redis_mock, good_socket_mock, ca):
assert second_cert_bytes == cert_bytes
def test_process_renew(redis_mock, good_socket_mock, ca):
def test_process_renew(redis_mock, good_socket_mock, ca, good_request_renew):
# prepare env
req = good_request_renew()
req = good_request_renew
redis_mock.brpop.return_value = (1, dict_to_bytes(req))
# issue the cert
......@@ -161,9 +150,9 @@ def test_process_renew(redis_mock, good_socket_mock, ca):
assert second_cert_bytes != cert_bytes
def test_process_cacert_expire_soon(redis_mock, good_socket_mock, ca_expire_soon):
def test_process_cacert_expire_soon(redis_mock, good_socket_mock, ca_expire_soon, good_request):
# prepare env
req = good_request()
req = good_request
redis_mock.brpop.return_value = (1, dict_to_bytes(req))
# test
......@@ -181,17 +170,27 @@ def test_process_cacert_expire_soon(redis_mock, good_socket_mock, ca_expire_soon
assert not ca_expire_soon.get_valid_cert_matching_csr(req["sn"], csr)
@pytest.mark.parametrize(
"param",
(
{"req": bad_request_invalid_csr(), "has_csr": False},
{"req": bad_request_invalid_csr_name(), "has_csr": True},
{"req": bad_request_invalid_csr_hash(), "has_csr": True},
)
)
def test_process_bad_request_invalid_csr(redis_mock, good_socket_mock, ca, param):
def test_process_bad_request_invalid_csr(redis_mock, good_socket_mock, ca, bad_request_invalid_csr):
# prepare env
req = param["req"]
req = bad_request_invalid_csr
redis_mock.brpop.return_value = (1, dict_to_bytes(req))
# test
process(redis_mock, good_socket_mock, ca)
# Check SN interaction
assert not good_socket_mock.send_multipart.called
# Check redis interaction
assert redis_mock.set.call_count == 1
# auth_state
auth_state = bytes_to_dict(redis_mock.set.call_args_list[0][0][1])
assert auth_state["status"] == "fail"
def test_process_bad_request_invalid_csr_params(redis_mock, good_socket_mock, ca, bad_request_invalid_csr_params):
# prepare env
req = bad_request_invalid_csr_params
redis_mock.brpop.return_value = (1, dict_to_bytes(req))
# test
......@@ -207,9 +206,8 @@ def test_process_bad_request_invalid_csr(redis_mock, good_socket_mock, ca, param
assert auth_state["status"] == "fail"
# Check certs in sqlite
if param["has_csr"]:
csr = csr_from_str(req["csr_str"])
assert not ca.get_valid_cert_matching_csr(req["sn"], csr)
csr = csr_from_str(req["csr_str"])
assert not ca.get_valid_cert_matching_csr(req["sn"], csr)
def test_process_empty_redis(redis_mock, good_socket_mock, ca):
......@@ -225,9 +223,9 @@ def test_process_empty_redis(redis_mock, good_socket_mock, ca):
assert not redis_mock.set.called
def test_process_bad_request_empty(redis_mock, good_socket_mock, ca):
def test_process_bad_request_empty(redis_mock, good_socket_mock, ca, bad_request_empty):
# prepare env
req = bad_request_empty()
req = bad_request_empty
redis_mock.brpop.return_value = (1, dict_to_bytes(req))
# test
......@@ -239,9 +237,9 @@ def test_process_bad_request_empty(redis_mock, good_socket_mock, ca):
assert not redis_mock.set.called
def test_process_bad_request_no_json(redis_mock, good_socket_mock, ca):
def test_process_bad_request_no_json(redis_mock, good_socket_mock, ca, bad_request_no_json):
# prepare env
req = bad_request_no_json()
req = bad_request_no_json
redis_mock.brpop.return_value = (1, bytes(req, encoding='utf-8'))
# test
......@@ -253,9 +251,9 @@ def test_process_bad_request_no_json(redis_mock, good_socket_mock, ca):
assert not redis_mock.set.called
def test_process_bad_request_encoding(redis_mock, good_socket_mock, ca):
def test_process_bad_request_encoding(redis_mock, good_socket_mock, ca, good_request):
# prepare env
req = good_request()
req = good_request
redis_mock.brpop.return_value = (1, bytes(json.dumps(req), encoding='utf-16'))
# test
......@@ -270,8 +268,8 @@ def test_process_bad_request_encoding(redis_mock, good_socket_mock, ca):
assert not ca.get_valid_cert_matching_csr(req["sn"], csr)
@pytest.mark.parametrize("req", bad_request_missing())
def test_process_bad_request_missing(redis_mock, good_socket_mock, ca, req):
def test_process_bad_request_missing(redis_mock, good_socket_mock, ca, bad_request_missing):
req = bad_request_missing
# prepare env
redis_mock.brpop.return_value = (1, dict_to_bytes(req))
......@@ -297,9 +295,9 @@ def test_process_bad_request_missing(redis_mock, good_socket_mock, ca, req):
{"reply": checker_bad_reply2(), "status": "error"},
)
)
def test_process_bad_reply(redis_mock, socket_mock, ca, param):
def test_process_bad_reply(redis_mock, socket_mock, ca, param, good_request):
# prepare env
req = good_request()
req = good_request
redis_mock.brpop.return_value = (1, dict_to_bytes(req))
socket_mock.recv_multipart.return_value = param["reply"]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment