diff --git a/modules/terraform-aws-ca-lambda/lambda_code/tls_cert/tls_cert.py b/modules/terraform-aws-ca-lambda/lambda_code/tls_cert/tls_cert.py index f715ba56..93b96984 100644 --- a/modules/terraform-aws-ca-lambda/lambda_code/tls_cert/tls_cert.py +++ b/modules/terraform-aws-ca-lambda/lambda_code/tls_cert/tls_cert.py @@ -27,11 +27,7 @@ def sign_tls_certificate(csr, ca_name, csr_info_1, csr_info_2): issuing_ca_kms_key_id = kms_get_kms_key_id(ca_name) # collect Certificate Request info - common_name = csr_info_1["commonName"] - lifetime = csr_info_2["lifetime"] - purposes = csr_info_2["purposes"] - sans = csr_info_2["sans"] - cert_request_info = crypto_cert_request_info(csr, common_name, lifetime, purposes, sans) + cert_request_info = crypto_cert_request_info(csr, csr_info_1, csr_info_2) # sign certificate return ca_kms_sign_tls_certificate_request( @@ -125,8 +121,8 @@ def create_csr_info_1(common_name, locality=None, organization=None, organizatio } -def create_csr_info_2(lifetime, email_address=None, purposes=None, sans=None): - return {"lifetime": lifetime, "emailAddress": email_address, "sans": sans, "purposes": purposes} +def create_csr_info_2(lifetime, email_address=None, purposes=None, sans=None, state=None): + return {"lifetime": lifetime, "emailAddress": email_address, "sans": sans, "purposes": purposes, "state": state} def get_csr_info(event): @@ -139,9 +135,10 @@ def get_csr_info(event): organizational_unit = event.get("organizational_unit") # string, organizational unit name purposes = event.get("purposes") # list of strings, e.g. ["client_auth", "server_auth"] sans = event.get("sans") # list of strings, DNS Subject Alternative Names + state = event.get("state") # string, state or province return create_csr_info_1(common_name, locality, organization, organizational_unit, country), create_csr_info_2( - int(lifetime), email_address, purposes, sans + int(lifetime), email_address, purposes, sans, state ) diff --git a/modules/terraform-aws-ca-lambda/utils/certs/ca.py b/modules/terraform-aws-ca-lambda/utils/certs/ca.py index c9ffa9ed..2f2ff757 100644 --- a/modules/terraform-aws-ca-lambda/utils/certs/ca.py +++ b/modules/terraform-aws-ca-lambda/utils/certs/ca.py @@ -65,6 +65,79 @@ def ca_construct_subject_name(ca_info, ca_hierarchy_type="root"): return x509.Name(attributes) +def tls_cert_construct_subject_name(csr_cert, cert_request_info): # pylint:disable=too-many-branches + """Constructs subject name for end entity certificate""" + # subject values from CSR + common_name = csr_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value + + # get country attribute if present + if csr_cert.subject.get_attributes_for_oid(NameOID.COUNTRY_NAME): + country = csr_cert.subject.get_attributes_for_oid(NameOID.COUNTRY_NAME)[0].value + else: + country = None + + # get email attribute if present + if csr_cert.subject.get_attributes_for_oid(NameOID.EMAIL_ADDRESS): + email_address = csr_cert.subject.get_attributes_for_oid(NameOID.EMAIL_ADDRESS)[0].value + else: + email_address = None + + # get state attribute if present + if csr_cert.subject.get_attributes_for_oid(NameOID.STATE_OR_PROVINCE_NAME): + state = csr_cert.subject.get_attributes_for_oid(NameOID.STATE_OR_PROVINCE_NAME)[0].value + else: + state = None + + # get locality attribute if present + if csr_cert.subject.get_attributes_for_oid(NameOID.LOCALITY_NAME): + locality = csr_cert.subject.get_attributes_for_oid(NameOID.LOCALITY_NAME)[0].value + else: + locality = None + + # get organization attribute if present + if csr_cert.subject.get_attributes_for_oid(NameOID.ORGANIZATION_NAME): + organization = csr_cert.subject.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)[0].value + else: + organization = None + + # get organizational unit attribute if present + if csr_cert.subject.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME): + organizational_unit = csr_cert.subject.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME)[0].value + else: + organizational_unit = None + + # overwrite subject values from CSR with cert_request_info values if present + common_name = cert_request_info.get("CommonName") or common_name + country = cert_request_info.get("Country") or country + email_address = cert_request_info.get("EmailAddress") or email_address + state = cert_request_info.get("State") or state + locality = cert_request_info.get("Locality") or locality + organization = cert_request_info.get("Organization") or organization + organizational_unit = cert_request_info.get("OrganizationalUnit") or organizational_unit + + attributes = [x509.NameAttribute(NameOID.COMMON_NAME, common_name)] + + if country: + attributes.append(x509.NameAttribute(NameOID.COUNTRY_NAME, country)) + + if email_address: + attributes.append(x509.NameAttribute(NameOID.EMAIL_ADDRESS, email_address)) + + if locality: + attributes.append(x509.NameAttribute(NameOID.LOCALITY_NAME, locality)) + + if organization: + attributes.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization)) + + if organizational_unit: + attributes.append(x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, organizational_unit)) + + if state: + attributes.append(x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state)) + + return x509.Name(attributes) + + def ca_kms_sign_ca_certificate_request( csr_cert, ca_cert, kms_key_id, kms_signing_algorithm="RSASSA_PKCS1_V1_5_SHA_256" ): @@ -138,7 +211,10 @@ def ca_kms_sign_ca_certificate_request( return cert.public_bytes(serialization.Encoding.PEM) -def ca_build_cert(csr_cert, ca_cert, lifetime, delta, purposes): +def ca_build_cert(csr_cert, ca_cert, lifetime, delta, cert_request_info): + purposes = cert_request_info["Purposes"] + + x509_subject = tls_cert_construct_subject_name(csr_cert, cert_request_info) extended_key_usage_oids = [] for purpose in purposes: @@ -149,7 +225,7 @@ def ca_build_cert(csr_cert, ca_cert, lifetime, delta, purposes): return ( x509.CertificateBuilder() - .subject_name(csr_cert.subject) + .subject_name(x509_subject) .issuer_name(ca_cert.subject) .public_key(csr_cert.public_key()) .serial_number(x509.random_serial_number()) @@ -187,14 +263,13 @@ def ca_kms_sign_tls_certificate_request( csr_cert = cert_request_info["CsrCert"] x509_dns_names = cert_request_info["x509Sans"] lifetime = cert_request_info["Lifetime"] - purposes = cert_request_info["Purposes"] # reduce lifetime to maximum allowed if needed lifetime = min(lifetime, max_cert_lifetime) delta = timedelta(minutes=5) # time delta to avoid clock skew issues - cert = ca_build_cert(csr_cert, ca_cert, lifetime, delta, purposes) + cert = ca_build_cert(csr_cert, ca_cert, lifetime, delta, cert_request_info) if len(x509_dns_names) > 0: cert = cert.add_extension( diff --git a/modules/terraform-aws-ca-lambda/utils/certs/crypto.py b/modules/terraform-aws-ca-lambda/utils/certs/crypto.py index 693442e2..43b6d528 100644 --- a/modules/terraform-aws-ca-lambda/utils/certs/crypto.py +++ b/modules/terraform-aws-ca-lambda/utils/certs/crypto.py @@ -26,8 +26,16 @@ def crypto_ca_key_info(public_key, kms_key_id, common_name): } -def crypto_cert_request_info(csr_cert, common_name, lifetime, purposes, sans): +def crypto_cert_request_info(csr_cert, csr_info_1, csr_info_2): """Creates a dictionary with the information needed to sign a certificate""" + # get common name from csr_info_1 + common_name = csr_info_1["commonName"] + + # get values from csr_info_2 + purposes = csr_info_2.get("purposes") + lifetime = csr_info_2.get("lifetime") + sans = csr_info_2.get("sans") + # if no purposes are specified, default to both client auth if purposes is None: purposes = ["client_auth"] @@ -56,10 +64,17 @@ def crypto_cert_request_info(csr_cert, common_name, lifetime, purposes, sans): x509_sans.append(x509.DNSName(san)) return { + "CommonName": common_name, + "Country": csr_info_1["country"], "CsrCert": csr_cert, - "x509Sans": x509_sans, + "EmailAddress": csr_info_2["emailAddress"], "Lifetime": lifetime, + "Locality": csr_info_1["locality"], + "Organization": csr_info_1["organization"], + "OrganizationalUnit": csr_info_1["organizationalUnit"], "Purposes": purposes, + "State": csr_info_2["state"], + "x509Sans": x509_sans, }