Skip to content

Commit

Permalink
add OCSP key backend options
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Nov 26, 2024
1 parent 31ea45e commit 35bc4da
Show file tree
Hide file tree
Showing 27 changed files with 606 additions and 302 deletions.
17 changes: 17 additions & 0 deletions ca/django_ca/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ class SettingsModel(BaseModel):
x509.NameOID.EMAIL_ADDRESS,
x509.NameOID.SERIAL_NUMBER,
)
CA_DEFAULT_OCSP_KEY_BACKEND: str = "default"
CA_DEFAULT_PRIVATE_KEY_TYPE: ParsableKeyType = "RSA"
CA_DEFAULT_PROFILE: str = "webserver"
CA_DEFAULT_SIGNATURE_HASH_ALGORITHM: HashAlgorithmTypeAlias = hashes.SHA512()
Expand All @@ -407,6 +408,7 @@ class SettingsModel(BaseModel):
CA_KEY_BACKENDS: dict[str, KeyBackendConfigurationModel] = Field(default_factory=dict)
CA_MIN_KEY_SIZE: Annotated[PowerOfTwoInt, Ge(1024)] = 2048
CA_NOTIFICATION_DAYS: tuple[int, ...] = (14, 7, 3, 1)
CA_OCSP_KEY_BACKENDS: dict[str, KeyBackendConfigurationModel] = Field(default_factory=dict)

# The minimum value comes from the fact that the renewal task only runs every hour by default.
CA_OCSP_RESPONDER_CERTIFICATE_RENEWAL: Annotated[timedelta, Ge(timedelta(hours=2))] = timedelta(days=1)
Expand Down Expand Up @@ -464,6 +466,21 @@ def check_ca_key_backends(self) -> "SettingsModel":
raise ValueError(f"{self.CA_DEFAULT_KEY_BACKEND}: The default key backend is not configured.")
return self

@model_validator(mode="after")
def check_ca_ocsp_key_backends(self) -> "SettingsModel":
"""Set the default OCSP key backend if not set, and validate that the default is configured."""
if not self.CA_OCSP_KEY_BACKENDS:
# pylint: disable-next=unsupported-assignment-operation # pylint this this is a Field()
self.CA_OCSP_KEY_BACKENDS[self.CA_DEFAULT_OCSP_KEY_BACKEND] = KeyBackendConfigurationModel(
BACKEND=constants.DEFAULT_OCSP_KEY_BACKEND,
OPTIONS={"storage_alias": self.CA_DEFAULT_STORAGE_ALIAS},
)

# pylint: disable-next=unsupported-membership-test # pylint this this is a Field()
elif self.CA_DEFAULT_OCSP_KEY_BACKEND not in self.CA_OCSP_KEY_BACKENDS:
raise ValueError(f"{self.CA_DEFAULT_KEY_BACKEND}: The default key backend is not configured.")
return self

@model_validator(mode="after")
def check_ca_default_profile(self) -> "SettingsModel":
"""Validate that the default profile is also configured."""
Expand Down
1 change: 1 addition & 0 deletions ca/django_ca/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
] = MappingProxyType({v: k for k, v in CERTIFICATE_REVOCATION_LIST_ENCODING_TYPES.items()})

DEFAULT_STORAGE_BACKEND = "django_ca.key_backends.storages.StoragesBackend"
DEFAULT_OCSP_KEY_BACKEND = "django_ca.key_backends.storages.StoragesOCSPBackend"

