diff --git a/tests/test_regress_basic.py b/tests/test_regress_basic.py index fd70ad1..5cf7c2c 100644 --- a/tests/test_regress_basic.py +++ b/tests/test_regress_basic.py @@ -8,7 +8,6 @@ from unittest import TestCase, mock import uuid -from pytest import fixture from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.backends import default_backend @@ -62,8 +61,11 @@ def _start_server_worker(self, sock_family, sock_type, sock_addr, secure): print("socket listening") if secure: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + print("socket setsockopt") context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + print("secure socket context created") context.load_cert_chain(certfile=self.pub_key, keyfile=self.priv_key) + print("secure socket cert loaded") oldsock = sock sock = context.wrap_socket(sock, server_side=True) print("secure socket listening") @@ -361,3 +363,62 @@ def test_e2e_INET_STREAM_SECURE_VERIFY_CONTEXT(self): print("done") handler.close() + + def test_e2e_INET_STREAM_SECURE_VERIFY_CAFILE(self): + socket_addr = ("localhost", SOCKET_PORT) + self._start_server(socket.AF_INET, socket.SOCK_STREAM, (socket_addr,), True) + + test_logger = self._build_logger() + + handler = TLSSysLogHandler( + address=socket_addr, + socktype=socket.SOCK_STREAM, + secure={"cafile": self.pub_key}, + ) + test_logger.addHandler(handler) + + uuid_message = uuid.uuid4().hex + test_logger.critical(uuid_message) + + sleep(1) + + data = self.queue.get(timeout=1) + self.assertTrue(uuid_message in data.decode("utf-8")) + + print("done") + handler.close() + + def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_INCORRECT_CERT(self): + socket_addr = ("::1", SOCKET_PORT) + self._start_server(socket.AF_INET6, socket.SOCK_STREAM, (socket_addr,), True) + + # normal secure connect should not work + with self.assertRaises(ssl.SSLCertVerificationError): + handler = TLSSysLogHandler( + address=socket_addr, socktype=socket.SOCK_STREAM, secure=True + ) + + print("done") + + @mock.patch("tlssysloghandler.handler.socket.getaddrinfo") + def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_WRONG_HOSTNAME(self, mock_getaddrinfo): + # try listening on secure-logging.example.com (mocked to return address "::1") + mock_getaddrinfo.return_value = [ + (socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("::1", 56712, 0, 0)) + ] + + server_socket_addr = ("::1", SOCKET_PORT) + self._start_server( + socket.AF_INET6, socket.SOCK_STREAM, (server_socket_addr,), True + ) + + # normal secure connect should not work + logger_socket_addr = ("secure-logging.example.com.", SOCKET_PORT) + with self.assertRaises(ssl.SSLCertVerificationError): + handler = TLSSysLogHandler( + address=logger_socket_addr, + socktype=socket.SOCK_STREAM, + secure={"cafile": self.pub_key}, + ) + + print("done")