Skip to content

Commit

Permalink
implement database key backend (fixes #146)
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Dec 8, 2024
1 parent a8d8678 commit 73800d9
Show file tree
Hide file tree
Showing 23 changed files with 589 additions and 14 deletions.
2 changes: 2 additions & 0 deletions ca/ca/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
"user_pin": PKCS11_USER_PIN,
},
},
"db": {"BACKEND": "django_ca.key_backends.db.DBBackend"},
}

CA_OCSP_KEY_BACKENDS = {
Expand All @@ -235,6 +236,7 @@
"user_pin": PKCS11_USER_PIN,
},
},
"db": {"BACKEND": "django_ca.key_backends.db.DBOCSPBackend"},
}

# Custom settings
Expand Down
9 changes: 6 additions & 3 deletions ca/django_ca/key_backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,26 +177,29 @@ def add_use_private_key_group(self, parser: CommandParser) -> Optional[ArgumentG
f"Arguments for using private keys stored with the {self.alias} backend.",
)

@abc.abstractmethod
# pylint: disable-next=unused-argument # default implementation does nothing.
def add_create_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments for private key generation with this backend.
Add arguments that can be used for generating private keys with your backend to `group`. The arguments
you add here are expected to be loaded (and validated) using
:py:func:`~django_ca.key_backends.KeyBackend.get_create_private_key_options`.
"""
return None

@abc.abstractmethod
# pylint: disable-next=unused-argument # default implementation does nothing.
def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments for loading the private key of a parent certificate authority.
The arguments you add here are expected to be loaded (and validated) using
:py:func:`~django_ca.key_backends.KeyBackend.get_use_parent_private_key_options`.
"""
return None

@abc.abstractmethod
# pylint: disable-next=unused-argument # default implementation does nothing.
def add_store_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments for storing private keys (when importing an existing CA)."""
return None

# pylint: disable=unused-argument # Method may not be overwritten, just providing default here
def add_use_private_key_arguments(self, group: ArgumentGroup) -> None:
Expand Down
19 changes: 19 additions & 0 deletions ca/django_ca/key_backends/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""DB storage backend module."""

from django_ca.key_backends.db.backend import DBBackend
from django_ca.key_backends.db.ocsp_backend import DBOCSPBackend

__all__ = ("DBBackend", "DBOCSPBackend")
190 changes: 190 additions & 0 deletions ca/django_ca/key_backends/db/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Key backend using the Django Storages system."""

import typing
from collections.abc import Sequence
from datetime import datetime
from typing import Any, Optional

from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives._serialization import Encoding, PrivateFormat
from cryptography.hazmat.primitives.asymmetric.types import (
CertificateIssuerPrivateKeyTypes,
CertificateIssuerPublicKeyTypes,
)
from cryptography.hazmat.primitives.serialization import load_pem_private_key

from django.core.management import CommandParser

from django_ca import constants
from django_ca.key_backends import KeyBackend
from django_ca.key_backends.db.models import (
DBCreatePrivateKeyOptions,
DBStorePrivateKeyOptions,
DBUsePrivateKeyOptions,
)
from django_ca.models import CertificateAuthority
from django_ca.typehints import (
AllowedHashTypes,
ArgumentGroup,
CertificateExtension,
EllipticCurves,
ParsableKeyType,
)
from django_ca.utils import generate_private_key, get_cert_builder


class DBBackend(KeyBackend[DBCreatePrivateKeyOptions, DBStorePrivateKeyOptions, DBUsePrivateKeyOptions]):
"""The default storage backend that uses Django's file storage API."""

name = "storages"
title = "Store private keys using the Django file storage API"
description = (
"It is most commonly used to store private keys on the file system. Custom file storage backends can "
"be used to store keys on other systems (e.g. a cloud storage system)."
)
use_model = DBUsePrivateKeyOptions

supported_key_types: tuple[ParsableKeyType, ...] = constants.PARSABLE_KEY_TYPES
supported_elliptic_curves: tuple[EllipticCurves, ...] = tuple(constants.ELLIPTIC_CURVE_TYPES)

def __eq__(self, other: Any) -> bool:
return isinstance(other, DBBackend)

def add_create_private_key_group(self, parser: CommandParser) -> Optional[ArgumentGroup]:
return None