#: Mapping of elliptic curve names to the implementing classes
ELLIPTIC_CURVE_TYPES: MappingProxyType[EllipticCurves, type[ec.EllipticCurve]] = MappingProxyType(
Expand Down
4 changes: 2 additions & 2 deletions ca/django_ca/key_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@

"""Public exports of the backend module."""

from django_ca.key_backends.base import KeyBackend, key_backends
from django_ca.key_backends.base import KeyBackend, OCSPKeyBackend, key_backends, ocsp_key_backends

__all__ = ["KeyBackend", "key_backends"]
__all__ = ["KeyBackend", "OCSPKeyBackend", "key_backends", "ocsp_key_backends"]
227 changes: 182 additions & 45 deletions ca/django_ca/key_backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,23 @@
"""Base classes for CA backends."""

import abc
import logging
import typing
from collections.abc import Iterator, Sequence
from datetime import datetime
from threading import local
from typing import Annotated, Any, Optional
from typing import Annotated, Any, ClassVar, Generic, Optional, TypeVar

from pydantic import BaseModel, Field, model_validator

from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
from cryptography.hazmat.primitives.asymmetric.types import (
CertificateIssuerPrivateKeyTypes,
CertificateIssuerPublicKeyTypes,
)
from cryptography.x509.ocsp import OCSPResponse, OCSPResponseBuilder

from django.core.exceptions import ImproperlyConfigured
from django.core.management import CommandParser
Expand All @@ -48,11 +51,13 @@
from django_ca.models import CertificateAuthority


log = logging.getLogger(__name__)

# NOTE: Self only needed before Python3.11, replace with typing.Self then
Self = typing.TypeVar("Self", bound="KeyBackend[BaseModel,BaseModel,BaseModel]") # pragma: only py<3.11
CreatePrivateKeyOptionsTypeVar = typing.TypeVar("CreatePrivateKeyOptionsTypeVar", bound=BaseModel)
UsePrivateKeyOptionsTypeVar = typing.TypeVar("UsePrivateKeyOptionsTypeVar", bound=BaseModel)
StorePrivateKeyOptionsTypeVar = typing.TypeVar("StorePrivateKeyOptionsTypeVar", bound=BaseModel)
Self = TypeVar("Self", bound="KeyBackend[BaseModel,BaseModel,BaseModel]") # pragma: only py<3.11
CreatePrivateKeyOptionsTypeVar = TypeVar("CreatePrivateKeyOptionsTypeVar", bound=BaseModel)
UsePrivateKeyOptionsTypeVar = TypeVar("UsePrivateKeyOptionsTypeVar", bound=BaseModel)
StorePrivateKeyOptionsTypeVar = TypeVar("StorePrivateKeyOptionsTypeVar", bound=BaseModel)


class CreatePrivateKeyOptionsBaseModel(BaseModel):
Expand All @@ -71,16 +76,8 @@ def validate_key_size(self) -> "typing.Self":
return self


class KeyBackend(
typing.Generic[
CreatePrivateKeyOptionsTypeVar, StorePrivateKeyOptionsTypeVar, UsePrivateKeyOptionsTypeVar
],
metaclass=abc.ABCMeta,
):
"""Base class for all key storage backends.
All implementations of a key storage backend must implement this abstract base class.
"""
class KeyBackendBase:
"""Base class for backend classes to create private keys (CAs or OCSP delegate responder certificates)."""

#: Alias under which this backend is configured under settings.KEY_BACKENDS.
alias: str
Expand All @@ -102,6 +99,25 @@ class KeyBackend(
#: one of the named values if ``--key-type=EC`` is passed.
supported_elliptic_curves: tuple[str, ...]

def __init__(self, alias: str, **kwargs: Any) -> None:
self.alias = alias

for key, value in kwargs.items():
setattr(self, key, value)


class KeyBackend(
KeyBackendBase,
typing.Generic[
CreatePrivateKeyOptionsTypeVar, StorePrivateKeyOptionsTypeVar, UsePrivateKeyOptionsTypeVar
],
metaclass=abc.ABCMeta,
):
"""Base class for all key storage backends.
All implementations of a key storage backend must implement this abstract base class.
"""

#: Title used for the ArgumentGroup in :command:`manage.py init_ca`.
title: typing.ClassVar[str]

Expand All @@ -118,15 +134,12 @@ class KeyBackend(
options_prefix: str = ""

def __init__(self, alias: str, **kwargs: Any) -> None:
self.alias = alias
super().__init__(alias, **kwargs)

if self.alias != model_settings.CA_DEFAULT_KEY_BACKEND:
self.argparse_prefix = f"{alias.lower().replace('_', '-')}-"
self.options_prefix = f"{alias.lower().replace('-', '_')}_"

for key, value in kwargs.items():
setattr(self, key, value)

def add_create_private_key_group(self, parser: CommandParser) -> Optional[ArgumentGroup]:
"""Add an argument group for arguments for private key generation with this backend.
Expand Down Expand Up @@ -322,22 +335,6 @@ def sign_certificate_revocation_list(
) -> x509.CertificateRevocationList:
"""Sign a certificate revocation list request."""

def get_ocsp_key_size(
self,
ca: "CertificateAuthority", # pylint: disable=unused-argument
use_private_key_options: UsePrivateKeyOptionsTypeVar, # pylint: disable=unused-argument
) -> int:
"""Get the default key size for OCSP keys. This is only called for RSA or DSA keys."""
return model_settings.CA_DEFAULT_KEY_SIZE

def get_ocsp_key_elliptic_curve(
self,
ca: "CertificateAuthority", # pylint: disable=unused-argument
use_private_key_options: UsePrivateKeyOptionsTypeVar, # pylint: disable=unused-argument
) -> ec.EllipticCurve:
"""Get the default elliptic curve for OCSP keys. This is only called for elliptic curve keys."""
return model_settings.CA_DEFAULT_ELLIPTIC_CURVE

def validate_signature_hash_algorithm(
self,
key_type: ParsableKeyType,
Expand Down Expand Up @@ -376,15 +373,140 @@ def validate_signature_hash_algorithm(
return algorithm


class KeyBackends:
"""A key backend handler similar to Django's storages or caches handler."""
class OCSPKeyBackend(KeyBackendBase):
"""Base class for all OCSP key storage backends."""

@abc.abstractmethod
def create_private_key(
self,
ca: "CertificateAuthority",
key_type: ParsableKeyType,
key_size: Optional[int],
elliptic_curve: Optional[ec.EllipticCurve],
) -> x509.CertificateSigningRequest:
"""Create the private key.
This method is responsible for creating the private key, storing it, and saving info to
ca.ocsp_key_backend_options so it can be signed. You're not responsible for the public key at all.
"""

def get_csr_algorithm(self, key_type: ParsableKeyType) -> Optional[AllowedHashTypes]:
"""Helper function to get a usable signing algorithm for the given key type.
This function can be used to get a default signing algorithm when creating a CSR in
:func:`~django_ca.key_backends.OCSPKeyBackend.create_private_key`. You are not obliged to use this
function, it is here merely for convenience.
"""
if key_type in ("Ed25519", "Ed448"):
return None
if key_type == "DSA":
return model_settings.CA_DEFAULT_DSA_SIGNATURE_HASH_ALGORITHM
return model_settings.CA_DEFAULT_SIGNATURE_HASH_ALGORITHM

def get_default_elliptic_curve(self, ca: "CertificateAuthority") -> ec.EllipticCurve:
"""Get the default elliptic curve used when creating OCSP private keys.
**Note** that you are not usually required to implement this method.
This function is called when generating keys for EC-based CAs or if the user explicitly requested an
EC-based OCSP key but did *not* specify a curve.
The default implementation returns the curve of the CA for EC-based CAs and the
:ref:`CA_DEFAULT_ELLIPTIC_CURVE <settings-ca-default-elliptic-curve>` setting otherwise.
"""
public_key = ca.pub.loaded.public_key()
if isinstance(public_key, ec.EllipticCurvePublicKey):
return public_key.curve
return model_settings.CA_DEFAULT_ELLIPTIC_CURVE

def get_default_key_size(self, ca: "CertificateAuthority") -> int:
"""Get the default key size used when creating OCSP private keys.
**Note** that you are not usually required to implement this method.
This function is called when generating keys for DSA/RSA-based CAs or if the user explicitly requested
an RSA/DSA-based OCSP key but did *not* specify a key size.
The default implementation returns the key size of the CA for RSA/DSA-based CAs and the
:ref:`CA_DEFAULT_KEY_SIZE <settings-ca-default-key-size>` setting otherwise.
"""
public_key = ca.pub.loaded.public_key()
if isinstance(public_key, (rsa.RSAPublicKey, dsa.DSAPublicKey)):
return public_key.key_size
return model_settings.CA_DEFAULT_KEY_SIZE

@abc.abstractmethod
def sign_ocsp_response(
self,
ca: "CertificateAuthority",
builder: OCSPResponseBuilder,
signature_hash_algorithm: Optional[AllowedHashTypes],
) -> OCSPResponse:
"""Sign the given OCSP response."""


class CryptographyOCSPKeyBackend(OCSPKeyBackend):
"""Specialized base class for backends that can represent the private key as cryptography classes.
Subclasses are only required to implement
:func:`~django_ca.key_backends.CryptographyOCSPKeyBackend.load_private_key_data` to return the raw private
key in either PEM or DER representation (DER is faster and thus preferred). The returned private key
must be a suitable argument for :func:`~cg:cryptography.x509.ocsp.OCSPResponseBuilder.sign`.
"""

@abc.abstractmethod
def load_private_key_data(self, ca: "CertificateAuthority") -> bytes:
"""Load the raw private key as bytes. Both PEM and DER representations are supported."""

# COVERAGE NOTE: Function is implemented in all subclasses.`
def get_private_key_password(self, ca: "CertificateAuthority") -> Optional[bytes]: # pragma: no cover
"""Get the private key password. The default implementation never returns a password."""
return None

def load_private_key(self, ca: "CertificateAuthority") -> CertificateIssuerPrivateKeyTypes:
raw_private_key = self.load_private_key_data(ca)
password = self.get_private_key_password(ca)

try:
loaded_key = serialization.load_der_private_key(raw_private_key, password)
except ValueError:
try:
loaded_key = serialization.load_pem_private_key(raw_private_key, password)
except ValueError as ex:
raise ValueError("Could not decrypt private key.") from ex

# Check that the private key is of a supported type
if not isinstance(loaded_key, constants.PRIVATE_KEY_TYPES):
log.error("%s: Unsupported private key type.", type(loaded_key))
raise ValueError(f"{type(loaded_key)}: Unsupported private key type.")

return loaded_key # type: ignore[return-value] # mypy is not smart enough for the above isinstance()

def sign_ocsp_response(
self,
ca: "CertificateAuthority",
builder: OCSPResponseBuilder,
signature_hash_algorithm: Optional[AllowedHashTypes],
) -> OCSPResponse:
private_key = self.load_private_key(ca)
return builder.sign(private_key, signature_hash_algorithm)


BackendTypeVar = TypeVar("BackendTypeVar", bound=KeyBackendBase)


class KeyBackendsProxy(Generic[BackendTypeVar]):
"""Base for proxy classes."""

_setting: ClassVar[str]
_type: type[BackendTypeVar]

def __init__(self) -> None:
self._backends = local()

def __getitem__(self, name: str) -> KeyBackend[BaseModel, BaseModel, BaseModel]:
def __getitem__(self, name: str) -> BackendTypeVar:
try:
return typing.cast(KeyBackend[BaseModel, BaseModel, BaseModel], self._backends.backends[name])
return typing.cast(BackendTypeVar, self._backends.backends[name])
except AttributeError:
self._backends.backends = {} # first backend is loaded
except KeyError:
Expand All @@ -394,17 +516,17 @@ def __getitem__(self, name: str) -> KeyBackend[BaseModel, BaseModel, BaseModel]:
# TYPEHINT NOTE: _get_key_backend should not write anything into this variable
return self._backends.backends[name] # type: ignore[no-any-return]

def __iter__(self) -> Iterator[KeyBackend[BaseModel, BaseModel, BaseModel]]:
for name in model_settings.CA_KEY_BACKENDS:
def __iter__(self) -> Iterator[BackendTypeVar]:
for name in getattr(model_settings, self._setting):
yield self[name]

def _reset(self) -> None:
self._backends = local()

def _get_key_backend(self, alias: str) -> KeyBackend[BaseModel, BaseModel, BaseModel]:
def _get_key_backend(self, alias: str) -> BackendTypeVar:
"""Get the key backend with the given alias."""
try:
configuration: KeyBackendConfigurationModel = model_settings.CA_KEY_BACKENDS[alias]
configuration: KeyBackendConfigurationModel = getattr(model_settings, self._setting)[alias]
except KeyError as ex:
raise ValueError(f"{alias}: key backend is not configured.") from ex

Expand All @@ -415,11 +537,26 @@ def _get_key_backend(self, alias: str) -> KeyBackend[BaseModel, BaseModel, BaseM
except ImportError as ex:
raise ImproperlyConfigured(f"Could not find backend {backend!r}: {ex}") from ex

if not issubclass(backend_cls, KeyBackend):
if not issubclass(backend_cls, self._type):
raise ImproperlyConfigured(f"{backend}: Class does not refer to a key backend.")

# TYPEHINT NOTE: we check for the correct subclass above.
return backend_cls(alias, **options) # type: ignore[no-any-return]


class KeyBackends(KeyBackendsProxy[KeyBackend[BaseModel, BaseModel, BaseModel]]):
"""A key backend handler similar to Django's storages or caches handler."""

_setting = "CA_KEY_BACKENDS"
_type = KeyBackend


class OCSPKeyBackends(KeyBackendsProxy[OCSPKeyBackend]):
"""An OCSP key backend handler similar to Django's storages or caches handler."""

_setting = "CA_OCSP_KEY_BACKENDS"
_type = OCSPKeyBackend


key_backends = KeyBackends()
ocsp_key_backends = OCSPKeyBackends()
Loading

0 comments on commit 35bc4da

Please sign in to comment.