Skip to content

Commit

Permalink
auth ALIAS: add DNSSEC tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zeha committed Sep 8, 2024
1 parent 312119e commit 6ae28d7
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 12 deletions.
80 changes: 80 additions & 0 deletions regression-tests.auth-py/authtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,68 @@ def generateAuthConfig(cls, confdir):
except subprocess.CalledProcessError as e:
raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))

@classmethod
def listKeys(cls, confdir: str, zonename: str):
zone = '.' if zonename == 'ROOT' else zonename
pdnsutilCmd = [os.environ['PDNSUTIL'],
'--config-dir=%s' % confdir,
'list-keys',
zone]

print(' '.join(pdnsutilCmd))
try:
lines = subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))

keys = []
found_header = False
for line in lines.splitlines():
if not found_header:
if line.startswith(b"----"):
found_header = True
continue
line = line.decode().split()
if not (len(line) >= 8 and line[1] in ("CSK", "KSK", "ZSK")):
continue
print(line)
keys.append(
{
"zone": line[0],
"type": line[1],
"act": line[2],
"pub": line[3],
"size": int(line[4]),
"algorithm": line[5],
"id": int(line[6]),
"location": line[7],
"keytag": int(line[8]),
})
return keys

@classmethod
def exportZoneDnsKey(cls, confdir: str, zonename: str, keyid: int):
zone = '.' if zonename == 'ROOT' else zonename
pdnsutilCmd = [os.environ['PDNSUTIL'],
'--config-dir=%s' % confdir,
'export-zone-dnskey',
zone,
str(keyid)]

