Skip to content

Commit

Permalink
Option to overwrite subject distinguised name from CSR
Browse files Browse the repository at this point in the history
  • Loading branch information
paulschwarzenberger committed May 17, 2024
1 parent 5ef06aa commit 7cd4097
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 14 deletions.
13 changes: 5 additions & 8 deletions modules/terraform-aws-ca-lambda/lambda_code/tls_cert/tls_cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ def sign_tls_certificate(csr, ca_name, csr_info_1, csr_info_2):
issuing_ca_kms_key_id = kms_get_kms_key_id(ca_name)

# collect Certificate Request info
common_name = csr_info_1["commonName"]
lifetime = csr_info_2["lifetime"]
purposes = csr_info_2["purposes"]
sans = csr_info_2["sans"]
cert_request_info = crypto_cert_request_info(csr, common_name, lifetime, purposes, sans)
cert_request_info = crypto_cert_request_info(csr, csr_info_1, csr_info_2)

# sign certificate
return ca_kms_sign_tls_certificate_request(
Expand Down Expand Up @@ -125,8 +121,8 @@ def create_csr_info_1(common_name, locality=None, organization=None, organizatio
}


def create_csr_info_2(lifetime, email_address=None, purposes=None, sans=None):
return {"lifetime": lifetime, "emailAddress": email_address, "sans": sans, "purposes": purposes}
def create_csr_info_2(lifetime, email_address=None, purposes=None, sans=None, state=None):
return {"lifetime": lifetime, "emailAddress": email_address, "sans": sans, "purposes": purposes, "state": state}


def get_csr_info(event):
Expand All @@ -139,9 +135,10 @@ def get_csr_info(event):
organizational_unit = event.get("organizational_unit") # string, organizational unit name
purposes = event.get("purposes") # list of strings, e.g. ["client_auth", "server_auth"]
sans = event.get("sans") # list of strings, DNS Subject Alternative Names
state = event.get("state") # string, state or province

return create_csr_info_1(common_name, locality, organization, organizational_unit, country), create_csr_info_2(
int(lifetime), email_address, purposes, sans
int(lifetime), email_address, purposes, sans, state
)


Expand Down
83 changes: 79 additions & 4 deletions modules/terraform-aws-ca-lambda/utils/certs/ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,79 @@ def ca_construct_subject_name(ca_info, ca_hierarchy_type="root"):
return x509.Name(attributes)


def tls_cert_construct_subject_name(csr_cert, cert_request_info): # pylint:disable=too-many-branches
"""Constructs subject name for end entity certificate"""
# subject values from CSR
common_name = csr_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value

# get country attribute if present
if csr_cert.subject.get_attributes_for_oid(NameOID.COUNTRY_NAME):
country = csr_cert.subject.get_attributes_for_oid(NameOID.COUNTRY_NAME)[0].value
else:
country = None

# get email attribute if present
if csr_cert.subject.get_attributes_for_oid(NameOID.EMAIL_ADDRESS):
email_address = csr_cert.subject.get_attributes_for_oid(NameOID.EMAIL_ADDRESS)[0].value
else:
email_address = None

# get state attribute if present
if csr_cert.subject.get_attributes_for_oid(NameOID.STATE_OR_PROVINCE_NAME):
state = csr_cert.subject.get_attributes_for_oid(NameOID.STATE_OR_PROVINCE_NAME)[0].value
else:
state = None

# get locality attribute if present
if csr_cert.subject.get_attributes_for_oid(NameOID.LOCALITY_NAME):
locality = csr_cert.subject.get_attributes_for_oid(NameOID.LOCALITY_NAME)[0].value
else:
locality = None

# get organization attribute if present
if csr_cert.subject.get_attributes_for_oid(NameOID.ORGANIZATION_NAME):
organization = csr_cert.subject.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)[0].value
else:
organization = None

# get organizational unit attribute if present
if csr_cert.subject.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME):
organizational_unit = csr_cert.subject.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME)[0].value
else:
organizational_unit = None

