Skip to content

Commit

Permalink
feat/remove global variables (#187)
Browse files Browse the repository at this point in the history
* refactor: remove reliance on  global `(issuing|root)_crl_days` and `(issuing|root)_crl_seconds` variables

* chore: align lambda requirement.txt's with rest of repository

* refactor: remove reliance on  global library references to PROJECT and ENVIRONMENT_NAME env variables

* refactor: remove reliance on global internal_s3_bucket_name and external_s3_bucket_name variables

* refactor: remove reliance on global max_cert_lifetime variable

* refactor: remove reliance on global domain variable

* refactor: remove reliance on global root_ca_info, issuing_ca_info and public_crl global variables

* chore: remove redundant methods and imports

* test: dump step function response to output on failure

* test: output execution details not just status
  • Loading branch information
patdowney authored Jul 17, 2024
1 parent 55927f1 commit 11c20be
Show file tree
Hide file tree
Showing 16 changed files with 328 additions and 265 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import base64
import json
import os
from utils.certs.kms import kms_get_kms_key_id, kms_describe_key
from utils.certs.crypto import (
crypto_kms_ca_cert_signing_request,
Expand All @@ -14,18 +16,31 @@
lifetime = 3650


def lambda_handler(event, context): # pylint:disable=unused-argument
root_ca_name = ca_name("root")
ca_slug = ca_name("issuing")
def lambda_handler(event, context): # pylint:disable=unused-argument,too-many-locals
project = os.environ["PROJECT"]
env_name = os.environ["ENVIRONMENT_NAME"]
external_s3_bucket_name = os.environ["EXTERNAL_S3_BUCKET"]
internal_s3_bucket_name = os.environ["INTERNAL_S3_BUCKET"]
domain = os.environ.get("DOMAIN")

public_crl = os.environ.get("PUBLIC_CRL")
enable_public_crl = False
if public_crl == "enabled":
enable_public_crl = True

issuing_ca_info = json.loads(os.environ["ISSUING_CA_INFO"])

root_ca_name = ca_name(project, env_name, "root")
ca_slug = ca_name(project, env_name, "issuing")

# check Root CA exists
if not db_list_certificates(root_ca_name):
if not db_list_certificates(project, env_name, root_ca_name):
print(f"CA {root_ca_name} not found")

return

# check if Issuing CA already exists
if db_list_certificates(ca_slug):
if db_list_certificates(project, env_name, ca_slug):
print(f"CA {ca_slug} already exists. To recreate, first delete item in DynamoDB")

return
Expand All @@ -45,14 +60,22 @@ def lambda_handler(event, context): # pylint:disable=unused-argument
)

# get Root CA cert in PEM format
root_ca_cert_pem = base64.b64decode(db_list_certificates(root_ca_name)[0]["Certificate"]["B"])
root_ca_cert_pem = base64.b64decode(db_list_certificates(project, env_name, root_ca_name)[0]["Certificate"]["B"])

# deserialize Root CA cert
root_ca_cert = load_pem_x509_certificate(root_ca_cert_pem)

# sign certificate
pem_certificate = ca_kms_sign_ca_certificate_request(
csr, root_ca_cert, root_ca_kms_key_id, kms_describe_key(root_ca_kms_key_id)["SigningAlgorithms"][0]
project,
env_name,
domain,
csr,
root_ca_cert,
root_ca_kms_key_id,
enable_public_crl,
issuing_ca_info,
kms_describe_key(root_ca_kms_key_id)["SigningAlgorithms"][0],
)
base64_certificate = base64.b64encode(pem_certificate)

Expand All @@ -61,13 +84,15 @@ def lambda_handler(event, context): # pylint:disable=unused-argument
info = crypto_cert_info(cert, ca_slug)

# create entry in DynamoDB
db_ca_cert_issued(info, base64_certificate)
db_ca_cert_issued(project, env_name, info, base64_certificate)

# create CA bundle
cert_bundle_pem = crypto_create_ca_bundle([root_ca_cert_pem, pem_certificate])

# upload certificate and CA bundle to S3
s3_upload(pem_certificate, f"{ca_slug}.crt")
s3_upload(cert_bundle_pem, f"{ca_bundle_name()}.pem")
s3_upload(external_s3_bucket_name, internal_s3_bucket_name, pem_certificate, f"{ca_slug}.crt")
s3_upload(
external_s3_bucket_name, internal_s3_bucket_name, cert_bundle_pem, f"{ca_bundle_name(project, env_name)}.pem"
)

return
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cryptography == 42.0.4
validators == 0.22.0
cryptography == 42.0.8
validators == 0.33.0
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import base64
import json
import os
from utils.certs.kms import kms_get_kms_key_id, kms_get_public_key, kms_describe_key
from utils.certs.crypto import crypto_cert_info
from utils.certs.ca import ca_name, ca_create_kms_root_ca
Expand All @@ -12,10 +14,16 @@


def lambda_handler(event, context): # pylint:disable=unused-argument
ca_slug = ca_name("root")
project = os.environ["PROJECT"]
env_name = os.environ["ENVIRONMENT_NAME"]
external_s3_bucket_name = os.environ["EXTERNAL_S3_BUCKET"]
internal_s3_bucket_name = os.environ["INTERNAL_S3_BUCKET"]
root_ca_info = json.loads(os.environ["ROOT_CA_INFO"])

ca_slug = ca_name(project, env_name, "root")

# check if CA already exists
if db_list_certificates(ca_slug):
if db_list_certificates(project, env_name, ca_slug):
print(f"CA {ca_slug} already exists. To recreate, first delete item in DynamoDB")

return
Expand All @@ -28,7 +36,7 @@ def lambda_handler(event, context): # pylint:disable=unused-argument
print(f"using {cipher} key pair in KMS for {ca_slug}")

pem_certificate = ca_create_kms_root_ca(
public_key, kms_key_id, kms_describe_key(kms_key_id)["SigningAlgorithms"][0]
public_key, kms_key_id, root_ca_info, kms_describe_key(kms_key_id)["SigningAlgorithms"][0]
)
base64_certificate = base64.b64encode(pem_certificate)

Expand All @@ -37,9 +45,9 @@ def lambda_handler(event, context): # pylint:disable=unused-argument
info = crypto_cert_info(cert, ca_slug)

# create entry in DynamoDB
db_ca_cert_issued(info, base64_certificate)
db_ca_cert_issued(project, env_name, info, base64_certificate)

# upload CRL to S3
s3_upload(pem_certificate, f"{ca_slug}.crt")
s3_upload(external_s3_bucket_name, internal_s3_bucket_name, pem_certificate, f"{ca_slug}.crt")

return
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cryptography == 42.0.4
validators == 0.22.0
cryptography == 42.0.8
validators == 0.33.0
Original file line number Diff line number Diff line change
@@ -1,47 +1,56 @@
from cryptography.hazmat.primitives import serialization
from utils.certs.kms import kms_get_kms_key_id, kms_get_public_key, kms_describe_key
from utils.certs.crypto import crypto_ca_key_info, crypto_revoked_certificate
from utils.certs.ca import ca_name, ca_kms_publish_crl
from utils.certs.db import db_list_certificates, db_update_crl_number, db_revocation_date
from utils.certs.ca import ca_name, ca_kms_publish_crl, ca_get_ca_info
from utils.certs.db import (
db_list_certificates,
db_update_crl_number,
db_revocation_date,
)
from utils.certs.s3 import s3_download, s3_upload
from cryptography.hazmat.primitives.serialization import load_der_public_key
import datetime
import json
import os

issuing_crl_days = int(os.environ["ISSUING_CRL_DAYS"])
issuing_crl_seconds = int(os.environ["ISSUING_CRL_SECONDS"])


def build_list_of_revoked_certs():
def build_list_of_revoked_certs(project, env_name, external_s3_bucket_name, internal_s3_bucket_name):
"""Build list of revoked certificates for CRL"""
# handle certificate revocation not enabled
if not s3_download("revoked.json"):
if not s3_download(external_s3_bucket_name, internal_s3_bucket_name, "revoked.json"):
print("revoked.json not found")
return []

# get list of certificates to be revoked
revocation_file = s3_download("revoked.json")["Body"]
revocation_file = s3_download(external_s3_bucket_name, internal_s3_bucket_name, "revoked.json")["Body"]

revocation_details = json.load(revocation_file)

revoked_certs = []
for revocation_detail in revocation_details:
common_name = revocation_detail["common_name"]
serial_number = revocation_detail["serial_number"]
revocation_date = db_revocation_date(common_name, serial_number)
revocation_date = db_revocation_date(project, env_name, common_name, serial_number)
revoked_cert = crypto_revoked_certificate(serial_number, revocation_date)
revoked_certs.append(revoked_cert)

print(f"CA {ca_name('issuing')} has {len(revoked_certs)} revoked certificates")
print(f"CA {ca_name(project, env_name, 'issuing')} has {len(revoked_certs)} revoked certificates")
return revoked_certs


def lambda_handler(event, context): # pylint:disable=unused-argument
ca_slug = ca_name("issuing")
def lambda_handler(event, context): # pylint:disable=unused-argument,too-many-locals
project = os.environ["PROJECT"]
env_name = os.environ["ENVIRONMENT_NAME"]
external_s3_bucket_name = os.environ["EXTERNAL_S3_BUCKET"]
internal_s3_bucket_name = os.environ["INTERNAL_S3_BUCKET"]

issuing_ca_info = json.loads(os.environ["ISSUING_CA_INFO"])
root_ca_info = json.loads(os.environ["ROOT_CA_INFO"])

ca_slug = ca_name(project, env_name, "issuing")

# check CA exists
if not db_list_certificates(ca_slug):
if not db_list_certificates(project, env_name, ca_slug):
print(f"CA {ca_slug} not found")

return
Expand All @@ -50,18 +59,26 @@ def lambda_handler(event, context): # pylint:disable=unused-argument
kms_key_id = kms_get_kms_key_id(ca_slug)
public_key = load_der_public_key(kms_get_public_key(kms_key_id))

issuing_crl_days = int(os.environ["ISSUING_CRL_DAYS"])
issuing_crl_seconds = int(os.environ["ISSUING_CRL_SECONDS"])

# issue CRL valid for one day 10 minutes
timedelta = datetime.timedelta(issuing_crl_days, issuing_crl_seconds, 0)
ca_key_info = crypto_ca_key_info(public_key, kms_key_id, ca_slug)
ca_info = ca_get_ca_info(issuing_ca_info, root_ca_info)

crl = ca_kms_publish_crl(
ca_info,
ca_key_info,
timedelta,
build_list_of_revoked_certs(),
db_update_crl_number(ca_slug, db_list_certificates(ca_slug)[0]["SerialNumber"]["S"]),
build_list_of_revoked_certs(project, env_name, external_s3_bucket_name, internal_s3_bucket_name),
db_update_crl_number(
project, env_name, ca_slug, db_list_certificates(project, env_name, ca_slug)[0]["SerialNumber"]["S"]
),
kms_describe_key(kms_key_id)["SigningAlgorithms"][0],
).public_bytes(encoding=serialization.Encoding.DER)

# upload CRL to S3
s3_upload(crl, f"{ca_slug}.crl")
s3_upload(external_s3_bucket_name, internal_s3_bucket_name, crl, f"{ca_slug}.crl")

return
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cryptography == 42.0.4
validators == 0.22.0
cryptography == 42.0.8
validators == 0.33.0
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cryptography == 42.0.4
validators == 0.22.0
cryptography == 42.0.8
validators == 0.33.0
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,42 @@
import json
import os

root_crl_days = int(os.environ["ROOT_CRL_DAYS"])
root_crl_seconds = int(os.environ["ROOT_CRL_SECONDS"])


def build_list_of_revoked_certs():
def build_list_of_revoked_certs(project, env_name, external_s3_bucket_name, internal_s3_bucket_name):
"""Build list of revoked certificates for CRL"""
# handle certificate revocation not enabled
if not s3_download("revoked-root-ca.json"):
if not s3_download(external_s3_bucket_name, internal_s3_bucket_name, "revoked-root-ca.json"):
print("revoked-root-ca.json not found")
return []

# get list of certificates to be revoked
revocation_file = s3_download("revoked-root-ca.json")["Body"]
revocation_file = s3_download(external_s3_bucket_name, internal_s3_bucket_name, "revoked-root-ca.json")["Body"]

revocation_details = json.load(revocation_file)

revoked_certs = []
for revocation_detail in revocation_details:
common_name = revocation_detail["common_name"]
serial_number = revocation_detail["serial_number"]
revocation_date = db_revocation_date(common_name, serial_number)
revocation_date = db_revocation_date(project, env_name, common_name, serial_number)
revoked_cert = crypto_revoked_certificate(serial_number, revocation_date)
revoked_certs.append(revoked_cert)

print(f"CA {ca_name('root')} has {len(revoked_certs)} revoked certificates")
print(f"CA {ca_name(project, env_name, 'root')} has {len(revoked_certs)} revoked certificates")
return revoked_certs


def lambda_handler(event, context): # pylint:disable=unused-argument
ca_slug = ca_name("root")
project = os.environ["PROJECT"]
env_name = os.environ["ENVIRONMENT_NAME"]
external_s3_bucket_name = os.environ["EXTERNAL_S3_BUCKET"]
internal_s3_bucket_name = os.environ["INTERNAL_S3_BUCKET"]
root_ca_info = json.loads(os.environ["ROOT_CA_INFO"])

ca_slug = ca_name(project, env_name, "root")

# check CA exists
if not db_list_certificates(ca_slug):
if not db_list_certificates(project, env_name, ca_slug):
print(f"CA {ca_slug} not found")

return
Expand All @@ -50,18 +53,25 @@ def lambda_handler(event, context): # pylint:disable=unused-argument
kms_key_id = kms_get_kms_key_id(ca_slug)
public_key = load_der_public_key(kms_get_public_key(kms_key_id))

root_crl_days = int(os.environ["ROOT_CRL_DAYS"])
root_crl_seconds = int(os.environ["ROOT_CRL_SECONDS"])

# issue CRL valid for one day 10 minutes
timedelta = datetime.timedelta(root_crl_days, root_crl_seconds, 0)
ca_key_info = crypto_ca_key_info(public_key, kms_key_id, ca_slug)

crl = ca_kms_publish_crl(
root_ca_info,
ca_key_info,
timedelta,
build_list_of_revoked_certs(),
db_update_crl_number(ca_slug, db_list_certificates(ca_slug)[0]["SerialNumber"]["S"]),
build_list_of_revoked_certs(project, env_name, external_s3_bucket_name, internal_s3_bucket_name),
db_update_crl_number(
project, env_name, ca_slug, db_list_certificates(project, env_name, ca_slug)[0]["SerialNumber"]["S"]
),
kms_describe_key(kms_key_id)["SigningAlgorithms"][0],
).public_bytes(encoding=serialization.Encoding.DER)

# upload CRL to S3
s3_upload(crl, f"{ca_slug}.crl")
s3_upload(external_s3_bucket_name, internal_s3_bucket_name, crl, f"{ca_slug}.crl")

return
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cryptography == 42.0.4
validators == 0.22.0
cryptography == 42.0.8
validators == 0.33.0
Loading

0 comments on commit 11c20be

Please sign in to comment.