Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identity Manager for DNSSEC Awesomeness #513

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions api/desecapi/migrations/0018_tlsaidentity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Generated by Django 3.2.7 on 2021-09-25 15:30

from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import uuid


class Migration(migrations.Migration):

dependencies = [
('desecapi', '0017_alter_user_limit_domains'),
]

operations = [
migrations.CreateModel(
name='TLSIdentity',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('name', models.CharField(default='', max_length=24)),
('created', models.DateTimeField(auto_now_add=True)),
('default_ttl', models.PositiveIntegerField(default=300)),
('scheduled_removal', models.DateTimeField(null=True)),
('certificate', models.TextField()),
('tlsa_selector', models.IntegerField(choices=[(0, 'Full Certificate'), (1, 'Subject Public Key Info')], default=1)),
('tlsa_matching_type', models.IntegerField(choices=[(0, 'No Hash Used'), (1, 'Sha256'), (2, 'Sha512')], default=1)),
('tlsa_certificate_usage', models.IntegerField(choices=[(0, 'Ca Constraint'), (1, 'Service Certificate Constraint'), (2, 'Trust Anchor Assertion'), (3, 'Domain Issued Certificate')], default=3)),
('port', models.IntegerField(default=443)),
('protocol', models.TextField(choices=[('tcp', 'Tcp'), ('udp', 'Udp'), ('sctp', 'Sctp')], default='tcp')),
('owner', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='identities', to=settings.AUTH_USER_MODEL)),
('rrs', models.ManyToManyField(related_name='identities', to='desecapi.RR')),
],
options={
'abstract': False,
},
),
]
203 changes: 203 additions & 0 deletions api/desecapi/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from datetime import timedelta
from functools import cached_property
from hashlib import sha256
from typing import Set, List, Tuple

import dns
import psl_dns
import rest_framework.authtoken.models
from cryptography import x509, hazmat
from django.conf import settings
from django.contrib.auth.hashers import make_password
from django.contrib.auth.models import AbstractBaseUser, AnonymousUser, BaseUserManager
Expand Down Expand Up @@ -217,6 +219,14 @@ def filter_qname(self, qname: str, **kwargs) -> models.query.QuerySet:
name_length=Length('name'),
).filter(dotted_qname__endswith=F('dotted_name'), **kwargs)

def most_specific_zone(self, fqdn: str) -> Tuple[Domain, str]:
try:
domain = self.filter_qname(fqdn).order_by('-name_length')[0]
except IndexError:
raise Domain.DoesNotExist
subname = fqdn[:-len(domain.name)].removesuffix('.')
return domain, subname


class Domain(ExportModelOperationsMixin('Domain'), models.Model):
@staticmethod
Expand Down Expand Up @@ -982,3 +992,196 @@ def verify(self, solution: str):
and
age <= settings.CAPTCHA_VALIDITY_PERIOD # not expired
)


class Identity(models.Model):
rr_type = None

id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=24, default="")
created = models.DateTimeField(auto_now_add=True)
owner = models.ForeignKey(User, on_delete=models.PROTECT, related_name='identities')
default_ttl = models.PositiveIntegerField(default=300)
rrs = models.ManyToManyField(to=RR, related_name='identities') # TODO OneToMany?
scheduled_removal = models.DateTimeField(null=True)

class Meta:
abstract = True

def get_rrs(self) -> List[RR]:
raise NotImplementedError

@property
def covered_names(self) -> List[str]:
raise NotImplementedError

def domains(self) -> List[Domain]:
# TODO improve query
return list({
d.name: d for d in [rr.rrset.domain for rr in self.rrs.all()]
}.values())

def save(self, *args, **kwargs):
ret = super().save(*args, **kwargs)
for rr in self.get_rrs():
rr.rrset.save()
rr.save()
self.rrs.add(rr)
return ret

def delete(self, using=None, keep_parents=False):
for rr in self.rrs.all(): # TODO use one query
if len(rr.identities.all()) == 1:
if (len(rr.rrset.records.all())) == 1:
rr.rrset.delete()
else:
rr.delete()
return super().delete(using, keep_parents)

# TODO move to RRset / RRset manager?
def get_or_create_rr_set(self, domain: Domain, subname: str) -> RRset:
try:
return RRset.objects.get(domain=domain, subname=subname, type=self.rr_type)
except RRset.DoesNotExist:
return RRset(domain=domain, subname=subname, type=self.rr_type, ttl=self.default_ttl)

# TODO move to RR / RR manager?
def get_or_create_rr(self, fqdn: str, content: str) -> RR:
domain, subname = self.owner.domains.most_specific_zone(fqdn)
rrset = self.get_or_create_rr_set(domain, subname)
try:
return RR.objects.get(rrset=rrset, content=content)
except RR.DoesNotExist:
return RR(rrset=rrset, content=content)


class TLSIdentity(Identity):
rr_type = 'TLSA'

class CertificateUsage(models.IntegerChoices):
CA_CONSTRAINT = 0
SERVICE_CERTIFICATE_CONSTRAINT = 1
TRUST_ANCHOR_ASSERTION = 2
DOMAIN_ISSUED_CERTIFICATE = 3

