Skip to content

Commit

Permalink
cleanup HSM backend
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Apr 29, 2024
1 parent 492a77a commit dd1cbc1
Showing 1 changed file with 30 additions and 48 deletions.
78 changes: 30 additions & 48 deletions ca/django_ca/key_backends/hsm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
"""HSM backend for keys."""

import asyncio
import typing
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Optional

from pydantic import BaseModel, ConfigDict, field_validator, model_validator
from python_x509_pkcs11 import KEYTYPES, PKCS11Session, get_keytypes_enum
from python_x509_pkcs11.privatekeys import (
PKCS11ECPrivateKey,
PKCS11ED448PrivateKey,
PKCS11ED25519PrivateKey,
PKCS11RSAPrivateKey,
)

from cryptography import x509
from cryptography.hazmat.primitives import hashes
Expand All @@ -14,42 +21,30 @@
CertificateIssuerPrivateKeyTypes,
CertificateIssuerPublicKeyTypes,
)
from cryptography.hazmat.primitives.serialization import (
Encoding,
PrivateFormat,
load_der_private_key,
load_pem_private_key,
)
from python_x509_pkcs11.privatekeys import PKCS11RSAPrivateKey, PKCS11ECPrivateKey, PKCS11ED25519PrivateKey, PKCS11ED448PrivateKey
from python_x509_pkcs11 import KEYTYPES, get_keytypes_enum, PKCS11Session
import asyncio

from django.conf import settings

from django_ca import ca_settings, constants
from django_ca.key_backends.base import KeyBackend
from django_ca.management.actions import PasswordAction
from django_ca.management.base import add_elliptic_curve, add_key_size
from django_ca.pydantic.type_aliases import PrivateKeySize
from django_ca.typehints import AllowedHashTypes, ArgumentGroup, ParsableKeyType
from django_ca.utils import generate_private_key, get_cert_builder

from django_ca.utils import get_cert_builder

if typing.TYPE_CHECKING:
from django_ca.models import CertificateAuthority

ParsableHSMKeyType = typing.Literal["rsa_2048", "rsa_4096", "secp256r1", "ed25519", "ed448", "secp384r1", "secp521r1"]
ParsableHSMKeyType = typing.Literal[
"rsa_2048", "rsa_4096", "secp256r1", "ed25519", "ed448", "secp384r1", "secp521r1"
]


# TODO: This should go to the library itself.
async def _create_key_pair(key_label:str, hsm_key_type: str):
"Creates the new keypair in async way"
async def _create_key_pair(key_label: str, hsm_key_type: str):
"""Creates the new keypair in async way."""
k_type = get_keytypes_enum(hsm_key_type)
public_key, identifier = await PKCS11Session().create_keypair(key_label, key_type=k_type)
return public_key, identifier


# TODO: This should be part of the library itself.
def get_private_key(key_label:str, hsm_key_type: str):
"Returns a private key of the given type."
def get_private_key(key_label: str, hsm_key_type: str):
"""Returns a private key of the given type."""
if hsm_key_type in ["rsa_2048", "rsa_4096"]:
return PKCS11RSAPrivateKey(key_label, hsm_key_type)
elif hsm_key_type == "ed25519":
Expand All @@ -59,8 +54,9 @@ def get_private_key(key_label:str, hsm_key_type: str):
elif hsm_key_type in ["secp256r1", "secp384r1", "secp521r1"]:
return PKCS11ECPrivateKey(key_label, hsm_key_type)


def get_signing_algo(ca: "CertificateAuthority") -> Optional[AllowedHashTypes]:
"Get the right algorithm for signing a certificate."
"""Get the right algorithm for signing a certificate."""
# FIXME: We should deal with algorithm in a better way.
hsm_key_type = ca.key_backend_options["hsm_key_type"]
if hsm_key_type == "rsa_2048":
Expand Down Expand Up @@ -100,11 +96,9 @@ class UsePrivateKeyOptions(BaseModel):
key_label: str



