-
Notifications
You must be signed in to change notification settings - Fork 43
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add data migration to assure a proper default state
- Loading branch information
1 parent
7fbf1d9
commit e1e4ea4
Showing
6 changed files
with
279 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
26 changes: 17 additions & 9 deletions
26
ca/django_ca/migrations/0050_certificateauthority_ocsp_key_backend_alias_and_more.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,33 @@ | ||
# Generated by Django 5.1.3 on 2024-12-01 19:37 | ||
|
||
import django_ca.models | ||
from django.db import migrations, models | ||
|
||
import django_ca.models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
('django_ca', '0049_remove_certificateauthority_crl_number'), | ||
("django_ca", "0049_remove_certificateauthority_crl_number"), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name='certificateauthority', | ||
name='ocsp_key_backend_alias', | ||
field=models.CharField(default='default', help_text='Backend to handle private keys for OCSP responder certificates.', max_length=256), | ||
model_name="certificateauthority", | ||
name="ocsp_key_backend_alias", | ||
field=models.CharField( | ||
default="default", | ||
help_text="Backend to handle private keys for OCSP responder certificates.", | ||
max_length=256, | ||
), | ||
preserve_default=False, | ||
), | ||
migrations.AddField( | ||
model_name='certificateauthority', | ||
name='ocsp_key_backend_options', | ||
field=models.JSONField(blank=True, default=django_ca.models.ocsp_key_backend_options_default, help_text='Key backend options for using OCSP responder private keys.'), | ||
model_name="certificateauthority", | ||
name="ocsp_key_backend_options", | ||
field=models.JSONField( | ||
blank=True, | ||
default=django_ca.models.ocsp_key_backend_options_default, | ||
help_text="Key backend options for using OCSP responder private keys.", | ||
), | ||
), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
# This file is part of django-ca (https://github.com/mathiasertl/django-ca). | ||
# | ||
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General | ||
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your | ||
# option) any later version. | ||
# | ||
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the | ||
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License | ||
# for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License along with django-ca. If not, see | ||
# <http://www.gnu.org/licenses/>. | ||
# | ||
# Generated by Django 5.1.3 on 2024-12-01 19:38 | ||
|
||
import logging | ||
|
||
from celery.exceptions import ImproperlyConfigured | ||
|
||
from cryptography import x509 | ||
from cryptography.hazmat.primitives.serialization import Encoding | ||
|
||
from django.core.files.storage import InvalidStorageError, storages | ||
from django.db import migrations | ||
from django.utils import timezone | ||
|
||
from django_ca.key_backends import ocsp_key_backends | ||
from django_ca.key_backends.storages import StoragesOCSPBackend | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
def configure_ocsp_responder_key(apps, schema_editor): | ||
"""Migration function to set ocsp responder key if it exists.""" | ||
# We can't import the Person model directly as it may be a newer | ||
# version than this migration expects. We use the historical version. | ||
CertificateAuthority = apps.get_model("django_ca", "CertificateAuthority") | ||
|
||
now = timezone.now() | ||
for ca in CertificateAuthority.objects.filter(enabled=True).filter(not_after__gt=now, not_before__lt=now): | ||
try: | ||
ocsp_key_backend = ocsp_key_backends[ca.ocsp_key_backend_alias] | ||
except ValueError: # pragma: no cover | ||
log.exception("%s: Cannot load OCSP key storage backend.") | ||
continue | ||
except ImproperlyConfigured: # pragma: no cover # not possible to reproduce with SettingsWrapper | ||
log.warning("%s: Key backend is not configured.", ca.ocsp_key_backend_alias) | ||
continue | ||
|
||
# COVERAGE NOTE: No other implementations exist at the time of writing | ||
if not isinstance(ocsp_key_backend, StoragesOCSPBackend): # pragma: no cover | ||
continue | ||
|
||
try: | ||
storage = storages[ocsp_key_backend.storage_alias] | ||
|
||
# COVERAGE NOTE: This exception should never happen, as this misconfiguration is already raised when | ||
# the ocsp_key_backend is accessed. | ||
except InvalidStorageError: # pragma: no cover | ||
log.error( | ||
"%s: OCSP key backend storage alias does not refer to a valid storage backend.", | ||
ocsp_key_backend.storage_alias, | ||
) | ||
continue | ||
|
||
private_key_path = f"ocsp/{ca.serial}.key" | ||
public_key_path = f"ocsp/{ca.serial}.pem" | ||
|
||
if storage.exists(private_key_path) and storage.exists(public_key_path): | ||
ca.ocsp_key_backend_options["private_key"]["path"] = private_key_path | ||
|
||
# Read public key | ||
stream = storage.open(public_key_path) | ||
|
||
try: | ||
public_key_data: bytes = stream.read() # pragma: no branch | ||
finally: | ||
stream.close() | ||
|
||
if not public_key_data.startswith(b"-----BEGIN CERTIFICATE-----"): | ||
try: | ||
public_key = x509.load_der_x509_certificate(public_key_data) | ||
public_key_data = public_key.public_bytes(Encoding.PEM) | ||
except ValueError: | ||
log.warning( | ||
"%s: Cannot encode certificate. Regenerate OCSP keys manually.", public_key_path | ||
) | ||
continue | ||
|
||
ca.ocsp_key_backend_options["certificate"]["pem"] = public_key_data.decode() | ||
ca.save() | ||
else: | ||
log.warning("Private or public key not found. Regenerate OCSP keys manually.") | ||
|
||
|
||
class Migration(migrations.Migration): | ||
dependencies = [ | ||
("django_ca", "0050_certificateauthority_ocsp_key_backend_alias_and_more"), | ||
] | ||
|
||
operations = [migrations.RunPython(configure_ocsp_responder_key, migrations.RunPython.noop)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
# This file is part of django-ca (https://github.com/mathiasertl/django-ca). | ||
# | ||
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General | ||
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your | ||
# option) any later version. | ||
# | ||
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the | ||
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License | ||
# for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License along with django-ca. If not, see | ||
# <http://www.gnu.org/licenses/>. | ||
|
||
# pylint: disable=redefined-outer-name # because of fixtures | ||
# pylint: disable=invalid-name # for model loading | ||
|
||
"""Test 0051 database migration.""" | ||
|
||
import logging | ||
import shutil | ||
from datetime import timedelta | ||
from pathlib import Path | ||
from typing import Any, Callable, Optional | ||
|
||
from django_test_migrations.migrator import Migrator | ||
|
||
from cryptography import x509 | ||
from cryptography.hazmat.primitives.serialization import Encoding | ||
|
||
from django.db.migrations.state import ProjectState | ||
from django.utils import timezone | ||
|
||
import pytest | ||
from _pytest.logging import LogCaptureFixture | ||
|
||
from django_ca.tests.base.constants import CERT_DATA, FIXTURES_DIR | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def ocsp_pem() -> str: | ||
"""Fixture for a PEM certificate.""" | ||
with open(FIXTURES_DIR / "profile-ocsp.pub", "rb") as stream: | ||
public_key_data = stream.read() | ||
public_key = x509.load_der_x509_certificate(public_key_data) | ||
public_key_data = public_key.public_bytes(Encoding.PEM) | ||
return public_key_data.decode() | ||
|
||
|
||
def setup(migrator: Migrator, setup: Optional[Callable[[ProjectState], None]] = None) -> ProjectState: | ||
"""Set up a CA with a CRL Number for the given scope.""" | ||
old_state = migrator.apply_initial_migration(("django_ca", "0049_remove_certificateauthority_crl_number")) | ||
now = timezone.now() | ||
|
||
cert: x509.Certificate = CERT_DATA["root"]["pub"]["parsed"] | ||
CertificateAuthority = old_state.apps.get_model("django_ca", "CertificateAuthority") | ||
ca = CertificateAuthority.objects.create( | ||
pub=cert, | ||
cn="", | ||
serial="123", | ||
not_before=now - timedelta(days=1), | ||
not_after=now + timedelta(days=1), | ||
) | ||
|
||
if setup is not None: | ||
setup(ca) | ||
|
||
return migrator.apply_tested_migration(("django_ca", "0051_auto_20241201_2038")) | ||
|
||
|
||
@pytest.mark.usefixtures("tmpcadir") | ||
def test_normal_migration( | ||
caplog: LogCaptureFixture, migrator: Migrator, tmpcadir: Path, ocsp_pem: str | ||
) -> None: | ||
"""Test running the migration with an empty cache.""" | ||
ocsp_dest = tmpcadir / "ocsp" | ||
ocsp_dest.mkdir(exist_ok=True, parents=True) | ||
|
||
def setup_ocsp_keys(ca: Any) -> None: | ||
shutil.copy(FIXTURES_DIR / "profile-ocsp.key", ocsp_dest / f"{ca.serial}.key") | ||
shutil.copy(FIXTURES_DIR / "profile-ocsp.pub", ocsp_dest / f"{ca.serial}.pem") | ||
|
||
state = setup(migrator, setup=setup_ocsp_keys) | ||
|
||
CertificateAuthority = state.apps.get_model("django_ca", "CertificateAuthority") | ||
ca = CertificateAuthority.objects.get(serial="123") | ||
|
||
assert ca.ocsp_key_backend_options == { | ||
"certificate": {"pem": ocsp_pem}, | ||
"private_key": {"path": "ocsp/123.key"}, | ||
} | ||
assert caplog.record_tuples == [] | ||
|
||
|
||
@pytest.mark.usefixtures("tmpcadir") | ||
def test_with_pem_public_key( | ||
caplog: LogCaptureFixture, migrator: Migrator, tmpcadir: Path, ocsp_pem: str | ||
) -> None: | ||
"""Test running the migration with a PEM public key.""" | ||
ocsp_dest = tmpcadir / "ocsp" | ||
ocsp_dest.mkdir(exist_ok=True, parents=True) | ||
|
||
def setup_ocsp_keys(ca: Any) -> None: | ||
shutil.copy(FIXTURES_DIR / "profile-ocsp.key", ocsp_dest / f"{ca.serial}.key") | ||
with open(ocsp_dest / f"{ca.serial}.pem", "w", encoding="ascii") as stream: | ||
stream.write(ocsp_pem) | ||
|
||
state = setup(migrator, setup=setup_ocsp_keys) | ||
|
||
CertificateAuthority = state.apps.get_model("django_ca", "CertificateAuthority") | ||
ca = CertificateAuthority.objects.get(serial="123") | ||
assert caplog.record_tuples == [] | ||
assert ca.ocsp_key_backend_options == { | ||
"certificate": {"pem": ocsp_pem}, | ||
"private_key": {"path": "ocsp/123.key"}, | ||
} | ||
|
||
|
||
@pytest.mark.usefixtures("tmpcadir") | ||
def test_ocsp_keys_dont_exist(caplog: LogCaptureFixture, migrator: Migrator) -> None: | ||
"""Test running the migration where no OCSP key was generated.""" | ||
state = setup(migrator) | ||
CertificateAuthority = state.apps.get_model("django_ca", "CertificateAuthority") | ||
ca = CertificateAuthority.objects.get(serial="123") | ||
assert ca.ocsp_key_backend_options == {"certificate": {}, "private_key": {}} | ||
assert caplog.record_tuples == [ | ||
( | ||
"django_ca.migrations.0051_auto_20241201_2038", | ||
logging.WARNING, | ||
"Private or public key not found. Regenerate OCSP keys manually.", | ||
) | ||
] | ||
|
||
|
||
@pytest.mark.usefixtures("tmpcadir") | ||
def test_with_bogus_public_key(caplog: LogCaptureFixture, migrator: Migrator, tmpcadir: Path) -> None: | ||
"""Test running the migration with a bogus public key.""" | ||
ocsp_dest = tmpcadir / "ocsp" | ||
ocsp_dest.mkdir(exist_ok=True, parents=True) | ||
|
||
def setup_ocsp_keys(ca: Any) -> None: | ||
shutil.copy(FIXTURES_DIR / "profile-ocsp.key", ocsp_dest / f"{ca.serial}.key") | ||
with open(ocsp_dest / f"{ca.serial}.pem", "w", encoding="ascii") as stream: | ||
stream.write("foobar") | ||
|
||
state = setup(migrator, setup=setup_ocsp_keys) | ||
|
||
CertificateAuthority = state.apps.get_model("django_ca", "CertificateAuthority") | ||
ca = CertificateAuthority.objects.get(serial="123") | ||
assert caplog.record_tuples == [ | ||
( | ||
"django_ca.migrations.0051_auto_20241201_2038", | ||
logging.WARNING, | ||
"ocsp/123.pem: Cannot encode certificate. Regenerate OCSP keys manually.", | ||
) | ||
] | ||
assert ca.ocsp_key_backend_options == {"certificate": {}, "private_key": {}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters