Skip to content

Commit

Permalink
implement database storage module
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Dec 8, 2024
1 parent 1992a9a commit 7c63849
Show file tree
Hide file tree
Showing 14 changed files with 418 additions and 10 deletions.
1 change: 1 addition & 0 deletions ca/ca/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
"user_pin": PKCS11_USER_PIN,
},
},
"db": {"BACKEND": "django_ca.key_backends.db.DBBackend"},
}

CA_OCSP_KEY_BACKENDS = {
Expand Down
9 changes: 6 additions & 3 deletions ca/django_ca/key_backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,26 +177,29 @@ def add_use_private_key_group(self, parser: CommandParser) -> Optional[ArgumentG
f"Arguments for using private keys stored with the {self.alias} backend.",
)

@abc.abstractmethod
# pylint: disable-next=unused-argument # default implementation does nothing.
def add_create_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments for private key generation with this backend.
Add arguments that can be used for generating private keys with your backend to `group`. The arguments
you add here are expected to be loaded (and validated) using
:py:func:`~django_ca.key_backends.KeyBackend.get_create_private_key_options`.
"""
return None

@abc.abstractmethod
# pylint: disable-next=unused-argument # default implementation does nothing.
def add_use_parent_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments for loading the private key of a parent certificate authority.
The arguments you add here are expected to be loaded (and validated) using
:py:func:`~django_ca.key_backends.KeyBackend.get_use_parent_private_key_options`.
"""
return None

@abc.abstractmethod
# pylint: disable-next=unused-argument # default implementation does nothing.
def add_store_private_key_arguments(self, group: ArgumentGroup) -> None:
"""Add arguments for storing private keys (when importing an existing CA)."""
return None

# pylint: disable=unused-argument # Method may not be overwritten, just providing default here
def add_use_private_key_arguments(self, group: ArgumentGroup) -> None:
Expand Down
18 changes: 18 additions & 0 deletions ca/django_ca/key_backends/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""DB storage backend module."""

from django_ca.key_backends.db.backend import DBBackend

__all__ = ("DBBackend",)
190 changes: 190 additions & 0 deletions ca/django_ca/key_backends/db/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Key backend using the Django Storages system."""

import typing
from collections.abc import Sequence
from datetime import datetime
from typing import Any, Optional

from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives._serialization import Encoding, PrivateFormat
from cryptography.hazmat.primitives.asymmetric.types import (
CertificateIssuerPrivateKeyTypes,
CertificateIssuerPublicKeyTypes,
)
from cryptography.hazmat.primitives.serialization import load_pem_private_key

from django.core.management import CommandParser

from django_ca import constants
from django_ca.key_backends import KeyBackend
from django_ca.key_backends.db.models import (
DBCreatePrivateKeyOptions,
DBStorePrivateKeyOptions,
DBUsePrivateKeyOptions,
)
from django_ca.models import CertificateAuthority
from django_ca.typehints import (
AllowedHashTypes,
ArgumentGroup,
CertificateExtension,
EllipticCurves,
ParsableKeyType,
)
from django_ca.utils import generate_private_key, get_cert_builder


class DBBackend(KeyBackend[DBCreatePrivateKeyOptions, DBStorePrivateKeyOptions, DBUsePrivateKeyOptions]):
"""The default storage backend that uses Django's file storage API."""

name = "storages"
title = "Store private keys using the Django file storage API"
description = (
"It is most commonly used to store private keys on the file system. Custom file storage backends can "
"be used to store keys on other systems (e.g. a cloud storage system)."
)
use_model = DBUsePrivateKeyOptions

supported_key_types: tuple[ParsableKeyType, ...] = constants.PARSABLE_KEY_TYPES
supported_elliptic_curves: tuple[EllipticCurves, ...] = tuple(constants.ELLIPTIC_CURVE_TYPES)

def __eq__(self, other: Any) -> bool:
return isinstance(other, DBBackend)

def add_create_private_key_group(self, parser: CommandParser) -> Optional[ArgumentGroup]:
return None

def add_store_private_key_group(self, parser: CommandParser) -> Optional[ArgumentGroup]:
return None

def add_use_private_key_group(self, parser: CommandParser) -> Optional[ArgumentGroup]:
return None

def get_create_private_key_options(
self,
key_type: ParsableKeyType,
key_size: Optional[int],
elliptic_curve: Optional[EllipticCurves], # type: ignore[override]
options: dict[str, Any],
) -> DBCreatePrivateKeyOptions:
return DBCreatePrivateKeyOptions(key_type=key_type, key_size=key_size, elliptic_curve=elliptic_curve)

def get_store_private_key_options(self, options: dict[str, Any]) -> DBStorePrivateKeyOptions:
return DBStorePrivateKeyOptions()

def get_use_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> DBUsePrivateKeyOptions:
return DBUsePrivateKeyOptions.model_validate({}, context={"ca": ca, "backend": self}, strict=True)

def get_use_parent_private_key_options(
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> DBUsePrivateKeyOptions:
return DBUsePrivateKeyOptions.model_validate({}, context={"ca": ca, "backend": self}, strict=True)

def create_private_key(
self,
ca: "CertificateAuthority",
key_type: ParsableKeyType,
options: DBCreatePrivateKeyOptions,
) -> tuple[CertificateIssuerPublicKeyTypes, DBUsePrivateKeyOptions]:
encryption = serialization.NoEncryption()
key = generate_private_key(options.key_size, key_type, options.elliptic_curve)
pem = key.private_bytes(
encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=encryption
)
ca.key_backend_options = {"private_key": {"pem": pem.decode()}}
use_private_key_options = DBUsePrivateKeyOptions.model_validate(
{}, context={"ca": ca, "backend": self}
)
return key.public_key(), use_private_key_options

def store_private_key(
self,
ca: "CertificateAuthority",
key: CertificateIssuerPrivateKeyTypes,
certificate: x509.Certificate,
options: DBStorePrivateKeyOptions,
) -> None:
encryption = serialization.NoEncryption()
pem = key.private_bytes(
encoding=Encoding.PEM, format=PrivateFormat.PKCS8, encryption_algorithm=encryption
)
ca.key_backend_options = {"private_key": {"pem": pem.decode()}}

def get_key(
self,
ca: "CertificateAuthority",
# pylint: disable-next=unused-argument # interface requires option
use_private_key_options: DBUsePrivateKeyOptions,
) -> CertificateIssuerPrivateKeyTypes:
"""The CAs private key as private key."""
pem = ca.key_backend_options["private_key"]["pem"].encode()
return typing.cast( # type validated below
CertificateIssuerPrivateKeyTypes, load_pem_private_key(pem, None)
)

def is_usable(
self,
ca: "CertificateAuthority",
use_private_key_options: Optional[DBUsePrivateKeyOptions] = None,
) -> bool:
# If key_backend_options is not set or path is not set, it is certainly unusable.
if not ca.key_backend_options or not ca.key_backend_options.get("private_key"):
return False
return True

def check_usable(
self, ca: "CertificateAuthority", use_private_key_options: DBUsePrivateKeyOptions
) -> None:
"""Check if the given CA is usable, raise ValueError if not.
The `options` are the options returned by
:py:func:`~django_ca.key_backends.base.KeyBackend.get_use_private_key_options`. It may be ``None`` in
cases where key options cannot (yet) be loaded. If ``None``, the backend should return ``False`` if it
knows for sure that it will not be usable, and ``True`` if usability cannot be determined.
"""
if not ca.key_backend_options or not ca.key_backend_options.get("private_key"):
raise ValueError(f"{ca.key_backend_options}: Private key not stored in database.")

def sign_certificate(
self,
ca: "CertificateAuthority",
use_private_key_options: DBUsePrivateKeyOptions,
public_key: CertificateIssuerPublicKeyTypes,
serial: int,
algorithm: Optional[AllowedHashTypes],
issuer: x509.Name,
subject: x509.Name,
not_after: datetime,
extensions: Sequence[CertificateExtension],
) -> x509.Certificate:
builder = get_cert_builder(not_after, serial=serial)
builder = builder.public_key(public_key)
builder = builder.issuer_name(issuer)
builder = builder.subject_name(subject)
for extension in extensions:
builder = builder.add_extension(extension.value, critical=extension.critical)
return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm)

def sign_certificate_revocation_list(
self,
ca: "CertificateAuthority",
use_private_key_options: DBUsePrivateKeyOptions,
builder: x509.CertificateRevocationListBuilder,
algorithm: Optional[AllowedHashTypes],
) -> x509.CertificateRevocationList:
return builder.sign(private_key=self.get_key(ca, use_private_key_options), algorithm=algorithm)
53 changes: 53 additions & 0 deletions ca/django_ca/key_backends/db/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
#
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
# <http://www.gnu.org/licenses/>.

"""Models for the storages backend."""

from typing import Optional

from pydantic import BaseModel, ConfigDict, model_validator

from django_ca.conf import model_settings
from django_ca.key_backends.base import CreatePrivateKeyOptionsBaseModel
from django_ca.pydantic.type_aliases import EllipticCurveTypeAlias


class DBCreatePrivateKeyOptions(CreatePrivateKeyOptionsBaseModel):
"""Options for initializing private keys."""

model_config = ConfigDict(arbitrary_types_allowed=True)

elliptic_curve: Optional[EllipticCurveTypeAlias] = None

@model_validator(mode="after")
def validate_elliptic_curve(self) -> "DBCreatePrivateKeyOptions":
"""Validate that the elliptic curve is not set for invalid key types."""
if self.key_type == "EC" and self.elliptic_curve is None:
self.elliptic_curve = model_settings.CA_DEFAULT_ELLIPTIC_CURVE
elif self.key_type != "EC" and self.elliptic_curve is not None:
raise ValueError(f"Elliptic curves are not supported for {self.key_type} keys.")
return self


class DBStorePrivateKeyOptions(BaseModel):
"""Options for storing a private key."""

# NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable.
model_config = ConfigDict(frozen=True)


class DBUsePrivateKeyOptions(BaseModel):
"""Options for using a private key."""

# NOTE: we set frozen here to prevent accidental coding mistakes. Models should be immutable.
model_config = ConfigDict(frozen=True)
1 change: 1 addition & 0 deletions ca/django_ca/tests/base/conftest_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def load_cert(
usable_ca_names = [
name for name, conf in CERT_DATA.items() if conf["type"] == "ca" and conf.get("key_filename")
]
usable_ca_names_by_type = ["dsa", "root", "ed448", "ed25519", "ec"]
contrib_ca_names = [
name for name, conf in CERT_DATA.items() if conf["type"] == "ca" and conf["cat"] == "sphinx-contrib"
]
Expand Down
14 changes: 14 additions & 0 deletions ca/django_ca/tests/base/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from django_ca.conf import model_settings
from django_ca.key_backends import key_backends, ocsp_key_backends
from django_ca.key_backends.db.backend import DBBackend
from django_ca.key_backends.hsm import HSMBackend, HSMOCSPBackend
from django_ca.key_backends.hsm.models import HSMCreatePrivateKeyOptions
from django_ca.key_backends.hsm.session import SessionPool
Expand All @@ -53,6 +54,7 @@
signed_certificate_timestamp_cert_names,
signed_certificate_timestamps_cert_names,
usable_ca_names,
usable_ca_names_by_type,
usable_cert_names,
)
from django_ca.tests.base.constants import CERT_DATA, TIMESTAMPS
Expand Down Expand Up @@ -414,6 +416,12 @@ def hsm_ocsp_backend(request: "SubRequest") -> Iterator[HSMOCSPBackend]: # prag
ocsp_key_backends._reset() # pylint: disable=protected-access # in case we manipulated the object


@pytest.fixture
def db_backend() -> DBBackend:
"""Fixture providing a DB backend."""
return cast(DBBackend, key_backends["db"])


@pytest.fixture(params=HSMBackend.supported_key_types)
def usable_hsm_ca( # pragma: hsm
request: "SubRequest", ca_name: str, subject: x509.Name, hsm_backend: HSMBackend
Expand Down Expand Up @@ -501,6 +509,12 @@ def usable_ca_name(request: "SubRequest") -> CertificateAuthority:
return request.param # type: ignore[no-any-return]


@pytest.fixture(params=usable_ca_names_by_type)
def usable_ca_name_by_type(request: "SubRequest") -> CertificateAuthority:
"""Parametrized fixture for the name of a CA of every type (dsa, rsa, ...)."""
return request.param # type: ignore[no-any-return]


@pytest.fixture(params=usable_ca_names)
def usable_ca(request: "SubRequest") -> CertificateAuthority:
"""Parametrized fixture for every usable CA (with usable private key)."""
Expand Down
6 changes: 0 additions & 6 deletions ca/django_ca/tests/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ class DummyBackend(KeyBackend[DummyModel, DummyModel, DummyModel]): # pragma: n
def __eq__(self, other: Any) -> bool:
return isinstance(other, DummyBackend)

def add_create_private_key_arguments(self, group: ArgumentGroup) -> None:
return None

def add_store_private_key_arguments(self, group: ArgumentGroup) -> None:
return None

def get_create_private_key_options(
self,
key_type: ParsableKeyType,
Expand Down
Loading

0 comments on commit 7c63849

Please sign in to comment.