Skip to content

Commit

Permalink
internetstandards#881: add TLSRPT parser + unittests, check TLSRPT po…
Browse files Browse the repository at this point in the history
…licy records with parser
  • Loading branch information
uwekamper committed Feb 25, 2024
1 parent 3e1258c commit 96f78fe
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 3 deletions.
14 changes: 11 additions & 3 deletions checks/tasks/mail.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from checks.tasks.dispatcher import check_registry, post_callback_hook
from checks.tasks.dmarc_parser import parse as dmarc_parse
from checks.tasks.spf_parser import parse as spf_parse
from checks.tasks import tlsrpt_parsing
from interface import batch, batch_shared_task, redis_id
from internetnl import log

Expand Down Expand Up @@ -237,8 +238,9 @@ def callback(results, addr, category):
mtauth.tlsrpt_available = tlsrpt_available
mtauth.tlsrpt_record = tlsrpt_record
mtauth.tlsrpt_score = tlsrpt_score
#if spf_available:
# subtests["tlsprt"].result_good(tlsrpt_record)
log.debug(f"subtests: {subtests.keys()}")
if spf_available:
subtests["tlsprt"].result_good(tlsrpt_record)


if skip_dkim_for_non_sending_domain(mtauth):
Expand Down Expand Up @@ -861,8 +863,14 @@ def tlsrpt_callback(data, status, r):
score = scoring.MAIL_AUTH_TLSRPT_FAIL
for d in r.data.data:
txt = as_txt(d)
log.debug(f"tlsrpt: found record '{txt.lower()}'")
if txt.lower().startswith("v=tlsrptv1"):
record.append(txt)
if tlsrpt_parsing.parse_silent(txt) is None:
# A parsing error has occured
available = False
score = scoring.MAIL_AUTH_TLSRPT_FAIL
break
if available:
# We see more than one TLSRPT record. Fail the test.
available = False
Expand Down Expand Up @@ -898,7 +906,7 @@ def do_tlsrpt(self, url, *args, **kwargs):
score = cb_data["score"]
record = cb_data["record"]

#if len(record) == 1:
if len(record) == 1:
# policy_status, policy_score, _ = spf_check_policy(url, record[0], self, policy_records=policy_records)

result = dict(
Expand Down
64 changes: 64 additions & 0 deletions checks/tasks/tlsrpt_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright: 2022-2024, ECP, NLnet Labs, the Internet.nl contributors and SYS4 AG.
# SPDX-License-Identifier: Apache-2.0

'''
SMTP TLS Reporting policy parser as defined by:
RFC 8460, Section "3. Reporting Policy", see:
https://datatracker.ietf.org/doc/html/rfc8460#section-3
'''

from pyparsing import (
Literal,
CaselessLiteral,
Combine,
ParseException,
Regex,
White,
Word,
ZeroOrMore,
alphanums,
pyparsing_common,
delimitedList,
)


WSP = White(ws=' ', exact=1).suppress() # Whitespace

field_delim = ZeroOrMore(WSP) + Literal(';') + ZeroOrMore(WSP) # Fields are semicolon-delimited
ura_delim = ZeroOrMore(WSP) + Literal(',') + ZeroOrMore(WSP) # multiple RUAs are comma-delimited

tlsrpt_ext_name = Word(alphanums, alphanums+"_-.", max=32)
tlsrpt_ext_value = Word(alphanums, alphanums+"_-.")
tlsrpt_extension = ZeroOrMore(tlsrpt_ext_name + Literal('=') + tlsrpt_ext_value)

# RegEx for parsing email.
regex_tld = r"(?:[a-zA-Z]{2,63}|xn--[a-zA-Z0-9]+)"
regex_mailaddr = (
r"(?P<mailaddr>([a-zA-Z0-9]{0,61}@)?([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+" r"" + regex_tld + ")"
)
mail_uri = Combine(CaselessLiteral("mailto:") + Regex(regex_mailaddr))
tlsrpt_rua = Literal("rua=") +\
delimitedList(mail_uri | pyparsing_common.url, delim=',').setResultsName('tlsrpt_uri')

tlsrpt_field = tlsrpt_rua + ZeroOrMore(field_delim + tlsrpt_extension)

# Literal will match the version string as required by the ABNF in the RFC:
# tlsrpt-version = %s"v=TLSRPTv1"
version = Literal("v=TLSRPTv1").setResultsName("tlsrpt_version")

record = version + field_delim + tlsrpt_field


def parse_silent(tlsrpt_record):
"""
Will return None if there was a parsing error and a ParseResult object otherwise.
"""
try:
parsed = record.parseString(tlsrpt_record)
except ParseException:
parsed = None
except Exception as e:
print(f"{e.__class__.__name__}: {e}")
parsed = None
return parsed
40 changes: 40 additions & 0 deletions checks/test/test_tlsrpt_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from checks.tasks import tlsrpt_parsing


def test_record_parse_simple_mailto():
TXT_RECORD="v=TLSRPTv1; rua=mailto:[email protected]"
parsed = tlsrpt_parsing.record.parseString(TXT_RECORD)
assert parsed.tlsrpt_version == 'v=TLSRPTv1'
assert parsed.tlsrpt_uri[0] == 'mailto:[email protected]'


def test_record_parse_multiple_mailto():
TXT_RECORD="v=TLSRPTv1;rua=mailto:[email protected],mailto:[email protected]"
parsed = tlsrpt_parsing.record.parseString(TXT_RECORD)
assert parsed.tlsrpt_version == 'v=TLSRPTv1'
assert parsed.tlsrpt_uri[0] == 'mailto:[email protected]'
assert parsed.tlsrpt_uri[1] == 'mailto:[email protected]'


def test_record_parse_simple_https():
TXT_RECORD = "v=TLSRPTv1; rua=https://reporting.example.com/v1/tlsrpt"
parsed = tlsrpt_parsing.record.parseString(TXT_RECORD)
assert parsed.tlsrpt_version == 'v=TLSRPTv1'
assert parsed.tlsrpt_uri[0] == 'https://reporting.example.com/v1/tlsrpt'


def test_record_parse_with_extension():
TXT_RECORD = "v=TLSRPTv1; rua=https://reporting.example.com/v1/tlsrpt; ext=extvalue"
parsed = tlsrpt_parsing.record.parseString(TXT_RECORD)
assert parsed.tlsrpt_version == 'v=TLSRPTv1'
#assert parsed.tlsrpt_uri[0] == 'https://reporting.example.com/v1/tlsrpt'


def test_parse_silent():
"""
Check that parse_silent does not throw a ParseException but instead returns
None if the TLSRPT policy record is malformed.
"""
TXT_RECORD = "v=TLSRPTv1; rua=!!" # broken TLSRPT
parsed = tlsrpt_parsing.parse_silent(TXT_RECORD)
assert parsed is None

0 comments on commit 96f78fe

Please sign in to comment.