Skip to content

Commit

Permalink
add some failure tests (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
epsilon-0 committed Dec 13, 2023
1 parent 3d3a21a commit 44e9491
Showing 1 changed file with 62 additions and 1 deletion.
63 changes: 62 additions & 1 deletion tests/test_regress_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from unittest import TestCase, mock
import uuid

from pytest import fixture
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.backends import default_backend
Expand Down Expand Up @@ -62,8 +61,11 @@ def _start_server_worker(self, sock_family, sock_type, sock_addr, secure):
print("socket listening")
if secure:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
print("socket setsockopt")
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
print("secure socket context created")
context.load_cert_chain(certfile=self.pub_key, keyfile=self.priv_key)
print("secure socket cert loaded")
oldsock = sock
sock = context.wrap_socket(sock, server_side=True)
print("secure socket listening")
Expand Down Expand Up @@ -361,3 +363,62 @@ def test_e2e_INET_STREAM_SECURE_VERIFY_CONTEXT(self):

print("done")
handler.close()

def test_e2e_INET_STREAM_SECURE_VERIFY_CAFILE(self):
socket_addr = ("localhost", SOCKET_PORT)
self._start_server(socket.AF_INET, socket.SOCK_STREAM, (socket_addr,), True)

test_logger = self._build_logger()

handler = TLSSysLogHandler(
address=socket_addr,
socktype=socket.SOCK_STREAM,
secure={"cafile": self.pub_key},
)
test_logger.addHandler(handler)

uuid_message = uuid.uuid4().hex
test_logger.critical(uuid_message)

sleep(1)

data = self.queue.get(timeout=1)
self.assertTrue(uuid_message in data.decode("utf-8"))

print("done")
handler.close()

def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_INCORRECT_CERT(self):
socket_addr = ("::1", SOCKET_PORT)
self._start_server(socket.AF_INET6, socket.SOCK_STREAM, (socket_addr,), True)

# normal secure connect should not work
with self.assertRaises(ssl.SSLCertVerificationError):
handler = TLSSysLogHandler(
address=socket_addr, socktype=socket.SOCK_STREAM, secure=True
)

print("done")

@mock.patch("tlssysloghandler.handler.socket.getaddrinfo")
def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_WRONG_HOSTNAME(self, mock_getaddrinfo):
# try listening on secure-logging.example.com (mocked to return address "::1")
mock_getaddrinfo.return_value = [
(socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("::1", 56712, 0, 0))
]

server_socket_addr = ("::1", SOCKET_PORT)
self._start_server(
socket.AF_INET6, socket.SOCK_STREAM, (server_socket_addr,), True
)

# normal secure connect should not work
logger_socket_addr = ("secure-logging.example.com.", SOCKET_PORT)
with self.assertRaises(ssl.SSLCertVerificationError):
handler = TLSSysLogHandler(
address=logger_socket_addr,
socktype=socket.SOCK_STREAM,
secure={"cafile": self.pub_key},
)

print("done")

0 comments on commit 44e9491

Please sign in to comment.