Skip to content

Commit

Permalink
add OCSP key backend using HSM
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Dec 6, 2024
1 parent e5563f7 commit ae1008f
Show file tree
Hide file tree
Showing 9 changed files with 385 additions and 170 deletions.
15 changes: 15 additions & 0 deletions ca/ca/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,21 @@
},
}

CA_OCSP_KEY_BACKENDS = {
"default": {
"BACKEND": "django_ca.key_backends.storages.StoragesOCSPBackend",
"OPTIONS": {"storage_alias": "django-ca"},
},
"hsm": {
"BACKEND": "django_ca.key_backends.hsm.HSMOCSPBackend",
"OPTIONS": {
"library_path": PKCS11_PATH,
"token": PKCS11_TOKEN_LABEL,
"user_pin": PKCS11_USER_PIN,
},
},
}

# Custom settings
CA_DEFAULT_SUBJECT = (
{"oid": "C", "value": "AT"},
Expand Down
3 changes: 2 additions & 1 deletion ca/django_ca/key_backends/hsm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@
"""HSM backend module."""

from django_ca.key_backends.hsm.backend import HSMBackend
from django_ca.key_backends.hsm.ocsp_backend import HSMOCSPBackend

__all__ = ("HSMBackend",)
__all__ = ("HSMBackend", "HSMOCSPBackend")
151 changes: 18 additions & 133 deletions ca/django_ca/key_backends/hsm/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@

"""Key storage backend for hardware security modules (HSMs)."""

from collections.abc import Iterator, Sequence
from contextlib import contextmanager
from collections.abc import Sequence
from datetime import datetime
from typing import TYPE_CHECKING, Any, Final, Optional

import pkcs11
from pkcs11 import KeyType, ObjectClass, Session
from pkcs11.util.ec import decode_ec_private_key, decode_ec_public_key, encode_named_curve_parameters
from pkcs11 import Session
from pkcs11.util.ec import decode_ec_private_key, decode_ec_public_key
from pkcs11.util.rsa import decode_rsa_private_key, decode_rsa_public_key

from asn1crypto.algos import SignedDigestAlgorithmId
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec, ed448, ed25519, rsa
Expand All @@ -44,12 +42,12 @@
PKCS11PrivateKeyTypes,
PKCS11RSAPrivateKey,
)
from django_ca.key_backends.hsm.mixins import HSMKeyBackendMixin
from django_ca.key_backends.hsm.models import (
HSMCreatePrivateKeyOptions,
HSMStorePrivateKeyOptions,
HSMUsePrivateKeyOptions,
)
from django_ca.key_backends.hsm.session import SessionPool
from django_ca.key_backends.hsm.typehints import SupportedKeyType
from django_ca.typehints import (
AllowedHashTypes,
Expand All @@ -65,7 +63,10 @@
from django_ca.models import CertificateAuthority


class HSMBackend(KeyBackend[HSMCreatePrivateKeyOptions, HSMStorePrivateKeyOptions, HSMUsePrivateKeyOptions]):
class HSMBackend(
HSMKeyBackendMixin,
KeyBackend[HSMCreatePrivateKeyOptions, HSMStorePrivateKeyOptions, HSMUsePrivateKeyOptions],
):
"""A key backend to create and use private keys in a hardware security module (HSM)."""

name = "hsm"
Expand All @@ -80,51 +81,8 @@ class HSMBackend(KeyBackend[HSMCreatePrivateKeyOptions, HSMStorePrivateKeyOption
supported_hash_algorithms: tuple[HashAlgorithms, ...] = ("SHA-224", "SHA-256", "SHA-384", "SHA-512")
supported_elliptic_curves: tuple[EllipticCurves, ...] = tuple(constants.ELLIPTIC_CURVE_TYPES)

library_path: str
token: str
so_pin: Optional[str]
user_pin: Optional[str]
_required_key_backend_options: Final[tuple[str, str, str]] = ("key_label", "key_id", "key_type")

def __init__(
self,
alias: str,
library_path: str,
token: str,
so_pin: Optional[str] = None,
user_pin: Optional[str] = None,
):
if so_pin is not None and user_pin is not None:
raise ValueError(f"{alias}: Set either so_pin or user_pin.")

super().__init__(alias, library_path=library_path, token=token, so_pin=so_pin, user_pin=user_pin)

@contextmanager
def session(self, so_pin: Optional[str], user_pin: Optional[str], rw: bool = False) -> Iterator[Session]:
"""Shortcut to get a session from the pool."""
try:
with SessionPool(self.library_path, self.token, so_pin, user_pin, rw=rw) as session:
yield session
# python-pkcs11 provides no useful exception strings, so we re-create exceptions with useful ones that
# can be sent to the user.
except pkcs11.UserNotLoggedIn as ex:
# NOTE: We always authenticate, but some operations are known to require a pin and the underlying
# pkcs11 library does not support it. The known case is creating a key with a SO pin.
raise pkcs11.UserNotLoggedIn(
"An operation required a login, but none was provided. This is most likely a bug in the "
"underlying library, not in django-ca."
) from ex
except pkcs11.PinIncorrect as ex:
raise pkcs11.PinIncorrect("Pin incorrect.") from ex # user supplied incorrect pin
except pkcs11.NoSuchToken as ex:
raise pkcs11.NoSuchToken(f"{self.token}: Token not found.") from ex
except pkcs11.SessionReadOnly as ex:
# E.g. trying to generate a key with a read-only session. Should not happen.
raise pkcs11.SessionReadOnly("Attempting to write to a read-only session.") from ex
except pkcs11.PKCS11Error as ex:
# Catch-all for any PKCS11 error. Should not happen, as all relevant errors are handled above.
raise pkcs11.PKCS11Error(f"Unknown pkcs11 error ({type(ex).__name__}).") from ex

def _add_key_label_argument(self, group: ArgumentGroup, prefix: str = "") -> None:
group.add_argument(
f"--{self.argparse_prefix}{prefix}key-label",
Expand Down Expand Up @@ -301,90 +259,17 @@ def create_private_key(
key_id = int_to_hex(x509.random_serial_number())
key_label = options.key_label

# Test that no private key with the given label exists. Some libraries (e.g. SoftHSM) don't treat the
# label as unique and will silently create a second key with the same label.
# NOTE: Using a rw session here, even though we don't need it. pkcs11 fails (at least with softhsm2)
# if an so_pin is used and a read-only session is requested. Also, we have to use rw when creating
# the key anyway.
with self.session(so_pin=options.so_pin, user_pin=options.user_pin, rw=True) as session:
try:
session.get_key(object_class=ObjectClass.PUBLIC_KEY, label=key_label)
except pkcs11.NoSuchKey:
pass # this is what we hope for
else:
raise ValueError(f"{key_label}: Private key with this label already exists.")

if key_type == "RSA":
pkcs11_public_key, pkcs11_private_key = session.generate_keypair(
pkcs11.KeyType.RSA, options.key_size, id=key_id.encode(), label=key_label, store=True
)

private_key: PKCS11PrivateKeyTypes = PKCS11RSAPrivateKey(
session=session,
key_id=key_id,
key_label=key_label,
pkcs11_private_key=pkcs11_private_key,
pkcs11_public_key=pkcs11_public_key,
)
public_key = private_key.public_key()

elif key_type in ("Ed25519", "Ed448"):
named_curve_parameters = encode_named_curve_parameters(
SignedDigestAlgorithmId(key_type.lower()).dotted
)

parameters = session.create_domain_parameters(
KeyType.EC_EDWARDS, {pkcs11.Attribute.EC_PARAMS: named_curve_parameters}, local=True
)

pkcs11_public_key, pkcs11_private_key = parameters.generate_keypair(
mechanism=pkcs11.Mechanism.EC_EDWARDS_KEY_PAIR_GEN,
store=True,
id=key_id.encode(),
label=key_label,
)

if key_type == "Ed25519":
private_key = PKCS11Ed25519PrivateKey(
session=session,
key_id=key_id,
key_label=key_label,
pkcs11_private_key=pkcs11_private_key,
pkcs11_public_key=pkcs11_public_key,
)
else:
private_key = PKCS11Ed448PrivateKey(
session=session,
key_id=key_id,
key_label=key_label,
pkcs11_private_key=pkcs11_private_key,
pkcs11_public_key=pkcs11_public_key,
)
public_key = private_key.public_key()

elif key_type == "EC":
# TYPEHINT NOTE: elliptic curve is always set if key_type is EC.
elliptic_curve_name = options.elliptic_curve.lower() # type: ignore[union-attr]
parameters = session.create_domain_parameters(
KeyType.EC,
{pkcs11.Attribute.EC_PARAMS: encode_named_curve_parameters(elliptic_curve_name)},
local=True,
)

pkcs11_public_key, pkcs11_private_key = parameters.generate_keypair(
store=True, id=key_id.encode(), label=key_label
)

private_key = PKCS11EllipticCurvePrivateKey(
session=session,
key_id=key_id,
key_label=key_label,
pkcs11_private_key=pkcs11_private_key,
pkcs11_public_key=pkcs11_public_key,
)
public_key = private_key.public_key()
else:
raise ValueError(f"{key_type}: unknown key type")
private_key = self._create_private_key(
session,
key_id,
key_label,
key_type,
key_size=options.key_size,
elliptic_curve=options.elliptic_curve,
)

public_key = private_key.public_key()

ca.key_backend_options = {"key_id": key_id, "key_label": key_label, "key_type": key_type}
use_private_key_options = HSMUsePrivateKeyOptions.model_validate(
Expand Down
Loading

0 comments on commit ae1008f

Please sign in to comment.