Skip to content

Commit

Permalink
Merge pull request #63 from pablopagani/patch-7
Browse files Browse the repository at this point in the history
add export-security-hub option
  • Loading branch information
dorukozturk authored Nov 18, 2024
2 parents cc74124 + 6475146 commit ad3b0c0
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ hardeneks [OPTIONS]
* `--export-csv TEXT`: Export the report in csv format
* `--export-html TEXT`: Export the report in html format
* `--export-json TEXT`: Export the report in json format
* `--export-security-hub`: Export failed checks to AWS Security Hub
* `--insecure-skip-tls-verify`: Skip TLS verification
* `--width`: Width of the output (defaults to terminal size)
* `--height`: Height of the output (defaults to terminal size)
Expand Down
113 changes: 113 additions & 0 deletions hardeneks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from .harden import harden
from hardeneks import helpers

import datetime
import hashlib

app = typer.Typer()
console = Console(record=True)

Expand Down Expand Up @@ -124,6 +127,108 @@ def _export_csv(rules: list, csv_path=str):
writer.writeheader()
writer.writerows(csv_data)

def _export_security_hub(rules: list,region,context):
"""
Export failed checks to AWS Security Hub as custom findings
"""
try:
security_hub = boto3.client('securityhub', region_name=region)
account_id = boto3.client('sts').get_caller_identity()['Account']

findings = []
current_time = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')

for rule in rules:
if not rule.result.status: # Only process failed checks
# Process each failed resource as a separate finding
resources = rule.result.resources if rule.result.resources else ['NoSpecificResource']

for resource in resources:
finding = {
'SchemaVersion': '2018-10-08',
'Id': f"hardeneks/{rule.pillar}/{rule.section}/{hashlib.md5(resource.encode()).hexdigest()}",
'ProductArn': f"arn:aws:securityhub:{region}:{account_id}:product/{account_id}/default",
'GeneratorId': f"hardeneks/{rule.pillar}/{rule.section}",
'AwsAccountId': account_id,
'Types': [
'Software and Configuration Checks/AWS Security Best Practices'
],
'CreatedAt': current_time,
'UpdatedAt': current_time,
'Severity': {
'Label': 'HIGH'
},
'Title': rule.message,
'Description': f"HardenEKS check failed: {rule.message}",
'Resources': [{
'Type': f'EKS {rule.result.resource_type}',
'Id': context,
'Partition': 'aws',
'Region': region
}],
'Compliance': {
'Status': 'FAILED'
},
'RecordState': 'ACTIVE',
'Workflow': {
'Status': 'NEW'
},
'ProductFields': {
'Provider': 'HardenEKS',
'Pillar': rule.pillar,
'Section': rule.section
}
}

# Add namespace information if available
if rule.result.namespace:
finding['ProductFields']['Namespace'] = rule.result.namespace

# Add remediation URL if available
if rule.url:
finding['Remediation'] = {
'Recommendation': {
'Text': 'For remediation steps, see the Amazon EKS Best Practices documentation',
'Url': rule.url
}
}

findings.append(finding)

# Security Hub has a batch limit of 100 findings
if len(findings) >= 100:
try:
response = security_hub.batch_import_findings(Findings=findings)
_process_security_hub_response(response)
findings = []
except Exception as e:
console.print(f"[red]Error sending batch to Security Hub: {str(e)}[/red]")

# Send any remaining findings
if findings:
try:
response = security_hub.batch_import_findings(Findings=findings)
_process_security_hub_response(response)
except Exception as e:
console.print(f"[red]Error sending final batch to Security Hub: {str(e)}[/red]")

console.print("[green]Successfully exported failed checks to Security Hub[/green]")

except Exception as e:
console.print(f"[red]Error connecting to Security Hub: {str(e)}[/red]")

def _process_security_hub_response(response):
"""
Process the response from Security Hub batch import
"""
if response['FailedCount'] > 0:
for failure in response['FailedFindings']:
console.print(
f"[yellow]Warning: Failed to import finding: {failure['Id']}, "
f"Error: {failure['ErrorCode']} - {failure['ErrorMessage']}[/yellow]"
)


def print_consolidated_results(rules: list):

pillars = set([i.pillar for i in rules])
Expand Down Expand Up @@ -192,6 +297,11 @@ def run_hardeneks(
export_json: str = typer.Option(
default=None, help="Export the report in json format"
),
export_security_hub: bool = typer.Option(
False,
"--export-security-hub",
help="Export failed checks to AWS Security Hub (Security Hub must be enabled and have securityhub:GetFindings, securityhub:BatchImportFindings IAM permission)",
),
insecure_skip_tls_verify: bool = typer.Option(
False,
"--insecure-skip-tls-verify",
Expand All @@ -217,6 +327,7 @@ def run_hardeneks(
export-csv (str): Export the report in csv format
export-html (str): Export the report in html format
export-json (str): Export the report in json format
export-security-hub (str): Export the report to AWS Security Hub
insecure-skip-tls-verify (str): Skip tls verification
width (int): Output width
height (int): Output height
Expand Down Expand Up @@ -287,3 +398,5 @@ def run_hardeneks(
console.save_html(export_html)
if export_json:
_export_json(results, export_json)
if export_security_hub:
_export_security_hub(results,region,context)

0 comments on commit ad3b0c0

Please sign in to comment.