Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identity Manager for DNSSEC Awesomeness #513

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions api/desecapi/migrations/0018_tlsidentity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Generated by Django 3.2.7 on 2021-09-19 19:53

from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import uuid


class Migration(migrations.Migration):

dependencies = [
('desecapi', '0017_alter_user_limit_domains'),
]

operations = [
migrations.CreateModel(
name='TLSIdentity',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('name', models.CharField(default='', max_length=24)),
('created', models.DateTimeField(auto_now_add=True)),
('default_ttl', models.PositiveIntegerField(default=300)),
('scheduled_removal', models.DateTimeField(null=True)),
('certificate', models.TextField()),
('tlsa_selector', models.IntegerField(choices=[(0, 'Full Certificate'), (1, 'Subject Public Key Info')], default=1)),
('tlsa_matching_type', models.IntegerField(choices=[(0, 'No Hash Used'), (1, 'Sha256'), (2, 'Sha512')], default=1)),
('tlsa_certificate_usage', models.IntegerField(choices=[(0, 'Ca Constraint'), (1, 'Service Certificate Constraint'), (2, 'Trust Anchor Assertion'), (3, 'Domain Issued Certificate')], default=3)),
('port', models.IntegerField(default=443)),
('protocol', models.TextField(choices=[('tcp', 'Tcp'), ('udp', 'Udp'), ('sctp', 'Sctp')], default='tcp')),
('owner', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='identities', to=settings.AUTH_USER_MODEL)),
('rrs', models.ManyToManyField(to='desecapi.RR')),
],
options={
'abstract': False,
},
),
]
185 changes: 185 additions & 0 deletions api/desecapi/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from datetime import timedelta
from functools import cached_property
from hashlib import sha256
from typing import Set, List, Optional, Tuple, Dict

import dns
import psl_dns
import rest_framework.authtoken.models
from cryptography import x509, hazmat
from django.conf import settings
from django.contrib.auth.hashers import make_password
from django.contrib.auth.models import AbstractBaseUser, AnonymousUser, BaseUserManager
Expand Down Expand Up @@ -217,6 +219,14 @@ def filter_qname(self, qname: str, **kwargs) -> models.query.QuerySet:
name_length=Length('name'),
).filter(dotted_qname__endswith=F('dotted_name'), **kwargs)

def most_specific_zone(self, fqdn: str) -> Tuple[Domain, str]:
try:
domain = self.filter_qname(fqdn).order_by('-name_length')[0]
except IndexError:
raise Domain.DoesNotExist
subname = fqdn[:-len(domain.name)].rstrip('.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.removesuffix()

return domain, subname


class Domain(ExportModelOperationsMixin('Domain'), models.Model):
@staticmethod
Expand Down Expand Up @@ -982,3 +992,178 @@ def verify(self, solution: str):
and
age <= settings.CAPTCHA_VALIDITY_PERIOD # not expired
)


class Identity(models.Model):
rr_type = None

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=24, default="")
created = models.DateTimeField(auto_now_add=True)
owner = models.ForeignKey(User, on_delete=models.PROTECT, related_name='identities')
default_ttl = models.PositiveIntegerField(default=300)
rrs = models.ManyToManyField(to=RR, related_name='identities')
scheduled_removal = models.DateTimeField(null=True)

class Meta:
abstract = True

def get_rrs(self) -> List[RR]:
raise NotImplementedError

def save(self, *args, **kwargs):
for rr in self.get_rrs():
rr.rrset.save()
rr.save()
self.rrs.add(rr)
return super().save(*args, **kwargs)

def delete(self, using=None, keep_parents=False):
for rr in self.rrs.all(): # TODO use one query
if len(rr.identities.all()) == 1:
rr.delete()
return super().delete(using, keep_parents)

# TODO move to RRset / RRset manager?
def get_or_create_rr_set(self, domain: Domain, subname: str) -> RRset:
try:
return RRset.objects.get(domain=domain, subname=subname, type=self.rr_type)
except RRset.DoesNotExist:
return RRset(domain=domain, subname=subname, type=self.rr_type, ttl=self.default_ttl)

# TODO move to RR / RR manager?
def get_or_create_rr(self, fqdn: str, content: str) -> RR:
domain, subname = self.owner.domains.most_specific_zone(fqdn)
rrset = self.get_or_create_rr_set(domain, subname)
try:
return RR.objects.get(rrset=rrset, content=content)
except RR.DoesNotExist:
return RR(rrset=rrset, content=content)


class TLSIdentity(Identity):
rr_type = 'TLSA'

