Skip to content

Commit d4faecd

Browse files
Update app.py (#126)
1 parent af21a0c commit d4faecd

File tree

1 file changed

+36
-60
lines changed
  • src/lambda/gc03_check_trusted_devices_admin_access

1 file changed

+36
-60
lines changed

src/lambda/gc03_check_trusted_devices_admin_access/app.py

Lines changed: 36 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
""" GC03 - Check trusted devices admin access
2-
Confirm that administrative access to cloud environments is from approved and trusted locations and devices
3-
"""
4-
51
import json
62
import logging
73
import ipaddress
@@ -21,7 +17,7 @@
2117

2218

2319
def ip_is_within_ranges(ip_addr: str, ip_cidr_ranges: list[str]) -> bool:
24-
"""Return true if the given IP Address is within the at least one of the given CIDR ranges, otherwise returns false"""
20+
"""Return true if the given IP Address is within at least one of the given CIDR ranges, otherwise false."""
2521
for ip_range in ip_cidr_ranges:
2622
ip_network = ipaddress.ip_network(ip_range)
2723
if ipaddress.ip_address(ip_addr) in ip_network:
@@ -31,13 +27,10 @@ def ip_is_within_ranges(ip_addr: str, ip_cidr_ranges: list[str]) -> bool:
3127

3228
def lambda_handler(event, context):
3329
"""
34-
This function is the main entry point for Lambda.
35-
36-
Keyword arguments:
37-
38-
event -- the event variable given in the lambda handler
30+
Main entry point for Lambda execution.
3931
40-
context -- the context variable given in the lambda handler
32+
event -- Event data passed to the Lambda function.
33+
context -- Runtime information for the Lambda execution.
4134
"""
4235
logger.info("Received Event: %s", json.dumps(event, indent=2))
4336

@@ -46,13 +39,13 @@ def lambda_handler(event, context):
4639
logger.error("Skipping assessments as this is not a scheduled invocation")
4740
return
4841

42+
# Extract rule parameters
4943
rule_parameters = check_required_parameters(
50-
json.loads(event.get("ruleParameters", "{}")), ["ExecutionRoleName", "s3ObjectPath"]
44+
json.loads(event.get("ruleParameters", "{}")), ["ExecutionRoleName", "BgUser1", "BgUser2"]
5145
)
5246
execution_role_name = rule_parameters.get("ExecutionRoleName")
5347
audit_account_id = rule_parameters.get("AuditAccountID", "")
5448
aws_account_id = event["accountId"]
55-
is_not_audit_account = aws_account_id != audit_account_id
5649

5750
evaluations = []
5851

@@ -69,28 +62,23 @@ def lambda_handler(event, context):
6962
aws_cloudtrail_client = get_client("cloudtrail", aws_account_id, execution_role_name)
7063
aws_iam_client = get_client("iam", aws_account_id, execution_role_name)
7164

72-
# Check cloud profile
73-
tags = get_account_tags(get_client("organizations", assume_role=False), aws_account_id)
74-
cloud_profile = get_cloud_profile_from_tags(tags)
75-
gr_requirement_type = check_guardrail_requirement_by_cloud_usage_profile(GuardrailType.Guardrail3, cloud_profile)
76-
77-
# If the guardrail is recommended
78-
if gr_requirement_type == GuardrailRequirementType.Recommended:
79-
return submit_evaluations(aws_config_client, event, [build_evaluation(
65+
# Check for federated users
66+
if account_has_federated_users(aws_iam_client):
67+
# Log identity providers
68+
logger.info("Federated users detected.")
69+
70+
annotation = "Federated users detected. Compliance is determined by Federated Identity Provider compliance."
71+
logger.info(annotation)
72+
evaluations.append(build_evaluation(
8073
aws_account_id,
8174
"COMPLIANT",
8275
event,
83-
gr_requirement_type=gr_requirement_type
84-
)])
85-
# If the guardrail is not required
86-
elif gr_requirement_type == GuardrailRequirementType.Not_Required:
87-
return submit_evaluations(aws_config_client, event, [build_evaluation(
88-
aws_account_id,
89-
"NOT_APPLICABLE",
90-
event,
91-
gr_requirement_type=gr_requirement_type
92-
)])
93-
76+
annotation=annotation
77+
))
78+
submit_evaluations(aws_config_client, event, evaluations)
79+
return
80+
81+
# Check for source IP ranges if no federated users are present
9482
file_param_name = "s3ObjectPath"
9583
vpn_ip_ranges_file_path = rule_parameters.get(file_param_name, "")
9684

@@ -105,59 +93,47 @@ def lambda_handler(event, context):
10593
logger.info("vpn_ip_ranges from the file in s3: %s", vpn_ip_ranges)
10694

10795
if not vpn_ip_ranges:
108-
annotation = "No ip ranges found in input file."
96+
annotation = "No IP ranges found in the input file."
10997
logger.info(annotation)
11098
evaluations.append(build_evaluation(aws_account_id, "NON_COMPLIANT", event, annotation=annotation))
11199
submit_evaluations(aws_config_client, event, evaluations)
112100
return
113101

102+
# Retrieve CloudTrail events and filter by background accounts
114103
bg_account_names = [rule_parameters["BgUser1"], rule_parameters["BgUser2"]]
115104
lookup_attributes = [{"AttributeKey": "EventName", "AttributeValue": "ConsoleLogin"}]
116105
console_login_cloud_trail_events = lookup_cloud_trail_events(aws_cloudtrail_client, lookup_attributes)
117-
print("INFO 1:", console_login_cloud_trail_events)
106+
logger.info("INFO 1: %s", console_login_cloud_trail_events)
107+
118108
cloud_trail_events = [e for e in console_login_cloud_trail_events if e.get("Username") not in bg_account_names]
119109
num_compliant_rules = 0
120110
logger.info("Number of events found: %s", len(cloud_trail_events))
121111

112+
# Evaluate events for compliance
122113
for lookup_event in cloud_trail_events:
123114
ct_event = json.loads(lookup_event.get("CloudTrailEvent", "{}"))
124115
ct_event_id = ct_event.get("eventID", "")
125-
ct_event_time = ct_event.get("eventTime", "")
126-
127-
# get event time and convert to ISO format
128-
ct_event_isotime = datetime.datetime.fromisoformat(ct_event_time.replace('Z', '+00:00'))
129-
now = datetime.datetime.now(datetime.timezone.utc)
130-
# find difference from the current/execution time
131-
time_diff = now - ct_event_isotime
132-
# this is one day in ISO format
133-
one_day = datetime.timedelta(days=1)
134-
135-
if time_diff > one_day:
136-
# skip if event entry is older than 1 day
137-
pass
138-
elif not ip_is_within_ranges(ct_event["sourceIPAddress"], vpn_ip_ranges):
116+
source_ip = ct_event.get("sourceIPAddress", "")
117+
118+
if not ip_is_within_ranges(source_ip, vpn_ip_ranges):
139119
compliance_type = "NON_COMPLIANT"
140-
annotation = f"Cloud Trail Event '{ct_event_id}' has a source IP address OUTSIDE of the allowed ranges, with event time '{ct_event_time}'."
120+
annotation = f"Event '{ct_event_id}' from IP '{source_ip}' is outside allowed ranges."
141121
logger.info(f"{compliance_type}: {annotation}")
142-
evaluations.append(build_evaluation(ct_event_id, compliance_type, event, "AWS::CloudTrail::Trail", annotation))
122+
evaluations.append(build_evaluation(ct_event_id, compliance_type, event, annotation=annotation))
143123
else:
144-
num_compliant_rules = num_compliant_rules + 1
145124
compliance_type = "COMPLIANT"
146-
annotation = f"Cloud Trail Event '{ct_event_id}' has a source IP address inside of the allowed ranges."
147-
if account_has_federated_users(aws_iam_client):
148-
annotation = f"{annotation} Dependent on the compliance of the Federated IdP."
125+
annotation = f"Event '{ct_event_id}' from IP '{source_ip}' is inside allowed ranges."
149126
logger.info(f"{compliance_type}: {annotation}")
150-
evaluations.append(build_evaluation(ct_event_id, compliance_type, event, "AWS::CloudTrail::Trail", annotation))
151-
127+
evaluations.append(build_evaluation(ct_event_id, compliance_type, event, annotation=annotation))
128+
num_compliant_rules += 1
152129

130+
# Final compliance summary
153131
if len(cloud_trail_events) == num_compliant_rules:
154132
compliance_type = "COMPLIANT"
155-
annotation = "All Cloud Trail Events are within the allowed source IP address ranges."
156-
if account_has_federated_users(aws_iam_client):
157-
annotation = f"All Cloud Trail Events are within the allowed source IP address ranges or are dependant on the federated identity provider."
133+
annotation = "All CloudTrail events are within the allowed source IP address ranges."
158134
else:
159135
compliance_type = "NON_COMPLIANT"
160-
annotation = "NOT all Cloud Trail Events are within the allowed source IP address ranges."
136+
annotation = "Not all CloudTrail events are within the allowed source IP address ranges."
161137

162138
logger.info(f"{compliance_type}: {annotation}")
163139
evaluations.append(build_evaluation(aws_account_id, compliance_type, event, annotation=annotation))

0 commit comments

Comments
 (0)