From 17f595f2b38df5336a582faa50f266ad32bbb2c4 Mon Sep 17 00:00:00 2001 From: Mathias Ertl Date: Sun, 14 Jul 2024 23:21:56 +0200 Subject: [PATCH] attempt to add HSM support directly --- ca/ca/test_settings.py | 16 + ca/django_ca/key_backends/base.py | 5 +- ca/django_ca/key_backends/hsm/__init__.py | 18 + ca/django_ca/key_backends/hsm/backend.py | 422 ++++++++++++++++++ ca/django_ca/key_backends/hsm/keys.py | 213 +++++++++ ca/django_ca/key_backends/hsm/models.py | 64 +++ ca/django_ca/key_backends/hsm/session.py | 118 +++++ ca/django_ca/key_backends/hsm/typehints.py | 19 + ca/django_ca/key_backends/storages.py | 2 +- ca/django_ca/management/commands/init_ca.py | 19 +- ca/django_ca/tests/base/utils.py | 4 +- .../tests/key_backends/hsm/__init__.py | 0 .../tests/key_backends/hsm/conftest.py | 74 +++ .../tests/key_backends/hsm/test_session.py | 129 ++++++ pyproject.toml | 2 + 15 files changed, 1093 insertions(+), 12 deletions(-) create mode 100644 ca/django_ca/key_backends/hsm/__init__.py create mode 100644 ca/django_ca/key_backends/hsm/backend.py create mode 100644 ca/django_ca/key_backends/hsm/keys.py create mode 100644 ca/django_ca/key_backends/hsm/models.py create mode 100644 ca/django_ca/key_backends/hsm/session.py create mode 100644 ca/django_ca/key_backends/hsm/typehints.py create mode 100644 ca/django_ca/tests/key_backends/hsm/__init__.py create mode 100644 ca/django_ca/tests/key_backends/hsm/conftest.py create mode 100644 ca/django_ca/tests/key_backends/hsm/test_session.py diff --git a/ca/ca/test_settings.py b/ca/ca/test_settings.py index 6aac801ae..f511b6687 100644 --- a/ca/ca/test_settings.py +++ b/ca/ca/test_settings.py @@ -14,6 +14,7 @@ """Test settings for the django-ca project.""" import json +import os from pathlib import Path # Base paths in this project @@ -195,6 +196,14 @@ "BACKEND": "django_ca.key_backends.storages.StoragesBackend", "OPTIONS": {"storage_alias": "secondary"}, }, + # "softhsm": { + # "BACKEND": "django_ca.key_backends.hsm.HSMBackend", + # "OPTIONS": { + # "module": "/usr/lib/softhsm/libsofthsm2.so", + # "token": "my_test_token_1", + # "pin": "1234", + # }, + # }, } # Custom settings @@ -253,3 +262,10 @@ CA_PASSWORDS = { _fixture_data["certs"]["pwd"]["serial"]: _fixture_data["certs"]["pwd"]["password"].encode("utf-8"), } + + +# PKCS11 settings +PKCS11_PATH = os.environ.get("PKCS11_LIBRARY", "/usr/lib/softhsm/libsofthsm2.so") +PKCS11_TOKEN_LABEL = "my_test_token_1" +PKCS11_SO_PIN = "so-pin-1234" +PKCS11_USER_PIN = "user-pin-1234" diff --git a/ca/django_ca/key_backends/base.py b/ca/django_ca/key_backends/base.py index 8fc7aae82..3fad71330 100644 --- a/ca/django_ca/key_backends/base.py +++ b/ca/django_ca/key_backends/base.py @@ -213,13 +213,12 @@ def get_use_parent_private_key_options( @abc.abstractmethod def get_use_private_key_options( - self, ca: Optional["CertificateAuthority"], options: dict[str, Any] + self, ca: "CertificateAuthority", options: dict[str, Any] ) -> UsePrivateKeyOptionsTypeVar: """Get options to use the private key of a certificate authority. The returned model will be used for the certificate authority `ca`. You can pass it as extra context - to influence model validation. If `ca` is ``None``, it indicates that the CA is currently being - created via :command:`manage.py init_ca`. + to influence model validation. `options` is the dictionary of arguments to :command:`manage.py init_ca` (including default values). The key backend is expected to be able to sign certificates and CRLs using the options provided here. diff --git a/ca/django_ca/key_backends/hsm/__init__.py b/ca/django_ca/key_backends/hsm/__init__.py new file mode 100644 index 000000000..1025ae0e3 --- /dev/null +++ b/ca/django_ca/key_backends/hsm/__init__.py @@ -0,0 +1,18 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""HSM backend module.""" + +from django_ca.key_backends.hsm.backend import HSMBackend + +__all__ = ("HSMBackend",) diff --git a/ca/django_ca/key_backends/hsm/backend.py b/ca/django_ca/key_backends/hsm/backend.py new file mode 100644 index 000000000..db0d6758d --- /dev/null +++ b/ca/django_ca/key_backends/hsm/backend.py @@ -0,0 +1,422 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Key storage backend for hardware security modules (HSMs).""" + +from collections.abc import Iterator, Sequence +from contextlib import contextmanager +from datetime import datetime +from typing import TYPE_CHECKING, Any, Optional + +import pkcs11 +from pkcs11 import KeyType, ObjectClass, Session +from pkcs11.util.ec import encode_named_curve_parameters +from pkcs11.util.rsa import decode_rsa_private_key + +from asn1crypto.algos import SignedDigestAlgorithmId +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.primitives.asymmetric.types import ( + CertificateIssuerPrivateKeyTypes, + CertificateIssuerPublicKeyTypes, +) + +from django.core.management import CommandError + +from django_ca.conf import model_settings +from django_ca.key_backends import KeyBackend +from django_ca.key_backends.hsm.keys import ( + PKCS11Ed448PrivateKey, + PKCS11Ed25519PrivateKey, + PKCS11EllipticCurvePrivateKey, + PKCS11PrivateKeyTypes, + PKCS11RSAPrivateKey, +) +from django_ca.key_backends.hsm.models import ( + CreatePrivateKeyOptions, + StorePrivateKeyOptions, + UsePrivateKeyOptions, +) +from django_ca.key_backends.hsm.session import SessionPool +from django_ca.key_backends.hsm.typehints import EllipticCurves, SupportedKeyType +from django_ca.typehints import AllowedHashTypes, ArgumentGroup, CertificateExtension, ParsableKeyType +from django_ca.utils import get_cert_builder, int_to_hex + +if TYPE_CHECKING: + from django_ca.models import CertificateAuthority + + +class HSMBackend(KeyBackend[CreatePrivateKeyOptions, StorePrivateKeyOptions, UsePrivateKeyOptions]): + """Key backend to create and use private keys in a hardware security module (HSM).""" + + name = "hsm" + title = "Store private keys using a hardware security module (HSM)" + description = ( + "The private key will be stored on the hardware security module (HSM). The HSM makes sure that the" + "private key can never be recovered and thus compromised." + ) + use_model = UsePrivateKeyOptions + + supported_key_types: tuple[SupportedKeyType, ...] = ("RSA", "EC", "Ed25519", "Ed448") + supported_elliptic_curves: tuple[EllipticCurves, ...] = ("secp256r1", "secp384r1", "secp521r1") + + module: str + token: str + so_pin: Optional[str] + user_pin: Optional[str] + + def __init__( + self, + alias: str, + module: str, + token: str, + so_pin: Optional[str] = None, + user_pin: Optional[str] = None, + ): + if so_pin is not None and user_pin is not None: + raise ValueError(f"{alias}: Set either so_pin or user_pin.") + + super().__init__(alias, module=module, token=token, so_pin=so_pin, user_pin=user_pin) + + @contextmanager + def session(self, so_pin: Optional[str], user_pin: Optional[str], rw: bool = False) -> Iterator[Session]: + """Shortcut to get a session from the pool.""" + with SessionPool(self.module, self.token, so_pin, user_pin, rw=rw) as session: + yield session + + def _add_key_label_argument(self, group: ArgumentGroup, prefix: str = "") -> None: + group.add_argument( + f"--{self.argparse_prefix}{prefix}key-label", + type=str, + metavar="LABEL", + help="%(metavar)s to use for the private key in the HSM.", + ) + + def _add_pin_arguments(self, group: ArgumentGroup, prefix: str = "") -> None: + group.add_argument( + f"--{self.argparse_prefix}{prefix}so-pin", + type=str, + metavar="PIN", + help="Security officer %(metavar)s to access the HSM.", + ) + group.add_argument( + f"--{self.argparse_prefix}{prefix}user-pin", + type=str, + metavar="PIN", + help="User %(metavar)s to access the HSM.", + ) + + def _get_pins(self, options: dict[str, Any], prefix: str = "") -> tuple[Optional[str], Optional[str]]: + so_pin: Optional[str] = options[f"{self.options_prefix}{prefix}so_pin"] + if so_pin is None: + so_pin = self.so_pin + elif so_pin == "": + so_pin = None + + user_pin: Optional[str] = options[f"{self.options_prefix}{prefix}user_pin"] + if user_pin is None: + user_pin = self.user_pin + elif user_pin == "": + user_pin = None + + return so_pin, user_pin + + def _get_private_key( + self, ca: "CertificateAuthority", session: Session + ) -> CertificateIssuerPrivateKeyTypes: + key_id: str = ca.key_backend_options["key_id"] + key_label: str = ca.key_backend_options["key_label"] + key_type: SupportedKeyType = ca.key_backend_options["key_type"] + + if key_type == "RSA": + return PKCS11RSAPrivateKey(session, key_id, key_label) + if key_type == "Ed448": + return PKCS11Ed448PrivateKey(session, key_id, key_label) + if key_type == "Ed25519": + return PKCS11Ed25519PrivateKey(session, key_id, key_label) + if key_type == "EC": + return PKCS11EllipticCurvePrivateKey(session, key_id, key_label) + + raise ValueError(f"{key_type}: Unsupported key type.") + + def add_create_private_key_arguments(self, group: ArgumentGroup) -> None: + self._add_key_label_argument(group) + self._add_pin_arguments(group) + + def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None: + self._add_pin_arguments(group, "parent-") + + def add_store_private_key_arguments(self, group: ArgumentGroup) -> None: + self._add_key_label_argument(group) + self._add_pin_arguments(group) + + def add_use_private_key_arguments(self, group: ArgumentGroup) -> None: + self._add_pin_arguments(group) + + def get_create_private_key_options( + self, + key_type: ParsableKeyType, + key_size: Optional[int], + elliptic_curve: Optional[EllipticCurves], # type: ignore[override] # base is just str + options: dict[str, Any], + ) -> CreatePrivateKeyOptions: + key_label = options[f"{self.options_prefix}key_label"] + if not key_label: + raise CommandError( + f"--{self.argparse_prefix}key-label is a required option for this key backend." + ) + + so_pin, user_pin = self._get_pins(options) + + if so_pin is None and user_pin is None: + raise CommandError( + f"Backend does not configure SO or user pin. Specify with --{self.argparse_prefix}so-pin or " + f"--{self.argparse_prefix}user-pin" + ) + if so_pin is not None and user_pin is not None: + raise CommandError( + "Both SO pin and user pin configured. To override a pin from settings, pass " + f'--{self.argparse_prefix}so-pin="" or --{self.argparse_prefix}user-pin=""' + ) + + if elliptic_curve is None: + # TYPEHINT NOTE: validated in the line after this statement. + elliptic_curve = model_settings.CA_DEFAULT_ELLIPTIC_CURVE.name # type: ignore[assignment] + if elliptic_curve not in self.supported_elliptic_curves: + raise CommandError( + f"{elliptic_curve}: Default elliptic curve is not supported by this backend." + ) + + return CreatePrivateKeyOptions( + key_label=key_label, + key_type=key_type, + key_size=key_size, + elliptic_curve=elliptic_curve, + so_pin=so_pin, + user_pin=user_pin, + ) + + def get_use_parent_private_key_options( + self, ca: "CertificateAuthority", options: dict[str, Any] + ) -> UsePrivateKeyOptions: + so_pin, user_pin = self._get_pins(options, "parent_") + return UsePrivateKeyOptions.model_validate( + {"so_pin": so_pin, "user_pin": user_pin}, context={"ca": ca}, strict=True + ) + + def get_use_private_key_options( + self, ca: "CertificateAuthority", options: dict[str, Any] + ) -> UsePrivateKeyOptions: + so_pin, user_pin = self._get_pins(options) + return UsePrivateKeyOptions.model_validate( + {"so_pin": so_pin, "user_pin": user_pin}, context={"ca": ca}, strict=True + ) + + def get_store_private_key_options(self, options: dict[str, Any]) -> StorePrivateKeyOptions: + key_label = options[f"{self.options_prefix}key_label"] + so_pin, user_pin = self._get_pins(options) + return StorePrivateKeyOptions.model_validate( + {"key_label": key_label, "so_pin": so_pin, "user_pin": user_pin}, strict=True + ) + + def is_usable( + self, ca: "CertificateAuthority", use_private_key_options: Optional[UsePrivateKeyOptions] = None + ) -> bool: + if not ca.key_backend_options or not ca.key_backend_options.get("key_label"): + return False + if use_private_key_options is None: + return True + + try: + with self.session( + so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin + ) as session: + self._get_private_key(ca, session) + return True + except Exception: # pylint: disable=broad-exception-caught # want to always return bool + return False + + def check_usable(self, ca: "CertificateAuthority", use_private_key_options: UsePrivateKeyOptions) -> None: + if not ca.key_backend_options or not ca.key_backend_options.get("key_label"): + raise ValueError("key_label not configured in database.") + + with self.session( + so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin + ) as session: + self._get_private_key(ca, session) + + def create_private_key( + self, ca: "CertificateAuthority", key_type: ParsableKeyType, options: CreatePrivateKeyOptions + ) -> tuple[CertificateIssuerPublicKeyTypes, UsePrivateKeyOptions]: + key_id = int_to_hex(x509.random_serial_number()) + key_label = options.key_label + + # Test that no private key with the given label exists. Some libraries (e.g. SoftHSM) don't treat the + # label as unique and will silently create a second key with the same label. + with self.session(options.so_pin, options.user_pin) as session: + try: + session.get_key(object_class=ObjectClass.PUBLIC_KEY, label=key_label) + except pkcs11.NoSuchKey: + pass # this is what we hope for + else: + raise ValueError(f"{key_label}: Private key with this label already exists.") + + if key_type == "RSA": + with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: + try: + pkcs11_public_key, pkcs11_private_key = session.generate_keypair( + pkcs11.KeyType.RSA, options.key_size, id=key_id.encode(), label=key_label, store=True + ) + except pkcs11.SessionReadOnly as ex: + raise RuntimeError("HSM session is read-only.") from ex + + private_key: PKCS11PrivateKeyTypes = PKCS11RSAPrivateKey( + session=session, + key_id=key_id, + key_label=key_label, + pkcs11_private_key=pkcs11_private_key, + pkcs11_public_key=pkcs11_public_key, + ) + public_key = private_key.public_key() + + elif key_type in ("Ed25519", "Ed448"): + named_curve_parameters = encode_named_curve_parameters( + SignedDigestAlgorithmId(key_type.lower()).dotted + ) + + with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: + parameters = session.create_domain_parameters( + KeyType.EC_EDWARDS, {pkcs11.Attribute.EC_PARAMS: named_curve_parameters}, local=True + ) + + try: + pkcs11_public_key, pkcs11_private_key = parameters.generate_keypair( + mechanism=pkcs11.Mechanism.EC_EDWARDS_KEY_PAIR_GEN, + store=True, + id=key_id.encode(), + label=key_label, + ) + except pkcs11.SessionReadOnly as ex: + raise RuntimeError("HSM session is read-only.") from ex + + if key_type == "Ed25519": + private_key = PKCS11Ed25519PrivateKey( + session=session, + key_id=key_id, + key_label=key_label, + pkcs11_private_key=pkcs11_private_key, + pkcs11_public_key=pkcs11_public_key, + ) + else: + private_key = PKCS11Ed448PrivateKey( + session=session, + key_id=key_id, + key_label=key_label, + pkcs11_private_key=pkcs11_private_key, + pkcs11_public_key=pkcs11_public_key, + ) + public_key = private_key.public_key() + + elif key_type == "EC": + with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: + parameters = session.create_domain_parameters( + KeyType.EC, + {pkcs11.Attribute.EC_PARAMS: encode_named_curve_parameters(options.elliptic_curve)}, + local=True, + ) + pkcs11_public_key, pkcs11_private_key = parameters.generate_keypair( + store=True, id=key_id.encode(), label=key_label + ) + + private_key = PKCS11EllipticCurvePrivateKey( + session=session, + key_id=key_id, + key_label=key_label, + pkcs11_private_key=pkcs11_private_key, + pkcs11_public_key=pkcs11_public_key, + ) + public_key = private_key.public_key() + else: # pragma: no cover # caller catches this already + raise ValueError(f"{key_type}: unknown key type") + + ca.key_backend_options = {"key_id": key_id, "key_label": key_label, "key_type": key_type} + use_private_key_options = UsePrivateKeyOptions.model_validate( + {"so_pin": options.so_pin, "user_pin": options.user_pin}, context={"ca": ca} + ) + + return public_key, use_private_key_options + + def store_private_key( + self, + ca: "CertificateAuthority", + key: CertificateIssuerPrivateKeyTypes, + options: StorePrivateKeyOptions, + ) -> None: + key_id = int_to_hex(x509.random_serial_number()) + if isinstance(key, rsa.RSAPrivateKey): + key_type = "RSA" + key_der = key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + attrs = decode_rsa_private_key(key_der) + else: + raise ValueError(f"{key}: Importing key of this type is not supported.") + + attrs[pkcs11.Attribute.ID] = key_id.encode() + attrs[pkcs11.Attribute.LABEL] = options.key_label + + with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session: + session.create_object(attrs) + + ca.key_backend_options = {"key_id": key_id, "key_label": options.key_label, "key_type": key_type} + + def sign_certificate( + self, + ca: "CertificateAuthority", + use_private_key_options: UsePrivateKeyOptions, + public_key: CertificateIssuerPublicKeyTypes, + serial: int, + algorithm: Optional[AllowedHashTypes], + issuer: x509.Name, + subject: x509.Name, + expires: datetime, + extensions: Sequence[CertificateExtension], + ) -> x509.Certificate: + builder = get_cert_builder(expires, serial=serial) + builder = builder.public_key(public_key) + builder = builder.issuer_name(issuer) + builder = builder.subject_name(subject) + for extension in extensions: + builder = builder.add_extension(extension.value, critical=extension.critical) + + with self.session( + so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin + ) as session: + private_key = self._get_private_key(ca, session) + return builder.sign(private_key=private_key, algorithm=algorithm) + + def sign_certificate_revocation_list( + self, + ca: "CertificateAuthority", + use_private_key_options: UsePrivateKeyOptions, + builder: x509.CertificateRevocationListBuilder, + algorithm: Optional[AllowedHashTypes], + ) -> x509.CertificateRevocationList: + with self.session( + so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin + ) as session: + private_key = self._get_private_key(ca, session) + return builder.sign(private_key=private_key, algorithm=algorithm) diff --git a/ca/django_ca/key_backends/hsm/keys.py b/ca/django_ca/key_backends/hsm/keys.py new file mode 100644 index 000000000..ccbbeb310 --- /dev/null +++ b/ca/django_ca/key_backends/hsm/keys.py @@ -0,0 +1,213 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Private key implementations that use an HSM in the background.""" + +from hashlib import sha256, sha384, sha512 +from typing import Generic, NoReturn, Optional, TypeVar, Union, cast + +import pkcs11 +from pkcs11 import Session +from pkcs11.util.ec import encode_ec_public_key +from pkcs11.util.rsa import encode_rsa_public_key +from python_x509_pkcs11.crypto import encode_eddsa_public_key + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, ed448, ed25519, rsa, utils as asym_utils +from cryptography.hazmat.primitives.asymmetric.padding import PSS, AsymmetricPadding, PKCS1v15 +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey +from cryptography.hazmat.primitives.serialization import load_der_public_key + +EdwardsPublicKeyTypeVar = TypeVar("EdwardsPublicKeyTypeVar", ed448.Ed448PublicKey, ed25519.Ed25519PublicKey) + + +class PKCS11PrivateKeyMixin: + """Mixin providing common functionality to PKCS11 key implementations.""" + + # pylint: disable=missing-function-docstring # implements standard functions of the base class. + + key_type: pkcs11.KeyType + + def __init__( + self, + session: Session, + key_id: str, + key_label: str, + pkcs11_private_key: Optional[pkcs11.PrivateKey] = None, + pkcs11_public_key: Optional[pkcs11.PublicKey] = None, + ) -> None: + self._session = session + self._key_id = key_id.encode() + self._key_label = key_label + self._pkcs11_private_key = pkcs11_private_key + self._pkcs11_public_key = pkcs11_public_key + + super().__init__() + + def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes: + raise NotImplementedError("Decryption is not implemented for keys stored in an HSM.") + + def private_bytes( + self, + encoding: serialization.Encoding, + format: serialization.PrivateFormat, # pylint: disable=redefined-builtin # given by cryptography + encryption_algorithm: serialization.KeySerializationEncryption, + ) -> bytes: + raise NotImplementedError("Cannot retrieve private bytes for a key stored in an HSM.") + + def private_numbers(self) -> NoReturn: + raise NotImplementedError("Cannot retrieve private numbers for a key stored in an HSM.") + + @property + def pkcs11_private_key(self) -> pkcs11.PrivateKey: + if self._pkcs11_private_key is None: + self._pkcs11_private_key = self._session.get_key( + key_type=self.key_type, + object_class=pkcs11.ObjectClass.PRIVATE_KEY, + id=self._key_id, + label=self._key_label, + ) + return self._pkcs11_private_key + + @property + def pkcs11_public_key(self) -> pkcs11.PublicKey: + if self._pkcs11_public_key is None: + self._pkcs11_public_key = self._session.get_key( + key_type=self.key_type, + object_class=pkcs11.ObjectClass.PUBLIC_KEY, + id=self._key_id, + label=self._key_label, + ) + return self._pkcs11_public_key + + +class PKCS11EdwardsPrivateKeyMixin(PKCS11PrivateKeyMixin, Generic[EdwardsPublicKeyTypeVar]): + """Specialized mixin for Ed448 and Ed5519 keys (which handle identical).""" + + # pylint: disable=missing-function-docstring # implements standard functions of the base class. + + key_type = pkcs11.KeyType.EC_EDWARDS + cryptograph_key_type: EdwardsPublicKeyTypeVar + + def private_bytes_raw(self) -> bytes: + raise NotImplementedError("Cannot retrieve private bytes for a key stored in an HSM.") + + def public_key(self) -> EdwardsPublicKeyTypeVar: + public_key = encode_eddsa_public_key(self.pkcs11_public_key) + return cast(EdwardsPublicKeyTypeVar, load_der_public_key(public_key)) + + def sign(self, data: bytes) -> bytes: + return self.pkcs11_private_key.sign(data, mechanism=pkcs11.Mechanism.EDDSA) # type: ignore[no-any-return] + + +# pylint: disable-next=abstract-method # private key functions are deliberately not implemented in base. +class PKCS11RSAPrivateKey(PKCS11PrivateKeyMixin, rsa.RSAPrivateKey): + """Private key implementation for RSA keys stored in a HSM.""" + + key_type: pkcs11.KeyType.RSA + + def public_key(self) -> RSAPublicKey: + der_public_key = encode_rsa_public_key(self.pkcs11_public_key) + return cast(RSAPublicKey, load_der_public_key(der_public_key)) + + @property + def key_size(self) -> int: + return self.public_key().key_size + + def sign( + self, + data: bytes, + padding: AsymmetricPadding, + algorithm: asym_utils.Prehashed | hashes.HashAlgorithm, + ) -> bytes: + if isinstance(algorithm, asym_utils.Prehashed): + raise ValueError("Prehashed operations are not supported.") + + if isinstance(algorithm, hashes.SHA224) and isinstance(padding, PSS): + mechanism: hashes.HashAlgorithm = pkcs11.Mechanism.SHA224_RSA_PKCS_PSS + elif isinstance(algorithm, hashes.SHA224) and isinstance(padding, PKCS1v15): + mechanism = pkcs11.Mechanism.SHA224_RSA_PKCS + elif isinstance(algorithm, hashes.SHA256) and isinstance(padding, PSS): + mechanism = pkcs11.Mechanism.SHA256_RSA_PKCS_PSS + elif isinstance(algorithm, hashes.SHA256) and isinstance(padding, PKCS1v15): + mechanism = pkcs11.Mechanism.SHA256_RSA_PKCS + elif isinstance(algorithm, hashes.SHA384) and isinstance(padding, PSS): + mechanism = pkcs11.Mechanism.SHA384_RSA_PKCS_PSS + elif isinstance(algorithm, hashes.SHA384) and isinstance(padding, PKCS1v15): + mechanism = pkcs11.Mechanism.SHA384_RSA_PKCS + elif isinstance(algorithm, hashes.SHA512) and isinstance(padding, PSS): + mechanism = pkcs11.Mechanism.SHA512_RSA_PKCS_PSS + elif isinstance(algorithm, hashes.SHA512) and isinstance(padding, PKCS1v15): + mechanism = pkcs11.Mechanism.SHA512_RSA_PKCS + elif isinstance(algorithm, (hashes.SHA3_224, hashes.SHA3_384, hashes.SHA3_256, hashes.SHA3_512)): + raise ValueError("SHA3 is not support by the HSM backend.") + else: + raise ValueError( + f"{algorithm.name} with {padding.name} padding: Unknown signing algorithm and padding" + ) + + # TYPEHINT NOTE: library is not type-hinted. + return self.pkcs11_private_key.sign(data, mechanism=mechanism) # type: ignore[no-any-return] + + +class PKCS11EllipticCurvePrivateKey(PKCS11PrivateKeyMixin, ec.EllipticCurvePrivateKey): + """Private key implementation for EC keys stored in a HSM.""" + + key_type = pkcs11.KeyType.EC + + @property + def curve(self) -> ec.EllipticCurve: + return self.public_key().curve + + def exchange(self, algorithm: ec.ECDH, peer_public_key: ec.EllipticCurvePublicKey) -> bytes: + raise NotImplementedError("Operation not implemented for a key stored in an HSM.") + + @property + def key_size(self) -> int: + return self.public_key().key_size + + def public_key(self) -> ec.EllipticCurvePublicKey: + public_key = encode_ec_public_key(self.pkcs11_public_key) + return cast(ec.EllipticCurvePublicKey, load_der_public_key(public_key)) + + def sign(self, data: bytes, signature_algorithm: ec.EllipticCurveSignatureAlgorithm) -> bytes: + if isinstance(signature_algorithm.algorithm, hashes.SHA256): + hasher = sha256() + elif isinstance(signature_algorithm.algorithm, hashes.SHA384): + hasher = sha384() + elif isinstance(signature_algorithm.algorithm, hashes.SHA512): + hasher = sha512() + else: + raise ValueError(f"{signature_algorithm.algorithm}: Signature algorithm is not supported.") + + hasher.update(data) + data = hasher.digest() + + return self.pkcs11_private_key.sign(data, mechanism=pkcs11.Mechanism.ECDSA) # type: ignore[no-any-return] + + +# pylint: disable-next=abstract-method # private key functions are deliberately not implemented in base. +class PKCS11Ed25519PrivateKey( + PKCS11EdwardsPrivateKeyMixin[ed25519.Ed25519PublicKey], ed25519.Ed25519PrivateKey +): + """Private key implementation for Ed25519 keys stored in a HSM.""" + + +# pylint: disable-next=abstract-method # private key functions are deliberately not implemented in base. +class PKCS11Ed448PrivateKey(PKCS11EdwardsPrivateKeyMixin[ed448.Ed448PublicKey], ed448.Ed448PrivateKey): + """Private key implementation for Ed448 keys stored in a HSM.""" + + +PKCS11PrivateKeyTypes = Union[ + PKCS11RSAPrivateKey, PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey, PKCS11EllipticCurvePrivateKey +] diff --git a/ca/django_ca/key_backends/hsm/models.py b/ca/django_ca/key_backends/hsm/models.py new file mode 100644 index 000000000..d8c33afb1 --- /dev/null +++ b/ca/django_ca/key_backends/hsm/models.py @@ -0,0 +1,64 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Models used by the HSM backend.""" + +import typing +from typing import Optional + +from pydantic import BaseModel, ConfigDict, model_validator + +from django_ca.key_backends.base import CreatePrivateKeyOptionsBaseModel +from django_ca.key_backends.hsm.typehints import EllipticCurves, SupportedKeyType + + +class PinModelMixin: + """Mixin providing so/user pin and validation.""" + + so_pin: Optional[str] = None + user_pin: Optional[str] = None + + @model_validator(mode="after") + def validate_pins(self) -> "typing.Self": + """Validate that exactly one of `so_pin` and `user_pin` is set.""" + if self.so_pin is None and self.user_pin is None: + raise ValueError("Provide one of so_pin or user_pin.") + if self.so_pin is not None and self.user_pin is not None: + raise ValueError("Provide either so_pin or user_pin.") + return self + + +class CreatePrivateKeyOptions(PinModelMixin, CreatePrivateKeyOptionsBaseModel): + """Options for initializing private keys.""" + + # NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable. + model_config = ConfigDict(arbitrary_types_allowed=True) + + key_label: str + key_type: SupportedKeyType # overwrites field from the base model + elliptic_curve: Optional[EllipticCurves] + + +class StorePrivateKeyOptions(PinModelMixin, BaseModel): + """Options for storing a private key.""" + + # NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable. + model_config = ConfigDict(frozen=True) + + key_label: str + + +class UsePrivateKeyOptions(PinModelMixin, BaseModel): + """Options for using the private key.""" + + model_config = ConfigDict(frozen=True) diff --git a/ca/django_ca/key_backends/hsm/session.py b/ca/django_ca/key_backends/hsm/session.py new file mode 100644 index 000000000..17b170662 --- /dev/null +++ b/ca/django_ca/key_backends/hsm/session.py @@ -0,0 +1,118 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Code for handling sessions for hardware security modules.""" + +import threading +from types import TracebackType +from typing import Final, Optional + +import pkcs11 +from pkcs11 import Session +from pkcs11._pkcs11 import lib as pkcs11_lib + +PoolKeyType = tuple[str, str, Optional[str], Optional[str]] + + +class SessionPool: + """Thread-safe session pool for PKCS11 sessions.""" + + _lib_lock: Final[threading.Lock] = threading.Lock() + _lib_pool: Final[dict[str, pkcs11_lib]] = {} + + _session_lock: Final[threading.Lock] = threading.Lock() + _session_pool: Final[dict[PoolKeyType, Session]] = {} + _session_refcount: Final[dict[PoolKeyType, int]] = {} + + path: Final[str] + token_label: Final[str] + so_pin: Final[Optional[str]] + user_pin: Final[Optional[str]] + rw: Final[bool] + + def __init__( + self, path: str, token_label: str, so_pin: Optional[str], user_pin: Optional[str], rw: bool = False + ) -> None: + if so_pin is None and user_pin is None: + raise ValueError("so_pin and user_pin cannot both be None.") + if so_pin is not None and user_pin is not None: + raise ValueError("Either so_pin and user_pin must be set.") + + self.path = path + self.token_label = token_label + self.so_pin = so_pin + self.user_pin = user_pin + self.rw = rw + + @classmethod + def acquire( + cls, + path: str, + token_label: str, + so_pin: Optional[str] = None, + user_pin: Optional[str] = None, + rw: bool = False, + ) -> Session: + """Open a new session with the given parameters.""" + with cls._lib_lock: + if path not in cls._lib_pool: + cls._lib_pool[path] = pkcs11.lib(path) + + with cls._session_lock: + pool_key = (path, token_label, so_pin, user_pin) + if pool_key not in cls._session_pool: + token = cls._lib_pool[path].get_token(token_label=token_label) + cls._session_pool[pool_key] = token.open(rw=rw, so_pin=so_pin, user_pin=user_pin) + cls._session_refcount[pool_key] = 1 + else: + # Request a read/write session, but a read-only session is already present. According to the + # PKCS11 documentation, some libraries don't allow multiple sessions for the same token per + # process: + # + # https://python-pkcs11.readthedocs.io/en/latest/concurrency.html + # + # Note that this does not happen in practice. A read/write session is only requested when + # generating or importing keys, and this always runs on the command-line, where no other + # session is ever present. + if rw is True and cls._session_pool[pool_key].rw is False: + raise ValueError("Requested R/W session, but R/O session is already initialized.") + + # Session is already open, just increase the refcount + cls._session_refcount[pool_key] += 1 + + return cls._session_pool[pool_key] + + @classmethod + def release(cls, path: str, token_label: str, so_pin: Optional[str], user_pin: Optional[str]) -> None: + """Close session if no reference is known.""" + with cls._session_lock: + pool_key = (path, token_label, so_pin, user_pin) + cls._session_refcount[pool_key] -= 1 + + if cls._session_refcount[pool_key] == 0: + cls._session_pool[pool_key].close() + del cls._session_pool[pool_key] + del cls._session_refcount[pool_key] + + def __enter__(self) -> Session: + return self.acquire( + self.path, self.token_label, so_pin=self.so_pin, user_pin=self.user_pin, rw=self.rw + ) + + def __exit__( + self, + exc_type: Optional[type[BaseException]], + value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + self.release(self.path, self.token_label, so_pin=self.so_pin, user_pin=self.user_pin) diff --git a/ca/django_ca/key_backends/hsm/typehints.py b/ca/django_ca/key_backends/hsm/typehints.py new file mode 100644 index 000000000..efd4d066d --- /dev/null +++ b/ca/django_ca/key_backends/hsm/typehints.py @@ -0,0 +1,19 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Type aliases used in HSM code.""" + +from typing import Literal + +SupportedKeyType = Literal["RSA", "EC", "Ed25519", "Ed448"] +EllipticCurves = Literal["secp256r1", "secp384r1", "secp521r1"] diff --git a/ca/django_ca/key_backends/storages.py b/ca/django_ca/key_backends/storages.py index f962f438f..902946002 100644 --- a/ca/django_ca/key_backends/storages.py +++ b/ca/django_ca/key_backends/storages.py @@ -225,7 +225,7 @@ def get_store_private_key_options(self, options: dict[str, Any]) -> StorePrivate ) def get_use_private_key_options( - self, ca: Optional["CertificateAuthority"], options: dict[str, Any] + self, ca: "CertificateAuthority", options: dict[str, Any] ) -> UsePrivateKeyOptions: return UsePrivateKeyOptions.model_validate( {"password": options.get(f"{self.options_prefix}password")}, context={"ca": ca}, strict=True diff --git a/ca/django_ca/management/commands/init_ca.py b/ca/django_ca/management/commands/init_ca.py index 1c838244c..492ac0f25 100644 --- a/ca/django_ca/management/commands/init_ca.py +++ b/ca/django_ca/management/commands/init_ca.py @@ -16,6 +16,7 @@ .. seealso:: https://docs.djangoproject.com/en/dev/howto/custom-management-commands/ """ +import logging from collections.abc import Iterable from datetime import datetime, timedelta, timezone as tz from typing import Any, Optional @@ -359,14 +360,12 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915 key_backend_options = key_backend.get_create_private_key_options( key_type, key_size, elliptic_curve=elliptic_curve, options=options ) - load_key_backend_options = key_backend.get_use_private_key_options(None, options) # If there is a parent CA, test if we can use it (here) to sign certificates. The most common case # where this happens is if the key is stored on the filesystem, but only accessible to the Celery # worker and the current process is on the webserver side. - if parent is None: - signer_key_backend_options = load_key_backend_options - else: + signer_key_backend_options = None + if parent is not None: signer_key_backend_options = parent.key_backend.get_use_parent_private_key_options( parent, options ) @@ -374,10 +373,13 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915 # Check if the parent key is usable parent.check_usable(signer_key_backend_options) except ValidationError as ex: + logging.exception(ex) self.validation_error_to_command_error(ex) - except CommandError: # pragma: no cover + except CommandError as ex: # pragma: no cover + logging.exception(ex) raise except Exception as ex: # pragma: no cover + logging.exception(ex) raise CommandError(*ex.args) from ex # Get/validate signature hash algorithm @@ -569,7 +571,14 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915 ocsp_responder_key_validity=ocsp_responder_key_validity, **kwargs, ) + + load_key_backend_options = key_backend.get_use_private_key_options(ca, options) + except ValidationError as ex: + self.validation_error_to_command_error(ex) + except CommandError: # pragma: no cover + raise except Exception as ex: # pragma: no cover + logging.exception(ex) raise CommandError(ex) from ex # Generate OCSP keys and cache CRLs diff --git a/ca/django_ca/tests/base/utils.py b/ca/django_ca/tests/base/utils.py index d52ea2ca7..0cfd8a4f8 100644 --- a/ca/django_ca/tests/base/utils.py +++ b/ca/django_ca/tests/base/utils.py @@ -98,9 +98,7 @@ def create_private_key( ) -> tuple[CertificateIssuerPublicKeyTypes, DummyModel]: return None, DummyModel() # type: ignore[return-value] - def get_use_private_key_options( - self, ca: Optional[CertificateAuthority], options: dict[str, Any] - ) -> DummyModel: + def get_use_private_key_options(self, ca: CertificateAuthority, options: dict[str, Any]) -> DummyModel: return DummyModel() def is_usable( diff --git a/ca/django_ca/tests/key_backends/hsm/__init__.py b/ca/django_ca/tests/key_backends/hsm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ca/django_ca/tests/key_backends/hsm/conftest.py b/ca/django_ca/tests/key_backends/hsm/conftest.py new file mode 100644 index 000000000..ac784b06e --- /dev/null +++ b/ca/django_ca/tests/key_backends/hsm/conftest.py @@ -0,0 +1,74 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + +"""Fixtures for HSM testing.""" + +import subprocess +from collections.abc import Iterator +from datetime import datetime, timezone + +from pkcs11._pkcs11 import Session + +from django.conf import settings +from django.utils.crypto import get_random_string + +import pytest + +from django_ca.key_backends.hsm.session import SessionPool + +# pylint: disable=redefined-outer-name # usefixtures does not work on fixtures. + + +@pytest.fixture(scope="session") +def softhsm_token() -> Iterator[str]: + """Fixture to create a new token in SoftHSM.""" + timestamp = datetime.now(tz=timezone.utc).strftime("%Y%m%d%H%M%S") + label = f"pytest.{timestamp}.{get_random_string(8)}" + so_pin = settings.PKCS11_SO_PIN + pin = settings.PKCS11_USER_PIN + subprocess.run( + ["softhsm2-util", "--init-token", "--free", "--label", label, "--so-pin", so_pin, "--pin", pin], + check=True, + ) + + yield label + + subprocess.run(["softhsm2-util", "--delete-token", "--token", label], check=True) + + +@pytest.fixture +def session_pool() -> Iterator[None]: + """Get a clean session pool.""" + # Reinitialize the library, so that any token created before is also visible (SoftHSM only sees token + # present at initialization time). + # pylint: disable=use-implicit-booleaness-not-comparison + # pylint: disable=protected-access # deliberate access in this entire function + for lib in SessionPool._lib_pool.values(): + lib.reinitialize() + + # Assert that the session pool *is* clean. Any test leaving sessions intact would show up here. + assert SessionPool._session_pool == {} + assert SessionPool._session_refcount == {} + + yield + + # Make sure that we leave no open sessions. + assert SessionPool._session_pool == {} + assert SessionPool._session_refcount == {} + + +@pytest.fixture +def session(softhsm_token: str, session_pool: None) -> Session: # pylint: disable=unused-argument + """Fixture providing a fresh (read-only) session.""" + with SessionPool(settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN) as session: + yield session diff --git a/ca/django_ca/tests/key_backends/hsm/test_session.py b/ca/django_ca/tests/key_backends/hsm/test_session.py new file mode 100644 index 000000000..f8c4e79bd --- /dev/null +++ b/ca/django_ca/tests/key_backends/hsm/test_session.py @@ -0,0 +1,129 @@ +# This file is part of django-ca (https://github.com/mathiasertl/django-ca). +# +# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General +# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. +# +# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along with django-ca. If not, see +# . + + +"""Test HSM related code.""" + +import subprocess +from collections.abc import Iterator +from typing import Optional + +from pkcs11._pkcs11 import Session + +from django.conf import settings +from django.utils.crypto import get_random_string + +import pytest + +from django_ca.key_backends.hsm.session import SessionPool + +PoolKeyType = tuple[str, str, Optional[str], Optional[str]] + +# pylint: disable=redefined-outer-name # several fixtures are defined here +# pylint: disable=protected-access # we test class internals throughout this module + + +@pytest.fixture +def pool_key(softhsm_token: str) -> Iterator[PoolKeyType]: + """Minor fixture to return the pool key for the default settings.""" + yield settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN + + +@pytest.fixture +def second_softhsm_token() -> Iterator[str]: + """Fixture to create a second softhsm token.""" + label = f"pytest.{get_random_string(8)}.dual" + + so_pin = settings.PKCS11_SO_PIN + pin = settings.PKCS11_USER_PIN + + subprocess.run( + ["softhsm2-util", "--init-token", "--free", "--label", label, "--so-pin", so_pin, "--pin", pin], + check=True, + ) + + yield label + + subprocess.run(["softhsm2-util", "--delete-token", "--token", label], check=True) + + +def test_duplicate_session(softhsm_token: str, session: Session, pool_key: PoolKeyType) -> None: + """Test that a second session request does not open a new session.""" + assert isinstance(session, Session) + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + # Get another session, even though it's in the same thread, refcount still goes up + with SessionPool(settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN) as second_session: + assert session is second_session + assert isinstance(second_session, Session) + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 2} + + # Test that second session was cleaned up + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + +def test_duplicate_rw_session(softhsm_token: str, session: Session, pool_key: PoolKeyType) -> None: + """Test that requesting a read/write session when it is already open read-only is an error.""" + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + with pytest.raises( + ValueError, match=r"^Requested R/W session, but R/O session is already initialized\.$" + ): + with SessionPool(settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN, rw=True): + pass + + # Test that the ref count has not increased + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + +def test_two_sessions(second_softhsm_token: str, session: Session, pool_key: PoolKeyType) -> None: + """Test creating a second token with dual sessions.""" + # assert that we have one session in the beginning + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + pin = settings.PKCS11_USER_PIN + second_pool_key = (settings.PKCS11_PATH, second_softhsm_token, None, pin) + + # Create a second session and observe dual session pool + with SessionPool(settings.PKCS11_PATH, second_softhsm_token, None, pin) as second_session: + assert SessionPool._session_pool == {pool_key: session, second_pool_key: second_session} + assert SessionPool._session_refcount == {pool_key: 1, second_pool_key: 1} + + # Try to get a nested session... + with SessionPool(settings.PKCS11_PATH, second_softhsm_token, None, pin) as third_session: + assert second_session is third_session + assert SessionPool._session_pool == {pool_key: session, second_pool_key: second_session} + assert SessionPool._session_refcount == {pool_key: 1, second_pool_key: 2} + + # session was cleared from pool + assert SessionPool._session_pool == {pool_key: session} + assert SessionPool._session_refcount == {pool_key: 1} + + +def test_both_pins_empty() -> None: + """Test error when both so_pin and user_pin are not set.""" + with pytest.raises(ValueError, match=r"^so_pin and user_pin cannot both be None\.$"): + with SessionPool(settings.PKCS11_PATH, "any", None, None): + pass + + +def test_both_pins_set() -> None: + """Test error when both so_pin and user_pin are not set.""" + with pytest.raises(ValueError, match=r"^Either so_pin and user_pin must be set\.$"): + with SessionPool(settings.PKCS11_PATH, "any", "foo", "bar"): + pass diff --git a/pyproject.toml b/pyproject.toml index 1b55bda7c..ae969c5db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -193,6 +193,8 @@ module = [ "docker.*", "enchant.tokenize", "httpcore.*", + "pkcs11.*", + "pkcs11.util.*", # psycopg and psycopg_c are not installed in isolated mypy envs (tox, ...) "psycopg", "psycopg_c",