Skip to content

Commit

Permalink
add negative testing to check failure on minimum ssl version (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
epsilon-0 committed Apr 29, 2024
1 parent b33bb55 commit 8ac6eed
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 1 deletion.
4 changes: 4 additions & 0 deletions .github/workflows/test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ jobs:
run: |
pipx install pipenv
pipenv install --dev --python ${{ steps.setup-python.outputs.python-path }}
- name: Show OpenSSL ciphers
run: |
openssl ciphers -v
pipenv run python -c "import ssl; import pprint; ctx = ssl.create_default_context(); ctx.set_ciphers('ALL:@SECLEVEL=0'); pprint.pprint(ctx.get_ciphers())"
- name: Run tests
run: |
pipenv run pytest -vvvv -s
2 changes: 1 addition & 1 deletion tests/test_regress_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_INCORRECT_CERT(self):
def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_WRONG_HOSTNAME(self, mock_getaddrinfo):
# try listening on secure-logging.example.com (mocked to return address "::1")
mock_getaddrinfo.return_value = [
(socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("::1", 56712, 0, 0))
(socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("::1", SOCKET_PORT, 0, 0))
]

server_socket_addr = ("::1", SOCKET_PORT)
Expand Down
227 changes: 227 additions & 0 deletions tests/test_regress_syslogng.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
SOCKET_PORT6_TLS = SOCKET_PORT + 5
SOCKET_PORT4_MUTUAL_TLS = SOCKET_PORT + 6
SOCKET_PORT6_MUTUAL_TLS = SOCKET_PORT + 7
SOCKET_PORT4_TLS10 = SOCKET_PORT + 8
SOCKET_PORT4_TLS11 = SOCKET_PORT + 9
SOCKET_PORT4_TLS12 = SOCKET_PORT + 10
SOCKET_PORT4_TLS13 = SOCKET_PORT + 11


# check if syslog-ng is installed else skip tests
Expand Down Expand Up @@ -63,6 +67,7 @@ def _start_server(self):
key-file("{0}/syslog.key")
cert-file("{0}/syslog.pub")
peer-verify(optional-untrusted)
cipher-suite("ALL:@SECLEVEL=0")
)
);
}};
Expand Down Expand Up @@ -92,6 +97,7 @@ def _start_server(self):
key-file("{0}/syslog.key")
cert-file("{0}/syslog.pub")
peer-verify(optional-untrusted)
cipher-suite("ALL:@SECLEVEL=0")
)
);
}};
Expand Down Expand Up @@ -122,6 +128,59 @@ def _start_server(self):
)
);
}};
source net4_tls10 {{
network(
ip("127.0.0.1")
transport("tls")
port({9})
tls(
key-file("{0}/syslog.key")
cert-file("{0}/syslog.pub")
peer-verify(optional-untrusted)
ssl-options(no-tlsv11, no-tlsv12, no-tlsv13)
cipher-suite("ALL:@SECLEVEL=0")
)
);
}};
source net4_tls11 {{
network(
ip("127.0.0.1")
transport("tls")
port({10})
tls(
key-file("{0}/syslog.key")
cert-file("{0}/syslog.pub")
peer-verify(optional-untrusted)
ssl-options(no-tlsv1, no-tlsv12, no-tlsv13)
)
);
}};
source net4_tls12 {{
network(
ip("127.0.0.1")
transport("tls")
port({11})
tls(
key-file("{0}/syslog.key")
cert-file("{0}/syslog.pub")
peer-verify(optional-untrusted)
ssl-options(no-tlsv1, no-tlsv11, no-tlsv13)
)
);
}};
source net4_tls13 {{
network(
ip("127.0.0.1")
transport("tls")
port({12})
tls(
key-file("{0}/syslog.key")
cert-file("{0}/syslog.pub")
peer-verify(optional-untrusted)
ssl-options(no-tlsv1, no-tlsv11, no-tlsv12)
)
);
}};
destination all {{
Expand Down Expand Up @@ -179,6 +238,26 @@ def _start_server(self):
source(net6_mutual_tls);
filter(f_messages);
destination(all);
}};
log {{
source(net4_tls10);
filter(f_messages);
destination(all);
}};
log {{
source(net4_tls11);
filter(f_messages);
destination(all);
}};
log {{
source(net4_tls12);
filter(f_messages);
destination(all);
}};
log {{
source(net4_tls13);
filter(f_messages);
destination(all);
}};
"""

Expand All @@ -192,6 +271,10 @@ def _start_server(self):
SOCKET_PORT6_TLS,
SOCKET_PORT4_MUTUAL_TLS,
SOCKET_PORT6_MUTUAL_TLS,
SOCKET_PORT4_TLS10,
SOCKET_PORT4_TLS11,
SOCKET_PORT4_TLS12,
SOCKET_PORT4_TLS13,
)

config_path = os.path.join(self.tmpdir.name, "syslog-ng.conf")
Expand Down Expand Up @@ -294,6 +377,150 @@ def test_SYSLOGNG_INET4_TLS(self):
data = f.read()
self.assertTrue(uuid_message in data)

def test_SYSLOGNG_INET4_TLS10(self):
test_logger = self._build_logger()

context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub"
)
context.set_ciphers("ALL:@SECLEVEL=0")
context.minimum_version = ssl.TLSVersion.TLSv1
context.maximum_version = ssl.TLSVersion.TLSv1

handler = TLSSysLogHandler(
address=("127.0.0.1", SOCKET_PORT4_TLS10),
socktype=socket.SOCK_STREAM,
secure=context,
)
test_logger.addHandler(handler)

uuid_message = uuid.uuid4().hex
test_logger.critical(uuid_message)

sleep(2)

with open(os.path.join(self.tmpdir.name, "syslog.log")) as f:
data = f.read()
self.assertTrue(uuid_message in data)

def test_SYSLOGNG_INET4_TLS12(self):
test_logger = self._build_logger()

context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub"
)
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_2

handler = TLSSysLogHandler(
address=("127.0.0.1", SOCKET_PORT4_TLS12),
socktype=socket.SOCK_STREAM,
secure=context,
)
test_logger.addHandler(handler)

uuid_message = uuid.uuid4().hex
test_logger.critical(uuid_message)

sleep(2)

with open(os.path.join(self.tmpdir.name, "syslog.log")) as f:
data = f.read()
self.assertTrue(uuid_message in data)

def test_SYSLOGNG_INET4_TLS13(self):
test_logger = self._build_logger()

context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub"
)
context.minimum_version = ssl.TLSVersion.TLSv1_3

handler = TLSSysLogHandler(
address=("127.0.0.1", SOCKET_PORT4_TLS13),
socktype=socket.SOCK_STREAM,
secure=context,
)
test_logger.addHandler(handler)

uuid_message = uuid.uuid4().hex
test_logger.critical(uuid_message)

sleep(2)

with open(os.path.join(self.tmpdir.name, "syslog.log")) as f:
data = f.read()
self.assertTrue(uuid_message in data)

def test_SYSLOGNG_INET4_TLS13_TO_TLS12_FAIL(self):
test_logger = self._build_logger()

context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub"
)
context.minimum_version = ssl.TLSVersion.TLSv1_3

with self.assertRaises(ssl.SSLError):
handler = TLSSysLogHandler(
address=("127.0.0.1", SOCKET_PORT4_TLS12),
socktype=socket.SOCK_STREAM,
secure=context,
)

def test_SYSLOGNG_INET4_TLS10_TO_DEFAULT_LISTENER(self):
test_logger = self._build_logger()

context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub"
)
context.set_ciphers("ALL:@SECLEVEL=0")
context.minimum_version = ssl.TLSVersion.TLSv1
context.maximum_version = ssl.TLSVersion.TLSv1

handler = TLSSysLogHandler(
address=("127.0.0.1", SOCKET_PORT4_TLS),
socktype=socket.SOCK_STREAM,
secure=context,
)
test_logger.addHandler(handler)

uuid_message = uuid.uuid4().hex
test_logger.critical(uuid_message)

sleep(2)

with open(os.path.join(self.tmpdir.name, "syslog.log")) as f:
data = f.read()
self.assertTrue(uuid_message in data)

def test_SYSLOGNG_INET4_TLS_ENFORCE_MINIMUM(self):
test_logger = self._build_logger()

context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub"
)
context.minimum_version = ssl.TLSVersion.TLSv1_2

handler = TLSSysLogHandler(
address=("127.0.0.1", SOCKET_PORT4_TLS),
socktype=socket.SOCK_STREAM,
secure=context,
)
test_logger.addHandler(handler)

tls_version = handler.socket.version()
self.assertNotEqual(tls_version, ssl.TLSVersion.TLSv1)
self.assertNotEqual(tls_version, ssl.TLSVersion.TLSv1_1)

uuid_message = uuid.uuid4().hex
test_logger.critical(uuid_message)

sleep(2)

with open(os.path.join(self.tmpdir.name, "syslog.log")) as f:
data = f.read()
self.assertTrue(uuid_message in data)

def test_SYSLOGNG_unix_DGRAM(self):
test_logger = self._build_logger()

Expand Down

0 comments on commit 8ac6eed

Please sign in to comment.