def add_store_private_key_group(self, parser: CommandParser) -> Optional[ArgumentGroup]:
return None

def add_use_private_key_group(self, parser: CommandParser) -> Optional[ArgumentGroup]:
return None

def get_create_private_key_options(
self,
key_type: ParsableKeyType,
key_size: Optional[int],
elliptic_curve: Optional[EllipticCurves], # type: ignore[override]
options: dict[str, Any],
) -> DBCreatePrivateKeyOptions:
return DBCreatePrivateKeyOptions(key_type=key_type, key_size=key_size, elliptic_curve=elliptic_curve)

def get_store_private_key_options(self, options: dict[str, Any]) -> DBStorePrivateKeyOptions:
return DBStorePrivateKeyOptions()

def get_use_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> DBUsePrivateKeyOptions:
return DBUsePrivateKeyOptions.model_validate({}, context={"ca": ca, "backend": self}, strict=True)

def get_use_parent_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> DBUsePrivateKeyOptions:
return DBUsePrivateKeyOptions.model_validate({}, context={"ca": ca, "backend": self}, strict=True)

def create_private_key(
self,
ca: "CertificateAuthority",
key_type: ParsableKeyType,
options: DBCreatePrivateKeyOptions,
) -> tuple[CertificateIssuerPublicKeyTypes, DBUsePrivateKeyOptions]:
encryption = serialization.NoEncryption()
key = generate_private_key(options.key_size, key_type, options.elliptic_curve)
pem = key.private_bytes(
encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=encryption
)
ca.key_backend_options = {"private_key": {"pem": pem.decode()}}
use_private_key_options = DBUsePrivateKeyOptions.model_validate(
{}, context={"ca": ca, "backend": self}
)
return key.public_key(), use_private_key_options

def store_private_key(
self,
ca: "CertificateAuthority",
key: CertificateIssuerPrivateKeyTypes,
certificate: x509.Certificate,
options: DBStorePrivateKeyOptions,
) -> None:
encryption = serialization.NoEncryption()
pem = key.private_bytes(
encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=encryption
)
ca.key_backend_options = {"private_key": {"pem": pem.decode()}}

def get_key(
self,
ca: "CertificateAuthority",
# pylint: disable-next=unused-argument # interface requires option
use_private_key_options: DBUsePrivateKeyOptions,
) -> CertificateIssuerPrivateKeyTypes:
"""The CAs private key as private key."""
pem = ca.key_backend_options["private_key"]["pem"].encode()
return typing.cast( # type validated below
CertificateIssuerPrivateKeyTypes, load_pem_private_key(pem, None)
)

def is_usable(
self,
ca: "CertificateAuthority",
use_private_key_options: Optional[DBUsePrivateKeyOptions] = None,
) -> bool:
# If key_backend_options is not set or path is not set, it is certainly unusable.
if not ca.key_backend_options or not ca.key_backend_options.get("private_key"):
return False
return True

def check_usable(
self, ca: "CertificateAuthority", use_private_key_options: DBUsePrivateKeyOptions
) -> None:
"""Check if the given CA is usable, raise ValueError if not.
The `options` are the options returned by
:py:func:`~django_ca.key_backends.base.KeyBackend.get_use_private_key_options`. It may be ``None`` in
cases where key options cannot (yet) be loaded. If ``None``, the backend should return ``False`` if it
knows for sure that it will not be usable, and ``True`` if usability cannot be determined.
"""
if not ca.key_backend_options or not ca.key_backend_options.get("private_key"):
raise ValueError(f"{ca.key_backend_options}: Private key not stored in database.")

def sign_certificate(
self,
ca: "CertificateAuthority",
use_private_key_options: DBUsePrivateKeyOptions,
public_key: CertificateIssuerPublicKeyTypes,
serial: int,
algorithm: Optional[AllowedHashTypes],
issuer: x509.Name,
subject: x509.Name,
not_after: datetime,
extensions: Sequence[CertificateExtension],
) -> x509.Certificate:
builder = get_cert_builder(not_after, serial=serial)
builder = builder.public_key(public_key)
builder = builder.issuer_name(issuer)
builder = builder.subject_name(subject)
for extension in extensions:
builder = builder.add_extension(extension.value, critical=extension.critical)
return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm)

