Skip to content

Commit d5404da

Browse files
committed
add data migration to assure a proper default state
1 parent 7fbf1d9 commit d5404da

File tree

8 files changed

+319
-16
lines changed

8 files changed

+319
-16
lines changed

ca/django_ca/management/commands/edit_ca.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def add_arguments(self, parser: CommandParser) -> None:
5555
"--disable", action="store_false", dest="enabled", help="Disable the certificate authority."
5656
)
5757

58-
def handle(
58+
def handle( # noqa: PLR0913
5959
self,
6060
ca: CertificateAuthority,
6161
enabled: Optional[bool],
Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
11
# Generated by Django 5.1.3 on 2024-12-01 19:37
22

3-
import django_ca.models
43
from django.db import migrations, models
54

5+
import django_ca.models
6+
67

78
class Migration(migrations.Migration):
8-
99
dependencies = [
10-
('django_ca', '0049_remove_certificateauthority_crl_number'),
10+
("django_ca", "0049_remove_certificateauthority_crl_number"),
1111
]
1212

1313
operations = [
1414
migrations.AddField(
15-
model_name='certificateauthority',
16-
name='ocsp_key_backend_alias',
17-
field=models.CharField(default='default', help_text='Backend to handle private keys for OCSP responder certificates.', max_length=256),
15+
model_name="certificateauthority",
16+
name="ocsp_key_backend_alias",
17+
field=models.CharField(
18+
default="default",
19+
help_text="Backend to handle private keys for OCSP responder certificates.",
20+
max_length=256,
21+
),
1822
preserve_default=False,
1923
),
2024
migrations.AddField(
21-
model_name='certificateauthority',
22-
name='ocsp_key_backend_options',
23-
field=models.JSONField(blank=True, default=django_ca.models.ocsp_key_backend_options_default, help_text='Key backend options for using OCSP responder private keys.'),
25+
model_name="certificateauthority",
26+
name="ocsp_key_backend_options",
27+
field=models.JSONField(
28+
blank=True,
29+
default=django_ca.models.ocsp_key_backend_options_default,
30+
help_text="Key backend options for using OCSP responder private keys.",
31+
),
2432
),
2533
]
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
2+
#
3+
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
4+
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
5+
# option) any later version.
6+
#
7+
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8+
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
9+
# for more details.
10+
#
11+
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
12+
# <http://www.gnu.org/licenses/>.
13+
#
14+
# Generated by Django 5.1.3 on 2024-12-01 19:38
15+
16+
import logging
17+
18+
from celery.exceptions import ImproperlyConfigured
19+
20+
from cryptography import x509
21+
from cryptography.hazmat.primitives.serialization import Encoding
22+
23+
from django.core.files.storage import InvalidStorageError, storages
24+
from django.db import migrations
25+
from django.utils import timezone
26+
27+
from django_ca.key_backends import ocsp_key_backends
28+
from django_ca.key_backends.storages import StoragesOCSPBackend
29+
30+
log = logging.getLogger(__name__)
31+
32+
33+
def configure_ocsp_responder_key(apps, schema_editor):
34+
"""Migration function to set ocsp responder key if it exists."""
35+
# We can't import the Person model directly as it may be a newer
36+
# version than this migration expects. We use the historical version.
37+
CertificateAuthority = apps.get_model("django_ca", "CertificateAuthority")
38+
39+
now = timezone.now()
40+
for ca in CertificateAuthority.objects.filter(enabled=True).filter(not_after__gt=now, not_before__lt=now):
41+
try:
42+
ocsp_key_backend = ocsp_key_backends[ca.ocsp_key_backend_alias]
43+
except ValueError: # pragma: no cover
44+
log.exception("%s: Cannot load OCSP key storage backend.")
45+
continue
46+
except ImproperlyConfigured: # pragma: no cover # not possible to reproduce with SettingsWrapper
47+
log.warning("%s: Key backend is not configured.", ca.ocsp_key_backend_alias)
48+
continue
49+
50+
# COVERAGE NOTE: No other implementations exist at the time of writing
51+
if not isinstance(ocsp_key_backend, StoragesOCSPBackend): # pragma: no cover
52+
continue
53+
54+
try:
55+
storage = storages[ocsp_key_backend.storage_alias]
56+
57+
# COVERAGE NOTE: This exception should never happen, as this misconfiguration is already raised when
58+
# the ocsp_key_backend is accessed.
59+
except InvalidStorageError: # pragma: no cover
60+
log.error(
61+
"%s: OCSP key backend storage alias does not refer to a valid storage backend.",
62+
ocsp_key_backend.storage_alias,
63+
)
64+
continue
65+
66+
private_key_path = f"ocsp/{ca.serial}.key"
67+
public_key_path = f"ocsp/{ca.serial}.pem"
68+
69+
if storage.exists(private_key_path) and storage.exists(public_key_path):
70+
ca.ocsp_key_backend_options["private_key"]["path"] = private_key_path
71+
72+
# Read public key
73+
stream = storage.open(public_key_path)
74+
75+
try:
76+
public_key_data: bytes = stream.read() # pragma: no branch
77+
finally:
78+
stream.close()
79+
80+
if not public_key_data.startswith(b"-----BEGIN CERTIFICATE-----"):
81+
try:
82+
public_key = x509.load_der_x509_certificate(public_key_data)
83+
public_key_data = public_key.public_bytes(Encoding.PEM)
84+
except ValueError:
85+
log.warning(
86+
"%s: Cannot encode certificate. Regenerate OCSP keys manually.", public_key_path
87+
)
88+
continue
89+
90+
ca.ocsp_key_backend_options["certificate"]["pem"] = public_key_data.decode()
91+
ca.save()
92+
else:
93+
log.warning("Private or public key not found. Regenerate OCSP keys manually.")
94+
95+
96+
class Migration(migrations.Migration):
97+
dependencies = [
98+
("django_ca", "0050_certificateauthority_ocsp_key_backend_alias_and_more"),
99+
]
100+
101+
operations = [migrations.RunPython(configure_ocsp_responder_key, migrations.RunPython.noop)]

ca/django_ca/tests/commands/test_edit_ca.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,22 @@ def test_rest_api_arguments_mutually_exclusive(root: CertificateAuthority) -> No
176176
assert root.api_enabled is False # state unchanged
177177

178178

179-
def test_ocsp_responder_arguments(root: CertificateAuthority) -> None:
179+
def test_ocsp_responder_arguments(root: CertificateAuthority, settings: SettingsWrapper) -> None:
180180
"""Test ACME arguments."""
181-
edit_ca(root, ocsp_responder_key_validity=10, ocsp_response_validity=3600)
182-
181+
settings.CA_OCSP_KEY_BACKENDS = {
182+
"default": {
183+
"BACKEND": "django_ca.key_backends.storages.StoragesOCSPBackend",
184+
"OPTIONS": {"storage_alias": "django-ca"},
185+
},
186+
"other": {
187+
"BACKEND": "django_ca.key_backends.storages.StoragesOCSPBackend",
188+
"OPTIONS": {"storage_alias": "django-ca"},
189+
},
190+
}
191+
192+
edit_ca(root, ocsp_key_backend="other", ocsp_responder_key_validity=10, ocsp_response_validity=3600)
193+
194+
assert root.ocsp_key_backend_alias == "other"
183195
assert root.ocsp_responder_key_validity == 10
184196
assert root.ocsp_response_validity == 3600
185197

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
# This file is part of django-ca (https://github.com/mathiasertl/django-ca).
2+
#
3+
# django-ca is free software: you can redistribute it and/or modify it under the terms of the GNU General
4+
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
5+
# option) any later version.
6+
#
7+
# django-ca is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
8+
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
9+
# for more details.
10+
#
11+
# You should have received a copy of the GNU General Public License along with django-ca. If not, see
12+
# <http://www.gnu.org/licenses/>.
13+
14+
# pylint: disable=redefined-outer-name # because of fixtures
15+
# pylint: disable=invalid-name # for model loading
16+
17+
"""Test 0051 database migration."""
18+
19+
import logging
20+
import shutil
21+
from datetime import timedelta
22+
from pathlib import Path
23+
from typing import Any, Callable, Optional
24+
25+
from django_test_migrations.migrator import Migrator
26+
27+
from cryptography import x509
28+
from cryptography.hazmat.primitives.serialization import Encoding
29+
30+
from django.db.migrations.state import ProjectState
31+
from django.utils import timezone
32+
33+
import pytest
34+
from _pytest.logging import LogCaptureFixture
35+
from pytest_django.fixtures import SettingsWrapper
36+
37+
from django_ca.tests.base.constants import CERT_DATA, FIXTURES_DIR
38+
39+
40+
@pytest.fixture(scope="session")
41+
def ocsp_pem() -> str:
42+
"""Fixture for a PEM certificate."""
43+
with open(FIXTURES_DIR / "profile-ocsp.pub", "rb") as stream:
44+
public_key_data = stream.read()
45+
public_key = x509.load_der_x509_certificate(public_key_data)
46+
public_key_data = public_key.public_bytes(Encoding.PEM)
47+
return public_key_data.decode()
48+
49+
50+
def setup(migrator: Migrator, setup: Optional[Callable[[ProjectState], None]] = None) -> ProjectState:
51+
"""Set up a CA with a CRL Number for the given scope."""
52+
old_state = migrator.apply_initial_migration(("django_ca", "0049_remove_certificateauthority_crl_number"))
53+
now = timezone.now()
54+
55+
cert: x509.Certificate = CERT_DATA["root"]["pub"]["parsed"]
56+
CertificateAuthority = old_state.apps.get_model("django_ca", "CertificateAuthority")
57+
ca = CertificateAuthority.objects.create(
58+
pub=cert,
59+
cn="",
60+
serial="123",
61+
not_before=now - timedelta(days=1),
62+
not_after=now + timedelta(days=1),
63+
)
64+
65+
if setup is not None:
66+
setup(ca)
67+
68+
return migrator.apply_tested_migration(("django_ca", "0051_auto_20241201_2038"))
69+
70+
71+
@pytest.mark.usefixtures("tmpcadir")
72+
def test_normal_migration(
73+
caplog: LogCaptureFixture, migrator: Migrator, tmpcadir: Path, ocsp_pem: str
74+
) -> None:
75+
"""Test running the migration with an empty cache."""
76+
ocsp_dest = tmpcadir / "ocsp"
77+
ocsp_dest.mkdir(exist_ok=True, parents=True)
78+
79+
def setup_ocsp_keys(ca: Any) -> None:
80+
shutil.copy(FIXTURES_DIR / "profile-ocsp.key", ocsp_dest / f"{ca.serial}.key")
81+
shutil.copy(FIXTURES_DIR / "profile-ocsp.pub", ocsp_dest / f"{ca.serial}.pem")
82+
83+
state = setup(migrator, setup=setup_ocsp_keys)
84+
85+
CertificateAuthority = state.apps.get_model("django_ca", "CertificateAuthority")
86+
ca = CertificateAuthority.objects.get(serial="123")
87+
88+
assert ca.ocsp_key_backend_options == {
89+
"certificate": {"pem": ocsp_pem},
90+
"private_key": {"path": "ocsp/123.key"},
91+
}
92+
assert caplog.record_tuples == []
93+
94+
95+
@pytest.mark.usefixtures("tmpcadir")
96+
def test_with_pem_public_key(
97+
caplog: LogCaptureFixture, migrator: Migrator, tmpcadir: Path, ocsp_pem: str
98+
) -> None:
99+
"""Test running the migration with a PEM public key."""
100+
ocsp_dest = tmpcadir / "ocsp"
101+
ocsp_dest.mkdir(exist_ok=True, parents=True)
102+
103+
def setup_ocsp_keys(ca: Any) -> None:
104+
shutil.copy(FIXTURES_DIR / "profile-ocsp.key", ocsp_dest / f"{ca.serial}.key")
105+
with open(ocsp_dest / f"{ca.serial}.pem", "w", encoding="ascii") as stream:
106+
stream.write(ocsp_pem)
107+
108+
state = setup(migrator, setup=setup_ocsp_keys)
109+
110+
CertificateAuthority = state.apps.get_model("django_ca", "CertificateAuthority")
111+
ca = CertificateAuthority.objects.get(serial="123")
112+
assert caplog.record_tuples == []
113+
assert ca.ocsp_key_backend_options == {
114+
"certificate": {"pem": ocsp_pem},
115+
"private_key": {"path": "ocsp/123.key"},
116+
}
117+
118+
119+
@pytest.mark.usefixtures("tmpcadir")
120+
def test_ocsp_keys_dont_exist(caplog: LogCaptureFixture, migrator: Migrator) -> None:
121+
"""Test running the migration where no OCSP key was generated."""
122+
state = setup(migrator)
123+
CertificateAuthority = state.apps.get_model("django_ca", "CertificateAuthority")
124+
ca = CertificateAuthority.objects.get(serial="123")
125+
assert ca.ocsp_key_backend_options == {"certificate": {}, "private_key": {}}
126+
assert caplog.record_tuples == [
127+
(
128+
"django_ca.migrations.0051_auto_20241201_2038",
129+
logging.WARNING,
130+
"Private or public key not found. Regenerate OCSP keys manually.",
131+
)
132+
]
133+
134+
135+
@pytest.mark.usefixtures("tmpcadir")
136+
def test_with_bogus_public_key(caplog: LogCaptureFixture, migrator: Migrator, tmpcadir: Path) -> None:
137+
"""Test running the migration with a bogus public key."""
138+
ocsp_dest = tmpcadir / "ocsp"
139+
ocsp_dest.mkdir(exist_ok=True, parents=True)
140+
141+
def setup_ocsp_keys(ca: Any) -> None:
142+
shutil.copy(FIXTURES_DIR / "profile-ocsp.key", ocsp_dest / f"{ca.serial}.key")
143+
with open(ocsp_dest / f"{ca.serial}.pem", "w", encoding="ascii") as stream:
144+
stream.write("foobar")
145+
146+
state = setup(migrator, setup=setup_ocsp_keys)
147+
148+
CertificateAuthority = state.apps.get_model("django_ca", "CertificateAuthority")
149+
ca = CertificateAuthority.objects.get(serial="123")
150+
assert caplog.record_tuples == [
151+
(
152+
"django_ca.migrations.0051_auto_20241201_2038",
153+
logging.WARNING,
154+
"ocsp/123.pem: Cannot encode certificate. Regenerate OCSP keys manually.",
155+
)
156+
]
157+
assert ca.ocsp_key_backend_options == {"certificate": {}, "private_key": {}}
158+
159+
160+
@pytest.mark.usefixtures("tmpcadir")
161+
def test_with_invalid_storage_alias(
162+
caplog: LogCaptureFixture, migrator: Migrator, tmpcadir: Path, settings: SettingsWrapper
163+
) -> None:
164+
"""Test running the migration with an improperly configured ocsp alias."""
165+
ocsp_dest = tmpcadir / "ocsp"
166+
ocsp_dest.mkdir(exist_ok=True, parents=True)
167+
168+
settings.CA_OCSP_KEY_BACKENDS = {
169+
"default": {
170+
"BACKEND": "django_ca.key_backends.storages.StoragesOCSPBackend",
171+
"OPTIONS": {"storage_alias": "foobar"},
172+
}
173+
}
174+
175+
state = setup(migrator)
176+
177+
CertificateAuthority = state.apps.get_model("django_ca", "CertificateAuthority")
178+
ca = CertificateAuthority.objects.get(serial="123")
179+
assert "Cannot load OCSP key storage backend." in caplog.text
180+
assert ca.ocsp_key_backend_options == {"certificate": {}, "private_key": {}}

ca/django_ca/tests/tasks/test_generate_ocsp_keys.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,5 @@ def test_with_invalid_password(usable_pwd: CertificateAuthority) -> None:
6060
storage = storages[model_settings.CA_DEFAULT_STORAGE_ALIAS]
6161
generate_ocsp_keys([usable_pwd.serial], {usable_pwd.serial: {"password": password}})
6262
usable_pwd.refresh_from_db() # models from fixture have old data
63-
assert usable_pwd.ocsp_key_backend_options == {}
63+
assert usable_pwd.ocsp_key_backend_options == {"private_key": {}, "certificate": {}}
6464
assert storage.exists(f"ocsp/{usable_pwd.serial}.pem") is False

ca/django_ca/typehints.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@
4848

4949
CRYPTOGRAPHY_VERSION = packaging.version.parse(cryptography.__version__).release
5050

51-
# pylint: disable-next=invalid-name
52-
JSON = Union[dict[str, "JSON"], list["JSON"], str, int, float, bool, None]
51+
52+
#: JSON serializable data.
53+
JSON = Union[dict[str, "JSON"], list["JSON"], str, int, float, bool, None] # pylint: disable=invalid-name
5354

5455

5556
class OCSPKeyBackendDict(TypedDict):

docs/source/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@
519519
# Pydantic root model signature does not currently work
520520
("py:class", "RootModelRootType"),
521521
("py:class", "pathlib._local.Path"),
522+
("py:class", "JSON"),
522523
]
523524

524525

0 commit comments

Comments
 (0)