Skip to content

Commit

Permalink
store certificate revocation lists in database
Browse files Browse the repository at this point in the history
  • Loading branch information
mathiasertl committed Oct 12, 2024
1 parent 88b175b commit 6f2fccd
Show file tree
Hide file tree
Showing 8 changed files with 684 additions and 11 deletions.
2 changes: 1 addition & 1 deletion ca/django_ca/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ class ReasonFlags(enum.Enum):
remove_from_crl = "removeFromCRL"


#: Mapping of RFC 5280, section 5.3.1 reason codes too cryptography reason codes
#: Mapping of RFC 5280, section 5.3.1 reason codes to cryptography reason codes
REASON_CODES = {
0: ReasonFlags.unspecified,
1: ReasonFlags.key_compromise,
Expand Down
214 changes: 209 additions & 5 deletions ca/django_ca/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@

import typing
from collections.abc import Iterable
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone as tz
from typing import Any, Generic, Optional, TypeVar, Union

from pydantic import BaseModel

from cryptography import x509
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.x509.oid import AuthorityInformationAccessOID, ExtensionOID

from django.db import models
from django.conf import settings
from django.db import models, transaction
from django.db.models.functions import Coalesce
from django.urls import reverse
from django.utils import timezone

from django_ca import constants
from django_ca.conf import model_settings
Expand Down Expand Up @@ -55,27 +59,32 @@
AcmeOrder,
Certificate,
CertificateAuthority,
CertificateRevocationList,
)
from django_ca.querysets import (
AcmeAccountQuerySet,
AcmeAuthorizationQuerySet,
CertificateAuthorityQuerySet,
CertificateQuerySet,
CertificateRevocationListQuerySet,
)

CertificateAuthorityManagerBase = models.Manager[CertificateAuthority]
CertificateManagerBase = models.Manager[Certificate]
CertificateRevocationListManagerBase = models.Manager[CertificateRevocationList]
AcmeAccountManagerBase = models.Manager[AcmeAccount]
AcmeAuthorizationManagerBase = models.Manager[AcmeAuthorization]
AcmeCertificateManagerBase = models.Manager[AcmeCertificate]
AcmeChallengeManagerBase = models.Manager[AcmeChallenge]
AcmeOrderManagerBase = models.Manager[AcmeOrder]
CertificateAuthorityManagerBase = models.Manager[CertificateAuthority]
CertificateManagerBase = models.Manager[Certificate]

QuerySetTypeVar = TypeVar("QuerySetTypeVar", CertificateAuthorityQuerySet, CertificateQuerySet)
else:
CertificateAuthorityManagerBase = CertificateManagerBase = models.Manager
CertificateRevocationListManagerBase = models.Manager
AcmeAccountManagerBase = AcmeAuthorizationManagerBase = AcmeCertificateManagerBase = (
AcmeChallengeManagerBase
) = AcmeOrderManagerBase = CertificateAuthorityManagerBase = CertificateManagerBase = models.Manager
) = AcmeOrderManagerBase = models.Manager
QuerySetTypeVar = TypeVar("QuerySetTypeVar")


Expand All @@ -101,6 +110,13 @@ def exclude(self, *args: Any, **kwargs: Any) -> QuerySetTypeVar: ...

def order_by(self, *fields: str) -> QuerySetTypeVar: ...

def for_certificate_revocation_list(
self,
reasons: Optional[Iterable[x509.ReasonFlags]] = None,
now: Optional[datetime] = None,
grace_timedelta: timedelta = timedelta(minutes=10),
) -> "CertificateQuerySet": ...

def get_by_serial_or_cn(self, identifier: str) -> X509CertMixinTypeVar: ...