print(' '.join(pdnsutilCmd))
try:
lines = subprocess.check_output(pdnsutilCmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
raise AssertionError('%s failed (%d): %s' % (pdnsutilCmd, e.returncode, e.output))

for line in lines.splitlines():
line = line.strip().decode().split(maxsplit=3)
print(line)
if line[2] == "DNSKEY":
_, rdclass, rdtype, key = line
return dns.rrset.from_text(dns.name.from_text(zone), 3600, rdclass, rdtype, key)
return None

@classmethod
def secureZone(cls, confdir, zonename, key=None):
zone = '.' if zonename == 'ROOT' else zonename
Expand Down Expand Up @@ -168,13 +230,18 @@ def secureZone(cls, confdir, zonename, key=None):
def generateAllAuthConfig(cls, confdir):
cls.generateAuthConfig(confdir)
cls.generateAuthNamedConf(confdir, cls._zones.keys())
cls._zone_dnskeys = {}

for zonename, zonecontent in cls._zones.items():
cls.generateAuthZone(confdir,
zonename,
zonecontent)
if cls._zone_keys.get(zonename, None):
cls.secureZone(confdir, zonename, cls._zone_keys.get(zonename))
zone_keys = cls.listKeys(confdir, zonename)
cls._zone_dnskeys[dns.name.from_text(zonename)] = cls.exportZoneDnsKey(
confdir, zonename, zone_keys[-1]["id"]
)

@classmethod
def waitForTCPSocket(cls, ipaddress, port):
Expand Down Expand Up @@ -590,3 +657,16 @@ def assertAuthorityHasSOA(self, msg):

if not found:
raise AssertionError("No SOA record found in the authority section:\n%s" % msg.to_text())

def assertSigned(self, msg: dns.message.Message, name: str | dns.name.Name, rdatatype: str | int):
if not isinstance(msg, dns.message.Message):
raise TypeError("msg is not a dns.message.Message but a %s" % type(msg))

name = dns.name.from_text(name)

if not isinstance(rdatatype, int):
rdatatype = dns.rdatatype.from_text(rdatatype)

rrset = msg.find_rrset(msg.answer, name, dns.rdataclass.IN, rdatatype)
rrsig = msg.find_rrset(msg.answer, name, dns.rdataclass.IN, dns.rdatatype.RRSIG, covers=rdatatype)
dns.dnssec.validate(rrset, rrsig, self._zone_dnskeys)
1 change: 1 addition & 0 deletions regression-tests.auth-py/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
cryptography
dnspython==2.1.0
pytest
Twisted>0.15.0
Expand Down
34 changes: 22 additions & 12 deletions regression-tests.auth-py/test_ALIAS.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,37 @@ def startResponders(cls):
cls._ALIASResponder.start()

def testNoError(self):
expected_a = [dns.rrset.from_text('noerror.example.org.',
name = 'noerror.example.org.'
expected_a = [dns.rrset.from_text(name,
0, dns.rdataclass.IN, 'A',
'192.0.2.1')]
expected_aaaa = [dns.rrset.from_text('noerror.example.org.',
expected_aaaa = [dns.rrset.from_text(name,
0, dns.rdataclass.IN, 'AAAA',
'2001:DB8::1')]

query = dns.message.make_query('noerror.example.org', 'A')
query = dns.message.make_query(name, 'A', want_dnssec=True)
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertAnyRRsetInAnswer(res, expected_a)
self.assertEqual(len(res.options), 0) # this checks that we don't invent ECS on non-ECS queries
self.assertSigned(res, name, dns.rdatatype.A)

query = dns.message.make_query('noerror.example.org', 'AAAA')
query = dns.message.make_query(name, 'AAAA', want_dnssec=True)
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertAnyRRsetInAnswer(res, expected_aaaa)
self.assertSigned(res, name, dns.rdatatype.AAAA)

query = dns.message.make_query('noerror.example.org', 'ANY')
query = dns.message.make_query(name, 'ANY', want_dnssec=True)
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertAnyRRsetInAnswer(res, expected_a)
self.assertAnyRRsetInAnswer(res, expected_aaaa)
self.assertSigned(res, name, dns.rdatatype.A)
self.assertSigned(res, name, dns.rdatatype.AAAA)

# NODATA
query = dns.message.make_query('noerror.example.org', 'MX')
query = dns.message.make_query(name, 'MX')
res = self.sendUDPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertEqual(len(res.answer), 0)
Expand Down Expand Up @@ -121,31 +126,36 @@ def testServFail(self):
self.assertRcodeEqual(res, dns.rcode.SERVFAIL)

def testNoErrorTCP(self):
expected_a = [dns.rrset.from_text('noerror.example.org.',
name = 'noerror.example.org.'
expected_a = [dns.rrset.from_text(name,
0, dns.rdataclass.IN, 'A',
'192.0.2.1')]
expected_aaaa = [dns.rrset.from_text('noerror.example.org.',
expected_aaaa = [dns.rrset.from_text(name,
0, dns.rdataclass.IN, 'AAAA',
'2001:DB8::1')]

query = dns.message.make_query('noerror.example.org', 'A')
query = dns.message.make_query(name, 'A', want_dnssec=True)
res = self.sendTCPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertAnyRRsetInAnswer(res, expected_a)
self.assertSigned(res, name, dns.rdatatype.A)

query = dns.message.make_query('noerror.example.org', 'AAAA')
query = dns.message.make_query(name, 'AAAA', want_dnssec=True)
res = self.sendTCPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertAnyRRsetInAnswer(res, expected_aaaa)
self.assertSigned(res, name, dns.rdatatype.AAAA)

query = dns.message.make_query('noerror.example.org', 'ANY')
query = dns.message.make_query(name, 'ANY', want_dnssec=True)
res = self.sendTCPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertAnyRRsetInAnswer(res, expected_a)
self.assertAnyRRsetInAnswer(res, expected_aaaa)
self.assertSigned(res, name, dns.rdatatype.A)
self.assertSigned(res, name, dns.rdatatype.AAAA)

# NODATA
query = dns.message.make_query('noerror.example.org', 'MX')
query = dns.message.make_query(name, 'MX', want_dnssec=True)
res = self.sendTCPQuery(query)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertEqual(len(res.answer), 0)
Expand Down
3 changes: 3 additions & 0 deletions regression-tests.auth-py/test_GSSTSIG.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class GSSTSIGBase(AuthTest):
'KRB5_KTNAME' : './kerberos-client/kt.keytab'
}

# zones will be created by our own setUpClass() code
_zones = {}

@classmethod
def setUpClass(cls):
super(GSSTSIGBase, cls).setUpClass()
Expand Down

0 comments on commit 6ae28d7

Please sign in to comment.