From 8ac6eeddb80781f9cd135e43c148b6a3a32dc086 Mon Sep 17 00:00:00 2001 From: "A. Tammy" Date: Sun, 28 Apr 2024 23:43:51 -0400 Subject: [PATCH] add negative testing to check failure on minimum ssl version (#8) --- .github/workflows/test-python.yml | 4 + tests/test_regress_basic.py | 2 +- tests/test_regress_syslogng.py | 227 ++++++++++++++++++++++++++++++ 3 files changed, 232 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test-python.yml b/.github/workflows/test-python.yml index 8ccd03e..8737fea 100644 --- a/.github/workflows/test-python.yml +++ b/.github/workflows/test-python.yml @@ -28,6 +28,10 @@ jobs: run: | pipx install pipenv pipenv install --dev --python ${{ steps.setup-python.outputs.python-path }} + - name: Show OpenSSL ciphers + run: | + openssl ciphers -v + pipenv run python -c "import ssl; import pprint; ctx = ssl.create_default_context(); ctx.set_ciphers('ALL:@SECLEVEL=0'); pprint.pprint(ctx.get_ciphers())" - name: Run tests run: | pipenv run pytest -vvvv -s diff --git a/tests/test_regress_basic.py b/tests/test_regress_basic.py index 02d7a48..d9ab6ea 100644 --- a/tests/test_regress_basic.py +++ b/tests/test_regress_basic.py @@ -277,7 +277,7 @@ def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_INCORRECT_CERT(self): def test_e2e_INET6_STREAM_SECURE_VERIFY_FAIL_WRONG_HOSTNAME(self, mock_getaddrinfo): # try listening on secure-logging.example.com (mocked to return address "::1") mock_getaddrinfo.return_value = [ - (socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("::1", 56712, 0, 0)) + (socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("::1", SOCKET_PORT, 0, 0)) ] server_socket_addr = ("::1", SOCKET_PORT) diff --git a/tests/test_regress_syslogng.py b/tests/test_regress_syslogng.py index 0ebe187..1c2458b 100644 --- a/tests/test_regress_syslogng.py +++ b/tests/test_regress_syslogng.py @@ -18,6 +18,10 @@ SOCKET_PORT6_TLS = SOCKET_PORT + 5 SOCKET_PORT4_MUTUAL_TLS = SOCKET_PORT + 6 SOCKET_PORT6_MUTUAL_TLS = SOCKET_PORT + 7 +SOCKET_PORT4_TLS10 = SOCKET_PORT + 8 +SOCKET_PORT4_TLS11 = SOCKET_PORT + 9 +SOCKET_PORT4_TLS12 = SOCKET_PORT + 10 +SOCKET_PORT4_TLS13 = SOCKET_PORT + 11 # check if syslog-ng is installed else skip tests @@ -63,6 +67,7 @@ def _start_server(self): key-file("{0}/syslog.key") cert-file("{0}/syslog.pub") peer-verify(optional-untrusted) + cipher-suite("ALL:@SECLEVEL=0") ) ); }}; @@ -92,6 +97,7 @@ def _start_server(self): key-file("{0}/syslog.key") cert-file("{0}/syslog.pub") peer-verify(optional-untrusted) + cipher-suite("ALL:@SECLEVEL=0") ) ); }}; @@ -122,6 +128,59 @@ def _start_server(self): ) ); }}; +source net4_tls10 {{ + network( + ip("127.0.0.1") + transport("tls") + port({9}) + tls( + key-file("{0}/syslog.key") + cert-file("{0}/syslog.pub") + peer-verify(optional-untrusted) + ssl-options(no-tlsv11, no-tlsv12, no-tlsv13) + cipher-suite("ALL:@SECLEVEL=0") + ) + ); +}}; +source net4_tls11 {{ + network( + ip("127.0.0.1") + transport("tls") + port({10}) + tls( + key-file("{0}/syslog.key") + cert-file("{0}/syslog.pub") + peer-verify(optional-untrusted) + ssl-options(no-tlsv1, no-tlsv12, no-tlsv13) + ) + ); +}}; +source net4_tls12 {{ + network( + ip("127.0.0.1") + transport("tls") + port({11}) + tls( + key-file("{0}/syslog.key") + cert-file("{0}/syslog.pub") + peer-verify(optional-untrusted) + ssl-options(no-tlsv1, no-tlsv11, no-tlsv13) + ) + ); +}}; +source net4_tls13 {{ + network( + ip("127.0.0.1") + transport("tls") + port({12}) + tls( + key-file("{0}/syslog.key") + cert-file("{0}/syslog.pub") + peer-verify(optional-untrusted) + ssl-options(no-tlsv1, no-tlsv11, no-tlsv12) + ) + ); +}}; destination all {{ @@ -179,6 +238,26 @@ def _start_server(self): source(net6_mutual_tls); filter(f_messages); destination(all); +}}; +log {{ + source(net4_tls10); + filter(f_messages); + destination(all); +}}; +log {{ + source(net4_tls11); + filter(f_messages); + destination(all); +}}; +log {{ + source(net4_tls12); + filter(f_messages); + destination(all); +}}; +log {{ + source(net4_tls13); + filter(f_messages); + destination(all); }}; """ @@ -192,6 +271,10 @@ def _start_server(self): SOCKET_PORT6_TLS, SOCKET_PORT4_MUTUAL_TLS, SOCKET_PORT6_MUTUAL_TLS, + SOCKET_PORT4_TLS10, + SOCKET_PORT4_TLS11, + SOCKET_PORT4_TLS12, + SOCKET_PORT4_TLS13, ) config_path = os.path.join(self.tmpdir.name, "syslog-ng.conf") @@ -294,6 +377,150 @@ def test_SYSLOGNG_INET4_TLS(self): data = f.read() self.assertTrue(uuid_message in data) + def test_SYSLOGNG_INET4_TLS10(self): + test_logger = self._build_logger() + + context = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub" + ) + context.set_ciphers("ALL:@SECLEVEL=0") + context.minimum_version = ssl.TLSVersion.TLSv1 + context.maximum_version = ssl.TLSVersion.TLSv1 + + handler = TLSSysLogHandler( + address=("127.0.0.1", SOCKET_PORT4_TLS10), + socktype=socket.SOCK_STREAM, + secure=context, + ) + test_logger.addHandler(handler) + + uuid_message = uuid.uuid4().hex + test_logger.critical(uuid_message) + + sleep(2) + + with open(os.path.join(self.tmpdir.name, "syslog.log")) as f: + data = f.read() + self.assertTrue(uuid_message in data) + + def test_SYSLOGNG_INET4_TLS12(self): + test_logger = self._build_logger() + + context = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub" + ) + context.minimum_version = ssl.TLSVersion.TLSv1_2 + context.maximum_version = ssl.TLSVersion.TLSv1_2 + + handler = TLSSysLogHandler( + address=("127.0.0.1", SOCKET_PORT4_TLS12), + socktype=socket.SOCK_STREAM, + secure=context, + ) + test_logger.addHandler(handler) + + uuid_message = uuid.uuid4().hex + test_logger.critical(uuid_message) + + sleep(2) + + with open(os.path.join(self.tmpdir.name, "syslog.log")) as f: + data = f.read() + self.assertTrue(uuid_message in data) + + def test_SYSLOGNG_INET4_TLS13(self): + test_logger = self._build_logger() + + context = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub" + ) + context.minimum_version = ssl.TLSVersion.TLSv1_3 + + handler = TLSSysLogHandler( + address=("127.0.0.1", SOCKET_PORT4_TLS13), + socktype=socket.SOCK_STREAM, + secure=context, + ) + test_logger.addHandler(handler) + + uuid_message = uuid.uuid4().hex + test_logger.critical(uuid_message) + + sleep(2) + + with open(os.path.join(self.tmpdir.name, "syslog.log")) as f: + data = f.read() + self.assertTrue(uuid_message in data) + + def test_SYSLOGNG_INET4_TLS13_TO_TLS12_FAIL(self): + test_logger = self._build_logger() + + context = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub" + ) + context.minimum_version = ssl.TLSVersion.TLSv1_3 + + with self.assertRaises(ssl.SSLError): + handler = TLSSysLogHandler( + address=("127.0.0.1", SOCKET_PORT4_TLS12), + socktype=socket.SOCK_STREAM, + secure=context, + ) + + def test_SYSLOGNG_INET4_TLS10_TO_DEFAULT_LISTENER(self): + test_logger = self._build_logger() + + context = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub" + ) + context.set_ciphers("ALL:@SECLEVEL=0") + context.minimum_version = ssl.TLSVersion.TLSv1 + context.maximum_version = ssl.TLSVersion.TLSv1 + + handler = TLSSysLogHandler( + address=("127.0.0.1", SOCKET_PORT4_TLS), + socktype=socket.SOCK_STREAM, + secure=context, + ) + test_logger.addHandler(handler) + + uuid_message = uuid.uuid4().hex + test_logger.critical(uuid_message) + + sleep(2) + + with open(os.path.join(self.tmpdir.name, "syslog.log")) as f: + data = f.read() + self.assertTrue(uuid_message in data) + + def test_SYSLOGNG_INET4_TLS_ENFORCE_MINIMUM(self): + test_logger = self._build_logger() + + context = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, cafile=self.tmpdir.name + "/syslog.pub" + ) + context.minimum_version = ssl.TLSVersion.TLSv1_2 + + handler = TLSSysLogHandler( + address=("127.0.0.1", SOCKET_PORT4_TLS), + socktype=socket.SOCK_STREAM, + secure=context, + ) + test_logger.addHandler(handler) + + tls_version = handler.socket.version() + self.assertNotEqual(tls_version, ssl.TLSVersion.TLSv1) + self.assertNotEqual(tls_version, ssl.TLSVersion.TLSv1_1) + + uuid_message = uuid.uuid4().hex + test_logger.critical(uuid_message) + + sleep(2) + + with open(os.path.join(self.tmpdir.name, "syslog.log")) as f: + data = f.read() + self.assertTrue(uuid_message in data) + def test_SYSLOGNG_unix_DGRAM(self): test_logger = self._build_logger()