def sign_certificate_revocation_list(
self,
ca: "CertificateAuthority",
use_private_key_options: DBUsePrivateKeyOptions,
builder: x509.CertificateRevocationListBuilder,
algorithm: Optional[AllowedHashTypes],
) -> x509.CertificateRevocationList:
return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm)
53 changes: 53 additions & 0 deletions ca/django_ca/key_backends/db/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Models for the storages backend."""

from typing import Optional

from pydantic import BaseModel, ConfigDict, model_validator

from django_ca.conf import model_settings
from django_ca.key_backends.base import CreatePrivateKeyOptionsBaseModel
from django_ca.pydantic.type_aliases import EllipticCurveTypeAlias


class DBCreatePrivateKeyOptions(CreatePrivateKeyOptionsBaseModel):
"""Options for initializing private keys."""

model_config = ConfigDict(arbitrary_types_allowed=True)

elliptic_curve: Optional[EllipticCurveTypeAlias] = None

@model_validator(mode="after")
def validate_elliptic_curve(self) -> "DBCreatePrivateKeyOptions":
"""Validate that the elliptic curve is not set for invalid key types."""
if self.key_type == "EC" and self.elliptic_curve is None:
self.elliptic_curve = model_settings.CA_DEFAULT_ELLIPTIC_CURVE
elif self.key_type != "EC" and self.elliptic_curve is not None:
raise ValueError(f"Elliptic curves are not supported for {self.key_type} keys.")
return self


class DBStorePrivateKeyOptions(BaseModel):
"""Options for storing a private key."""

# NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable.
model_config = ConfigDict(frozen=True)


class DBUsePrivateKeyOptions(BaseModel):
"""Options for using a private key."""

# NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable.
model_config = ConfigDict(frozen=True)
66 changes: 66 additions & 0 deletions ca/django_ca/key_backends/db/ocsp_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""OCSP key backend storing private keys in the database."""

import typing
from typing import Optional

from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.types import CertificateIssuerPrivateKeyTypes
from cryptography.hazmat.primitives.serialization import Encoding, PrivateFormat
from cryptography.x509.ocsp import OCSPResponse, OCSPResponseBuilder

from django_ca.key_backends import OCSPKeyBackend
from django_ca.models import CertificateAuthority
from django_ca.typehints import AllowedHashTypes, ParsableKeyType
from django_ca.utils import generate_private_key


class DBOCSPBackend(OCSPKeyBackend):
"""OCSP key backend storing files in the database."""

def create_private_key(
self,
ca: "CertificateAuthority",
key_type: ParsableKeyType,
key_size: Optional[int],
elliptic_curve: Optional[ec.EllipticCurve],
) -> x509.CertificateSigningRequest:
# Generate the private key.
private_key = generate_private_key(key_size, key_type, elliptic_curve)

# Serialize and store the key
encryption = serialization.NoEncryption()
pem = private_key.private_bytes(
encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=encryption
)

ca.ocsp_key_backend_options["private_key"]["pem"] = pem.decode()

# Generate the CSR to return to the caller.
csr_builder = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([]))
csr_algorithm = self.get_csr_algorithm(key_type)
return csr_builder.sign(private_key, csr_algorithm)

def sign_ocsp_response(
self,
ca: "CertificateAuthority",
builder: OCSPResponseBuilder,
signature_hash_algorithm: Optional[AllowedHashTypes],
) -> OCSPResponse:
pem = ca.ocsp_key_backend_options["private_key"]["pem"].encode()
key = typing.cast(CertificateIssuerPrivateKeyTypes, serialization.load_pem_private_key(pem, None))
return builder.sign(key, signature_hash_algorithm)
1 change: 1 addition & 0 deletions ca/django_ca/tests/base/conftest_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def load_cert(
usable_ca_names = [
name for name, conf in CERT_DATA.items() if conf["type"] == "ca" and conf.get("key_filename")
]
usable_ca_names_by_type = ["dsa", "root", "ed448", "ed25519", "ec"]
contrib_ca_names = [
name for name, conf in CERT_DATA.items() if conf["type"] == "ca" and conf["cat"] == "sphinx-contrib"
]
Expand Down
Loading

0 comments on commit 73800d9

Please sign in to comment.