diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 98524eb0b..da9bb4fa0 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -24,7 +24,7 @@ jobs:
name: Python ${{ matrix.python-version }}, Django ${{ matrix.django-version }}, cryptography ${{ matrix.cryptography-version }}, pydantic ${{ matrix.pydantic-version }}
steps:
- name: Install APT dependencies
- run: sudo apt-get install -y firefox
+ run: sudo apt-get install -y firefox softhsm2
- name: Acquire sources
uses: actions/checkout@v4.1.1
diff --git a/ca/ca/test_settings.py b/ca/ca/test_settings.py
index 6aac801ae..0ebf4ebe2 100644
--- a/ca/ca/test_settings.py
+++ b/ca/ca/test_settings.py
@@ -14,8 +14,12 @@
"""Test settings for the django-ca project."""
import json
+import os
+from datetime import datetime, timezone
from pathlib import Path
+from django.utils.crypto import get_random_string
+
# Base paths in this project
BASE_DIR = Path(__file__).resolve().parent.parent # ca/
@@ -186,6 +190,14 @@
_fixture_data = json.load(stream)
+# PKCS11 settings
+_timestamp = datetime.now(tz=timezone.utc).strftime("%Y%m%d%H%M%S")
+PKCS11_PATH = os.environ.get("PKCS11_LIBRARY", "/usr/lib/softhsm/libsofthsm2.so")
+PKCS11_TOKEN_LABEL = f"pytest.{_timestamp}.{get_random_string(8)}"
+PKCS11_SO_PIN = "so-pin-1234"
+PKCS11_USER_PIN = "user-pin-1234"
+
+
CA_KEY_BACKENDS = {
"default": {
"BACKEND": "django_ca.key_backends.storages.StoragesBackend",
@@ -195,6 +207,14 @@
"BACKEND": "django_ca.key_backends.storages.StoragesBackend",
"OPTIONS": {"storage_alias": "secondary"},
},
+ "hsm": {
+ "BACKEND": "django_ca.key_backends.hsm.HSMBackend",
+ "OPTIONS": {
+ "module": PKCS11_PATH,
+ "token": PKCS11_TOKEN_LABEL,
+ "user_pin": PKCS11_USER_PIN,
+ },
+ },
}
# Custom settings
diff --git a/ca/django_ca/constants.py b/ca/django_ca/constants.py
index 4385a8cc2..224498785 100644
--- a/ca/django_ca/constants.py
+++ b/ca/django_ca/constants.py
@@ -574,6 +574,17 @@ class ExtendedKeyUsageOID(_ExtendedKeyUsageOID):
ed448.Ed448PublicKey,
rsa.RSAPublicKey,
)
+PUBLIC_KEY_TYPE_MAPPING: MappingProxyType[ParsableKeyType, type[CertificateIssuerPublicKeyTypes]] = (
+ MappingProxyType(
+ {
+ "DSA": dsa.DSAPublicKey,
+ "EC": ec.EllipticCurvePublicKey,
+ "Ed25519": ed25519.Ed25519PublicKey,
+ "Ed448": ed448.Ed448PublicKey,
+ "RSA": rsa.RSAPublicKey,
+ }
+ )
+)
#: Tuple of supported private key types.
PRIVATE_KEY_TYPES: tuple[type[CertificateIssuerPrivateKeyTypes], ...] = (
diff --git a/ca/django_ca/key_backends/base.py b/ca/django_ca/key_backends/base.py
index 8fc7aae82..3fad71330 100644
--- a/ca/django_ca/key_backends/base.py
+++ b/ca/django_ca/key_backends/base.py
@@ -213,13 +213,12 @@ def get_use_parent_private_key_options(
@abc.abstractmethod
def get_use_private_key_options(
- self, ca: Optional["CertificateAuthority"], options: dict[str, Any]
+ self, ca: "CertificateAuthority", options: dict[str, Any]
) -> UsePrivateKeyOptionsTypeVar:
"""Get options to use the private key of a certificate authority.
The returned model will be used for the certificate authority `ca`. You can pass it as extra context
- to influence model validation. If `ca` is ``None``, it indicates that the CA is currently being
- created via :command:`manage.py init_ca`.
+ to influence model validation.
`options` is the dictionary of arguments to :command:`manage.py init_ca` (including default values).
The key backend is expected to be able to sign certificates and CRLs using the options provided here.
diff --git a/ca/django_ca/key_backends/hsm/__init__.py b/ca/django_ca/key_backends/hsm/__init__.py
new file mode 100644
index 000000000..1025ae0e3
--- /dev/null
+++ b/ca/django_ca/key_backends/hsm/__init__.py
@@ -0,0 +1,18 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""HSM backend module."""
+
+from django_ca.key_backends.hsm.backend import HSMBackend
+
+__all__ = ("HSMBackend",)
diff --git a/ca/django_ca/key_backends/hsm/backend.py b/ca/django_ca/key_backends/hsm/backend.py
new file mode 100644
index 000000000..8139afcfe
--- /dev/null
+++ b/ca/django_ca/key_backends/hsm/backend.py
@@ -0,0 +1,434 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Key storage backend for hardware security modules (HSMs)."""
+
+from collections.abc import Iterator, Sequence
+from contextlib import contextmanager
+from datetime import datetime
+from typing import TYPE_CHECKING, Any, Final, Optional
+
+import pkcs11
+from pkcs11 import KeyType, ObjectClass, Session
+from pkcs11.util.ec import encode_named_curve_parameters
+from pkcs11.util.rsa import decode_rsa_private_key
+
+from asn1crypto.algos import SignedDigestAlgorithmId
+from cryptography import x509
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.asymmetric.types import (
+ CertificateIssuerPrivateKeyTypes,
+ CertificateIssuerPublicKeyTypes,
+)
+
+from django.core.management import CommandError
+
+from django_ca import constants
+from django_ca.conf import model_settings
+from django_ca.key_backends import KeyBackend
+from django_ca.key_backends.hsm.keys import (
+ PKCS11Ed448PrivateKey,
+ PKCS11Ed25519PrivateKey,
+ PKCS11EllipticCurvePrivateKey,
+ PKCS11PrivateKeyTypes,
+ PKCS11RSAPrivateKey,
+)
+from django_ca.key_backends.hsm.models import (
+ CreatePrivateKeyOptions,
+ StorePrivateKeyOptions,
+ UsePrivateKeyOptions,
+)
+from django_ca.key_backends.hsm.session import SessionPool
+from django_ca.key_backends.hsm.typehints import SupportedKeyType
+from django_ca.typehints import (
+ AllowedHashTypes,
+ ArgumentGroup,
+ CertificateExtension,
+ EllipticCurves,
+ ParsableKeyType,
+)
+from django_ca.utils import get_cert_builder, int_to_hex
+
+if TYPE_CHECKING:
+ from django_ca.models import CertificateAuthority
+
+
+class HSMBackend(KeyBackend[CreatePrivateKeyOptions, StorePrivateKeyOptions, UsePrivateKeyOptions]):
+ """Key backend to create and use private keys in a hardware security module (HSM)."""
+
+ name = "hsm"
+ title = "Store private keys using a hardware security module (HSM)"
+ description = (
+ "The private key will be stored on the hardware security module (HSM). The HSM makes sure that the"
+ "private key can never be recovered and thus compromised."
+ )
+ use_model = UsePrivateKeyOptions
+
+ supported_key_types: tuple[SupportedKeyType, ...] = ("RSA", "EC", "Ed25519", "Ed448")
+ supported_elliptic_curves: tuple[EllipticCurves, ...] = tuple(constants.ELLIPTIC_CURVE_TYPES)
+
+ module: str
+ token: str
+ so_pin: Optional[str]
+ user_pin: Optional[str]
+ _required_key_backend_options: Final[tuple[str, str, str]] = ("key_label", "key_id", "key_type")
+
+ def __init__(
+ self,
+ alias: str,
+ module: str,
+ token: str,
+ so_pin: Optional[str] = None,
+ user_pin: Optional[str] = None,
+ ):
+ if so_pin is not None and user_pin is not None:
+ raise ValueError(f"{alias}: Set either so_pin or user_pin.")
+
+ super().__init__(alias, module=module, token=token, so_pin=so_pin, user_pin=user_pin)
+
+ @contextmanager
+ def session(self, so_pin: Optional[str], user_pin: Optional[str], rw: bool = False) -> Iterator[Session]:
+ """Shortcut to get a session from the pool."""
+ with SessionPool(self.module, self.token, so_pin, user_pin, rw=rw) as session:
+ yield session
+
+ def _add_key_label_argument(self, group: ArgumentGroup, prefix: str = "") -> None:
+ group.add_argument(
+ f"--{self.argparse_prefix}{prefix}key-label",
+ type=str,
+ metavar="LABEL",
+ help="%(metavar)s to use for the private key in the HSM.",
+ )
+
+ def _add_pin_arguments(self, group: ArgumentGroup, prefix: str = "") -> None:
+ group.add_argument(
+ f"--{self.argparse_prefix}{prefix}so-pin",
+ type=str,
+ metavar="PIN",
+ help="Security officer %(metavar)s to access the HSM.",
+ )
+ group.add_argument(
+ f"--{self.argparse_prefix}{prefix}user-pin",
+ type=str,
+ metavar="PIN",
+ help="User %(metavar)s to access the HSM.",
+ )
+
+ def _get_pins(self, options: dict[str, Any], prefix: str = "") -> tuple[Optional[str], Optional[str]]:
+ options_prefix = f"{self.options_prefix}{prefix.replace('-', '_')}"
+ argparse_prefix = f"{self.argparse_prefix}{prefix}"
+
+ so_pin: Optional[str] = options[f"{options_prefix}so_pin"]
+ if so_pin is None:
+ so_pin = self.so_pin
+ elif so_pin == "":
+ so_pin = None
+
+ user_pin: Optional[str] = options[f"{options_prefix}user_pin"]
+ if user_pin is None:
+ user_pin = self.user_pin
+ elif user_pin == "":
+ user_pin = None
+
+ if so_pin is None and user_pin is None:
+ raise CommandError(
+ f"No SO or user pin configured. Use --{argparse_prefix}so-pin or --{argparse_prefix}user-pin "
+ f"to specify a pin."
+ )
+ if so_pin is not None and user_pin is not None:
+ raise CommandError(
+ "Both SO pin and user pin configured. To override a pin from settings, pass "
+ f'--{argparse_prefix}so-pin="" or --{argparse_prefix}user-pin="".'
+ )
+
+ return so_pin, user_pin
+
+ def _get_private_key(
+ self, ca: "CertificateAuthority", session: Session
+ ) -> CertificateIssuerPrivateKeyTypes:
+ key_id: str = ca.key_backend_options["key_id"]
+ key_label: str = ca.key_backend_options["key_label"]
+ key_type: SupportedKeyType = ca.key_backend_options["key_type"]
+
+ if key_type == "RSA":
+ return PKCS11RSAPrivateKey(session, key_id, key_label)
+ if key_type == "Ed448":
+ return PKCS11Ed448PrivateKey(session, key_id, key_label)
+ if key_type == "Ed25519":
+ return PKCS11Ed25519PrivateKey(session, key_id, key_label)
+ if key_type == "EC":
+ return PKCS11EllipticCurvePrivateKey(session, key_id, key_label)
+
+ raise ValueError(f"{key_type}: Unsupported key type.")
+
+ def add_create_private_key_arguments(self, group: ArgumentGroup) -> None:
+ self._add_key_label_argument(group)
+ self._add_pin_arguments(group)
+
+ def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None:
+ self._add_pin_arguments(group, "parent-")
+
+ def add_store_private_key_arguments(self, group: ArgumentGroup) -> None:
+ self._add_key_label_argument(group)
+ self._add_pin_arguments(group)
+
+ def add_use_private_key_arguments(self, group: ArgumentGroup) -> None:
+ self._add_pin_arguments(group)
+
+ def get_create_private_key_options(
+ self,
+ key_type: ParsableKeyType,
+ key_size: Optional[int],
+ elliptic_curve: Optional[str],
+ options: dict[str, Any],
+ ) -> CreatePrivateKeyOptions:
+ key_label = options[f"{self.options_prefix}key_label"]
+ if not key_label:
+ raise CommandError(
+ f"--{self.argparse_prefix}key-label is a required option for this key backend."
+ )
+
+ so_pin, user_pin = self._get_pins(options)
+
+ if key_type == "EC" and elliptic_curve is None:
+ # NOTE: Currently all curves supported by cryptography are also supported by this backend.
+ # If this changes, a check should be added here (if the default is not supported by the
+ # backend).
+ elliptic_curve = model_settings.CA_DEFAULT_ELLIPTIC_CURVE.name
+
+ return CreatePrivateKeyOptions(
+ key_label=key_label,
+ key_type=key_type,
+ key_size=key_size,
+ elliptic_curve=elliptic_curve,
+ so_pin=so_pin,
+ user_pin=user_pin,
+ )
+
+ def get_use_parent_private_key_options(
+ self, ca: "CertificateAuthority", options: dict[str, Any]
+ ) -> UsePrivateKeyOptions:
+ so_pin, user_pin = self._get_pins(options, "parent-")
+ return UsePrivateKeyOptions.model_validate(
+ {"so_pin": so_pin, "user_pin": user_pin}, context={"ca": ca}, strict=True
+ )
+
+ def get_use_private_key_options(
+ self, ca: "CertificateAuthority", options: dict[str, Any]
+ ) -> UsePrivateKeyOptions:
+ so_pin, user_pin = self._get_pins(options)
+ return UsePrivateKeyOptions.model_validate(
+ {"so_pin": so_pin, "user_pin": user_pin}, context={"ca": ca}, strict=True
+ )
+
+ def get_store_private_key_options(self, options: dict[str, Any]) -> StorePrivateKeyOptions:
+ key_label = options[f"{self.options_prefix}key_label"]
+ so_pin, user_pin = self._get_pins(options)
+ return StorePrivateKeyOptions.model_validate(
+ {"key_label": key_label, "so_pin": so_pin, "user_pin": user_pin}, strict=True
+ )
+
+ def is_usable(
+ self, ca: "CertificateAuthority", use_private_key_options: Optional[UsePrivateKeyOptions] = None
+ ) -> bool:
+ if not ca.key_backend_options or not isinstance(ca.key_backend_options, dict):
+ return False
+ for option in self._required_key_backend_options:
+ if not ca.key_backend_options.get(option):
+ return False
+ if use_private_key_options is None:
+ return True
+
+ try:
+ with self.session(
+ so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin
+ ) as session:
+ self._get_private_key(ca, session)
+ return True
+ except Exception: # pylint: disable=broad-exception-caught # want to always return bool
+ return False
+
+ def check_usable(self, ca: "CertificateAuthority", use_private_key_options: UsePrivateKeyOptions) -> None:
+ if not ca.key_backend_options or not isinstance(ca.key_backend_options, dict):
+ raise ValueError("key backend options are not defined.")
+ for option in self._required_key_backend_options:
+ if not ca.key_backend_options.get(option):
+ raise ValueError(f"{option}: Required key option is not defined.")
+
+ with self.session(
+ so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin
+ ) as session:
+ self._get_private_key(ca, session)
+
+ def create_private_key(
+ self, ca: "CertificateAuthority", key_type: ParsableKeyType, options: CreatePrivateKeyOptions
+ ) -> tuple[CertificateIssuerPublicKeyTypes, UsePrivateKeyOptions]:
+ key_id = int_to_hex(x509.random_serial_number())
+ key_label = options.key_label
+
+ # Test that no private key with the given label exists. Some libraries (e.g. SoftHSM) don't treat the
+ # label as unique and will silently create a second key with the same label.
+ with self.session(options.so_pin, options.user_pin) as session:
+ try:
+ session.get_key(object_class=ObjectClass.PUBLIC_KEY, label=key_label)
+ except pkcs11.NoSuchKey:
+ pass # this is what we hope for
+ else:
+ raise ValueError(f"{key_label}: Private key with this label already exists.")
+
+ if key_type == "RSA":
+ with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session:
+ pkcs11_public_key, pkcs11_private_key = session.generate_keypair(
+ pkcs11.KeyType.RSA, options.key_size, id=key_id.encode(), label=key_label, store=True
+ )
+
+ private_key: PKCS11PrivateKeyTypes = PKCS11RSAPrivateKey(
+ session=session,
+ key_id=key_id,
+ key_label=key_label,
+ pkcs11_private_key=pkcs11_private_key,
+ pkcs11_public_key=pkcs11_public_key,
+ )
+ public_key = private_key.public_key()
+
+ elif key_type in ("Ed25519", "Ed448"):
+ named_curve_parameters = encode_named_curve_parameters(
+ SignedDigestAlgorithmId(key_type.lower()).dotted
+ )
+
+ with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session:
+ parameters = session.create_domain_parameters(
+ KeyType.EC_EDWARDS, {pkcs11.Attribute.EC_PARAMS: named_curve_parameters}, local=True
+ )
+
+ pkcs11_public_key, pkcs11_private_key = parameters.generate_keypair(
+ mechanism=pkcs11.Mechanism.EC_EDWARDS_KEY_PAIR_GEN,
+ store=True,
+ id=key_id.encode(),
+ label=key_label,
+ )
+
+ if key_type == "Ed25519":
+ private_key = PKCS11Ed25519PrivateKey(
+ session=session,
+ key_id=key_id,
+ key_label=key_label,
+ pkcs11_private_key=pkcs11_private_key,
+ pkcs11_public_key=pkcs11_public_key,
+ )
+ else:
+ private_key = PKCS11Ed448PrivateKey(
+ session=session,
+ key_id=key_id,
+ key_label=key_label,
+ pkcs11_private_key=pkcs11_private_key,
+ pkcs11_public_key=pkcs11_public_key,
+ )
+ public_key = private_key.public_key()
+
+ elif key_type == "EC":
+ with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session:
+ # TYPEHINT NOTE: elliptic curve is always set if key_type is EC.
+ elliptic_curve_name = options.elliptic_curve.lower() # type: ignore[union-attr]
+ parameters = session.create_domain_parameters(
+ KeyType.EC,
+ {pkcs11.Attribute.EC_PARAMS: encode_named_curve_parameters(elliptic_curve_name)},
+ local=True,
+ )
+
+ pkcs11_public_key, pkcs11_private_key = parameters.generate_keypair(
+ store=True, id=key_id.encode(), label=key_label
+ )
+
+ private_key = PKCS11EllipticCurvePrivateKey(
+ session=session,
+ key_id=key_id,
+ key_label=key_label,
+ pkcs11_private_key=pkcs11_private_key,
+ pkcs11_public_key=pkcs11_public_key,
+ )
+ public_key = private_key.public_key()
+ else:
+ raise ValueError(f"{key_type}: unknown key type")
+
+ ca.key_backend_options = {"key_id": key_id, "key_label": key_label, "key_type": key_type}
+ use_private_key_options = UsePrivateKeyOptions.model_validate(
+ {"so_pin": options.so_pin, "user_pin": options.user_pin}, context={"ca": ca}
+ )
+
+ return public_key, use_private_key_options
+
+ def store_private_key(
+ self,
+ ca: "CertificateAuthority",
+ key: CertificateIssuerPrivateKeyTypes,
+ options: StorePrivateKeyOptions,
+ ) -> None:
+ key_id = int_to_hex(x509.random_serial_number())
+ if isinstance(key, rsa.RSAPrivateKey):
+ key_type = "RSA"
+ key_der = key.private_bytes(
+ encoding=serialization.Encoding.DER,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption(),
+ )
+ attrs = decode_rsa_private_key(key_der)
+ else:
+ raise ValueError(f"{key}: Importing key of this type is not supported.")
+
+ attrs[pkcs11.Attribute.ID] = key_id.encode()
+ attrs[pkcs11.Attribute.LABEL] = options.key_label
+
+ with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session:
+ session.create_object(attrs)
+
+ ca.key_backend_options = {"key_id": key_id, "key_label": options.key_label, "key_type": key_type}
+
+ def sign_certificate(
+ self,
+ ca: "CertificateAuthority",
+ use_private_key_options: UsePrivateKeyOptions,
+ public_key: CertificateIssuerPublicKeyTypes,
+ serial: int,
+ algorithm: Optional[AllowedHashTypes],
+ issuer: x509.Name,
+ subject: x509.Name,
+ expires: datetime,
+ extensions: Sequence[CertificateExtension],
+ ) -> x509.Certificate:
+ builder = get_cert_builder(expires, serial=serial)
+ builder = builder.public_key(public_key)
+ builder = builder.issuer_name(issuer)
+ builder = builder.subject_name(subject)
+ for extension in extensions:
+ builder = builder.add_extension(extension.value, critical=extension.critical)
+
+ with self.session(
+ so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin
+ ) as session:
+ private_key = self._get_private_key(ca, session)
+ return builder.sign(private_key=private_key, algorithm=algorithm)
+
+ def sign_certificate_revocation_list(
+ self,
+ ca: "CertificateAuthority",
+ use_private_key_options: UsePrivateKeyOptions,
+ builder: x509.CertificateRevocationListBuilder,
+ algorithm: Optional[AllowedHashTypes],
+ ) -> x509.CertificateRevocationList:
+ with self.session(
+ so_pin=use_private_key_options.so_pin, user_pin=use_private_key_options.user_pin
+ ) as session:
+ private_key = self._get_private_key(ca, session)
+ return builder.sign(private_key=private_key, algorithm=algorithm)
diff --git a/ca/django_ca/key_backends/hsm/keys.py b/ca/django_ca/key_backends/hsm/keys.py
new file mode 100644
index 000000000..8029c8908
--- /dev/null
+++ b/ca/django_ca/key_backends/hsm/keys.py
@@ -0,0 +1,231 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Private key implementations that use an HSM in the background."""
+
+from hashlib import sha256, sha384, sha512
+from typing import ClassVar, Generic, NoReturn, Optional, TypeVar, Union, cast
+
+import pkcs11
+from pkcs11 import Session
+from pkcs11.constants import Attribute
+from pkcs11.util.ec import encode_ec_public_key
+from pkcs11.util.rsa import encode_rsa_public_key
+
+from asn1crypto.core import OctetString
+from asn1crypto.keys import PublicKeyInfo
+from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.hazmat.primitives.asymmetric import ec, ed448, ed25519, rsa, utils as asym_utils
+from cryptography.hazmat.primitives.asymmetric.padding import PSS, AsymmetricPadding, PKCS1v15
+from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
+from cryptography.hazmat.primitives.serialization import load_der_public_key
+
+EdwardsPublicKeyTypeVar = TypeVar("EdwardsPublicKeyTypeVar", ed448.Ed448PublicKey, ed25519.Ed25519PublicKey)
+
+
+class PKCS11PrivateKeyMixin:
+ """Mixin providing common functionality to PKCS11 key implementations."""
+
+ # pylint: disable=missing-function-docstring # implements standard functions of the base class.
+
+ key_type: pkcs11.KeyType
+
+ def __init__(
+ self,
+ session: Session,
+ key_id: str,
+ key_label: str,
+ pkcs11_private_key: Optional[pkcs11.PrivateKey] = None,
+ pkcs11_public_key: Optional[pkcs11.PublicKey] = None,
+ ) -> None:
+ self._session = session
+ self._key_id = key_id.encode()
+ self._key_label = key_label
+ self._pkcs11_private_key = pkcs11_private_key
+ self._pkcs11_public_key = pkcs11_public_key
+
+ super().__init__()
+
+ def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes:
+ raise NotImplementedError(
+ "Decryption is not implemented for keys stored in a hardware security module (HSM)."
+ )
+
+ def private_bytes(
+ self,
+ encoding: serialization.Encoding,
+ format: serialization.PrivateFormat, # pylint: disable=redefined-builtin # given by cryptography
+ encryption_algorithm: serialization.KeySerializationEncryption,
+ ) -> bytes:
+ raise NotImplementedError(
+ "Private bytes cannot be retrieved for keys stored in a hardware security module (HSM)."
+ )
+
+ def private_numbers(self) -> NoReturn:
+ raise NotImplementedError(
+ "Private numbers cannot be retrieved for keys stored in a hardware security module (HSM)."
+ )
+
+ @property
+ def pkcs11_private_key(self) -> pkcs11.PrivateKey:
+ if self._pkcs11_private_key is None:
+ self._pkcs11_private_key = self._session.get_key(
+ key_type=self.key_type,
+ object_class=pkcs11.ObjectClass.PRIVATE_KEY,
+ id=self._key_id,
+ label=self._key_label,
+ )
+ return self._pkcs11_private_key
+
+ @property
+ def pkcs11_public_key(self) -> pkcs11.PublicKey:
+ if self._pkcs11_public_key is None:
+ self._pkcs11_public_key = self._session.get_key(
+ key_type=self.key_type,
+ object_class=pkcs11.ObjectClass.PUBLIC_KEY,
+ id=self._key_id,
+ label=self._key_label,
+ )
+ return self._pkcs11_public_key
+
+
+class PKCS11EdwardsPrivateKeyMixin(PKCS11PrivateKeyMixin, Generic[EdwardsPublicKeyTypeVar]):
+ """Specialized mixin for Ed448 and Ed5519 keys (which handle identical)."""
+
+ # pylint: disable=missing-function-docstring # implements standard functions of the base class.
+
+ public_key_algorithm: ClassVar[str]
+ key_type = pkcs11.KeyType.EC_EDWARDS
+ cryptograph_key_type: EdwardsPublicKeyTypeVar
+
+ def private_bytes_raw(self) -> bytes:
+ raise NotImplementedError(
+ "Private bytes cannot be retrieved for keys stored in a hardware security module (HSM)."
+ )
+
+ def public_key(self) -> EdwardsPublicKeyTypeVar:
+ ec_point = bytes(OctetString.load(self.pkcs11_public_key[Attribute.EC_POINT]))
+ value = {"algorithm": {"algorithm": self.public_key_algorithm}, "public_key": ec_point}
+ public_key: bytes = PublicKeyInfo(value).dump()
+
+ return cast(EdwardsPublicKeyTypeVar, load_der_public_key(public_key))
+
+ def sign(self, data: bytes) -> bytes:
+ return self.pkcs11_private_key.sign(data, mechanism=pkcs11.Mechanism.EDDSA) # type: ignore[no-any-return]
+
+
+# pylint: disable-next=abstract-method # private key functions are deliberately not implemented in base.
+class PKCS11RSAPrivateKey(PKCS11PrivateKeyMixin, rsa.RSAPrivateKey):
+ """Private key implementation for RSA keys stored in a HSM."""
+
+ key_type = pkcs11.KeyType.RSA
+
+ def public_key(self) -> RSAPublicKey:
+ der_public_key = encode_rsa_public_key(self.pkcs11_public_key)
+ return cast(RSAPublicKey, load_der_public_key(der_public_key))
+
+ @property
+ def key_size(self) -> int:
+ return self.public_key().key_size
+
+ def sign(
+ self,
+ data: bytes,
+ padding: AsymmetricPadding,
+ algorithm: Union[asym_utils.Prehashed, hashes.HashAlgorithm],
+ ) -> bytes:
+ if isinstance(algorithm, asym_utils.Prehashed):
+ raise ValueError("Prehashed operations are not supported.")
+
+ if isinstance(algorithm, hashes.SHA224) and isinstance(padding, PSS):
+ mechanism: hashes.HashAlgorithm = pkcs11.Mechanism.SHA224_RSA_PKCS_PSS
+ elif isinstance(algorithm, hashes.SHA224) and isinstance(padding, PKCS1v15):
+ mechanism = pkcs11.Mechanism.SHA224_RSA_PKCS
+ elif isinstance(algorithm, hashes.SHA256) and isinstance(padding, PSS):
+ mechanism = pkcs11.Mechanism.SHA256_RSA_PKCS_PSS
+ elif isinstance(algorithm, hashes.SHA256) and isinstance(padding, PKCS1v15):
+ mechanism = pkcs11.Mechanism.SHA256_RSA_PKCS
+ elif isinstance(algorithm, hashes.SHA384) and isinstance(padding, PSS):
+ mechanism = pkcs11.Mechanism.SHA384_RSA_PKCS_PSS
+ elif isinstance(algorithm, hashes.SHA384) and isinstance(padding, PKCS1v15):
+ mechanism = pkcs11.Mechanism.SHA384_RSA_PKCS
+ elif isinstance(algorithm, hashes.SHA512) and isinstance(padding, PSS):
+ mechanism = pkcs11.Mechanism.SHA512_RSA_PKCS_PSS
+ elif isinstance(algorithm, hashes.SHA512) and isinstance(padding, PKCS1v15):
+ mechanism = pkcs11.Mechanism.SHA512_RSA_PKCS
+ elif isinstance(algorithm, (hashes.SHA3_224, hashes.SHA3_384, hashes.SHA3_256, hashes.SHA3_512)):
+ raise ValueError("SHA3 is not support by the HSM backend.")
+ else:
+ raise ValueError(
+ f"{algorithm.name} with {padding.name} padding: Unknown signing algorithm and padding"
+ )
+
+ # TYPEHINT NOTE: library is not type-hinted.
+ return self.pkcs11_private_key.sign(data, mechanism=mechanism) # type: ignore[no-any-return]
+
+
+class PKCS11EllipticCurvePrivateKey(PKCS11PrivateKeyMixin, ec.EllipticCurvePrivateKey):
+ """Private key implementation for EC keys stored in a HSM."""
+
+ key_type = pkcs11.KeyType.EC
+
+ @property
+ def curve(self) -> ec.EllipticCurve:
+ return self.public_key().curve
+
+ def exchange(self, algorithm: ec.ECDH, peer_public_key: ec.EllipticCurvePublicKey) -> bytes:
+ raise NotImplementedError("Operation not implemented for a key stored in an HSM.")
+
+ @property
+ def key_size(self) -> int:
+ return self.public_key().key_size
+
+ def public_key(self) -> ec.EllipticCurvePublicKey:
+ public_key = encode_ec_public_key(self.pkcs11_public_key)
+ return cast(ec.EllipticCurvePublicKey, load_der_public_key(public_key))
+
+ def sign(self, data: bytes, signature_algorithm: ec.EllipticCurveSignatureAlgorithm) -> bytes:
+ if isinstance(signature_algorithm.algorithm, hashes.SHA256):
+ hasher = sha256()
+ elif isinstance(signature_algorithm.algorithm, hashes.SHA384):
+ hasher = sha384()
+ elif isinstance(signature_algorithm.algorithm, hashes.SHA512):
+ hasher = sha512()
+ else:
+ raise ValueError(f"{signature_algorithm.algorithm}: Signature algorithm is not supported.")
+
+ hasher.update(data)
+ data = hasher.digest()
+
+ return self.pkcs11_private_key.sign(data, mechanism=pkcs11.Mechanism.ECDSA) # type: ignore[no-any-return]
+
+
+# pylint: disable-next=abstract-method # private key functions are deliberately not implemented in base.
+class PKCS11Ed25519PrivateKey(
+ PKCS11EdwardsPrivateKeyMixin[ed25519.Ed25519PublicKey], ed25519.Ed25519PrivateKey
+):
+ """Private key implementation for Ed25519 keys stored in a HSM."""
+
+ public_key_algorithm = "ed25519"
+
+
+# pylint: disable-next=abstract-method # private key functions are deliberately not implemented in base.
+class PKCS11Ed448PrivateKey(PKCS11EdwardsPrivateKeyMixin[ed448.Ed448PublicKey], ed448.Ed448PrivateKey):
+ """Private key implementation for Ed448 keys stored in a HSM."""
+
+ public_key_algorithm = "ed448"
+
+
+PKCS11PrivateKeyTypes = Union[
+ PKCS11RSAPrivateKey, PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey, PKCS11EllipticCurvePrivateKey
+]
diff --git a/ca/django_ca/key_backends/hsm/models.py b/ca/django_ca/key_backends/hsm/models.py
new file mode 100644
index 000000000..2e393b4a5
--- /dev/null
+++ b/ca/django_ca/key_backends/hsm/models.py
@@ -0,0 +1,65 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Models used by the HSM backend."""
+
+import typing
+from typing import Optional
+
+from pydantic import BaseModel, ConfigDict, model_validator
+
+from django_ca.key_backends.base import CreatePrivateKeyOptionsBaseModel
+from django_ca.key_backends.hsm.typehints import SupportedKeyType
+from django_ca.typehints import EllipticCurves
+
+
+class PinModelMixin:
+ """Mixin providing so/user pin and validation."""
+
+ so_pin: Optional[str] = None
+ user_pin: Optional[str] = None
+
+ @model_validator(mode="after")
+ def validate_pins(self) -> "typing.Self":
+ """Validate that exactly one of `so_pin` and `user_pin` is set."""
+ if self.so_pin is None and self.user_pin is None:
+ raise ValueError("Provide one of so_pin or user_pin.")
+ if self.so_pin is not None and self.user_pin is not None:
+ raise ValueError("Provide either so_pin or user_pin.")
+ return self
+
+
+class CreatePrivateKeyOptions(PinModelMixin, CreatePrivateKeyOptionsBaseModel):
+ """Options for initializing private keys."""
+
+ # NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable.
+ model_config = ConfigDict(arbitrary_types_allowed=True)
+
+ key_label: str
+ key_type: SupportedKeyType # overwrites field from the base model
+ elliptic_curve: Optional[EllipticCurves]
+
+
+class StorePrivateKeyOptions(PinModelMixin, BaseModel):
+ """Options for storing a private key."""
+
+ # NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable.
+ model_config = ConfigDict(frozen=True)
+
+ key_label: str
+
+
+class UsePrivateKeyOptions(PinModelMixin, BaseModel):
+ """Options for using the private key."""
+
+ model_config = ConfigDict(frozen=True)
diff --git a/ca/django_ca/key_backends/hsm/session.py b/ca/django_ca/key_backends/hsm/session.py
new file mode 100644
index 000000000..17b170662
--- /dev/null
+++ b/ca/django_ca/key_backends/hsm/session.py
@@ -0,0 +1,118 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Code for handling sessions for hardware security modules."""
+
+import threading
+from types import TracebackType
+from typing import Final, Optional
+
+import pkcs11
+from pkcs11 import Session
+from pkcs11._pkcs11 import lib as pkcs11_lib
+
+PoolKeyType = tuple[str, str, Optional[str], Optional[str]]
+
+
+class SessionPool:
+ """Thread-safe session pool for PKCS11 sessions."""
+
+ _lib_lock: Final[threading.Lock] = threading.Lock()
+ _lib_pool: Final[dict[str, pkcs11_lib]] = {}
+
+ _session_lock: Final[threading.Lock] = threading.Lock()
+ _session_pool: Final[dict[PoolKeyType, Session]] = {}
+ _session_refcount: Final[dict[PoolKeyType, int]] = {}
+
+ path: Final[str]
+ token_label: Final[str]
+ so_pin: Final[Optional[str]]
+ user_pin: Final[Optional[str]]
+ rw: Final[bool]
+
+ def __init__(
+ self, path: str, token_label: str, so_pin: Optional[str], user_pin: Optional[str], rw: bool = False
+ ) -> None:
+ if so_pin is None and user_pin is None:
+ raise ValueError("so_pin and user_pin cannot both be None.")
+ if so_pin is not None and user_pin is not None:
+ raise ValueError("Either so_pin and user_pin must be set.")
+
+ self.path = path
+ self.token_label = token_label
+ self.so_pin = so_pin
+ self.user_pin = user_pin
+ self.rw = rw
+
+ @classmethod
+ def acquire(
+ cls,
+ path: str,
+ token_label: str,
+ so_pin: Optional[str] = None,
+ user_pin: Optional[str] = None,
+ rw: bool = False,
+ ) -> Session:
+ """Open a new session with the given parameters."""
+ with cls._lib_lock:
+ if path not in cls._lib_pool:
+ cls._lib_pool[path] = pkcs11.lib(path)
+
+ with cls._session_lock:
+ pool_key = (path, token_label, so_pin, user_pin)
+ if pool_key not in cls._session_pool:
+ token = cls._lib_pool[path].get_token(token_label=token_label)
+ cls._session_pool[pool_key] = token.open(rw=rw, so_pin=so_pin, user_pin=user_pin)
+ cls._session_refcount[pool_key] = 1
+ else:
+ # Request a read/write session, but a read-only session is already present. According to the
+ # PKCS11 documentation, some libraries don't allow multiple sessions for the same token per
+ # process:
+ #
+ # https://python-pkcs11.readthedocs.io/en/latest/concurrency.html
+ #
+ # Note that this does not happen in practice. A read/write session is only requested when
+ # generating or importing keys, and this always runs on the command-line, where no other
+ # session is ever present.
+ if rw is True and cls._session_pool[pool_key].rw is False:
+ raise ValueError("Requested R/W session, but R/O session is already initialized.")
+
+ # Session is already open, just increase the refcount
+ cls._session_refcount[pool_key] += 1
+
+ return cls._session_pool[pool_key]
+
+ @classmethod
+ def release(cls, path: str, token_label: str, so_pin: Optional[str], user_pin: Optional[str]) -> None:
+ """Close session if no reference is known."""
+ with cls._session_lock:
+ pool_key = (path, token_label, so_pin, user_pin)
+ cls._session_refcount[pool_key] -= 1
+
+ if cls._session_refcount[pool_key] == 0:
+ cls._session_pool[pool_key].close()
+ del cls._session_pool[pool_key]
+ del cls._session_refcount[pool_key]
+
+ def __enter__(self) -> Session:
+ return self.acquire(
+ self.path, self.token_label, so_pin=self.so_pin, user_pin=self.user_pin, rw=self.rw
+ )
+
+ def __exit__(
+ self,
+ exc_type: Optional[type[BaseException]],
+ value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> None:
+ self.release(self.path, self.token_label, so_pin=self.so_pin, user_pin=self.user_pin)
diff --git a/ca/django_ca/key_backends/hsm/typehints.py b/ca/django_ca/key_backends/hsm/typehints.py
new file mode 100644
index 000000000..208d6ce0d
--- /dev/null
+++ b/ca/django_ca/key_backends/hsm/typehints.py
@@ -0,0 +1,18 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Type aliases used in HSM code."""
+
+from typing import Literal
+
+SupportedKeyType = Literal["RSA", "EC", "Ed25519", "Ed448"]
diff --git a/ca/django_ca/key_backends/storages.py b/ca/django_ca/key_backends/storages.py
index f962f438f..2a1ed45e3 100644
--- a/ca/django_ca/key_backends/storages.py
+++ b/ca/django_ca/key_backends/storages.py
@@ -102,7 +102,7 @@ def load_default_password(cls, password: Optional[bytes], info: ValidationInfo)
"""Validator to load the password from CA_PASSWORDS if not given."""
if info.context and password is None:
ca: CertificateAuthority = info.context.get("ca")
- if ca is not None:
+ if ca is not None: # pragma: no branch # ca is always set, this is just a precaution.
if settings_password := model_settings.CA_PASSWORDS.get(ca.serial):
return settings_password
@@ -225,7 +225,7 @@ def get_store_private_key_options(self, options: dict[str, Any]) -> StorePrivate
)
def get_use_private_key_options(
- self, ca: Optional["CertificateAuthority"], options: dict[str, Any]
+ self, ca: "CertificateAuthority", options: dict[str, Any]
) -> UsePrivateKeyOptions:
return UsePrivateKeyOptions.model_validate(
{"password": options.get(f"{self.options_prefix}password")}, context={"ca": ca}, strict=True
diff --git a/ca/django_ca/management/commands/init_ca.py b/ca/django_ca/management/commands/init_ca.py
index 1c838244c..492ac0f25 100644
--- a/ca/django_ca/management/commands/init_ca.py
+++ b/ca/django_ca/management/commands/init_ca.py
@@ -16,6 +16,7 @@
.. seealso:: https://docs.djangoproject.com/en/dev/howto/custom-management-commands/
"""
+import logging
from collections.abc import Iterable
from datetime import datetime, timedelta, timezone as tz
from typing import Any, Optional
@@ -359,14 +360,12 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915
key_backend_options = key_backend.get_create_private_key_options(
key_type, key_size, elliptic_curve=elliptic_curve, options=options
)
- load_key_backend_options = key_backend.get_use_private_key_options(None, options)
# If there is a parent CA, test if we can use it (here) to sign certificates. The most common case
# where this happens is if the key is stored on the filesystem, but only accessible to the Celery
# worker and the current process is on the webserver side.
- if parent is None:
- signer_key_backend_options = load_key_backend_options
- else:
+ signer_key_backend_options = None
+ if parent is not None:
signer_key_backend_options = parent.key_backend.get_use_parent_private_key_options(
parent, options
)
@@ -374,10 +373,13 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915
# Check if the parent key is usable
parent.check_usable(signer_key_backend_options)
except ValidationError as ex:
+ logging.exception(ex)
self.validation_error_to_command_error(ex)
- except CommandError: # pragma: no cover
+ except CommandError as ex: # pragma: no cover
+ logging.exception(ex)
raise
except Exception as ex: # pragma: no cover
+ logging.exception(ex)
raise CommandError(*ex.args) from ex
# Get/validate signature hash algorithm
@@ -569,7 +571,14 @@ def handle( # pylint: disable=too-many-locals # noqa: PLR0912,PLR0913,PLR0915
ocsp_responder_key_validity=ocsp_responder_key_validity,
**kwargs,
)
+
+ load_key_backend_options = key_backend.get_use_private_key_options(ca, options)
+ except ValidationError as ex:
+ self.validation_error_to_command_error(ex)
+ except CommandError: # pragma: no cover
+ raise
except Exception as ex: # pragma: no cover
+ logging.exception(ex)
raise CommandError(ex) from ex
# Generate OCSP keys and cache CRLs
diff --git a/ca/django_ca/tests/base/fixtures.py b/ca/django_ca/tests/base/fixtures.py
index acb9fea6e..3ea1653d2 100644
--- a/ca/django_ca/tests/base/fixtures.py
+++ b/ca/django_ca/tests/base/fixtures.py
@@ -17,12 +17,15 @@
import copy
import os
+import subprocess
+import time
from collections.abc import Iterator
from pathlib import Path
from cryptography import x509
from cryptography.x509.oid import CertificatePoliciesOID, ExtensionOID, NameOID
+from django.conf import settings
from django.core.cache import cache
from django.core.files.storage import storages
@@ -250,6 +253,24 @@ def signed_certificate_timestamps_pub(
yield request.getfixturevalue(f"{name}_pub")
+@pytest.fixture(scope="session")
+def softhsm_token() -> Iterator[str]:
+ """Fixture to create a new token in SoftHSM."""
+ token = settings.PKCS11_TOKEN_LABEL
+ so_pin = settings.PKCS11_SO_PIN
+ pin = settings.PKCS11_USER_PIN
+ subprocess.run(
+ ["softhsm2-util", "--init-token", "--free", "--label", token, "--so-pin", so_pin, "--pin", pin],
+ check=True,
+ )
+ # There might be a synchronization issue with the softhsm token.
+ time.sleep(0.1)
+
+ yield token
+
+ subprocess.run(["softhsm2-util", "--delete-token", "--token", token], check=True)
+
+
@pytest.fixture()
def subject(hostname: str) -> Iterator[x509.Name]:
"""Fixture for a :py:class:`~cg:cryptography.x509.Name` to use for a subject.
diff --git a/ca/django_ca/tests/base/utils.py b/ca/django_ca/tests/base/utils.py
index d52ea2ca7..0cfd8a4f8 100644
--- a/ca/django_ca/tests/base/utils.py
+++ b/ca/django_ca/tests/base/utils.py
@@ -98,9 +98,7 @@ def create_private_key(
) -> tuple[CertificateIssuerPublicKeyTypes, DummyModel]:
return None, DummyModel() # type: ignore[return-value]
- def get_use_private_key_options(
- self, ca: Optional[CertificateAuthority], options: dict[str, Any]
- ) -> DummyModel:
+ def get_use_private_key_options(self, ca: CertificateAuthority, options: dict[str, Any]) -> DummyModel:
return DummyModel()
def is_usable(
diff --git a/ca/django_ca/tests/commands/test_init_ca.py b/ca/django_ca/tests/commands/test_init_ca.py
index e3e61e843..71a025b93 100644
--- a/ca/django_ca/tests/commands/test_init_ca.py
+++ b/ca/django_ca/tests/commands/test_init_ca.py
@@ -22,10 +22,12 @@
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import dsa, ec
-from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
+from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
+from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509.oid import AuthorityInformationAccessOID, ExtensionOID, NameOID
+from django.conf import settings
from django.core.cache import cache
from django.urls import reverse
from django.utils import timezone
@@ -33,9 +35,12 @@
import pytest
from pytest_django.fixtures import SettingsWrapper
+from django_ca import constants
from django_ca.conf import model_settings
from django_ca.constants import ExtendedKeyUsageOID
from django_ca.key_backends import key_backends
+from django_ca.key_backends.hsm import HSMBackend
+from django_ca.key_backends.hsm.models import UsePrivateKeyOptions as HSMUsePrivateKeyOptions
from django_ca.key_backends.storages import StoragesBackend, UsePrivateKeyOptions
from django_ca.models import Certificate, CertificateAuthority
from django_ca.signals import post_create_ca
@@ -72,10 +77,13 @@
subject_alternative_name,
uri,
)
+from django_ca.typehints import EllipticCurves, ParsableKeyType
from django_ca.utils import get_crl_cache_key
use_options = UsePrivateKeyOptions(password=None)
+# pylint: disable=redefined-outer-name # disabled because of fixtures
+
def assert_post_create_ca(post: mock.Mock, ca: CertificateAuthority) -> None:
"""Assert that the post_create_ca signal was called."""
@@ -155,8 +163,8 @@ def init_ca_e2e(
with assert_create_ca_signals() as (_pre, post):
out, err = cmd_e2e(["init_ca", name, subject, *args])
- assert out == ""
- assert err == ""
+ assert out == ""
+ assert err == ""
ca = CertificateAuthority.objects.get(name=name)
assert_post_create_ca(post, ca)
@@ -1058,6 +1066,179 @@ def test_non_default_key_backend_with_ec_key(
assert isinstance(key.curve, ec.SECT571R1)
+@pytest.mark.django_db
+@pytest.mark.usefixtures("tmpcadir")
+@pytest.mark.usefixtures("softhsm_token")
+@pytest.mark.parametrize("key_type", HSMBackend.supported_key_types)
+def test_hsm_backend(ca_name: str, rfc4514_subject: str, key_type: ParsableKeyType) -> None:
+ """Basic test for creating a key in the HSM."""
+ ca = init_ca_e2e(
+ ca_name,
+ rfc4514_subject,
+ "--subject-format=rfc4514",
+ f"--key-type={key_type}",
+ "--key-backend=hsm",
+ f"--hsm-key-label={ca_name}",
+ )
+
+ assert ca.key_backend_alias == "hsm"
+ assert ca.key_backend.is_usable(ca, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN))
+ assert ca.key_type == key_type
+ assert ca.key_backend_options["key_label"] == ca_name
+ assert ca.key_backend_options["key_type"] == key_type
+ assert isinstance(ca.pub.loaded, x509.Certificate)
+ assert isinstance(ca.pub.loaded.public_key(), constants.PUBLIC_KEY_TYPE_MAPPING[key_type])
+
+
+@pytest.mark.django_db
+@pytest.mark.usefixtures("tmpcadir")
+@pytest.mark.usefixtures("softhsm_token")
+def test_hsm_backend_with_rsa_options(ca_name: str, rfc4514_subject: str) -> None:
+ """Basic test for creating a key in the HSM."""
+ assert settings.CA_MIN_KEY_SIZE == 1024 # assert initial state
+ ca = init_ca_e2e(
+ ca_name,
+ rfc4514_subject,
+ "--subject-format=rfc4514",
+ "--key-type=RSA",
+ "--key-backend=hsm",
+ f"--hsm-key-label={ca_name}",
+ "--key-size=2048", # RSA specific option
+ )
+
+ assert ca.key_backend_alias == "hsm"
+ assert ca.key_backend.is_usable(ca, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN))
+ assert ca.key_type == "RSA"
+ assert ca.key_backend_options["key_label"] == ca_name
+ assert ca.key_backend_options["key_type"] == "RSA"
+ assert isinstance(ca.pub.loaded, x509.Certificate)
+ public_key = ca.pub.loaded.public_key()
+ assert isinstance(public_key, RSAPublicKey)
+ assert public_key.key_size == 2048
+
+
+@pytest.mark.django_db
+@pytest.mark.usefixtures("tmpcadir")
+@pytest.mark.usefixtures("softhsm_token")
+@pytest.mark.parametrize("ec_curve", HSMBackend.supported_elliptic_curves)
+def test_hsm_backend_with_ec_options(ca_name: str, ec_curve: EllipticCurves) -> None:
+ """Basic test for creating a key in the HSM."""
+ ca = init_ca(
+ ca_name,
+ key_type="EC",
+ key_backend=key_backends["hsm"],
+ hsm_key_label=ca_name,
+ elliptic_curve=ec_curve,
+ )
+
+ assert ca.key_backend_alias == "hsm"
+ assert ca.key_backend.is_usable(ca, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN))
+ assert ca.key_type == "EC"
+ assert ca.key_backend_options["key_label"] == ca_name
+ assert ca.key_backend_options["key_type"] == "EC"
+ assert isinstance(ca.pub.loaded, x509.Certificate)
+ public_key = ca.pub.loaded.public_key()
+ assert isinstance(public_key, EllipticCurvePublicKey)
+
+ # TYPEHINT NOTE: seems to be a false positive
+ assert public_key.key_size == constants.ELLIPTIC_CURVE_TYPES[ec_curve].key_size # type: ignore[comparison-overlap]
+ assert public_key.curve.name == ec_curve
+
+
+@pytest.mark.django_db
+@pytest.mark.usefixtures("tmpcadir")
+@pytest.mark.usefixtures("softhsm_token")
+def test_hsm_backend_with_child_ca(ca_name: str) -> None:
+ """Basic test for creating a key in the HSM."""
+ parent_name = f"{ca_name}_parent"
+ parent = init_ca(parent_name, key_backend=key_backends["hsm"], hsm_key_label=parent_name, path_length=1)
+
+ assert parent.key_backend_alias == "hsm"
+ assert parent.key_backend.is_usable(parent, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN))
+ assert parent.key_type == "RSA"
+ assert parent.key_backend_options["key_label"] == parent_name
+ assert parent.key_backend_options["key_type"] == "RSA"
+ assert isinstance(parent.pub.loaded, x509.Certificate)
+ assert isinstance(parent.pub.loaded.public_key(), RSAPublicKey)
+
+ # Create a child CA in the HSM
+ child_name = f"{ca_name}_child"
+ child = init_ca(child_name, parent=parent, key_backend=key_backends["hsm"], hsm_key_label=child_name)
+ assert child.key_backend_alias == "hsm"
+ assert child.key_backend.is_usable(child, HSMUsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN))
+ assert child.key_type == "RSA"
+ assert child.key_backend_options["key_label"] == child_name
+ assert child.key_backend_options["key_type"] == "RSA"
+ assert isinstance(child.pub.loaded, x509.Certificate)
+ assert isinstance(child.pub.loaded.public_key(), RSAPublicKey)
+ assert child.parent == parent
+
+
+def test_hsm_backend_without_key_label(ca_name: str) -> None:
+ """Test error when --key-label option is missing."""
+ with assert_command_error(r"^--hsm-key-label is a required option for this key backend\.$"):
+ init_ca(name=ca_name, key_backend=key_backends["hsm"])
+
+
+def test_hsm_backend_with_both_pins(ca_name: str) -> None:
+ """Test error when --key-label option is missing."""
+ with assert_command_error(
+ r'^Both SO pin and user pin configured\. To override a pin from settings, pass --hsm-so-pin="" or '
+ r'--hsm-user-pin=""\.$'
+ ):
+ init_ca(
+ name=ca_name,
+ key_backend=key_backends["hsm"],
+ hsm_key_label=ca_name,
+ hsm_so_pin="so-pin",
+ hsm_user_pin="user-pin",
+ )
+
+
+@pytest.mark.django_db
+@pytest.mark.usefixtures("tmpcadir")
+@pytest.mark.usefixtures("softhsm_token")
+def test_hsm_backend_with_both_parent_pins(ca_name: str) -> None:
+ """Test error when --key-label option is missing."""
+ parent_name = f"{ca_name}_parent"
+ parent = init_ca(parent_name, key_backend=key_backends["hsm"], hsm_key_label=parent_name, path_length=1)
+
+ with assert_command_error(
+ r"^Both SO pin and user pin configured\. To override a pin from settings, pass "
+ r'--hsm-parent-so-pin="" or --hsm-parent-user-pin=""\.$'
+ ):
+ init_ca(
+ name=ca_name,
+ parent=parent,
+ key_backend=key_backends["hsm"],
+ hsm_key_label=ca_name,
+ hsm_parent_so_pin="so-pin",
+ hsm_parent_user_pin="user-pin",
+ )
+
+
+@pytest.mark.django_db
+@pytest.mark.usefixtures("tmpcadir")
+@pytest.mark.usefixtures("softhsm_token")
+def test_hsm_backend_with_no_parent_pins(ca_name: str) -> None:
+ """Test error when --key-label option is missing."""
+ parent_name = f"{ca_name}_parent"
+ parent = init_ca(parent_name, key_backend=key_backends["hsm"], hsm_key_label=parent_name, path_length=1)
+
+ with assert_command_error(
+ r"^No SO or user pin configured\. Use --hsm-parent-so-pin or --hsm-parent-user-pin to specify "
+ r"a pin\.$"
+ ):
+ init_ca(
+ name=ca_name,
+ parent=parent,
+ key_backend=key_backends["hsm"],
+ hsm_key_label=ca_name,
+ hsm_parent_so_pin="",
+ hsm_parent_user_pin="",
+ )
+
+
def test_invalid_public_key_parameters(ca_name: str) -> None:
"""Test passing invalid public key parameters."""
msg = r"^Ed25519 keys do not allow an algorithm for signing\.$"
diff --git a/ca/django_ca/tests/key_backends/hsm/__init__.py b/ca/django_ca/tests/key_backends/hsm/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/ca/django_ca/tests/key_backends/hsm/conftest.py b/ca/django_ca/tests/key_backends/hsm/conftest.py
new file mode 100644
index 000000000..f574b0710
--- /dev/null
+++ b/ca/django_ca/tests/key_backends/hsm/conftest.py
@@ -0,0 +1,61 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Fixtures for HSM testing."""
+
+from collections.abc import Iterator
+
+from pkcs11._pkcs11 import Session
+
+from django.conf import settings
+
+import pytest
+
+from django_ca.key_backends.hsm import HSMBackend
+from django_ca.key_backends.hsm.session import SessionPool
+
+# pylint: disable=redefined-outer-name # usefixtures does not work on fixtures.
+
+
+@pytest.fixture
+def session_pool() -> Iterator[None]:
+ """Get a clean session pool."""
+ # Reinitialize the library, so that any token created before is also visible (SoftHSM only sees token
+ # present at initialization time).
+ # pylint: disable=use-implicit-booleaness-not-comparison
+ # pylint: disable=protected-access # deliberate access in this entire function
+ for lib in SessionPool._lib_pool.values():
+ lib.reinitialize()
+
+ # Assert that the session pool *is* clean. Any test leaving sessions intact would show up here.
+ assert SessionPool._session_pool == {}
+ assert SessionPool._session_refcount == {}
+
+ yield
+
+ # Make sure that we leave no open sessions.
+ assert SessionPool._session_pool == {}
+ assert SessionPool._session_refcount == {}
+
+
+@pytest.fixture
+def session(softhsm_token: str, session_pool: None) -> Session: # pylint: disable=unused-argument
+ """Fixture providing a fresh (read-only) session."""
+ with SessionPool(settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN) as session:
+ yield session
+
+
+@pytest.fixture
+def hsm_backend(softhsm_token: str) -> Iterator[HSMBackend]:
+ """Fixture to retrievea HSM backend instance."""
+ yield HSMBackend("hsm", settings.PKCS11_PATH, softhsm_token, user_pin=settings.PKCS11_USER_PIN)
diff --git a/ca/django_ca/tests/key_backends/hsm/test_backend.py b/ca/django_ca/tests/key_backends/hsm/test_backend.py
new file mode 100644
index 000000000..9d1ab9282
--- /dev/null
+++ b/ca/django_ca/tests/key_backends/hsm/test_backend.py
@@ -0,0 +1,131 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Main tests for the HSM backend."""
+
+from django.conf import settings
+
+import pytest
+
+from django_ca.key_backends import key_backends
+from django_ca.key_backends.hsm import HSMBackend
+from django_ca.key_backends.hsm.models import CreatePrivateKeyOptions, UsePrivateKeyOptions
+from django_ca.models import CertificateAuthority
+
+use_options = UsePrivateKeyOptions(user_pin=settings.PKCS11_USER_PIN)
+
+
+def test_invalid_pin_configuration() -> None:
+ """Test validation of so_pin/user_pin."""
+ with pytest.raises(ValueError, match=r"^test: Set either so_pin or user_pin\.$"):
+ HSMBackend("test", "/path", "token", so_pin="so-pin", user_pin="user-pin")
+
+
+@pytest.mark.usefixtures("softhsm_token")
+def test_invalid_private_key_type(root: CertificateAuthority) -> None:
+ """Test an invalid private key."""
+ root.key_backend_alias = "hsm"
+ root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "WRONG"}
+ root.save()
+ with pytest.raises(ValueError, match=r"^WRONG: Unsupported key type\.$"):
+ root.check_usable(use_options)
+
+
+def test_is_usable_with_no_options(root: CertificateAuthority) -> None:
+ """Test that is_usable() returns True if no options are passed."""
+ root.key_backend_alias = "hsm"
+ root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "RSA"}
+ root.save()
+ assert root.is_usable() is True
+ assert root.is_usable(None) is True
+
+
+@pytest.mark.usefixtures("softhsm_token")
+def test_is_usable_with_wrong_user_pin(root: CertificateAuthority) -> None:
+ """Test that is_usable() returns False if the wrong pin is passed."""
+ root.key_backend_alias = "hsm"
+ root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "RSA"}
+ root.save()
+ assert root.is_usable(UsePrivateKeyOptions(user_pin="wrong")) is False
+
+
+def test_no_private_key_options(root: CertificateAuthority) -> None:
+ """Test ...usable() with empty private key options."""
+ root.key_backend_alias = "hsm"
+ root.key_backend_options = {}
+ root.save()
+ assert root.is_usable(use_options) is False
+ with pytest.raises(ValueError, match=r"^key backend options are not defined\.$"):
+ root.check_usable(use_options)
+
+
+def test_private_key_options_not_a_dict(root: CertificateAuthority) -> None:
+ """Test ...usable() with private key options that are not a dict."""
+ root.key_backend_alias = "hsm"
+ root.key_backend_options = []
+ root.save()
+ assert root.is_usable(use_options) is False
+ with pytest.raises(ValueError, match=r"^key backend options are not defined\.$"):
+ root.check_usable(use_options)
+
+
+# pylint: disable-next=protected-access # okay in test cases
+@pytest.mark.parametrize("parameter", HSMBackend._required_key_backend_options)
+def test_private_key_options_missing_parameter(root: CertificateAuthority, parameter: str) -> None:
+ """Test ...usable() with private key options that are missing a value."""
+ root.key_backend_alias = "hsm"
+ root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "RSA"}
+ del root.key_backend_options[parameter]
+ root.save()
+ assert root.is_usable(use_options) is False
+
+ with pytest.raises(ValueError, match=rf"^{parameter}: Required key option is not defined\.$"):
+ root.check_usable(use_options)
+
+
+@pytest.mark.usefixtures("softhsm_token")
+@pytest.mark.parametrize("key_type", HSMBackend.supported_key_types)
+def test_create_private_key_with_read_only_session(
+ root: CertificateAuthority, ca_name: str, key_type: str
+) -> None:
+ """Test creating a private key if a read-only session is already open."""
+ root.key_backend_alias = "hsm"
+ root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "RSA"}
+ root.save()
+
+ options = CreatePrivateKeyOptions(
+ key_type=key_type, key_label=ca_name, elliptic_curve=None, user_pin=settings.PKCS11_USER_PIN
+ )
+
+ backend: HSMBackend = key_backends["hsm"] # type: ignore[assignment]
+ with pytest.raises(
+ ValueError, match=r"^Requested R/W session, but R/O session is already initialized\.$"
+ ):
+ with backend.session(so_pin=None, user_pin=settings.PKCS11_USER_PIN):
+ backend.create_private_key(root, options.key_type, options)
+
+
+@pytest.mark.usefixtures("softhsm_token")
+def test_create_private_key_with_unknown_key_type(root: CertificateAuthority, ca_name: str) -> None:
+ """Test creating a private key if a read-only session is already open."""
+ root.key_backend_alias = "hsm"
+ root.key_backend_options = {"key_id": "123", "key_label": "label", "key_type": "WRONG"}
+ root.save()
+
+ options = CreatePrivateKeyOptions(
+ key_type="RSA", key_label=ca_name, elliptic_curve=None, user_pin=settings.PKCS11_USER_PIN
+ )
+
+ backend: HSMBackend = key_backends["hsm"] # type: ignore[assignment]
+ with pytest.raises(ValueError, match=r"^WRONG: unknown key type$"):
+ backend.create_private_key(root, "WRONG", options) # type: ignore[arg-type] # what we test
diff --git a/ca/django_ca/tests/key_backends/hsm/test_keys.py b/ca/django_ca/tests/key_backends/hsm/test_keys.py
new file mode 100644
index 000000000..d711f2cd3
--- /dev/null
+++ b/ca/django_ca/tests/key_backends/hsm/test_keys.py
@@ -0,0 +1,68 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Tests for models used in the HSM key backend."""
+
+from typing import Union
+
+from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
+from cryptography.hazmat.primitives.serialization import Encoding, NoEncryption, PrivateFormat
+
+import pytest
+
+from django_ca.key_backends.hsm.keys import (
+ PKCS11Ed448PrivateKey,
+ PKCS11Ed25519PrivateKey,
+ PKCS11EllipticCurvePrivateKey,
+ PKCS11PrivateKeyTypes,
+ PKCS11RSAPrivateKey,
+)
+
+
+@pytest.mark.parametrize(
+ "key_class",
+ (PKCS11RSAPrivateKey, PKCS11EllipticCurvePrivateKey, PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey),
+)
+def test_not_implemented_error(key_class: type[PKCS11PrivateKeyTypes]) -> None:
+ """Test methods that are not implemented."""
+ key: PKCS11PrivateKeyTypes = key_class(None, "key_id", "key_label")
+
+ error = r"^Private numbers cannot be retrieved for keys stored in a hardware security module \(HSM\)\.$"
+ with pytest.raises(NotImplementedError, match=error):
+ key.private_numbers()
+
+ with pytest.raises(
+ NotImplementedError,
+ match=r"^Private bytes cannot be retrieved for keys stored in a hardware security module \(HSM\)\.$",
+ ):
+ key.private_bytes(Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption())
+
+ with pytest.raises(
+ NotImplementedError,
+ match=r"^Decryption is not implemented for keys stored in a hardware security module \(HSM\)\.$",
+ ):
+ key.decrypt(b"foo", PKCS1v15())
+
+
+@pytest.mark.parametrize("key_class", (PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey))
+def test_ed_key_not_implemented_error(
+ key_class: type[Union[PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey]],
+) -> None:
+ """Test methods that are not implemented."""
+ key: Union[PKCS11Ed25519PrivateKey, PKCS11Ed448PrivateKey] = key_class(None, "key_id", "key_label")
+
+ with pytest.raises(
+ NotImplementedError,
+ match=r"^Private bytes cannot be retrieved for keys stored in a hardware security module \(HSM\)\.$",
+ ):
+ key.private_bytes_raw()
diff --git a/ca/django_ca/tests/key_backends/hsm/test_models.py b/ca/django_ca/tests/key_backends/hsm/test_models.py
new file mode 100644
index 000000000..b226f7a34
--- /dev/null
+++ b/ca/django_ca/tests/key_backends/hsm/test_models.py
@@ -0,0 +1,41 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+"""Tests for models used in the HSM key backend."""
+
+from typing import Optional
+
+import pytest
+
+from django_ca.key_backends.hsm.models import UsePrivateKeyOptions
+
+
+@pytest.mark.parametrize("so_pin,user_pin", (("so-pin-value", None), (None, "user-pin-value")))
+def test_pins(so_pin: Optional[str], user_pin: Optional[str]) -> None:
+ """Test valid pin configurations."""
+ model = UsePrivateKeyOptions(so_pin=so_pin, user_pin=user_pin)
+ assert model.so_pin == so_pin
+ assert model.user_pin == user_pin
+
+
+@pytest.mark.parametrize(
+ "so_pin,user_pin,error",
+ (
+ (None, None, r"Provide one of so_pin or user_pin\."),
+ ("so-pin-value", "user-pin-value", r"Provide either so_pin or user_pin\."),
+ ),
+)
+def test_invalid_pins(so_pin: Optional[str], user_pin: Optional[str], error: str) -> None:
+ """Test invalid pin configurations."""
+ with pytest.raises(ValueError, match=error):
+ UsePrivateKeyOptions(so_pin=so_pin, user_pin=user_pin)
diff --git a/ca/django_ca/tests/key_backends/hsm/test_session.py b/ca/django_ca/tests/key_backends/hsm/test_session.py
new file mode 100644
index 000000000..f8c4e79bd
--- /dev/null
+++ b/ca/django_ca/tests/key_backends/hsm/test_session.py
@@ -0,0 +1,129 @@
+# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
+#
+# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
+# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+#
+# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
+# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along with django-ca. If not, see
+# .
+
+
+"""Test HSM related code."""
+
+import subprocess
+from collections.abc import Iterator
+from typing import Optional
+
+from pkcs11._pkcs11 import Session
+
+from django.conf import settings
+from django.utils.crypto import get_random_string
+
+import pytest
+
+from django_ca.key_backends.hsm.session import SessionPool
+
+PoolKeyType = tuple[str, str, Optional[str], Optional[str]]
+
+# pylint: disable=redefined-outer-name # several fixtures are defined here
+# pylint: disable=protected-access # we test class internals throughout this module
+
+
+@pytest.fixture
+def pool_key(softhsm_token: str) -> Iterator[PoolKeyType]:
+ """Minor fixture to return the pool key for the default settings."""
+ yield settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN
+
+
+@pytest.fixture
+def second_softhsm_token() -> Iterator[str]:
+ """Fixture to create a second softhsm token."""
+ label = f"pytest.{get_random_string(8)}.dual"
+
+ so_pin = settings.PKCS11_SO_PIN
+ pin = settings.PKCS11_USER_PIN
+
+ subprocess.run(
+ ["softhsm2-util", "--init-token", "--free", "--label", label, "--so-pin", so_pin, "--pin", pin],
+ check=True,
+ )
+
+ yield label
+
+ subprocess.run(["softhsm2-util", "--delete-token", "--token", label], check=True)
+
+
+def test_duplicate_session(softhsm_token: str, session: Session, pool_key: PoolKeyType) -> None:
+ """Test that a second session request does not open a new session."""
+ assert isinstance(session, Session)
+ assert SessionPool._session_pool == {pool_key: session}
+ assert SessionPool._session_refcount == {pool_key: 1}
+
+ # Get another session, even though it's in the same thread, refcount still goes up
+ with SessionPool(settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN) as second_session:
+ assert session is second_session
+ assert isinstance(second_session, Session)
+ assert SessionPool._session_pool == {pool_key: session}
+ assert SessionPool._session_refcount == {pool_key: 2}
+
+ # Test that second session was cleaned up
+ assert SessionPool._session_pool == {pool_key: session}
+ assert SessionPool._session_refcount == {pool_key: 1}
+
+
+def test_duplicate_rw_session(softhsm_token: str, session: Session, pool_key: PoolKeyType) -> None:
+ """Test that requesting a read/write session when it is already open read-only is an error."""
+ assert SessionPool._session_pool == {pool_key: session}
+ assert SessionPool._session_refcount == {pool_key: 1}
+ with pytest.raises(
+ ValueError, match=r"^Requested R/W session, but R/O session is already initialized\.$"
+ ):
+ with SessionPool(settings.PKCS11_PATH, softhsm_token, None, settings.PKCS11_USER_PIN, rw=True):
+ pass
+
+ # Test that the ref count has not increased
+ assert SessionPool._session_pool == {pool_key: session}
+ assert SessionPool._session_refcount == {pool_key: 1}
+
+
+def test_two_sessions(second_softhsm_token: str, session: Session, pool_key: PoolKeyType) -> None:
+ """Test creating a second token with dual sessions."""
+ # assert that we have one session in the beginning
+ assert SessionPool._session_pool == {pool_key: session}
+ assert SessionPool._session_refcount == {pool_key: 1}
+
+ pin = settings.PKCS11_USER_PIN
+ second_pool_key = (settings.PKCS11_PATH, second_softhsm_token, None, pin)
+
+ # Create a second session and observe dual session pool
+ with SessionPool(settings.PKCS11_PATH, second_softhsm_token, None, pin) as second_session:
+ assert SessionPool._session_pool == {pool_key: session, second_pool_key: second_session}
+ assert SessionPool._session_refcount == {pool_key: 1, second_pool_key: 1}
+
+ # Try to get a nested session...
+ with SessionPool(settings.PKCS11_PATH, second_softhsm_token, None, pin) as third_session:
+ assert second_session is third_session
+ assert SessionPool._session_pool == {pool_key: session, second_pool_key: second_session}
+ assert SessionPool._session_refcount == {pool_key: 1, second_pool_key: 2}
+
+ # session was cleared from pool
+ assert SessionPool._session_pool == {pool_key: session}
+ assert SessionPool._session_refcount == {pool_key: 1}
+
+
+def test_both_pins_empty() -> None:
+ """Test error when both so_pin and user_pin are not set."""
+ with pytest.raises(ValueError, match=r"^so_pin and user_pin cannot both be None\.$"):
+ with SessionPool(settings.PKCS11_PATH, "any", None, None):
+ pass
+
+
+def test_both_pins_set() -> None:
+ """Test error when both so_pin and user_pin are not set."""
+ with pytest.raises(ValueError, match=r"^Either so_pin and user_pin must be set\.$"):
+ with SessionPool(settings.PKCS11_PATH, "any", "foo", "bar"):
+ pass
diff --git a/ca/django_ca/tests/key_backends/test_base.py b/ca/django_ca/tests/key_backends/test_base.py
index 038821db0..fc767521e 100644
--- a/ca/django_ca/tests/key_backends/test_base.py
+++ b/ca/django_ca/tests/key_backends/test_base.py
@@ -56,6 +56,7 @@ def test_key_backends_iter(settings: SettingsWrapper) -> None:
assert list(key_backends) == [
key_backends[model_settings.CA_DEFAULT_KEY_BACKEND],
key_backends["secondary"],
+ key_backends["hsm"],
]
settings.CA_KEY_BACKENDS = {
diff --git a/pyproject.toml b/pyproject.toml
index 4f5569e8b..6bd4a7529 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -123,6 +123,7 @@ dependencies = [
[project.optional-dependencies]
api = ["django-ninja>=1.1"]
celery = ["celery>=5.4"]
+hsm = ["python-pkcs11>=0.7"]
mysql = ["mysqlclient"]
postgres = ["psycopg[c,pool]>=3.1"]
# remove psycopg3 in django-ca==2.0, also update docs/source/include/pip-extras.rst
@@ -193,6 +194,8 @@ module = [
"docker.*",
"enchant.tokenize",
"httpcore.*",
+ "pkcs11.*",
+ "pkcs11.util.*",
# psycopg and psycopg_c are not installed in isolated mypy envs (tox, ...)
"psycopg",
"psycopg_c",
@@ -391,6 +394,9 @@ filterwarnings = [
"error:::josepy",
"error:::pydantic",
+ # PyOpenSSL==24.2.1 deprecates CSR support, which is still used by josepy==1.14.0.
+ "ignore:CSR support in pyOpenSSL is deprecated. You should use the APIs in cryptography.",
+
# Disabled because cryptography calls the function directly and the warning is issued with stacklevel=2,
# so it's shown for our code. See: https://github.com/pyca/cryptography/issues/9580
"ignore:datetime.datetime.utcfromtimestamp\\(\\) is deprecated::django_ca", # pragma: only cg<42
diff --git a/requirements.txt b/requirements.txt
index b8746b1a1..12f87e382 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1 @@
--e .[celery,redis,yaml,api,postgres]
+-e .[celery,hsm,redis,yaml,api,postgres]