From 625ed2f11820a8ce5ee79d0c73ef0fe43af5de93 Mon Sep 17 00:00:00 2001 From: Mathias Ertl Date: Sun, 14 Jul 2024 23:21:56 +0200 Subject: [PATCH] attempt to add HSM support directly --- .github/workflows/tests.yml | 2 +- ca/ca/test_settings.py | 20 + ca/django_ca/constants.py | 11 + ca/django_ca/key_backends/base.py | 5 +- ca/django_ca/key_backends/hsm/__init__.py | 18 + ca/django_ca/key_backends/hsm/backend.py | 434 ++++++++++++++++++ ca/django_ca/key_backends/hsm/keys.py | 231 ++++++++++ ca/django_ca/key_backends/hsm/models.py | 65 +++ ca/django_ca/key_backends/hsm/session.py | 118 +++++ ca/django_ca/key_backends/hsm/typehints.py | 18 + ca/django_ca/key_backends/storages.py | 4 +- ca/django_ca/management/commands/init_ca.py | 19 +- ca/django_ca/tests/base/fixtures.py | 21 + ca/django_ca/tests/base/utils.py | 4 +- ca/django_ca/tests/commands/test_init_ca.py | 187 +++++++- .../tests/key_backends/hsm/__init__.py | 0 .../tests/key_backends/hsm/conftest.py | 61 +++ .../tests/key_backends/hsm/test_backend.py | 131 ++++++ .../tests/key_backends/hsm/test_keys.py | 68 +++ .../tests/key_backends/hsm/test_models.py | 41 ++ .../tests/key_backends/hsm/test_session.py | 129 ++++++ ca/django_ca/tests/key_backends/test_base.py | 1 + pyproject.toml | 6 + requirements.txt | 2 +- 24 files changed, 1578 insertions(+), 18 deletions(-) create mode 100644 ca/django_ca/key_backends/hsm/__init__.py create mode 100644 ca/django_ca/key_backends/hsm/backend.py create mode 100644 ca/django_ca/key_backends/hsm/keys.py create mode 100644 ca/django_ca/key_backends/hsm/models.py create mode 100644 ca/django_ca/key_backends/hsm/session.py create mode 100644 ca/django_ca/key_backends/hsm/typehints.py create mode 100644 ca/django_ca/tests/key_backends/hsm/__init__.py create mode 100644 ca/django_ca/tests/key_backends/hsm/conftest.py create mode 100644 ca/django_ca/tests/key_backends/hsm/test_backend.py create mode 100644 ca/django_ca/tests/key_backends/hsm/test_keys.py create mode 100644 ca/django_ca/tests/key_backends/hsm/test_models.py create mode 100644 ca/django_ca/tests/key_backends/hsm/test_session.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 98524eb0b..da9bb4fa0 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -24,7 +24,7 @@ jobs: name: Python ${{ matrix.python-version }}, Django ${{ matrix.django-version }}, cryptography ${{ matrix.cryptography-version }}, pydantic ${{ matrix.pydantic-version }} steps: - name: Install APT dependencies - run: sudo apt-get install -y firefox + run: sudo apt-get install -y firefox softhsm2 - name: Acquire sources uses: actions/checkout@v4.1.1 diff --git a/ca/ca/test_settings.py b/ca/ca/test_settings.py index 6aac801ae..0ebf4ebe2 100644 --- a/ca/ca/test_settings.py +++ b/ca/ca/test_settings.py @@ -14,8 +14,12 @@ """Test settings for the django-ca project.""" import json +import os +from datetime import datetime, timezone from pathlib import Path +from django.utils.crypto import get_random_string + # Base paths in this project BASE_DIR = Path(__file__).resolve().parent.parent # ca/ @@ -186,6 +190,14 @@ _fixture_data = json.load(stream) +# PKCS11 settings +_timestamp = datetime.now(tz=timezone.utc).strftime("%Y%m%d%H%M%S") +PKCS11_PATH = os.environ.get("PKCS11_LIBRARY", "/usr/lib/softhsm/libsofthsm2.so") +PKCS11_TOKEN_LABEL = f"pytest.{_timestamp}.{get_random_string(8)}" +PKCS11_SO_PIN = "so-pin-1234" +PKCS11_USER_PIN = "user-pin-1234" + + CA_KEY_BACKENDS = { "default": { "BACKEND": "django_ca.key_backends.storages.StoragesBackend", @@ -195,6 +207,14 @@ "BACKEND": "django_ca.key_backends.storages.StoragesBackend", "OPTIONS": {"storage_alias": "secondary"}, }, + "hsm": { + "BACKEND": "django_ca.key_backends.hsm.HSMBackend", + "OPTIONS": { + "module": PKCS11_PATH, + "token": PKCS11_TOKEN_LABEL, + "user_pin": PKCS11_USER_PIN, + }, + }, } # Custom settings diff --git a/ca/django_ca/constants.py b/ca/django_ca/constants.py index 4385a8cc2..224498785 100644 --- a/ca/django_ca/constants.py +++ b/ca/django_ca/constants.py @@ -574,6 +574,17 @@ class ExtendedKeyUsageOID(_ExtendedKeyUsageOID): ed448.Ed448PublicKey, rsa.RSAPublicKey, ) +PUBLIC_KEY_TYPE_MAPPING: MappingProxyType[ParsableKeyType, type[CertificateIssuerPublicKeyTypes]] = ( + MappingProxyType( + { + "DSA": dsa.DSAPublicKey, + "EC": ec.EllipticCurvePublicKey, + "Ed25519": ed25519.Ed25519PublicKey, + "Ed448": ed448.Ed448PublicKey, + "RSA": rsa.RSAPublicKey, + } + ) +) #: Tuple of supported private key types. PRIVATE_KEY_TYPES: tuple[type[CertificateIssuerPrivateKeyTypes], ...] = ( diff --git a/ca/django_ca/key_backends/base.py b/ca/django_ca/key_backends/base.py index 8fc7aae82..3fad71330 100644 --- a/ca/django_ca/key_backends/base.py +++ b/ca/django_ca/key_backends/base.py @@ -213,13 +213,12 @@ def get_use_parent_private_key_options( @abc.abstractmethod def get_use_private_key_options( - self, ca: Optional["CertificateAuthority"], options: dict[str, Any] + self, ca: "CertificateAuthority", options: dict[str, Any] ) -> UsePrivateKeyOptionsTypeVar: """Get options to use the private key of a certificate authority. The returned model will be used for the certificate authority `ca`. You can pass it as extra context - to influence model validation. If `ca` is ``None``, it indicates that the CA is currently being - created via :command:`manage.py init_ca`. + to influence model validation. `options` is the dictionary of arguments to :command:`manage.py init_ca` (including default values). The key backend is expected to be able to sign certificates and CRLs using the options provided here. diff --git a/ca/django_ca/key_backends/hsm/__init__.py b/ca/django_ca/key_backends/hsm/__init__.py new file mode 100644 index 000000000..1025ae0e3 --- /dev/null +++ b/ca/django_ca/key_backends/hsm/__init__.py @@ -0,0 +1,18 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""HSM backend module.""" + +from django_ca.key_backends.hsm.backend import HSMBackend + +__all__ = ("HSMBackend",) diff --git a/ca/django_ca/key_backends/hsm/backend.py b/ca/django_ca/key_backends/hsm/backend.py new file mode 100644 index 000000000..8139afcfe --- /dev/null +++ b/ca/django_ca/key_backends/hsm/backend.py @@ -0,0 +1,434 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Key storage backend for hardware security modules (HSMs).""" + +from collections.abc import Iterator, Sequence +from contextlib import contextmanager +from datetime import datetime +from typing import TYPE_CHECKING, Any, Final, Optional + +import pkcs11 +from pkcs11 import KeyType, ObjectClass, Session +from pkcs11.util.ec import encode_named_curve_parameters +from pkcs11.util.rsa import decode_rsa_private_key + +from asn1crypto.algos import SignedDigestAlgorithmId +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric.types import ( + CertificateIssuerPrivateKeyTypes, + CertificateIssuerPublicKeyTypes, +) + +from django.core.management import CommandError + +from django_ca import constants +from django_ca.conf import model_settings +from django_ca.key_backends import KeyBackend +from django_ca.key_backends.hsm.keys import ( + PKCS11Ed448PrivateKey, + PKCS11Ed25519PrivateKey, + PKCS11EllipticCurvePrivateKey, + PKCS11PrivateKeyTypes, + PKCS11RSAPrivateKey, +) +from django_ca.key_backends.hsm.models import ( + CreatePrivateKeyOptions, + StorePrivateKeyOptions, + UsePrivateKeyOptions, +) +from django_ca.key_backends.hsm.session import SessionPool +from django_ca.key_backends.hsm.typehints import SupportedKeyType +from django_ca.typehints import ( + AllowedHashTypes, + ArgumentGroup, + CertificateExtension, + EllipticCurves, + ParsableKeyType, +) +from django_ca.utils import get_cert_builder, int_to_hex + +if TYPE_CHECKING: + from django_ca.models import CertificateAuthority + + +class HSMBackend(KeyBackend[CreatePrivateKeyOptions, StorePrivateKeyOptions, UsePrivateKeyOptions]): + """Key backend to create and use private keys in a hardware security module (HSM).""" + + name = "hsm" + title = "Store private keys using a hardware security module (HSM)" + description = ( + "The private key will be stored on the hardware security module (HSM). The HSM makes sure that the" + "private key can never be recovered and thus compromised." + ) + use_model = UsePrivateKeyOptions + + supported_key_types: tuple[SupportedKeyType, ...] = ("RSA", "EC", "Ed25519", "Ed448") + supported_elliptic_curves: tuple[EllipticCurves, ...] = tuple(constants.ELLIPTIC_CURVE_TYPES) + + module: str + token: str + so_pin: Optional[str] + user_pin: Optional[str] + _required_key_backend_options: Final[tuple[str, str, str]] = ("key_label", "key_id", "key_type") + + def __init__( + self, + alias: str, + module: str, + token: str, + so_pin: Optional[str] = None, + user_pin: Optional[str] = None, + ): + if so_pin is not None and user_pin is not None: + raise ValueError(f"{alias}: Set either so_pin or user_pin.") + + super().__init__(alias, module=module, token=token, so_pin=so_pin, user_pin=user_pin) + + @contextmanager + def session(self, so_pin: Optional[str], user_pin: Optional[str], rw: bool = False) -> Iterator[Session]: + """Shortcut to get a session from the pool.""" + with SessionPool(self.module, self.token, so_pin, user_pin, rw=rw) as session: + yield session + + def _add_key_label_argument(self, group: ArgumentGroup, prefix: str = "") -> None: + group.add_argument( + f"--{self.argparse_prefix}{prefix}key-label", + type=str, + metavar="LABEL", + help="%(metavar)s to use for the private key in the HSM.", + ) + + def _add_pin_arguments(self, group: ArgumentGroup, prefix: str = "") -> None: + group.add_argument( + f"--{self.argparse_prefix}{prefix}so-pin", + type=str, + metavar="PIN", + help="Security officer %(metavar)s to access the HSM.", + ) + group.add_argument( + f"--{self.argparse_prefix}{prefix}user-pin", + type=str, + metavar="PIN", + help="User %(metavar)s to access the HSM.", + ) + + def _get_pins(self, options: dict[str, Any], prefix: str = "") -> tuple[Optional[str], Optional[str]]: + options_prefix = f"{self.options_prefix}{prefix.replace('-', '_')}" + argparse_prefix = f"{self.argparse_prefix}{prefix}" + + so_pin: Optional[str] = options[f"{options_prefix}so_pin"] + if so_pin is None: + so_pin = self.so_pin + elif so_pin == "": + so_pin = None + + user_pin: Optional[str] = options[f"{options_prefix}user_pin"] + if user_pin is None: + user_pin = self.user_pin + elif user_pin == "": + user_pin = None + + if so_pin is None and user_pin is None: + raise CommandError( + f"No SO or user pin configured. Use --{argparse_prefix}so-pin or --{argparse_prefix}user-pin " + f"to specify a pin." + ) + if so_pin is not None and user_pin is not None: + raise CommandError( + "Both SO pin and user pin configured. To override a pin from settings, pass " + f'--{argparse_prefix}so-pin="" or --{argparse_prefix}user-pin="".' + ) + + return so_pin, user_pin + + def _get_private_key( + self, ca: "CertificateAuthority", session: Session + ) -> CertificateIssuerPrivateKeyTypes: + key_id: str = ca.key_backend_options["key_id"] + key_label: str = ca.key_backend_options["key_label"] + key_type: SupportedKeyType = ca.key_backend_options["key_type"] + + if key_type == "RSA": + return PKCS11RSAPrivateKey(session, key_id, key_label) + if key_type == "Ed448": + return PKCS11Ed448PrivateKey(session, key_id, key_label) + if key_type == "Ed25519": + return PKCS11Ed25519PrivateKey(session, key_id, key_label) + if key_type == "EC": + return PKCS11EllipticCurvePrivateKey(session, key_id, key_label) + + raise ValueError(f"{key_type}: Unsupported key type.") + + def add_create_private_key_arguments(self, group: ArgumentGroup) -> None: + self._add_key_label_argument(group) + self._add_pin_arguments(group) + + def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None: + self._add_pin_arguments(group, "parent-") + + def add_store_private_key_arguments(self, group: ArgumentGroup) -> None: + self._add_key_label_argument(group) + self._add_pin_arguments(group) + + def add_use_private_key_arguments(self, group: ArgumentGroup) -> None: + self._add_pin_arguments(group) + + def get_create_private_key_options( + self, + key_type: ParsableKeyType, + key_size: Optional[int], + elliptic_curve: Optional[str], + options: dict[str, Any], + ) -> CreatePrivateKeyOptions: + key_label = options[f"{self.options_prefix}key_label"] + if not key_label: + raise CommandError( + f"--{self.argparse_prefix}key-label is a required option for this key backend." + ) + + so_pin, user_pin = self._get_pins(options) + + if key_type == "EC" and elliptic_curve is None: + # NOTE: Currently all curves supported by cryptography are also supported by this backend. + # If this changes, a check should be added here (if the default is not supported by the + # backend). + elliptic_curve = model_settings.CA_DEFAULT_ELLIPTIC_CURVE.name + + return CreatePrivateKeyOptions( + key_label=key_label, + key_type=key_type, + key_size=key_size, + elliptic_curve=elliptic_curve, + so_pin=so_pin, + user_pin=user_pin, + ) + + def get_use_parent_private_key_options( + self, ca: "CertificateAuthority", options: dict[str, Any] + ) -> UsePrivateKeyOptions: + so_pin, user_pin = self._get_pins(options, "parent-") + return UsePrivateKeyOptions.model_validate( + {"so_pin": so_pin, "user_pin": user_pin}, context={"ca": ca}, strict=True + ) + + def get_use_private_key_options( + self, ca: "CertificateAuthority", options: dict[str, Any] + ) -> UsePrivateKeyOptions: + so_pin, user_pin = self._get_pins(options) + return UsePrivateKeyOptions.model_validate( + {"so_pin": so_pin, "user_pin": user_pin}, context={"ca": ca}, strict=True + ) + + def get_store_private_key_options(self, options: dict[str, Any]) -> StorePrivateKeyOptions: + key_label = options[f"{self.options_prefix}key_label"] + so_pin, user_pin = self._get_pins(options) + return StorePrivateKeyOptions.model_validate( + {"key_label": key_label, "so_pin": so_pin, "user_pin": user_pin}, strict=True + ) + + def is_usable( + self, ca: "CertificateAuthority", use_private_key_options: Optional[UsePrivateKeyOptions] = None + ) -> bool: + if not ca.key_backend_options or not isinstance(ca.key_backend_options, dict): + return False + for option in self._required_key_backend_options: + if not ca.key_backend_options.get(option): + return False + if use_private_key_options is None: + return True + + try: + with self.session( + so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin + ) as session: + self._get_private_key(ca, session) + return True + except Exception: # pylint: disable=broad-exception-caught # want to always return bool + return False + + def check_usable(self, ca: "CertificateAuthority", use_private_key_options: UsePrivateKeyOptions) -> None: + if not ca.key_backend_options or not isinstance(ca.key_backend_options, dict): + raise ValueError("key backend options are not defined.") + for option in self._required_key_backend_options: + if not ca.key_backend_options.get(option): + raise ValueError(f"{option}: Required key option is not defined.") + + with self.session( + so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin + ) as session: + self._get_private_key(ca, session) + + def create_private_key( + self, ca: "CertificateAuthority", key_type: ParsableKeyType, options: CreatePrivateKeyOptions + ) -> tuple[CertificateIssuerPublicKeyTypes, UsePrivateKeyOptions]: + key_id = int_to_hex(x509.random_serial_number()) + key_label = options.key_label + + # Test that no private key with the given label exists. Some libraries (e.g. SoftHSM) don't treat the + # label as unique and will silently create a second key with the same label. + with self.session(options.so_pin, options.user_pin) as session: + try: + session.get_key(object_class=ObjectClass.PUBLIC_KEY, label=key_label) + except pkcs11.NoSuchKey: + pass # this is what we hope for + else: + raise ValueError(f"{key_label}: Private key with this label already exists.") + + if key_type == "RSA": + with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: + pkcs11_public_key, pkcs11_private_key = session.generate_keypair( + pkcs11.KeyType.RSA, options.key_size, id=key_id.encode(), label=key_label, store=True + ) + + private_key: PKCS11PrivateKeyTypes = PKCS11RSAPrivateKey( + session=session, + key_id=key_id, + key_label=key_label, + pkcs11_private_key=pkcs11_private_key, + pkcs11_public_key=pkcs11_public_key, + ) + public_key = private_key.public_key() + + elif key_type in ("Ed25519", "Ed448"): + named_curve_parameters = encode_named_curve_parameters( + SignedDigestAlgorithmId(key_type.lower()).dotted + ) + + with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: + parameters = session.create_domain_parameters( + KeyType.EC_EDWARDS, {pkcs11.Attribute.EC_PARAMS: named_curve_parameters}, local=True + ) + + pkcs11_public_key, pkcs11_private_key = parameters.generate_keypair( + mechanism=pkcs11.Mechanism.EC_EDWARDS_KEY_PAIR_GEN, + store=True, + id=key_id.encode(), + label=key_label, + ) + + if key_type == "Ed25519": + private_key = PKCS11Ed25519PrivateKey( + session=session, + key_id=key_id, + key_label=key_label, + pkcs11_private_key=pkcs11_private_key, + pkcs11_public_key=pkcs11_public_key, + ) + else: + private_key = PKCS11Ed448PrivateKey( + session=session, + key_id=key_id, + key_label=key_label, + pkcs11_private_key=pkcs11_private_key, + pkcs11_public_key=pkcs11_public_key, + ) + public_key = private_key.public_key() + + elif key_type == "EC": + with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: + # TYPEHINT NOTE: elliptic curve is always set if key_type is EC. + elliptic_curve_name = options.elliptic_curve.lower() # type: ignore[union-attr] + parameters = session.create_domain_parameters( + KeyType.EC, + {pkcs11.Attribute.EC_PARAMS: encode_named_curve_parameters(elliptic_curve_name)}, + local=True, + ) + + pkcs11_public_key, pkcs11_private_key = parameters.generate_keypair( + store=True, id=key_id.encode(), label=key_label + ) + + private_key = PKCS11EllipticCurvePrivateKey( + session=session, + key_id=key_id, + key_label=key_label, + pkcs11_private_key=pkcs11_private_key, + pkcs11_public_key=pkcs11_public_key, + ) + public_key = private_key.public_key() + else: + raise ValueError(f"{key_type}: unknown key type") + + ca.key_backend_options = {"key_id": key_id, "key_label": key_label, "key_type": key_type} + use_private_key_options = UsePrivateKeyOptions.model_validate( + {"so_pin": options.so_pin, "user_pin": options.user_pin}, context={"ca": ca} + ) + + return public_key, use_private_key_options + + def store_private_key( + self, + ca: "CertificateAuthority", + key: CertificateIssuerPrivateKeyTypes, + options: StorePrivateKeyOptions, + ) -> None: + key_id = int_to_hex(x509.random_serial_number()) + if isinstance(key, rsa.RSAPrivateKey): + key_type = "RSA" + key_der = key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + attrs = decode_rsa_private_key(key_der) + else: + raise ValueError(f"{key}: Importing key of this type is not supported.") + + attrs[pkcs11.Attribute.ID] = key_id.encode() + attrs[pkcs11.Attribute.LABEL] = options.key_label + + with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: + session.create_object(attrs) + + ca.key_backend_options = {"key_id": key_id, "key_label": options.key_label, "key_type": key_type} + + def sign_certificate( + self, + ca: "CertificateAuthority", + use_private_key_options: UsePrivateKeyOptions, + public_key: CertificateIssuerPublicKeyTypes, + serial: int, + algorithm: Optional[AllowedHashTypes], + issuer: x509.Name, + subject: x509.Name, + expires: datetime, + extensions: Sequence[CertificateExtension], + ) -> x509.Certificate: + builder = get_cert_builder(expires, serial=serial) + builder = builder.public_key(public_key) + builder = builder.issuer_name(issuer) + builder = builder.subject_name(subject) + for extension in extensions: + builder = builder.add_extension(extension.value, critical=extension.critical) + + with self.session( + so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin + ) as session: + private_key = self._get_private_key(ca, session) + return builder.sign(private_key=private_key, algorithm=algorithm) + + def sign_certificate_revocation_list( + self, + ca: "CertificateAuthority", + use_private_key_options: UsePrivateKeyOptions, + builder: x509.CertificateRevocationListBuilder, + algorithm: Optional[AllowedHashTypes], + ) -> x509.CertificateRevocationList: + with self.session( + so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin + ) as session: + private_key = self._get_private_key(ca, session) + return builder.sign(private_key=private_key, algorithm=algorithm) diff --git a/ca/django_ca/key_backends/hsm/keys.py b/ca/django_ca/key_backends/hsm/keys.py new file mode 100644 index 000000000..8029c8908 --- /dev/null +++ b/ca/django_ca/key_backends/hsm/keys.py @@ -0,0 +1,231 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Private key implementations that use an HSM in the background.""" + +from hashlib import sha256, sha384, sha512 +from typing import ClassVar, Generic, NoReturn, Optional, TypeVar, Union, cast + +import pkcs11 +from pkcs11 import Session +from pkcs11.constants import Attribute +from pkcs11.util.ec import encode_ec_public_key +from pkcs11.util.rsa import encode_rsa_public_key + +from asn1crypto.core import OctetString +from asn1crypto.keys import PublicKeyInfo +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, ed448, ed25519, rsa, utils as asym_utils +from cryptography.hazmat.primitives.asymmetric.padding import PSS, AsymmetricPadding, PKCS1v15 +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey +from cryptography.hazmat.primitives.serialization import load_der_public_key + +EdwardsPublicKeyTypeVar = TypeVar("EdwardsPublicKeyTypeVar", ed448.Ed448PublicKey, ed25519.Ed25519PublicKey) + + +class PKCS11PrivateKeyMixin: + """Mixin providing common functionality to PKCS11 key implementations.""" + + # pylint: disable=missing-function-docstring # implements standard functions of the base class. + + key_type: pkcs11.KeyType + + def __init__( + self, + session: Session, + key_id: str, + key_label: str, + pkcs11_private_key: Optional[pkcs11.PrivateKey] = None, + pkcs11_public_key: Optional[pkcs11.PublicKey] = None, + ) -> None: + self._session = session + self._key_id = key_id.encode() + self._key_label = key_label + self._pkcs11_private_key = pkcs11_private_key + self._pkcs11_public_key = pkcs11_public_key + + super().__init__() + + def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes: + raise NotImplementedError( + "Decryption is not implemented for keys stored in a hardware security module (HSM)." + ) + + def private_bytes( + self, + encoding: serialization.Encoding, + format: serialization.PrivateFormat, # pylint: disable=redefined-builtin # given by cryptography + encryption_algorithm: serialization.KeySerializationEncryption, + ) -> bytes: + raise NotImplementedError( + "Private bytes cannot be retrieved for keys stored in a hardware security module (HSM)." + ) + + def private_numbers(self) -> NoReturn: + raise NotImplementedError( + "Private numbers cannot be retrieved for keys stored in a hardware security module (HSM)." + ) + + @property + def pkcs11_private_key(self) -> pkcs11.PrivateKey: + if self._pkcs11_private_key is None: + self._pkcs11_private_key = self._session.get_key( + key_type=self.key_type, + object_class=pkcs11.ObjectClass.PRIVATE_KEY, + id=self._key_id, + label=self._key_label, + ) + return self._pkcs11_private_key + + @property + def pkcs11_public_key(self) -> pkcs11.PublicKey: + if self._pkcs11_public_key is None: + self._pkcs11_public_key = self._session.get_key( + key_type=self.key_type, + object_class=pkcs11.ObjectClass.PUBLIC_KEY, + id=self._key_id, + label=self._key_label, + ) + return self._pkcs11_public_key + + +class PKCS11EdwardsPrivateKeyMixin(PKCS11PrivateKeyMixin, Generic[EdwardsPublicKeyTypeVar]): + """Specialized mixin for Ed448 and Ed5519 keys (which handle identical).""" + + # pylint: disable=missing-function-docstring # implements standard functions of the base class. + + public_key_algorithm: ClassVar[str] + key_type = pkcs11.KeyType.EC_EDWARDS + cryptograph_key_type: EdwardsPublicKeyTypeVar + + def private_bytes_raw(self) -> bytes: + raise NotImplementedError( + "Private bytes cannot be retrieved for keys stored in a hardware security module (HSM)." + ) + + def public_key(self) -> EdwardsPublicKeyTypeVar: + ec_point = bytes(OctetString.load(self.pkcs11_public_key[Attribute.EC_POINT])) + value = {"algorithm": {"algorithm": self.public_key_algorithm}, "public_key": ec_point} + public_key: bytes = PublicKeyInfo(value).dump() + + return cast(EdwardsPublicKeyTypeVar, load_der_public_key(public_key)) + + def sign(self, data: bytes) -> bytes: + return self.pkcs11_private_key.sign(data, mechanism=pkcs11.Mechanism.EDDSA) # type: ignore[no-any-return] + + +# pylint: disable-next=abstract-method # private key functions are deliberately not implemented in base. +class PKCS11RSAPrivateKey(PKCS11PrivateKeyMixin, rsa.RSAPrivateKey): + """Private key implementation for RSA keys stored in a HSM.""" + + key_type = pkcs11.KeyType.RSA + + def public_key(self) -> RSAPublicKey: + der_public_key = encode_rsa_public_key(self.pkcs11_public_key) + return cast(RSAPublicKey, load_der_public_key(der_public_key)) + + @property + def key_size(self) -> int: + return self.public_key().key_size + + def sign( + self, + data: bytes, + padding: AsymmetricPadding, + algorithm: Union[asym_utils.Prehashed, hashes.HashAlgorithm], + ) -> bytes: + if isinstance(algorithm, asym_utils.Prehashed): + raise ValueError("Prehashed operations are not supported.") + + if isinstance(algorithm, hashes.SHA224) and isinstance(padding, PSS): + mechanism: hashes.HashAlgorithm = pkcs11.Mechanism.SHA224_RSA_PKCS_PSS + elif isinstance(algorithm, hashes.SHA224) and isinstance(padding, PKCS1v15): + mechanism = pkcs11.Mechanism.SHA224_RSA_PKCS + elif isinstance(algorithm, hashes.SHA256) and isinstance(padding, PSS): + mechanism = pkcs11.Mechanism.SHA256_RSA_PKCS_PSS + elif isinstance(algorithm, hashes.SHA256) and isinstance(padding, PKCS1v15): + mechanism = pkcs11.Mechanism.SHA256_RSA_PKCS + elif isinstance(algorithm, hashes.SHA384) and isinstance(padding, PSS): + mechanism = pkcs11.Mechanism.SHA384_RSA_PKCS_PSS + elif isinstance(algorithm, hashes.SHA384) and isinstance(padding, PKCS1v15): + mechanism = pkcs11.Mechanism.SHA384_RSA_PKCS + elif isinstance(algorithm, hashes.SHA512) and isinstance(padding, PSS): + mechanism = pkcs11.Mechanism.SHA512_RSA_PKCS_PSS + elif isinstance(algorithm, hashes.SHA512) and isinstance(padding, PKCS1v15): + mechanism = pkcs11.Mechanism.SHA512_RSA_PKCS + elif isinstance(algorithm, (hashes.SHA3_224, hashes.SHA3_384, hashes.SHA3_256, hashes.SHA3_512)): + raise ValueError("SHA3 is not support by the HSM backend.") + else: + raise ValueError( + f"{algorithm.name} with {padding.name} padding: Unknown signing algorithm and padding" + ) + + # TYPEHINT NOTE: library is not type-hinted. + return self.pkcs11_private_key.sign(data, mechanism=mechanism) # type: ignore[no-any-return] + + +class PKCS11EllipticCurvePrivateKey(PKCS11PrivateKeyMixin, ec.EllipticCurvePrivateKey): + """Private key implementation for EC keys stored in a HSM.""" + + key_type = pkcs11.KeyType.EC + + @property + def curve(self) -> ec.EllipticCurve: + return self.public_key().curve + + def exchange(self, algorithm: ec.ECDH, peer_public_key: ec.EllipticCurvePublicKey) -> bytes: + raise NotImplementedError("Operation not implemented for a key stored in an HSM.") + + @property + def key_size(self) -> int: + return self.public_key().key_size + + def public_key(self) -> ec.EllipticCurvePublicKey: + public_key = encode_ec_public_key(self.pkcs11_public_key) + return cast(ec.EllipticCurvePublicKey, load_der_public_key(public_key)) + + def sign(self, data: bytes, signature_algorithm: ec.EllipticCurveSignatureAlgorithm) -> bytes: + if isinstance(signature_algorithm.algorithm, hashes.SHA256): + hasher = sha256() + elif isinstance(signature_algorithm.algorithm, hashes.SHA384): + hasher = sha384() + elif isinstance(signature_algorithm.algorithm, hashes.SHA512): + hasher = sha512() + else: + raise ValueError(f"{signature_algorithm.algorithm}: Signature algorithm is not supported.") + + hasher.update(data) + data = hasher.digest() + + return self.pkcs11_private_key.sign(data, mechanism=pkcs11.Mechanism.ECDSA) # type: ignore[no-any-return] + + +# pylint: disable-next=abstract-method # private key functions are deliberately not implemented in base. +class PKCS11Ed25519PrivateKey( + PKCS11EdwardsPrivateKeyMixin[ed25519.Ed25519PublicKey], ed25519.Ed25519PrivateKey +): + """Private key implementation for Ed25519 keys stored in a HSM.""" + + public_key_algorithm = "ed25519" + + +# pylint: disable-next=abstract-method # private key functions are deliberately not implemented in base. +class PKCS11Ed448PrivateKey(PKCS11EdwardsPrivateKeyMixin[ed448.Ed448PublicKey], ed448.Ed448PrivateKey): + """Private key implementation for Ed448 keys stored in a HSM.""" + + public_key_algorithm = "ed448" + + +PKCS11PrivateKeyTypes = Union[ + PKCS11RSAPrivateKey, PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey, PKCS11EllipticCurvePrivateKey +] diff --git a/ca/django_ca/key_backends/hsm/models.py b/ca/django_ca/key_backends/hsm/models.py new file mode 100644 index 000000000..2e393b4a5 --- /dev/null +++ b/ca/django_ca/key_backends/hsm/models.py @@ -0,0 +1,65 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Models used by the HSM backend.""" + +import typing +from typing import Optional + +from pydantic import BaseModel, ConfigDict, model_validator + +from django_ca.key_backends.base import CreatePrivateKeyOptionsBaseModel +from django_ca.key_backends.hsm.typehints import SupportedKeyType +from django_ca.typehints import EllipticCurves + + +class PinModelMixin: + """Mixin providing so/user pin and validation.""" + + so_pin: Optional[str] = None + user_pin: Optional[str] = None + + @model_validator(mode="after") + def validate_pins(self) -> "typing.Self": + """Validate that exactly one of `so_pin` and `user_pin` is set.""" + if self.so_pin is None and self.user_pin is None: + raise ValueError("Provide one of so_pin or user_pin.") + if self.so_pin is not None and self.user_pin is not None: + raise ValueError("Provide either so_pin or user_pin.") + return self + + +class CreatePrivateKeyOptions(PinModelMixin, CreatePrivateKeyOptionsBaseModel): + """Options for initializing private keys.""" + + # NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable. + model_config = ConfigDict(arbitrary_types_allowed=True) + + key_label: str + key_type: SupportedKeyType # overwrites field from the base model + elliptic_curve: Optional[EllipticCurves] + + +class StorePrivateKeyOptions(PinModelMixin, BaseModel): + """Options for storing a private key.""" + + # NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable. + model_config = ConfigDict(frozen=True) + + key_label: str + + +class UsePrivateKeyOptions(PinModelMixin, BaseModel): + """Options for using the private key.""" + + model_config = ConfigDict(frozen=True) diff --git a/ca/django_ca/key_backends/hsm/session.py b/ca/django_ca/key_backends/hsm/session.py new file mode 100644 index 000000000..17b170662 --- /dev/null +++ b/ca/django_ca/key_backends/hsm/session.py @@ -0,0 +1,118 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Code for handling sessions for hardware security modules.""" + +import threading +from types import TracebackType +from typing import Final, Optional + +import pkcs11 +from pkcs11 import Session +from pkcs11._pkcs11 import lib as pkcs11_lib + +PoolKeyType = tuple[str, str, Optional[str], Optional[str]] + + +class SessionPool: + """Thread-safe session pool for PKCS11 sessions.""" + + _lib_lock: Final[threading.Lock] = threading.Lock() + _lib_pool: Final[dict[str, pkcs11_lib]] = {} + + _session_lock: Final[threading.Lock] = threading.Lock() + _session_pool: Final[dict[PoolKeyType, Session]] = {} + _session_refcount: Final[dict[PoolKeyType, int]] = {} + + path: Final[str] + token_label: Final[str] + so_pin: Final[Optional[str]] + user_pin: Final[Optional[str]] + rw: Final[bool] + + def __init__( + self, path: str, token_label: str, so_pin: Optional[str], user_pin: Optional[str], rw: bool = False + ) -> None: + if so_pin is None and user_pin is None: + raise ValueError("so_pin and user_pin cannot both be None.") + if so_pin is not None and user_pin is not None: + raise ValueError("Either so_pin and user_pin must be set.") + + self.path = path + self.token_label = token_label + self.so_pin = so_pin + self.user_pin = user_pin + self.rw = rw + + @classmethod + def acquire( + cls, + path: str, + token_label: str, + so_pin: Optional[str] = None, + user_pin: Optional[str] = None, + rw: bool = False, + ) -> Session: + """Open a new session with the given parameters.""" + with cls._lib_lock: + if path not in cls._lib_pool: + cls._lib_pool[path] = pkcs11.lib(path) + + with cls._session_lock: + pool_key = (path, token_label, so_pin, user_pin) + if pool_key not in cls._session_pool: + token = cls._lib_pool[path].get_token(token_label=token_label) + cls._session_pool[pool_key] = token.open(rw=rw, so_pin=so_pin, user_pin=user_pin) + cls._session_refcount[pool_key] = 1 + else: + # Request a read/write session, but a read-only session is already present. According to the + # PKCS11 documentation, some libraries don't allow multiple sessions for the same token per + # process: + # + # https://python-pkcs11.readthedocs.io/en/latest/concurrency.html + # + # Note that this does not happen in practice. A read/write session is only requested when + # generating or importing keys, and this always runs on the command-line, where no other + # session is ever present. + if rw is True and cls._session_pool[pool_key].rw is False: + raise ValueError("Requested R/W session, but R/O session is already initialized.") + + # Session is already open, just increase the refcount + cls._session_refcount[pool_key] += 1 + + return cls._session_pool[pool_key] + + @classmethod + def release(cls, path: str, token_label: str, so_pin: Optional[str], user_pin: Optional[str]) -> None: + """Close session if no reference is known.""" + with cls._session_lock: + pool_key = (path, token_label, so_pin, user_pin) + cls._session_refcount[pool_key] -= 1 + + if cls._session_refcount[pool_key] == 0: + cls._session_pool[pool_key].close() + del cls._session_pool[pool_key] + del cls._session_refcount[pool_key] + + def __enter__(self) -> Session: + return self.acquire( + self.path, self.token_label, so_pin=self.so_pin, user_pin=self.user_pin, rw=self.rw + ) + + def __exit__( + self, + exc_type: Optional[type[BaseException]], + value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + self.release(self.path, self.token_label, so_pin=self.so_pin, user_pin=self.user_pin) diff --git a/ca/django_ca/key_backends/hsm/typehints.py b/ca/django_ca/key_backends/hsm/typehints.py new file mode 100644 index 000000000..208d6ce0d --- /dev/null +++ b/ca/django_ca/key_backends/hsm/typehints.py @@ -0,0 +1,18 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Type aliases used in HSM code.""" + +from typing import Literal + +SupportedKeyType = Literal["RSA", "EC", "Ed25519", "Ed448"] diff --git a/ca/django_ca/key_backends/storages.py b/ca/django_ca/key_backends/storages.py index f962f438f..2a1ed45e3 100644 --- a/ca/django_ca/key_backends/storages.py +++ b/ca/django_ca/key_backends/storages.py @@ -102,7 +102,7 @@ def load_default_password(cls, password: Optional[bytes], info: ValidationInfo) """Validator to load the password from CA_PASSWORDS if not given.""" if info.context and password is None: ca: CertificateAuthority = info.context.get("ca") - if ca is not None: + if ca is not None: # pragma: no branch # ca is always set, this is just a precaution. if settings_password := model_settings.CA_PASSWORDS.get(ca.serial): return settings_password @@ -225,7 +225,7 @@ def get_store_private_key_options(self, options: dict[str, Any]) -> StorePrivate ) def get_use_private_key_options( - self, ca: Optional["CertificateAuthority"], options: dict[str, Any] + self, ca: "CertificateAuthority", options: dict[str, Any] ) -> UsePrivateKeyOptions: return UsePrivateKeyOptions.model_validate( {"password": options.get(f"{self.options_prefix}password")}, context={"ca": ca}, strict=True diff --git a/ca/django_ca/management/commands/init_ca.py b/ca/django_ca/management/commands/init_ca.py index 1c838244c..492ac0f25 100644 --- a/ca/django_ca/management/commands/init_ca.py +++ b/ca/django_ca/management/commands/init_ca.py @@ -16,6 +16,7 @@ .. seealso:: https://docs.djangoproject.com/en/dev/howto/custom-management-commands/ """ +import logging from collections.abc import Iterable from datetime import datetime, timedelta, timezone as tz from typing import Any, Optional @@ -359,14 +360,12 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915 key_backend_options = key_backend.get_create_private_key_options( key_type, key_size, elliptic_curve=elliptic_curve, options=options ) - load_key_backend_options = key_backend.get_use_private_key_options(None, options) # If there is a parent CA, test if we can use it (here) to sign certificates. The most common case # where this happens is if the key is stored on the filesystem, but only accessible to the Celery # worker and the current process is on the webserver side. - if parent is None: - signer_key_backend_options = load_key_backend_options - else: + signer_key_backend_options = None + if parent is not None: signer_key_backend_options = parent.key_backend.get_use_parent_private_key_options( parent, options ) @@ -374,10 +373,13 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915 # Check if the parent key is usable parent.check_usable(signer_key_backend_options) except ValidationError as ex: + logging.exception(ex) self.validation_error_to_command_error(ex) - except CommandError: # pragma: no cover + except CommandError as ex: # pragma: no cover + logging.exception(ex) raise except Exception as ex: # pragma: no cover + logging.exception(ex) raise CommandError(*ex.args) from ex # Get/validate signature hash algorithm @@ -569,7 +571,14 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915 ocsp_responder_key_validity=ocsp_responder_key_validity, **kwargs, ) + + load_key_backend_options = key_backend.get_use_private_key_options(ca, options) + except ValidationError as ex: + self.validation_error_to_command_error(ex) + except CommandError: # pragma: no cover + raise except Exception as ex: # pragma: no cover + logging.exception(ex) raise CommandError(ex) from ex # Generate OCSP keys and cache CRLs diff --git a/ca/django_ca/tests/base/fixtures.py b/ca/django_ca/tests/base/fixtures.py index acb9fea6e..3ea1653d2 100644 --- a/ca/django_ca/tests/base/fixtures.py +++ b/ca/django_ca/tests/base/fixtures.py @@ -17,12 +17,15 @@ import copy import os +import subprocess +import time from collections.abc import Iterator from pathlib import Path from cryptography import x509 from cryptography.x509.oid import CertificatePoliciesOID, ExtensionOID, NameOID +from django.conf import settings from django.core.cache import cache from django.core.files.storage import storages @@ -250,6 +253,24 @@ def signed_certificate_timestamps_pub( yield request.getfixturevalue(f"{name}_pub") +@pytest.fixture(scope="session") +def softhsm_token() -> Iterator[str]: + """Fixture to create a new token in SoftHSM.""" + token = settings.PKCS11_TOKEN_LABEL + so_pin = settings.PKCS11_SO_PIN + pin = settings.PKCS11_USER_PIN + subprocess.run( + ["softhsm2-util", "--init-token", "--free", "--label", token, "--so-pin", so_pin, "--pin", pin], + check=True, + ) + # There might be a synchronization issue with the softhsm token. + time.sleep(0.1) + + yield token + + subprocess.run(["softhsm2-util", "--delete-token", "--token", token], check=True) + + @pytest.fixture() def subject(hostname: str) -> Iterator[x509.Name]: """Fixture for a :py:class:`~cg:cryptography.x509.Name` to use for a subject. diff --git a/ca/django_ca/tests/base/utils.py b/ca/django_ca/tests/base/utils.py index d52ea2ca7..0cfd8a4f8 100644 --- a/ca/django_ca/tests/base/utils.py +++ b/ca/django_ca/tests/base/utils.py @@ -98,9 +98,7 @@ def create_private_key( ) -> tuple[CertificateIssuerPublicKeyTypes, DummyModel]: return None, DummyModel() # type: ignore[return-value] - def get_use_private_key_options( - self, ca: Optional[CertificateAuthority], options: dict[str, Any] - ) -> DummyModel: + def get_use_private_key_options(self, ca: CertificateAuthority, options: dict[str, Any]) -> DummyModel: return DummyModel() def is_usable( diff --git a/ca/django_ca/tests/commands/test_init_ca.py b/ca/django_ca/tests/commands/test_init_ca.py index e3e61e843..71a025b93 100644 --- a/ca/django_ca/tests/commands/test_init_ca.py +++ b/ca/django_ca/tests/commands/test_init_ca.py @@ -22,10 +22,12 @@ from cryptography import x509 from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import dsa, ec -from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey from cryptography.hazmat.primitives.serialization import Encoding from cryptography.x509.oid import AuthorityInformationAccessOID, ExtensionOID, NameOID +from django.conf import settings from django.core.cache import cache from django.urls import reverse from django.utils import timezone @@ -33,9 +35,12 @@ import pytest from pytest_django.fixtures import SettingsWrapper +from django_ca import constants from django_ca.conf import model_settings from django_ca.constants import ExtendedKeyUsageOID from django_ca.key_backends import key_backends +from django_ca.key_backends.hsm import HSMBackend +from django_ca.key_backends.hsm.models import UsePrivateKeyOptions as HSMUsePrivateKeyOptions from django_ca.key_backends.storages import StoragesBackend, UsePrivateKeyOptions from django_ca.models import Certificate, CertificateAuthority from django_ca.signals import post_create_ca @@ -72,10 +77,13 @@ subject_alternative_name, uri, ) +from django_ca.typehints import EllipticCurves, ParsableKeyType from django_ca.utils import get_crl_cache_key use_options = UsePrivateKeyOptions(password=None) +# pylint: disable=redefined-outer-name # disabled because of fixtures + def assert_post_create_ca(post: mock.Mock, ca: CertificateAuthority) -> None: """Assert that the post_create_ca signal was called.""" @@ -155,8 +163,8 @@ def init_ca_e2e( with assert_create_ca_signals() as (_pre, post): out, err = cmd_e2e(["init_ca", name, subject, *args]) - assert out == "" - assert err == "" + assert out == "" + assert err == "" ca = CertificateAuthority.objects.get(name=name) assert_post_create_ca(post, ca) @@ -1058,6 +1066,179 @@ def test_non_default_key_backend_with_ec_key( assert isinstance(key.curve, ec.SECT571R1) +@pytest.mark.django_db +@pytest.mark.usefixtures("tmpcadir") +@pytest.mark.usefixtures("softhsm_token") +@pytest.mark.parametrize("key_type", HSMBackend.supported_key_types) +def test_hsm_backend(ca_name: str, rfc4514_subject: str, key_type: ParsableKeyType) -> None: + """Basic test for creating a key in the HSM.""" + ca = init_ca_e2e( + ca_name, + rfc4514_subject, + "--subject-format=rfc4514", + f"--key-type={key_type}", + "--key-backend=hsm", + f"--hsm-key-label={ca_name}", + ) + + assert ca.key_backend_alias == "hsm" + assert ca.key_backend.is_usable(ca, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN)) + assert ca.key_type == key_type + assert ca.key_backend_options["key_label"] == ca_name + assert ca.key_backend_options["key_type"] == key_type + assert isinstance(ca.pub.loaded, x509.Certificate) + assert isinstance(ca.pub.loaded.public_key(), constants.PUBLIC_KEY_TYPE_MAPPING[key_type]) + + +@pytest.mark.django_db +@pytest.mark.usefixtures("tmpcadir") +@pytest.mark.usefixtures("softhsm_token") +def test_hsm_backend_with_rsa_options(ca_name: str, rfc4514_subject: str) -> None: + """Basic test for creating a key in the HSM.""" + assert settings.CA_MIN_KEY_SIZE == 1024 # assert initial state + ca = init_ca_e2e( + ca_name, + rfc4514_subject, + "--subject-format=rfc4514", + "--key-type=RSA", + "--key-backend=hsm", + f"--hsm-key-label={ca_name}", + "--key-size=2048", # RSA specific option + ) + + assert ca.key_backend_alias == "hsm" + assert ca.key_backend.is_usable(ca, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN)) + assert ca.key_type == "RSA" + assert ca.key_backend_options["key_label"] == ca_name + assert ca.key_backend_options["key_type"] == "RSA" + assert isinstance(ca.pub.loaded, x509.Certificate) + public_key = ca.pub.loaded.public_key() + assert isinstance(public_key, RSAPublicKey) + assert public_key.key_size == 2048 + + +@pytest.mark.django_db +@pytest.mark.usefixtures("tmpcadir") +@pytest.mark.usefixtures("softhsm_token") +@pytest.mark.parametrize("ec_curve", HSMBackend.supported_elliptic_curves) +def test_hsm_backend_with_ec_options(ca_name: str, ec_curve: EllipticCurves) -> None: + """Basic test for creating a key in the HSM.""" + ca = init_ca( + ca_name, + key_type="EC", + key_backend=key_backends["hsm"], + hsm_key_label=ca_name, + elliptic_curve=ec_curve, + ) + + assert ca.key_backend_alias == "hsm" + assert ca.key_backend.is_usable(ca, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN)) + assert ca.key_type == "EC" + assert ca.key_backend_options["key_label"] == ca_name + assert ca.key_backend_options["key_type"] == "EC" + assert isinstance(ca.pub.loaded, x509.Certificate) + public_key = ca.pub.loaded.public_key() + assert isinstance(public_key, EllipticCurvePublicKey) + + # TYPEHINT NOTE: seems to be a false positive + assert public_key.key_size == constants.ELLIPTIC_CURVE_TYPES[ec_curve].key_size # type: ignore[comparison-overlap] + assert public_key.curve.name == ec_curve + + +@pytest.mark.django_db +@pytest.mark.usefixtures("tmpcadir") +@pytest.mark.usefixtures("softhsm_token") +def test_hsm_backend_with_child_ca(ca_name: str) -> None: + """Basic test for creating a key in the HSM.""" + parent_name = f"{ca_name}_parent" + parent = init_ca(parent_name, key_backend=key_backends["hsm"], hsm_key_label=parent_name, path_length=1) + + assert parent.key_backend_alias == "hsm" + assert parent.key_backend.is_usable(parent, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN)) + assert parent.key_type == "RSA" + assert parent.key_backend_options["key_label"] == parent_name + assert parent.key_backend_options["key_type"] == "RSA" + assert isinstance(parent.pub.loaded, x509.Certificate) + assert isinstance(parent.pub.loaded.public_key(), RSAPublicKey) + + # Create a child CA in the HSM + child_name = f"{ca_name}_child" + child = init_ca(child_name, parent=parent, key_backend=key_backends["hsm"], hsm_key_label=child_name) + assert child.key_backend_alias == "hsm" + assert child.key_backend.is_usable(child, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN)) + assert child.key_type == "RSA" + assert child.key_backend_options["key_label"] == child_name + assert child.key_backend_options["key_type"] == "RSA" + assert isinstance(child.pub.loaded, x509.Certificate) + assert isinstance(child.pub.loaded.public_key(), RSAPublicKey) + assert child.parent == parent + + +def test_hsm_backend_without_key_label(ca_name: str) -> None: + """Test error when --key-label option is missing.""" + with assert_command_error(r"^--hsm-key-label is a required option for this key backend\.$"): + init_ca(name=ca_name, key_backend=key_backends["hsm"]) + + +def test_hsm_backend_with_both_pins(ca_name: str) -> None: + """Test error when --key-label option is missing.""" + with assert_command_error( + r'^Both SO pin and user pin configured\. To override a pin from settings, pass --hsm-so-pin="" or ' + r'--hsm-user-pin=""\.$' + ): + init_ca( + name=ca_name, + key_backend=key_backends["hsm"], + hsm_key_label=ca_name, + hsm_so_pin="so-pin", + hsm_user_pin="user-pin", + ) + + +@pytest.mark.django_db +@pytest.mark.usefixtures("tmpcadir") +@pytest.mark.usefixtures("softhsm_token") +def test_hsm_backend_with_both_parent_pins(ca_name: str) -> None: + """Test error when --key-label option is missing.""" + parent_name = f"{ca_name}_parent" + parent = init_ca(parent_name, key_backend=key_backends["hsm"], hsm_key_label=parent_name, path_length=1) + + with assert_command_error( + r"^Both SO pin and user pin configured\. To override a pin from settings, pass " + r'--hsm-parent-so-pin="" or --hsm-parent-user-pin=""\.$' + ): + init_ca( + name=ca_name, + parent=parent, + key_backend=key_backends["hsm"], + hsm_key_label=ca_name, + hsm_parent_so_pin="so-pin", + hsm_parent_user_pin="user-pin", + ) + + +@pytest.mark.django_db +@pytest.mark.usefixtures("tmpcadir") +@pytest.mark.usefixtures("softhsm_token") +def test_hsm_backend_with_no_parent_pins(ca_name: str) -> None: + """Test error when --key-label option is missing.""" + parent_name = f"{ca_name}_parent" + parent = init_ca(parent_name, key_backend=key_backends["hsm"], hsm_key_label=parent_name, path_length=1) + + with assert_command_error( + r"^No SO or user pin configured\. Use --hsm-parent-so-pin or --hsm-parent-user-pin to specify " + r"a pin\.$" + ): + init_ca( + name=ca_name, + parent=parent, + key_backend=key_backends["hsm"], + hsm_key_label=ca_name, + hsm_parent_so_pin="", + hsm_parent_user_pin="", + ) + + def test_invalid_public_key_parameters(ca_name: str) -> None: """Test passing invalid public key parameters.""" msg = r"^Ed25519 keys do not allow an algorithm for signing\.$" diff --git a/ca/django_ca/tests/key_backends/hsm/__init__.py b/ca/django_ca/tests/key_backends/hsm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ca/django_ca/tests/key_backends/hsm/conftest.py b/ca/django_ca/tests/key_backends/hsm/conftest.py new file mode 100644 index 000000000..f574b0710 --- /dev/null +++ b/ca/django_ca/tests/key_backends/hsm/conftest.py @@ -0,0 +1,61 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Fixtures for HSM testing.""" + +from collections.abc import Iterator + +from pkcs11._pkcs11 import Session + +from django.conf import settings + +import pytest + +from django_ca.key_backends.hsm import HSMBackend +from django_ca.key_backends.hsm.session import SessionPool + +# pylint: disable=redefined-outer-name # usefixtures does not work on fixtures. + + +@pytest.fixture +def session_pool() -> Iterator[None]: + """Get a clean session pool.""" + # Reinitialize the library, so that any token created before is also visible (SoftHSM only sees token + # present at initialization time). + # pylint: disable=use-implicit-booleaness-not-comparison + # pylint: disable=protected-access # deliberate access in this entire function + for lib in SessionPool._lib_pool.values(): + lib.reinitialize() + + # Assert that the session pool *is* clean. Any test leaving sessions intact would show up here. + assert SessionPool._session_pool == {} + assert SessionPool._session_refcount == {} + + yield + + # Make sure that we leave no open sessions. + assert SessionPool._session_pool == {} + assert SessionPool._session_refcount == {} + + +@pytest.fixture +def session(softhsm_token: str, session_pool: None) -> Session: # pylint: disable=unused-argument + """Fixture providing a fresh (read-only) session.""" + with SessionPool(settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN) as session: + yield session + + +@pytest.fixture +def hsm_backend(softhsm_token: str) -> Iterator[HSMBackend]: + """Fixture to retrievea HSM backend instance.""" + yield HSMBackend("hsm", settings.PKCS11_PATH, softhsm_token, user_pin=settings.PKCS11_USER_PIN) diff --git a/ca/django_ca/tests/key_backends/hsm/test_backend.py b/ca/django_ca/tests/key_backends/hsm/test_backend.py new file mode 100644 index 000000000..9d1ab9282 --- /dev/null +++ b/ca/django_ca/tests/key_backends/hsm/test_backend.py @@ -0,0 +1,131 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Main tests for the HSM backend.""" + +from django.conf import settings + +import pytest + +from django_ca.key_backends import key_backends +from django_ca.key_backends.hsm import HSMBackend +from django_ca.key_backends.hsm.models import CreatePrivateKeyOptions, UsePrivateKeyOptions +from django_ca.models import CertificateAuthority + +use_options = UsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN) + + +def test_invalid_pin_configuration() -> None: + """Test validation of so_pin/user_pin.""" + with pytest.raises(ValueError, match=r"^test: Set either so_pin or user_pin\.$"): + HSMBackend("test", "/path", "token", so_pin="so-pin", user_pin="user-pin") + + +@pytest.mark.usefixtures("softhsm_token") +def test_invalid_private_key_type(root: CertificateAuthority) -> None: + """Test an invalid private key.""" + root.key_backend_alias = "hsm" + root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "WRONG"} + root.save() + with pytest.raises(ValueError, match=r"^WRONG: Unsupported key type\.$"): + root.check_usable(use_options) + + +def test_is_usable_with_no_options(root: CertificateAuthority) -> None: + """Test that is_usable() returns True if no options are passed.""" + root.key_backend_alias = "hsm" + root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "RSA"} + root.save() + assert root.is_usable() is True + assert root.is_usable(None) is True + + +@pytest.mark.usefixtures("softhsm_token") +def test_is_usable_with_wrong_user_pin(root: CertificateAuthority) -> None: + """Test that is_usable() returns False if the wrong pin is passed.""" + root.key_backend_alias = "hsm" + root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "RSA"} + root.save() + assert root.is_usable(UsePrivateKeyOptions(user_pin="wrong")) is False + + +def test_no_private_key_options(root: CertificateAuthority) -> None: + """Test ...usable() with empty private key options.""" + root.key_backend_alias = "hsm" + root.key_backend_options = {} + root.save() + assert root.is_usable(use_options) is False + with pytest.raises(ValueError, match=r"^key backend options are not defined\.$"): + root.check_usable(use_options) + + +def test_private_key_options_not_a_dict(root: CertificateAuthority) -> None: + """Test ...usable() with private key options that are not a dict.""" + root.key_backend_alias = "hsm" + root.key_backend_options = [] + root.save() + assert root.is_usable(use_options) is False + with pytest.raises(ValueError, match=r"^key backend options are not defined\.$"): + root.check_usable(use_options) + + +# pylint: disable-next=protected-access # okay in test cases +@pytest.mark.parametrize("parameter", HSMBackend._required_key_backend_options) +def test_private_key_options_missing_parameter(root: CertificateAuthority, parameter: str) -> None: + """Test ...usable() with private key options that are missing a value.""" + root.key_backend_alias = "hsm" + root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "RSA"} + del root.key_backend_options[parameter] + root.save() + assert root.is_usable(use_options) is False + + with pytest.raises(ValueError, match=rf"^{parameter}: Required key option is not defined\.$"): + root.check_usable(use_options) + + +@pytest.mark.usefixtures("softhsm_token") +@pytest.mark.parametrize("key_type", HSMBackend.supported_key_types) +def test_create_private_key_with_read_only_session( + root: CertificateAuthority, ca_name: str, key_type: str +) -> None: + """Test creating a private key if a read-only session is already open.""" + root.key_backend_alias = "hsm" + root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "RSA"} + root.save() + + options = CreatePrivateKeyOptions( + key_type=key_type, key_label=ca_name, elliptic_curve=None, user_pin=settings.PKCS11_USER_PIN + ) + + backend: HSMBackend = key_backends["hsm"] # type: ignore[assignment] + with pytest.raises( + ValueError, match=r"^Requested R/W session, but R/O session is already initialized\.$" + ): + with backend.session(so_pin=None, user_pin=settings.PKCS11_USER_PIN): + backend.create_private_key(root, options.key_type, options) + + +@pytest.mark.usefixtures("softhsm_token") +def test_create_private_key_with_unknown_key_type(root: CertificateAuthority, ca_name: str) -> None: + """Test creating a private key if a read-only session is already open.""" + root.key_backend_alias = "hsm" + root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "WRONG"} + root.save() + + options = CreatePrivateKeyOptions( + key_type="RSA", key_label=ca_name, elliptic_curve=None, user_pin=settings.PKCS11_USER_PIN + ) + + backend: HSMBackend = key_backends["hsm"] # type: ignore[assignment] + with pytest.raises(ValueError, match=r"^WRONG: unknown key type$"): + backend.create_private_key(root, "WRONG", options) # type: ignore[arg-type] # what we test diff --git a/ca/django_ca/tests/key_backends/hsm/test_keys.py b/ca/django_ca/tests/key_backends/hsm/test_keys.py new file mode 100644 index 000000000..d711f2cd3 --- /dev/null +++ b/ca/django_ca/tests/key_backends/hsm/test_keys.py @@ -0,0 +1,68 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Tests for models used in the HSM key backend.""" + +from typing import Union + +from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 +from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, PrivateFormat + +import pytest + +from django_ca.key_backends.hsm.keys import ( + PKCS11Ed448PrivateKey, + PKCS11Ed25519PrivateKey, + PKCS11EllipticCurvePrivateKey, + PKCS11PrivateKeyTypes, + PKCS11RSAPrivateKey, +) + + +@pytest.mark.parametrize( + "key_class", + (PKCS11RSAPrivateKey, PKCS11EllipticCurvePrivateKey, PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey), +) +def test_not_implemented_error(key_class: type[PKCS11PrivateKeyTypes]) -> None: + """Test methods that are not implemented.""" + key: PKCS11PrivateKeyTypes = key_class(None, "key_id", "key_label") + + error = r"^Private numbers cannot be retrieved for keys stored in a hardware security module \(HSM\)\.$" + with pytest.raises(NotImplementedError, match=error): + key.private_numbers() + + with pytest.raises( + NotImplementedError, + match=r"^Private bytes cannot be retrieved for keys stored in a hardware security module \(HSM\)\.$", + ): + key.private_bytes(Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption()) + + with pytest.raises( + NotImplementedError, + match=r"^Decryption is not implemented for keys stored in a hardware security module \(HSM\)\.$", + ): + key.decrypt(b"foo", PKCS1v15()) + + +@pytest.mark.parametrize("key_class", (PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey)) +def test_ed_key_not_implemented_error( + key_class: type[Union[PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey]], +) -> None: + """Test methods that are not implemented.""" + key: Union[PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey] = key_class(None, "key_id", "key_label") + + with pytest.raises( + NotImplementedError, + match=r"^Private bytes cannot be retrieved for keys stored in a hardware security module \(HSM\)\.$", + ): + key.private_bytes_raw() diff --git a/ca/django_ca/tests/key_backends/hsm/test_models.py b/ca/django_ca/tests/key_backends/hsm/test_models.py new file mode 100644 index 000000000..b226f7a34 --- /dev/null +++ b/ca/django_ca/tests/key_backends/hsm/test_models.py @@ -0,0 +1,41 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Tests for models used in the HSM key backend.""" + +from typing import Optional + +import pytest + +from django_ca.key_backends.hsm.models import UsePrivateKeyOptions + + +@pytest.mark.parametrize("so_pin,user_pin", (("so-pin-value", None), (None, "user-pin-value"))) +def test_pins(so_pin: Optional[str], user_pin: Optional[str]) -> None: + """Test valid pin configurations.""" + model = UsePrivateKeyOptions(so_pin=so_pin, user_pin=user_pin) + assert model.so_pin == so_pin + assert model.user_pin == user_pin + + +@pytest.mark.parametrize( + "so_pin,user_pin,error", + ( + (None, None, r"Provide one of so_pin or user_pin\."), + ("so-pin-value", "user-pin-value", r"Provide either so_pin or user_pin\."), + ), +) +def test_invalid_pins(so_pin: Optional[str], user_pin: Optional[str], error: str) -> None: + """Test invalid pin configurations.""" + with pytest.raises(ValueError, match=error): + UsePrivateKeyOptions(so_pin=so_pin, user_pin=user_pin) diff --git a/ca/django_ca/tests/key_backends/hsm/test_session.py b/ca/django_ca/tests/key_backends/hsm/test_session.py new file mode 100644 index 000000000..f8c4e79bd --- /dev/null +++ b/ca/django_ca/tests/key_backends/hsm/test_session.py @@ -0,0 +1,129 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + + +"""Test HSM related code.""" + +import subprocess +from collections.abc import Iterator +from typing import Optional + +from pkcs11._pkcs11 import Session + +from django.conf import settings +from django.utils.crypto import get_random_string + +import pytest + +from django_ca.key_backends.hsm.session import SessionPool + +PoolKeyType = tuple[str, str, Optional[str], Optional[str]] + +# pylint: disable=redefined-outer-name # several fixtures are defined here +# pylint: disable=protected-access # we test class internals throughout this module + + +@pytest.fixture +def pool_key(softhsm_token: str) -> Iterator[PoolKeyType]: + """Minor fixture to return the pool key for the default settings.""" + yield settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN + + +@pytest.fixture +def second_softhsm_token() -> Iterator[str]: + """Fixture to create a second softhsm token.""" + label = f"pytest.{get_random_string(8)}.dual" + + so_pin = settings.PKCS11_SO_PIN + pin = settings.PKCS11_USER_PIN + + subprocess.run( + ["softhsm2-util", "--init-token", "--free", "--label", label, "--so-pin", so_pin, "--pin", pin], + check=True, + ) + + yield label + + subprocess.run(["softhsm2-util", "--delete-token", "--token", label], check=True) + + +def test_duplicate_session(softhsm_token: str, session: Session, pool_key: PoolKeyType) -> None: + """Test that a second session request does not open a new session.""" + assert isinstance(session, Session) + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + # Get another session, even though it's in the same thread, refcount still goes up + with SessionPool(settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN) as second_session: + assert session is second_session + assert isinstance(second_session, Session) + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 2} + + # Test that second session was cleaned up + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + +def test_duplicate_rw_session(softhsm_token: str, session: Session, pool_key: PoolKeyType) -> None: + """Test that requesting a read/write session when it is already open read-only is an error.""" + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + with pytest.raises( + ValueError, match=r"^Requested R/W session, but R/O session is already initialized\.$" + ): + with SessionPool(settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN, rw=True): + pass + + # Test that the ref count has not increased + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + +def test_two_sessions(second_softhsm_token: str, session: Session, pool_key: PoolKeyType) -> None: + """Test creating a second token with dual sessions.""" + # assert that we have one session in the beginning + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + pin = settings.PKCS11_USER_PIN + second_pool_key = (settings.PKCS11_PATH, second_softhsm_token, None, pin) + + # Create a second session and observe dual session pool + with SessionPool(settings.PKCS11_PATH, second_softhsm_token, None, pin) as second_session: + assert SessionPool._session_pool == {pool_key: session, second_pool_key: second_session} + assert SessionPool._session_refcount == {pool_key: 1, second_pool_key: 1} + + # Try to get a nested session... + with SessionPool(settings.PKCS11_PATH, second_softhsm_token, None, pin) as third_session: + assert second_session is third_session + assert SessionPool._session_pool == {pool_key: session, second_pool_key: second_session} + assert SessionPool._session_refcount == {pool_key: 1, second_pool_key: 2} + + # session was cleared from pool + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + +def test_both_pins_empty() -> None: + """Test error when both so_pin and user_pin are not set.""" + with pytest.raises(ValueError, match=r"^so_pin and user_pin cannot both be None\.$"): + with SessionPool(settings.PKCS11_PATH, "any", None, None): + pass + + +def test_both_pins_set() -> None: + """Test error when both so_pin and user_pin are not set.""" + with pytest.raises(ValueError, match=r"^Either so_pin and user_pin must be set\.$"): + with SessionPool(settings.PKCS11_PATH, "any", "foo", "bar"): + pass diff --git a/ca/django_ca/tests/key_backends/test_base.py b/ca/django_ca/tests/key_backends/test_base.py index 038821db0..fc767521e 100644 --- a/ca/django_ca/tests/key_backends/test_base.py +++ b/ca/django_ca/tests/key_backends/test_base.py @@ -56,6 +56,7 @@ def test_key_backends_iter(settings: SettingsWrapper) -> None: assert list(key_backends) == [ key_backends[model_settings.CA_DEFAULT_KEY_BACKEND], key_backends["secondary"], + key_backends["hsm"], ] settings.CA_KEY_BACKENDS = { diff --git a/pyproject.toml b/pyproject.toml index 4f5569e8b..6bd4a7529 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -123,6 +123,7 @@ dependencies = [ [project.optional-dependencies] api = ["django-ninja>=1.1"] celery = ["celery>=5.4"] +hsm = ["python-pkcs11>=0.7"] mysql = ["mysqlclient"] postgres = ["psycopg[c,pool]>=3.1"] # remove psycopg3 in django-ca==2.0, also update docs/source/include/pip-extras.rst @@ -193,6 +194,8 @@ module = [ "docker.*", "enchant.tokenize", "httpcore.*", + "pkcs11.*", + "pkcs11.util.*", # psycopg and psycopg_c are not installed in isolated mypy envs (tox, ...) "psycopg", "psycopg_c", @@ -391,6 +394,9 @@ filterwarnings = [ "error:::josepy", "error:::pydantic", + # PyOpenSSL==24.2.1 deprecates CSR support, which is still used by josepy==1.14.0. + "ignore:CSR support in pyOpenSSL is deprecated. You should use the APIs in cryptography.", + # Disabled because cryptography calls the function directly and the warning is issued with stacklevel=2, # so it's shown for our code. See: https://github.com/pyca/cryptography/issues/9580 "ignore:datetime.datetime.utcfromtimestamp\\(\\) is deprecated::django_ca", # pragma: only cg<42 diff --git a/requirements.txt b/requirements.txt index b8746b1a1..12f87e382 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ --e .[celery,redis,yaml,api,postgres] +-e .[celery,hsm,redis,yaml,api,postgres]