def valid(self) -> QuerySetTypeVar: ...
Expand Down Expand Up @@ -685,6 +701,194 @@ def create_cert( # noqa: PLR0913
return obj


class CertificateRevocationListManager(CertificateRevocationListManagerBase):
"""The model manager for :py:class:`~django_ca.models.CertificateRevocationList`.
.. versionadded:: 2.1.0
"""

if typing.TYPE_CHECKING:
# See CertificateManagerMixin for description on this branch
#
# pylint: disable=missing-function-docstring,unused-argument; just defining stubs here

def reasons(
self, only_some_reasons: Optional[frozenset[x509.CertificateRevocationList]]
) -> "CertificateRevocationListQuerySet": ...

def scope(
self,
ca: "CertificateAuthority",
only_contains_ca_certs: bool = False,
only_contains_user_certs: bool = False,
only_some_reasons: Optional[frozenset[x509.ReasonFlags]] = None,
) -> "CertificateRevocationListQuerySet": ...

def _add_issuing_distribution_point_extension(
self,
builder: x509.CertificateRevocationListBuilder,
*,
only_contains_ca_certs: bool,
only_contains_user_certs: bool,
only_some_reasons: Optional[frozenset[x509.ReasonFlags]],
) -> x509.CertificateRevocationListBuilder:
# We can only add the IDP extension if one of these properties is set, see RFC 5280, 5.2.5.
if only_contains_user_certs or only_contains_ca_certs or only_some_reasons:
return builder.add_extension(
x509.IssuingDistributionPoint(
full_name=None,
relative_name=None,
indirect_crl=False,
only_contains_attribute_certs=False,
only_contains_ca_certs=only_contains_ca_certs,
only_contains_user_certs=only_contains_user_certs,
only_some_reasons=only_some_reasons,
),
critical=True, # "is a critical CRL extension" - RFC 5280, section 5.2.5
)

return builder

def _add_revoked_certificates(
self,
builder: x509.CertificateRevocationListBuilder,
ca: "CertificateAuthority",
now: datetime,
*,
only_contains_ca_certs: bool,
only_contains_user_certs: bool,
only_some_reasons: Optional[frozenset[x509.ReasonFlags]],
) -> x509.CertificateRevocationListBuilder:
# Add certificate authorities if applicable
if only_contains_ca_certs is True or only_contains_user_certs is False:
for child_ca in ca.children.for_certificate_revocation_list(now=now, reasons=only_some_reasons):
builder = builder.add_revoked_certificate(child_ca.get_revocation())

# Add certificates if applicable
if only_contains_user_certs is True or only_contains_ca_certs is False:
certs = ca.certificate_set.for_certificate_revocation_list(now=now, reasons=only_some_reasons)
for cert in certs:
builder = builder.add_revoked_certificate(cert.get_revocation())

return builder

@transaction.atomic
def create_certificate_revocation_list(
self,
ca: "CertificateAuthority",
key_backend_options: BaseModel,
*,
next_update: Optional[datetime] = None,
only_contains_ca_certs: bool = False,
only_contains_user_certs: bool = False,
only_some_reasons: Optional[frozenset[x509.ReasonFlags]] = None,
) -> "CertificateRevocationList":
"""Create or update a certificate revocation list.
Apart from `ca` and `key_backend_options`, all arguments are optional and must be passed as keyword
arguments.
Parameters
----------
ca : :py:class:`~django_ca.models.CertificateAuthority`
The certificate authority to generate the CRL for.
key_backend_options : BaseModel
Key backend options for using the private key.
next_update : datetime, optional
When the CRL will be updated again, defaults to one day.
only_contains_ca_certs : bool, optional
Set to ``True`` to generate a CRL that contains only CA certificates.
only_contains_user_certs : bool, optional
Set to ``True`` to generate a CRL that contains only end-entity certificates.
only_some_reasons : frozenset[:py:class:`~cg:cryptography.x509.ReasonFlags`], optional
Pass a set of :py:class:`~cg:cryptography.x509.ReasonFlags` to limit the CRL to certificates that
have been revoked for that reason.
"""
# Parameter validation
if only_contains_ca_certs is True and only_contains_user_certs is True:
raise ValueError("`only_contains_ca_certs` and `only_contains_user_certs` cannot both be set.")

# Compute last_update/next_update timestamps
last_update = datetime.now(tz=tz.utc).replace(microsecond=0)
if next_update is None:
next_update = last_update + timedelta(days=1)
else:
next_update = next_update.replace(microsecond=0)

if settings.USE_TZ is False:
last_update = timezone.make_naive(last_update, timezone=tz.utc)

if timezone.is_aware(next_update):
next_update = timezone.make_naive(next_update, timezone=tz.utc)

# Initialize builder
builder = x509.CertificateRevocationListBuilder()
builder = builder.issuer_name(ca.pub.loaded.subject)
builder = builder.last_update(last_update)
builder = builder.next_update(next_update)

# Add AuthorityKeyIdentifier extension from the certificate authority
builder = builder.add_extension(ca.get_authority_key_identifier(), critical=False)

# Add the IssuingDistributionPoint extension
builder = self._add_issuing_distribution_point_extension(
builder,
only_contains_ca_certs=only_contains_ca_certs,
only_contains_user_certs=only_contains_user_certs,
only_some_reasons=only_some_reasons,
)

builder = self._add_revoked_certificates(
builder,
ca,
now=last_update,
only_contains_ca_certs=only_contains_ca_certs,
only_contains_user_certs=only_contains_user_certs,
only_some_reasons=only_some_reasons,
)

# Create subquery for the current CRL number with the given scope.
number_subquery = (
self.scope(
ca=ca,
only_contains_ca_certs=only_contains_ca_certs,
only_contains_user_certs=only_contains_user_certs,
only_some_reasons=only_some_reasons,
)
.order_by("-number")
.values(new_number=models.F("number") + 1)[:1]
)

# Create database object (as late as possible so any exception above would not hit the database)
obj = self.create(
ca=ca,
number=Coalesce(models.Subquery(number_subquery, default=1), 1),
only_contains_ca_certs=only_contains_ca_certs,
only_contains_user_certs=only_contains_user_certs,
only_some_reasons=only_some_reasons,
last_update=last_update,
next_update=next_update,
)

# Refresh the object from the database, since we need to access the number. See:
# https://docs.djangoproject.com/en/5.1/ref/models/expressions/#f-assignments-persist-after-model-save
obj.refresh_from_db()

# Add the CRL Number extension
builder = builder.add_extension(x509.CRLNumber(crl_number=int(obj.id)), critical=False)

# Create the signed CRL
crl = ca.key_backend.sign_certificate_revocation_list(
ca=ca, use_private_key_options=key_backend_options, builder=builder, algorithm=ca.algorithm
)

# Store CRL in the database
obj.data = crl.public_bytes(Encoding.DER)
obj.save()

return obj


class AcmeAccountManager(AcmeAccountManagerBase):
"""Model manager for :py:class:`~django_ca.models.AcmeAccount`."""

Expand Down
32 changes: 32 additions & 0 deletions ca/django_ca/migrations/0047_certificaterevocationlist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Generated by Django 5.1.1 on 2024-10-12 08:17

import django.db.models.deletion
import django_ca.models
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_ca', '0046_rename_expires_certificate_not_after_and_more'),
]

operations = [
migrations.CreateModel(
name='CertificateRevocationList',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('number', models.PositiveIntegerField(db_index=True, help_text='Monotonically increasing number for the CRLNumber extension.')),
('last_update', models.DateTimeField(help_text="The CRL's activation time.")),
('next_update', models.DateTimeField(help_text="The CRL's next update time.")),
('only_contains_user_certs', models.BooleanField(default=False)),
('only_contains_ca_certs', models.BooleanField(default=False)),
('only_some_reasons', models.JSONField(decoder=django_ca.models.ReasonDecoder, default=None, encoder=django_ca.models.ReasonEncoder, null=True)),
('data', models.BinaryField(null=True)),
('ca', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='django_ca.certificateauthority', verbose_name='Certificate Authority')),
],
options={
'indexes': [models.Index(fields=['ca', 'number', 'only_contains_user_certs', 'only_contains_ca_certs', 'only_some_reasons'], name='django_ca_c_ca_id_bcb21f_idx')],
},
),
]
Loading

0 comments on commit 6f2fccd

Please sign in to comment.