# overwrite subject values from CSR with cert_request_info values if present
common_name = cert_request_info.get("CommonName") or common_name
country = cert_request_info.get("Country") or country
email_address = cert_request_info.get("EmailAddress") or email_address
state = cert_request_info.get("State") or state
locality = cert_request_info.get("Locality") or locality
organization = cert_request_info.get("Organization") or organization
organizational_unit = cert_request_info.get("OrganizationalUnit") or organizational_unit

attributes = [x509.NameAttribute(NameOID.COMMON_NAME, common_name)]

if country:
attributes.append(x509.NameAttribute(NameOID.COUNTRY_NAME, country))

if email_address:
attributes.append(x509.NameAttribute(NameOID.EMAIL_ADDRESS, email_address))

if locality:
attributes.append(x509.NameAttribute(NameOID.LOCALITY_NAME, locality))

if organization:
attributes.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization))

if organizational_unit:
attributes.append(x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, organizational_unit))

if state:
attributes.append(x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state))

return x509.Name(attributes)


def ca_kms_sign_ca_certificate_request(
csr_cert, ca_cert, kms_key_id, kms_signing_algorithm="RSASSA_PKCS1_V1_5_SHA_256"
):
Expand Down Expand Up @@ -138,7 +211,10 @@ def ca_kms_sign_ca_certificate_request(
return cert.public_bytes(serialization.Encoding.PEM)


def ca_build_cert(csr_cert, ca_cert, lifetime, delta, purposes):
def ca_build_cert(csr_cert, ca_cert, lifetime, delta, cert_request_info):
purposes = cert_request_info["Purposes"]

x509_subject = tls_cert_construct_subject_name(csr_cert, cert_request_info)

extended_key_usage_oids = []
for purpose in purposes:
Expand All @@ -149,7 +225,7 @@ def ca_build_cert(csr_cert, ca_cert, lifetime, delta, purposes):

return (
x509.CertificateBuilder()
.subject_name(csr_cert.subject)
.subject_name(x509_subject)
.issuer_name(ca_cert.subject)
.public_key(csr_cert.public_key())
.serial_number(x509.random_serial_number())
Expand Down Expand Up @@ -187,14 +263,13 @@ def ca_kms_sign_tls_certificate_request(
csr_cert = cert_request_info["CsrCert"]
x509_dns_names = cert_request_info["x509Sans"]
lifetime = cert_request_info["Lifetime"]
purposes = cert_request_info["Purposes"]

# reduce lifetime to maximum allowed if needed
lifetime = min(lifetime, max_cert_lifetime)

delta = timedelta(minutes=5) # time delta to avoid clock skew issues

cert = ca_build_cert(csr_cert, ca_cert, lifetime, delta, purposes)
cert = ca_build_cert(csr_cert, ca_cert, lifetime, delta, cert_request_info)

if len(x509_dns_names) > 0:
cert = cert.add_extension(
Expand Down
19 changes: 17 additions & 2 deletions modules/terraform-aws-ca-lambda/utils/certs/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,16 @@ def crypto_ca_key_info(public_key, kms_key_id, common_name):
}


def crypto_cert_request_info(csr_cert, common_name, lifetime, purposes, sans):
def crypto_cert_request_info(csr_cert, csr_info_1, csr_info_2):
"""Creates a dictionary with the information needed to sign a certificate"""
# get common name from csr_info_1
common_name = csr_info_1["commonName"]

# get values from csr_info_2
purposes = csr_info_2.get("purposes")
lifetime = csr_info_2.get("lifetime")
sans = csr_info_2.get("sans")

# if no purposes are specified, default to both client auth
if purposes is None:
purposes = ["client_auth"]
Expand Down Expand Up @@ -56,10 +64,17 @@ def crypto_cert_request_info(csr_cert, common_name, lifetime, purposes, sans):
x509_sans.append(x509.DNSName(san))

return {
"CommonName": common_name,
"Country": csr_info_1["country"],
"CsrCert": csr_cert,
"x509Sans": x509_sans,
"EmailAddress": csr_info_2["emailAddress"],
"Lifetime": lifetime,
"Locality": csr_info_1["locality"],
"Organization": csr_info_1["organization"],
"OrganizationalUnit": csr_info_1["organizationalUnit"],
"Purposes": purposes,
"State": csr_info_2["state"],
"x509Sans": x509_sans,
}


Expand Down

0 comments on commit 7cd4097

Please sign in to comment.