-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat gsn-10597: migrate to awsfindingsmanagerlib
- Loading branch information
1 parent
4a0a256
commit d05004e
Showing
10 changed files
with
161 additions
and
403 deletions.
There are no files selected for viewing
5 changes: 1 addition & 4 deletions
5
files/lambda-artifacts/securityhub-suppressor/requirements.txt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,2 @@ | ||
aws-lambda-powertools | ||
pylint-gitlab | ||
pyyaml | ||
jmespath | ||
yamllint | ||
awsfindingsmanagerlib |
243 changes: 33 additions & 210 deletions
243
files/lambda-artifacts/securityhub-suppressor/securityhub_events.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,218 +1,41 @@ | ||
import os | ||
from dataclasses import dataclass | ||
from datetime import datetime | ||
from re import search | ||
from typing import Any | ||
from typing import Dict | ||
from typing import Optional | ||
from typing import Tuple | ||
from typing import Union | ||
from os import environ | ||
|
||
import boto3 | ||
import jmespath | ||
import yaml | ||
from aws_lambda_powertools import Logger | ||
from aws_lambda_powertools.utilities.data_classes import EventBridgeEvent | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
from awsfindingsmanagerlib.awsfindingsmanagerlib import FindingsManager | ||
from awsfindingsmanagerlib.backends import Backend | ||
|
||
from yaml_parser import get_file_contents | ||
LOGGER = Logger() | ||
S3_BUCKET_NAME = environ.get("S3_BUCKET_NAME") | ||
S3_OBJECT_NAME = environ.get("S3_OBJECT_NAME") | ||
|
||
logger = Logger() | ||
VALID_STATUSES = ['FAILED', 'HIGH', 'WARNING'] | ||
DYNAMODB_TABLE_NAME = os.environ['DYNAMODB_TABLE_NAME'] | ||
YAML_CONFIGURATION_FILE = 'suppressor.yml' | ||
SUPPRESSED_FINDINGS = [] | ||
|
||
|
||
@dataclass | ||
class Finding: | ||
finding_id: str | ||
product_arn: str | ||
product_name: str | ||
|
||
|
||
@dataclass | ||
class SuppressionRule: | ||
action: str | ||
rules: [str] | ||
notes: str | ||
dry_run: Optional[bool] | ||
|
||
|
||
@dataclass | ||
class SuppressionEntry: | ||
control_id: str | ||
data: [SuppressionRule] | ||
|
||
|
||
class SuppressionList: | ||
def __init__(self, boto_client, hash_key) -> None: | ||
self._entries = [] | ||
self.hash_key = hash_key | ||
self.boto_client = boto_client | ||
self.table = self.data_source | ||
|
||
@property | ||
def data_source(self): | ||
dynamodb = self.boto_client.resource('dynamodb') | ||
return dynamodb.Table(name=DYNAMODB_TABLE_NAME) | ||
|
||
@property | ||
def entries(self) -> list: | ||
if not self.hash_key: | ||
logger.info(f'Invalid hash key: {self.hash_key}') | ||
return self._entries | ||
if not self._entries: | ||
logger.info(f'Fetching suppression list from dynamoDB {DYNAMODB_TABLE_NAME}, hash key: {self.hash_key}') | ||
rules = self.table.get_item(Key={"controlId": self.hash_key}) | ||
for rule in rules.get('Item', {}).get('data', {}): | ||
self._entries.append( | ||
SuppressionRule(action=rule.get('action'), | ||
rules=rule.get('rules'), | ||
notes=rule.get('notes'), | ||
dry_run=rule.get('dry_run', False)) | ||
) | ||
return self._entries | ||
|
||
|
||
class Suppressor: | ||
def __init__(self, boto_client, | ||
finding: Finding, | ||
resource_id: str, | ||
suppression_list: SuppressionList) -> None: | ||
self.boto_client = boto_client | ||
self._finding = finding | ||
self._security_hub = boto_client.client('securityhub') | ||
self.resource_id = resource_id | ||
self.suppression_list = suppression_list | ||
self._suppression_rule = None | ||
self.matched_rule = None | ||
SUPPRESSED_FINDINGS.clear() | ||
|
||
@property | ||
def finding(self) -> Finding: | ||
return self._finding | ||
|
||
@property | ||
def rule(self) -> SuppressionRule: | ||
if not self._suppression_rule: | ||
self._suppression_rule = self.evaluate_rule() | ||
return self._suppression_rule | ||
|
||
@staticmethod | ||
def validate(finding_event: Dict[str, Any]) -> Union[bool, Finding]: | ||
product_arn = finding_event.get('ProductArn', '') | ||
if not product_arn: | ||
raise ValueError('Error: no product_arn found') | ||
finding_id = finding_event.get('Id', '') | ||
if not finding_id: | ||
raise ValueError('Error: no finding_id found') | ||
product_details = finding_event.get('ProductFields', {}) | ||
if not product_details: | ||
raise ValueError('Error: no product fields found') | ||
product_name = product_details.get('aws/securityhub/ProductName', '') | ||
if not product_name: | ||
raise ValueError('Error: no product name found') | ||
return Finding(product_arn=product_arn, finding_id=finding_id, product_name=product_name) | ||
class S3(Backend): | ||
def __init__(self, bucket_name, file_name): | ||
self._file_contents = self._get_file_contents(bucket_name, file_name) | ||
|
||
@staticmethod | ||
def get_product_details(finding_event: Dict[str, Any], product_name: str) -> Tuple[None, None]: | ||
key, status = None, None | ||
yaml_config = get_file_contents(YAML_CONFIGURATION_FILE) | ||
if not yaml_config.get(product_name): | ||
logger.warning(f'No YAML configuration for product {product_name}') | ||
return key, status | ||
key = jmespath.search(yaml_config.get(product_name, {}).get('key'), finding_event) | ||
status = jmespath.search(yaml_config.get(product_name, {}).get('status'), finding_event) | ||
return key, status | ||
|
||
def evaluate_rule(self) -> Optional[SuppressionRule]: | ||
for entry in self.suppression_list.entries: | ||
match = next((rule for rule in entry.rules if search(rule, self.resource_id)), None) | ||
if match: | ||
self.matched_rule = match | ||
return entry | ||
return None | ||
|
||
def suppress_finding(self) -> bool: | ||
if not self.rule: | ||
logger.info(f'Skipping finding because {self.resource_id} is not in the suppression list') | ||
return False | ||
if not self.rule.notes: | ||
logger.error('Error: a valid notes must be added to the dynamoDB entry') | ||
return False | ||
if self.rule.dry_run: | ||
action_output = 'DRY RUN - Would' | ||
else: | ||
action_output = 'Will' | ||
|
||
logger.info(f'{action_output} perform Suppression on finding {self.finding.finding_id}, ' | ||
f'matched rule: {self.matched_rule}, ' | ||
f'action: {self.rule.action}') | ||
SUPPRESSED_FINDINGS.append(self.finding.finding_id) | ||
now = datetime.now() | ||
|
||
if self.rule.dry_run: | ||
return True | ||
|
||
return self._security_hub.batch_update_findings(FindingIdentifiers=[ | ||
{ | ||
'Id': self.finding.finding_id, | ||
'ProductArn': self.finding.product_arn | ||
}], | ||
Workflow={'Status': self.rule.action}, | ||
Note={'Text': f'{self.rule.notes} - ' | ||
f'Suppressed by the Security Hub Suppressor at {now.strftime("%Y-%m-%d %H:%M:%S")}', | ||
'UpdatedBy': 'landingzone'}) | ||
|
||
|
||
def validate_event(event: EventBridgeEvent): | ||
for event_entries in event.detail.get('findings', []): | ||
finding = Suppressor.validate(event_entries) | ||
hash_key, status = Suppressor.get_product_details(event_entries, finding.product_name) | ||
if status not in VALID_STATUSES: | ||
raise ValueError(f'Skipping execution because status is {status}. Valid statuses: {VALID_STATUSES}') | ||
if not hash_key: | ||
raise ValueError(f'Error: no hash_key found for product {finding.product_name}') | ||
workflow_status = event_entries.get('Workflow', {}).get('Status', {}) | ||
if workflow_status == "SUPPRESSED": | ||
raise ValueError(f'Skipping execution because workflow status is {workflow_status}') | ||
return True | ||
|
||
|
||
def _parse_fields(event): | ||
finding, resource_id, hash_key = None, None, None | ||
for event_entries in event.get('detail').get('findings', []): | ||
finding = Suppressor.validate(event_entries) | ||
hash_key, status = Suppressor.get_product_details(event_entries, finding.product_name) | ||
resource_id = [resource.get('Id') for resource in event_entries.get('Resources', [])].pop() | ||
return finding, resource_id, hash_key | ||
|
||
|
||
def suppress(event): | ||
finding, resource_id, hash_key = _parse_fields(event) | ||
suppression_list = get_suppression_list(hash_key) | ||
return Suppressor(boto_client=boto3, | ||
finding=finding, | ||
resource_id=resource_id, | ||
suppression_list=suppression_list).suppress_finding() | ||
|
||
|
||
def get_suppression_list(hash_key) -> SuppressionList: | ||
suppression_list = SuppressionList(hash_key=hash_key, boto_client=boto3) | ||
if not suppression_list.entries: | ||
logger.error(f'Could not find any rules for control {hash_key}') | ||
return suppression_list | ||
|
||
|
||
@logger.inject_lambda_context(log_event=True) | ||
def lambda_handler(event: Dict[str, Any], context: LambdaContext): | ||
event: EventBridgeEvent = EventBridgeEvent(event) | ||
validate_event(event) | ||
if suppress(event): | ||
logger.info(f'Total findings processed: {len(SUPPRESSED_FINDINGS)}') | ||
return { | ||
'finding_state': 'suppressed' | ||
} | ||
return { | ||
'finding_state': 'skipped' | ||
} | ||
def _get_file_contents(bucket_name, file_name): | ||
s3 = boto3.resource("s3") | ||
return s3.Object(bucket_name, file_name).get()["Body"].read() | ||
|
||
def _get_rules(self): | ||
data = yaml.safe_load(self._file_contents) | ||
return data.get("Rules") | ||
|
||
|
||
@LOGGER.inject_lambda_context(log_event=True) | ||
def lambda_handler(event, context): | ||
s3_backend = S3(S3_BUCKET_NAME, S3_OBJECT_NAME) | ||
rules = s3_backend.get_rules() | ||
LOGGER.info(rules) | ||
findings_manager = FindingsManager() | ||
findings_manager.register_rules(rules) | ||
if findings_manager.suppress_matching_findings(): | ||
LOGGER.info("Successfully applied all suppression rules.") | ||
return True | ||
else: | ||
raise RuntimeError( | ||
"No explicit error was raised, but not all suppression rules were applied successfully, please investigate." | ||
) |
67 changes: 0 additions & 67 deletions
67
files/lambda-artifacts/securityhub-suppressor/securityhub_streams.py
This file was deleted.
Oops, something went wrong.
41 changes: 41 additions & 0 deletions
41
files/lambda-artifacts/securityhub-suppressor/securityhub_trigger.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
from os import environ | ||
|
||
import boto3 | ||
import yaml | ||
from aws_lambda_powertools import Logger | ||
from awsfindingsmanagerlib.awsfindingsmanagerlib import FindingsManager | ||
from awsfindingsmanagerlib.backends import Backend | ||
|
||
LOGGER = Logger() | ||
S3_BUCKET_NAME = environ.get("S3_BUCKET_NAME") | ||
S3_OBJECT_NAME = environ.get("S3_OBJECT_NAME") | ||
|
||
|
||
class S3(Backend): | ||
def __init__(self, bucket_name, file_name): | ||
self._file_contents = self._get_file_contents(bucket_name, file_name) | ||
|
||
@staticmethod | ||
def _get_file_contents(bucket_name, file_name): | ||
s3 = boto3.resource("s3") | ||
return s3.Object(bucket_name, file_name).get()["Body"].read() | ||
|
||
def _get_rules(self): | ||
data = yaml.safe_load(self._file_contents) | ||
return data.get("Rules") | ||
|
||
|
||
@LOGGER.inject_lambda_context(log_event=True) | ||
def lambda_handler(event, context): | ||
s3_backend = S3(S3_BUCKET_NAME, S3_OBJECT_NAME) | ||
rules = s3_backend.get_rules() | ||
LOGGER.info(rules) | ||
findings_manager = FindingsManager() | ||
findings_manager.register_rules(rules) | ||
if findings_manager.suppress_matching_findings(): | ||
LOGGER.info("Successfully applied all suppression rules.") | ||
return True | ||
else: | ||
raise RuntimeError( | ||
"No explicit error was raised, but not all suppression rules were applied successfully, please investigate." | ||
) |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.