Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add some failure tests #5

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion tests/test_regress_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from unittest import TestCase, mock
import uuid

from pytest import fixture
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.backends import default_backend
Expand Down Expand Up @@ -62,8 +61,11 @@ def _start_server_worker(self, sock_family, sock_type, sock_addr, secure):
print("socket listening")
if secure:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
print("socket setsockopt")
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
print("secure socket context created")
context.load_cert_chain(certfile=self.pub_key, keyfile=self.priv_key)
print("secure socket cert loaded")
oldsock = sock
sock = context.wrap_socket(sock, server_side=True)
print("secure socket listening")
Expand Down Expand Up @@ -361,3 +363,62 @@ def test_e2e_INET_STREAM_SECURE_VERIFY_CONTEXT(self):

print("done")
handler.close()

def test_e2e_INET_STREAM_SECURE_VERIFY_CAFILE(self):
socket_addr = ("localhost", SOCKET_PORT)
self._start_server(socket.AF_INET, socket.SOCK_STREAM, (socket_addr,), True)

test_logger = self._build_logger()

handler = TLSSysLogHandler(
address=socket_addr,
socktype=socket.SOCK_STREAM,
secure={"cafile": self.pub_key},
)
test_logger.addHandler(handler)

uuid_message = uuid.uuid4().hex
test_logger.critical(uuid_message)

sleep(1)

data = self.queue.get(timeout=1)
self.assertTrue(uuid_message in data.decode("utf-8"))

print("done")
handler.close()

def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_INCORRECT_CERT(self):
socket_addr = ("::1", SOCKET_PORT)
self._start_server(socket.AF_INET6, socket.SOCK_STREAM, (socket_addr,), True)

# normal secure connect should not work
with self.assertRaises(ssl.SSLCertVerificationError):
handler = TLSSysLogHandler(
address=socket_addr, socktype=socket.SOCK_STREAM, secure=True
)

print("done")

@mock.patch("tlssysloghandler.handler.socket.getaddrinfo")
def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_WRONG_HOSTNAME(self, mock_getaddrinfo):
# try listening on secure-logging.example.com (mocked to return address "::1")
mock_getaddrinfo.return_value = [
(socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("::1", 56712, 0, 0))
]

server_socket_addr = ("::1", SOCKET_PORT)
self._start_server(
socket.AF_INET6, socket.SOCK_STREAM, (server_socket_addr,), True
)

# normal secure connect should not work
logger_socket_addr = ("secure-logging.example.com.", SOCKET_PORT)
with self.assertRaises(ssl.SSLCertVerificationError):
handler = TLSSysLogHandler(
address=logger_socket_addr,
socktype=socket.SOCK_STREAM,
secure={"cafile": self.pub_key},
)

print("done")