class CertificateUsage(models.IntegerChoices):
CA_CONSTRAINT = 0
SERVICE_CERTIFICATE_CONSTRAINT = 1
TRUST_ANCHOR_ASSERTION = 2
DOMAIN_ISSUED_CERTIFICATE = 3

class Selector(models.IntegerChoices):
FULL_CERTIFICATE = 0
SUBJECT_PUBLIC_KEY_INFO = 1

class MatchingType(models.IntegerChoices):
NO_HASH_USED = 0
SHA256 = 1
SHA512 = 2

class Protocol(models.TextChoices):
TCP = 'tcp'
UDP = 'udp'
SCTP = 'sctp'

certificate = models.TextField()

tlsa_selector = models.IntegerField(choices=Selector.choices, default=Selector.SUBJECT_PUBLIC_KEY_INFO)
tlsa_matching_type = models.IntegerField(choices=MatchingType.choices, default=MatchingType.SHA256)
tlsa_certificate_usage = models.IntegerField(choices=CertificateUsage.choices,
default=CertificateUsage.DOMAIN_ISSUED_CERTIFICATE)

port = models.IntegerField(default=443)
protocol = models.TextField(choices=Protocol.choices, default=Protocol.TCP)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if 'not_valid_after' not in kwargs:
self.scheduled_removal = self.not_valid_after

def get_record_content(self) -> str:
# choose hash function
if self.tlsa_matching_type == self.MatchingType.SHA256:
hash_function = hazmat.primitives.hashes.SHA256()
elif self.tlsa_matching_type == self.MatchingType.SHA512:
hash_function = hazmat.primitives.hashes.SHA512()
else:
raise NotImplementedError

# choose data to hash
if self.tlsa_selector == self.Selector.SUBJECT_PUBLIC_KEY_INFO:
to_be_hashed = self._cert.public_key().public_bytes(
hazmat.primitives.serialization.Encoding.DER,
hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo
)
else:
raise NotImplementedError

# compute the hash
h = hazmat.primitives.hashes.Hash(hash_function)
h.update(to_be_hashed)
hash = h.finalize().hex()

# create TLSA record content
return f"{self.tlsa_certificate_usage} {self.tlsa_selector} {self.tlsa_matching_type} {hash}"

@property
def _cert(self) -> x509.Certificate:
return x509.load_pem_x509_certificate(self.certificate.encode())

@property
def fingerprint(self) -> str:
return self._cert.fingerprint(hazmat.primitives.hashes.SHA256()).hex()

@property
def subject_names(self) -> Set[str]:
subject_names = {
x.value for x in
self._cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
}

try:
subject_alternative_names = {
x for x in
self._cert.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(x509.DNSName)
}
except x509.extensions.ExtensionNotFound:
subject_alternative_names = set()

return subject_names | subject_alternative_names

@property
def subject_names_clean(self) -> Set[str]:
clean = set()
for name in self.subject_names:
# cut off any wildcard prefix
name = name.lstrip('*').lstrip('.')

# filter names for valid domain names
try:
validate_domain_name[1](name)
except ValidationError:
continue

clean.add(name)
return clean

def get_rrs(self) -> List[RR]:
rrs = []
content = self.get_record_content()
for qname in self.subject_names_clean:
try:
rrs.append(self.get_or_create_rr(
fqdn=f"_{self.port:n}._{self.protocol}.{qname}",
content=content,
))
except Domain.DoesNotExist:
pass
return rrs

@property
def not_valid_before(self):
return self._cert.not_valid_before

@property
def not_valid_after(self):
return self._cert.not_valid_after
28 changes: 28 additions & 0 deletions api/desecapi/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,31 @@ class AuthenticatedRenewDomainBasicUserActionSerializer(AuthenticatedDomainBasic

class Meta(AuthenticatedDomainBasicUserActionSerializer.Meta):
model = models.AuthenticatedRenewDomainBasicUserAction


class TLSIdentitySerializer(serializers.ModelSerializer):
published_at = serializers.SerializerMethodField(read_only=True)

def get_published_at(self, tls_identity: models.TLSIdentity):
return [
f"{rrset.type}/{rrset.name}"
for rrset in tls_identity.get_rrsets()
]

class Meta:
model = models.TLSIdentity
fields = (
'id', 'name', 'created',

'default_ttl',

'certificate',
'tlsa_selector', 'tlsa_matching_type', 'tlsa_certificate_usage',

'port', 'protocol',

'fingerprint', 'not_valid_before', 'not_valid_after', 'subject_names',

'published_at',
)
read_only_fields = list(filter(lambda f: f not in ('name', 'certificate'), fields))
Loading