test_msgloop.py 4.05 KB
Newer Older
1
import pytest
2
from unittest.mock import Mock, patch
3 4 5 6

import sn


7
def test_context_data_passed(in_out_args_mock, recv_multipart_mock, send_multipart_mock, good_msg):
8 9 10 11
    def side_effect():
        yield good_msg
        yield good_msg
        raise StopIteration()
12

13
    recv_multipart_mock.side_effect = side_effect()
14

15
    class TestBox(sn.SNPipelineBox):
16
        pass
17 18 19 20 21

    tb = TestBox("test")
    tb.setup = mock_setup = Mock(return_value={"foo": "bar"})
    tb.teardown = mock_teardown = Mock()
    tb.process = mock_process = Mock(return_value=("sentinel/test", {"foo": "bar"}))
22

23
    tb.run()
24

25 26
    assert mock_setup.called
    assert mock_setup.call_count == 1
27

28
    assert mock_teardown.called
29
    assert mock_teardown.call_count == 1
30 31

    assert mock_process.called
32
    assert mock_process.call_count == 2
33

34 35
    assert send_multipart_mock.called
    assert send_multipart_mock.call_count == 2
36

37 38 39
    assert tb.name == "test"
    assert isinstance(tb.logger.getEffectiveLevel(), int)
    assert tb.ctx.foo == "bar"
40

41

42 43 44 45 46
def test_regulraly_processed(in_out_args_mock, recv_multipart_mock, send_multipart_mock, good_msg):
    def side_effect():
        yield good_msg
        yield good_msg
        raise StopIteration()
47

48
    recv_multipart_mock.side_effect = side_effect()
49

50 51 52
    class TestBox(sn.SNPipelineBox):
        def process(self, msg_type, payload):
            return "processed", payload
53

54 55 56 57 58 59 60 61 62 63 64 65 66
    TestBox("test").run()

    assert send_multipart_mock.called
    assert send_multipart_mock.call_count == 2
    assert send_multipart_mock.call_args[0][0][0] == b"processed"


def test_processed_from_generator(out_only_args_mock, send_multipart_mock):
    msg_num = 5

    class TestBox(sn.SNGeneratorBox):
        def process(self):
            for i in range(msg_num):
Robin Obůrka's avatar
Robin Obůrka committed
67
                yield "sentinel/test", {"foo": "bar"}
68 69

    TestBox("test").run()
70

71
    assert send_multipart_mock.called
Robin Obůrka's avatar
Robin Obůrka committed
72
    assert send_multipart_mock.call_count == msg_num
73
    assert send_multipart_mock.call_args[0][0][0] == b"sentinel/test"
74

75

76
def test_many_errors_in_row(out_only_args_mock, send_multipart_mock):
77 78 79
    class TestBox(sn.SNGeneratorBox):
        def process(self):
            while True:
Robin Obůrka's avatar
Robin Obůrka committed
80
                yield "šentinel/test", {"foo": "bar"}
81

82
    tb = TestBox("test")
83 84

    with pytest.raises(SystemExit) as se:
85
        tb.run()
86 87 88 89 90 91 92 93

    assert se.type == SystemExit
    assert se.value.code == 1

    assert not send_multipart_mock.called


def test_resetable_error_counter(out_only_args_mock, send_multipart_mock):
94 95 96
    class TestBox(sn.SNGeneratorBox):
        def process(self):
            for i in range(10):
Robin Obůrka's avatar
Robin Obůrka committed
97 98
                yield "šentinel/test", {"foo": "bar"}
            yield "sentinel/test", {"foo": "bar"}
99

100
    TestBox("test").run()
101

102
    assert send_multipart_mock.called
103 104 105 106 107


def test_before_first_request_processed(out_only_args_mock, send_multipart_mock):
    class TestBox(sn.SNGeneratorBox):
        def before_first_request(self):
Robin Obůrka's avatar
Robin Obůrka committed
108 109
            return "sentinel/test/bfr", {"foo": "bar"}

110
        def process(self):
Robin Obůrka's avatar
Robin Obůrka committed
111
            yield "sentinel/test", {"foo": "bar"}
112 113 114 115 116 117 118 119 120 121 122 123 124 125

    tb = TestBox("test")
    tb.run()

    assert send_multipart_mock.called
    assert send_multipart_mock.call_count == 2

    assert send_multipart_mock.call_args_list[0][0][0][0] == b"sentinel/test/bfr"
    assert send_multipart_mock.call_args_list[1][0][0][0] == b"sentinel/test"


def test_set_signal_handlers(out_only_args_mock, send_multipart_mock):
    class TestBox(sn.SNGeneratorBox):
        def process(self):
Robin Obůrka's avatar
Robin Obůrka committed
126
            yield "sentinel/test", {"foo": "bar"}
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151

    with patch("signal.signal") as signal:
        tb = TestBox("test")
        tb.run()

        assert signal.called


def test_signal_handler_stops_loop(in_out_args_mock, recv_multipart_mock, send_multipart_mock):
    class TestBox(sn.SNPipelineBox):
        pass

    def se(t, p):
        # I will be happy for better solution...
        import signal
        sh = signal.getsignal(signal.SIGTERM)
        sh(None, None)
        return t, p

    tb = TestBox("test")
    tb.process = Mock(side_effect=se)
    tb.run()

    assert send_multipart_mock.called
    assert send_multipart_mock.call_count == 1