class Selector(models.IntegerChoices):
FULL_CERTIFICATE = 0
SUBJECT_PUBLIC_KEY_INFO = 1

class MatchingType(models.IntegerChoices):
NO_HASH_USED = 0
SHA256 = 1
SHA512 = 2

class Protocol(models.TextChoices):
TCP = 'tcp'
UDP = 'udp'
SCTP = 'sctp'

certificate = models.TextField()

tlsa_selector = models.IntegerField(choices=Selector.choices, default=Selector.SUBJECT_PUBLIC_KEY_INFO)
tlsa_matching_type = models.IntegerField(choices=MatchingType.choices, default=MatchingType.SHA256)
tlsa_certificate_usage = models.IntegerField(choices=CertificateUsage.choices,
default=CertificateUsage.DOMAIN_ISSUED_CERTIFICATE)

port = models.IntegerField(default=443)
protocol = models.TextField(choices=Protocol.choices, default=Protocol.TCP)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if 'not_valid_after' not in kwargs:
self.scheduled_removal = self.not_valid_after # TODO check timezone

def get_record_content(self) -> str:
# choose hash function
if self.tlsa_matching_type == self.MatchingType.SHA256:
hash_function = hazmat.primitives.hashes.SHA256()
elif self.tlsa_matching_type == self.MatchingType.SHA512:
hash_function = hazmat.primitives.hashes.SHA512()
else:
raise NotImplementedError

# choose data to hash
if self.tlsa_selector == self.Selector.SUBJECT_PUBLIC_KEY_INFO:
to_be_hashed = self._cert.public_key().public_bytes(
hazmat.primitives.serialization.Encoding.DER,
hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo
)
else:
raise NotImplementedError

# compute the hash
h = hazmat.primitives.hashes.Hash(hash_function)
h.update(to_be_hashed)
hash = h.finalize().hex()

# create TLSA record content
return f"{self.tlsa_certificate_usage} {self.tlsa_selector} {self.tlsa_matching_type} {hash}"

@property
def _cert(self) -> x509.Certificate:
return x509.load_pem_x509_certificate(self.certificate.encode())

@property
def fingerprint(self) -> str:
return self._cert.fingerprint(hazmat.primitives.hashes.SHA256()).hex()

@property
def subject_names(self) -> Set[str]:
subject_names = {
x.value for x in
self._cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
}

try:
subject_alternative_names = {
x for x in
self._cert.extensions.get_extension_for_oid(
x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME).value.get_values_for_type(x509.DNSName)
}
except x509.extensions.ExtensionNotFound:
subject_alternative_names = set()

return subject_names | subject_alternative_names

@property
def subject_names_clean(self) -> Set[str]:
clean = set()
for name in self.subject_names:
# cut off any wildcard prefix
name = name.lstrip('*').lstrip('.') # TODO publish wildcard TLSA record?

# filter names for valid domain names
try:
validate_domain_name[1](name)
except ValidationError:
continue

clean.add(name)
return clean

def get_rrs(self) -> List[RR]:
rrs = []
content = self.get_record_content()
for qname in self.subject_names_clean:
try:
rrs.append(self.get_or_create_rr(
fqdn=f"_{self.port:n}._{self.protocol}.{qname}",
content=content,
))
except Domain.DoesNotExist:
pass
return rrs

@property
def not_valid_before(self):
return self._cert.not_valid_before

@property
def not_valid_after(self):
return self._cert.not_valid_after

@property
def covered_names(self) -> Set[str]:
return {rr.rrset.name.split('.', 2)[-1].removesuffix('.') for rr in self.rrs.all()}
37 changes: 37 additions & 0 deletions api/desecapi/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,3 +840,40 @@ class AuthenticatedRenewDomainBasicUserActionSerializer(AuthenticatedDomainBasic

class Meta(AuthenticatedDomainBasicUserActionSerializer.Meta):
model = models.AuthenticatedRenewDomainBasicUserAction


class TLSIdentitySerializer(serializers.ModelSerializer):
published_at = serializers.SerializerMethodField(read_only=True)
domains = serializers.SlugRelatedField(
many=True,
read_only=True,
slug_field='name'
)

def get_published_at(self, tls_identity: models.TLSIdentity):
return [
f"{rrset.type}/{rrset.name}"
for rrset in {rr.rrset for rr in tls_identity.rrs.all()} # TODO improve query
]

class Meta:
model = models.TLSIdentity
fields = (
# Identity fields
'id', 'name', 'created',

'default_ttl',
'scheduled_removal',
'published_at',
'domains',
'covered_names',

# TLSAIdentity fields
'certificate',
'tlsa_selector', 'tlsa_matching_type', 'tlsa_certificate_usage',

'port', 'protocol',

'fingerprint', 'not_valid_before', 'not_valid_after', 'subject_names',
)
read_only_fields = list(filter(lambda f: f not in ('name', 'certificate'), fields)) # TODO add tlsa_*, port, proto
Loading