class HSMBackend(KeyBackend[CreatePrivateKeyOptions, StorePrivateKeyOptions, UsePrivateKeyOptions]):
"""The HSM backend that uses PKCS111.
.. tab:: Python
.. literalinclude:: /include/config/settings_default_ca_key_backends.py
Expand All @@ -123,21 +117,11 @@ class HSMBackend(KeyBackend[CreatePrivateKeyOptions, StorePrivateKeyOptions, Use

name = "hsm"
title = "Store private keys using HSM"
description = (
"Use a HSM for private key storage."
)
description = "Use a HSM for private key storage."
use_model = UsePrivateKeyOptions

# Backend options
storage_alias: str

def __init__(self, alias: str, storage_alias: str) -> None:
if storage_alias not in settings.STORAGES:
raise ValueError(f"{alias}: {storage_alias}: Storage alias is not configured.")
super().__init__(alias, storage_alias=storage_alias)

def __eq__(self, other: Any) -> bool:
return isinstance(other, HSMBackend) and self.storage_alias == other.storage_alias
return isinstance(other, HSMBackend)

def _add_key_label_argument(self, group: ArgumentGroup) -> None:
group.add_argument(
Expand Down Expand Up @@ -173,43 +157,41 @@ def add_use_private_key_arguments(self, group: ArgumentGroup) -> None:
self._add_key_label_argument(group)

def get_create_private_key_options(
self,key_type: ParsableKeyType, options: Dict[str, Any]
self, key_type: ParsableKeyType, options: dict[str, Any]
) -> CreatePrivateKeyOptions:
return CreatePrivateKeyOptions(
hsm_key_type=options[f"{self.options_prefix}hsm_key_type"],
key_label=options[f"{self.options_prefix}key_label"],
)

def get_store_private_key_options(self, options: Dict[str, Any]) -> StorePrivateKeyOptions:
def get_store_private_key_options(self, options: dict[str, Any]) -> StorePrivateKeyOptions:
return StorePrivateKeyOptions(
hsm_key_type=options[f"{self.options_prefix}hsm_key_type"],
key_label=options[f"{self.options_prefix}key_label"],
)

def get_use_private_key_options(
self, ca: Optional["CertificateAuthority"], options: Dict[str, Any]
self, ca: Optional["CertificateAuthority"], options: dict[str, Any]
) -> UsePrivateKeyOptions:
return UsePrivateKeyOptions.model_validate(
{"key_label": options.get(f"{self.options_prefix}key_label")}, context={"ca": ca}
)

def get_use_parent_private_key_options(
self, ca: "CertificateAuthority", options: Dict[str, Any]
self, ca: "CertificateAuthority", options: dict[str, Any]
) -> UsePrivateKeyOptions:
return UsePrivateKeyOptions.model_validate(
{"key_label": options[f"{self.options_prefix}parent_key_label"]}, context={"ca": ca}
)


def create_private_key(
self, ca: "CertificateAuthority", key_type: ParsableKeyType, options: CreatePrivateKeyOptions
) -> Tuple[CertificateIssuerPublicKeyTypes, UsePrivateKeyOptions]:

) -> tuple[CertificateIssuerPublicKeyTypes, UsePrivateKeyOptions]:
hsm_key_type = options.hsm_key_type
try:
asyncio.run(_create_key_pair(key_label=options.key_label, hsm_key_type=hsm_key_type))
except Exception as e:
raise e
raise e

# Update model instance
ca.key_backend_options = {"key_label": options.key_label, "hsm_key_type": hsm_key_type}
Expand Down Expand Up @@ -278,7 +260,7 @@ def sign_certificate(
issuer: x509.Name,
subject: x509.Name,
expires: datetime,
extensions: List[x509.Extension[x509.ExtensionType]],
extensions: list[x509.Extension[x509.ExtensionType]],
) -> x509.Certificate:
builder = get_cert_builder(expires, serial=serial)
builder = builder.public_key(public_key)
Expand Down

0 comments on commit dd1cbc1

Please sign in to comment.