From b12348b6059ecdaae7a03498f7e2bba954b1fc84 Mon Sep 17 00:00:00 2001 From: chebuya Date: Fri, 26 Jul 2024 13:58:11 -0600 Subject: [PATCH 1/6] Add ds__enum --- pacu/modules/ds__enum/__init__.py | 0 pacu/modules/ds__enum/main.py | 274 ++++++++++++++++++++++++++++++ 2 files changed, 274 insertions(+) create mode 100755 pacu/modules/ds__enum/__init__.py create mode 100755 pacu/modules/ds__enum/main.py diff --git a/pacu/modules/ds__enum/__init__.py b/pacu/modules/ds__enum/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/pacu/modules/ds__enum/main.py b/pacu/modules/ds__enum/main.py new file mode 100755 index 00000000..3fdf9c3c --- /dev/null +++ b/pacu/modules/ds__enum/main.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +import argparse +from copy import deepcopy +from random import choice + +from pacu.core.lib import save +from botocore.exceptions import ClientError +from pacu.core.secretfinder.utils import regex_checker, Color + + +module_info = { + # Name of the module (should be the same as the filename) + 'name': 'ds__enum', + + # Name and any other notes about the author + 'author': '@_chebuya of Rhino Security Labs', + + # Category of the module. Make sure the name matches an existing category. + 'category': 'ENUM', + + # One liner description of the module functionality. This shows up when a user searches for modules. + 'one_liner': 'Enumerates a AWS Directory Service offerings.', + + # Description about what the module does and how it works + 'description': 'The module is used to enumerate AWS Directory Service offerings including AWS Managed Microsoft AD, AD Connector, and Simple AD', + + # A list of AWS services that the module utilizes during its execution + 'services': ['DS'], + + # For prerequisite modules, try and see if any existing modules return the data that is required for your module before writing that code yourself, that way, session data can stay separated and modular. + 'prerequisite_modules': [], + + # External resources that the module depends on. Valid options are either a GitHub URL (must end in .git) or single file URL. + 'external_dependencies': [], + + # Module arguments to autocomplete when the user hits tab + 'arguments_to_autocomplete': [ + '--regions', + '--managed-ad', + '--directories', + '--domain-controllers', + '--trusts', + '--settings', + '--shared-directories' + ], +} + +parser = argparse.ArgumentParser(add_help=False, description=module_info['description']) + +parser.add_argument('--regions', required=False, default=None, help='One or more (comma separated) AWS regions in the format "us-east-1". Defaults to all session regions') +parser.add_argument('--managed-ad', required=False, default=False, action='store_true', help='Enumerate AWS Managed Microsoft AD') +parser.add_argument('--directories', required=False, default=False, action='store_true', help='Enumerate Directories') +parser.add_argument('--domain-controllers', required=False, default=False, action='store_true', help='Enumerate Domain Controllers') +parser.add_argument('--trusts', required=False, default=False, action='store_true', help='Enumerate Domain Trusts') +parser.add_argument('--settings', required=False, default=False, action='store_true', help='Enumerate Directory Settings') +parser.add_argument('--shared-directories', required=False, default=False, action='store_true', help='Enumerate Shared Directories') + +ARG_FIELD_MAPPER = { + 'managed_ad': 'ManagedAD', + 'directories': 'Directories', + 'domain_controllers': 'DomainControllers', + 'trusts': 'Trusts', + 'settings': 'Settings', + 'shared_directories': 'SharedDirectories' +} + +def get_directories(client, do_print=True): + directories = {} + + paginator = client.get_paginator("describe_directories") + for resource_records in paginator.paginate(): + directories.update({directory["DirectoryId"]:directory for directory in resource_records['DirectoryDescriptions']}) + + if not do_print: + return directories + + for directory_id,directory in directories.items(): + # add created at datetime + print(f"DirectoryId: {directory_id}") + print(f" DNSName: {directory['Name']}") + print(f" NetBIOSName: {directory['ShortName']}") + print(f" DirectoryType: {directory['Type']}") + print(f" Edition: {directory['Edition']}") + print(f" Size: {directory['Size']}") + print(f" Description: {directory['Description']}") + + return directories + + +def get_domain_controllers(client, directories): + directory_domain_controllers = {} + + if directories == None: + directories = get_directories(client, do_print=False) + directory_ids = list(directories) + + for directory_id in directory_ids: + + paginator = client.get_paginator("describe_domain_controllers") + for resource_records in paginator.paginate(DirectoryId=directory_id): + + directory_domain_controllers[directory_id] = [domain_controller for domain_controller in resource_records['DomainControllers']] + + + for directory_id,domain_controllers in directory_domain_controllers.items(): + + print(f"Domain controllers for {directory_id}/{directories[directory_id]['Name']}") + + for domain_controller in domain_controllers: + + domain_controller_id = domain_controller['DomainControllerId'] + print(f" DomainControllerId: {domain_controller_id}") + print(f" Status: {domain_controller['Status']}") + print(f" OsVersion: {directories[directory_id]['OsVersion']}") + print(f" IpAddress: {domain_controller['DnsIpAddr']}") + print(f" VpcId: {domain_controller['VpcId']}") + print(f" SecurityGroupId: {directories[directory_id]['VpcSettings']['SecurityGroupId']}") + print(f" SubnetId: {domain_controller['SubnetId']}") + print(f" AvailabilityZone: {domain_controller['AvailabilityZone']}") + + + return domain_controllers + +def get_trusts(client, directories): + trusts = {} + + if directories == None: + directories = get_directories(client, do_print=False) + + paginator = client.get_paginator("describe_trusts") + for resource_records in paginator.paginate(): + for trust in resource_records["Trusts"]: + directory_id = trust["DirectoryId"] + if directory_id not in trusts: + trusts[directory_id] = {} + + trusts[directory_id].update({trust["TrustId"]: trust}) + + + for directory_id,directory_trusts in trusts.items(): + domain_name = directories[directory_id]['Name'] + print(f"Trusts for {directory_id}/{domain_name}") + for trust_id,trust in directory_trusts.items(): + remote_domain_name = trust['RemoteDomainName'] + trust_direction = trust['TrustDirection'] + # remote_domain_name trusts directory_name + if trust_direction == 'One-Way: Outgoing': + print(f"{remote_domain_name} trusts {domain_name}") + elif trust_direction == 'One-Way: Incoming': + print(f"{remote_domain_name} trusts {domain_name}") + elif trust_direction == 'Two-Way': + print(f"{remote_domain_name} and {domain_name} trust each other") + + + print(f" TrustId: {trust_id}") + print(f" TrustState: {trust['TrustState']}") + print(f" TrustType: {trust['TrustType']}") + # add datetime and TrustStateReason + + return trusts + +def get_settings(client, directories): + settings = {} + + if directories == None: + directories = get_directories(client, do_print=False) + + for directory_id,directory in directories.items(): + print(f"Settings for {directory_id}/{directory['Name']}") + + directory_settings = client.describe_settings(DirectoryId=directory_id)["SettingEntries"] + settings[directory_id] = directory_settings + for settings in directory_settings: + print(f" {settings['Name']}: {settings['AppliedValue'].replace('able', 'abled')}") + + return settings + + +def get_shared_directories(client, directories): + shared_directories = {} + + if directories == None: + directories = get_directories(client, do_print=False) + directory_ids = list(directories) + + for directory_id in directory_ids: + paginator = client.get_paginator("describe_shared_directories") + for resource_records in paginator.paginate(OwnerDirectoryId=directory_id): + shared_directories[directory_id] = resource_records['SharedDirectories'] + + for directory_id,share_settings in shared_directories.items(): + print(f"Share settings for {directory_id}/{directories[directory_id]['Name']}") + for setting in share_settings: + print(f" {setting['OwnerDirectoryId']} is shared with {setting['SharedAccountId']}") + print(f" ShareMethod: {setting['ShareMethod']}") + print(f" ShareNotes: {setting['ShareNotes']}") + + + return shared_directories + + + + +def main(args, pacu_main): + # do we need this here? + #session = pacu_main.get_active_session() + + args = parser.parse_args(args) + print = pacu_main.print + get_regions = pacu_main.get_regions + + # for each new cli flag, add here + if args.directories is False and args.domain_controllers is False and args.trusts is False and args.settings is False and args.shared_directories is False: + args.directories = args.domain_controllers = args.trusts = args.settings = args.shared_directories = True + + if args.regions is None: + regions = get_regions('ds') + if regions is None or regions == [] or regions == '' or regions == {}: + print('This module is not supported in any regions specified in the current sessions region set. Exiting...') + return + else: + regions = args.regions.split(',') + + client = pacu_main.get_boto3_client('ds', choice(regions)) + + # do we actually need these for anything if we are printing + all_directories = [] + all_domain_controllers = [] + all_trusts = [] + all_settings = [] + all_shared_directories = [] + for region in regions: + if any([args.directories]): + print('Starting region {}...'.format(region)) + + client = pacu_main.get_boto3_client('ds', region) + + # There is edge case where like --domain-controllers and --trusts is called but not --directories, we make 2x calls to DescribeDirectories. should return as value + directories = None + if args.directories: + try: + directories = get_directories(client) + all_directories.append(directories) + except ClientError as error: + print(f"Failed to list directories: {error}") + + if args.domain_controllers: + try: + domain_controllers = get_domain_controllers(client, directories) + all_domain_controllers.append(domain_controllers) + except ClientError as error: + print(f"Failed to list domain controllers: {error}") + + if args.trusts: + try: + trusts = get_trusts(client, directories) + all_trusts.append(trusts) + except ClientError as error: + print(f"Failed to list trusts: {error}") + + if args.settings: + try: + settings = get_settings(client, directories) + all_settings.append(settings) + except ClientError as error: + print(f"Failed to list settings: {error}") + + if args.shared_directories: + try: + shared_directories = get_shared_directories(client, directories) + all_shared_directories.append(shared_directories) + except ClientError as error: + print(f"Failed to list settings: {error}") + \ No newline at end of file From 491ae284e802252474e62dcdbb00684da04ff5f3 Mon Sep 17 00:00:00 2001 From: chebuya Date: Fri, 26 Jul 2024 15:26:18 -0600 Subject: [PATCH 2/6] KeyError fix --- pacu/modules/ds__enum/main.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pacu/modules/ds__enum/main.py b/pacu/modules/ds__enum/main.py index 3fdf9c3c..edc1b922 100755 --- a/pacu/modules/ds__enum/main.py +++ b/pacu/modules/ds__enum/main.py @@ -7,6 +7,7 @@ from botocore.exceptions import ClientError from pacu.core.secretfinder.utils import regex_checker, Color +import pdb module_info = { # Name of the module (should be the same as the filename) @@ -48,7 +49,6 @@ parser = argparse.ArgumentParser(add_help=False, description=module_info['description']) parser.add_argument('--regions', required=False, default=None, help='One or more (comma separated) AWS regions in the format "us-east-1". Defaults to all session regions') -parser.add_argument('--managed-ad', required=False, default=False, action='store_true', help='Enumerate AWS Managed Microsoft AD') parser.add_argument('--directories', required=False, default=False, action='store_true', help='Enumerate Directories') parser.add_argument('--domain-controllers', required=False, default=False, action='store_true', help='Enumerate Domain Controllers') parser.add_argument('--trusts', required=False, default=False, action='store_true', help='Enumerate Domain Trusts') @@ -75,14 +75,18 @@ def get_directories(client, do_print=True): return directories for directory_id,directory in directories.items(): - # add created at datetime + directory_type = directory['Type'] + print(f"DirectoryId: {directory_id}") print(f" DNSName: {directory['Name']}") print(f" NetBIOSName: {directory['ShortName']}") - print(f" DirectoryType: {directory['Type']}") - print(f" Edition: {directory['Edition']}") + print(f" DirectoryType: {directory_type}") + if "Description" in directory: + print(f" Description: {directory['Description']}") + if "Edition" in directory: + print(f" Edition: {directory['Edition']}") print(f" Size: {directory['Size']}") - print(f" Description: {directory['Description']}") + print(f" CreatedAt: {str(directory['LaunchTime'])}") return directories @@ -202,14 +206,11 @@ def get_shared_directories(client, directories): def main(args, pacu_main): - # do we need this here? - #session = pacu_main.get_active_session() args = parser.parse_args(args) print = pacu_main.print get_regions = pacu_main.get_regions - # for each new cli flag, add here if args.directories is False and args.domain_controllers is False and args.trusts is False and args.settings is False and args.shared_directories is False: args.directories = args.domain_controllers = args.trusts = args.settings = args.shared_directories = True @@ -223,7 +224,6 @@ def main(args, pacu_main): client = pacu_main.get_boto3_client('ds', choice(regions)) - # do we actually need these for anything if we are printing all_directories = [] all_domain_controllers = [] all_trusts = [] From fb08e42c534a64064925900c80ac18e4e40a0a2f Mon Sep 17 00:00:00 2001 From: chebuya Date: Fri, 26 Jul 2024 15:33:23 -0600 Subject: [PATCH 3/6] KeyError fix --- pacu/modules/ds__enum/main.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pacu/modules/ds__enum/main.py b/pacu/modules/ds__enum/main.py index edc1b922..3267651f 100755 --- a/pacu/modules/ds__enum/main.py +++ b/pacu/modules/ds__enum/main.py @@ -7,8 +7,6 @@ from botocore.exceptions import ClientError from pacu.core.secretfinder.utils import regex_checker, Color -import pdb - module_info = { # Name of the module (should be the same as the filename) 'name': 'ds__enum', From 269f8f009c53940bb1d6469e10d1221fd2d54dda Mon Sep 17 00:00:00 2001 From: chebuya Date: Thu, 1 Aug 2024 09:23:03 -0600 Subject: [PATCH 4/6] Final changes --- pacu/modules/ds__enum/main.py | 83 ++++++++++++++++++++++++++++------- 1 file changed, 68 insertions(+), 15 deletions(-) diff --git a/pacu/modules/ds__enum/main.py b/pacu/modules/ds__enum/main.py index 3267651f..900f4dc1 100755 --- a/pacu/modules/ds__enum/main.py +++ b/pacu/modules/ds__enum/main.py @@ -92,8 +92,7 @@ def get_directories(client, do_print=True): def get_domain_controllers(client, directories): directory_domain_controllers = {} - if directories == None: - directories = get_directories(client, do_print=False) + directory_ids = list(directories) for directory_id in directory_ids: @@ -126,9 +125,6 @@ def get_domain_controllers(client, directories): def get_trusts(client, directories): trusts = {} - if directories == None: - directories = get_directories(client, do_print=False) - paginator = client.get_paginator("describe_trusts") for resource_records in paginator.paginate(): for trust in resource_records["Trusts"]: @@ -145,7 +141,6 @@ def get_trusts(client, directories): for trust_id,trust in directory_trusts.items(): remote_domain_name = trust['RemoteDomainName'] trust_direction = trust['TrustDirection'] - # remote_domain_name trusts directory_name if trust_direction == 'One-Way: Outgoing': print(f"{remote_domain_name} trusts {domain_name}") elif trust_direction == 'One-Way: Incoming': @@ -157,16 +152,12 @@ def get_trusts(client, directories): print(f" TrustId: {trust_id}") print(f" TrustState: {trust['TrustState']}") print(f" TrustType: {trust['TrustType']}") - # add datetime and TrustStateReason return trusts def get_settings(client, directories): settings = {} - if directories == None: - directories = get_directories(client, do_print=False) - for directory_id,directory in directories.items(): print(f"Settings for {directory_id}/{directory['Name']}") @@ -200,10 +191,9 @@ def get_shared_directories(client, directories): return shared_directories - - def main(args, pacu_main): + session = pacu_main.get_active_session() args = parser.parse_args(args) print = pacu_main.print @@ -228,12 +218,11 @@ def main(args, pacu_main): all_settings = [] all_shared_directories = [] for region in regions: + if any([args.directories]): print('Starting region {}...'.format(region)) - client = pacu_main.get_boto3_client('ds', region) - # There is edge case where like --domain-controllers and --trusts is called but not --directories, we make 2x calls to DescribeDirectories. should return as value directories = None if args.directories: try: @@ -244,6 +233,8 @@ def main(args, pacu_main): if args.domain_controllers: try: + if directories == None: + directories = get_directories(client, do_print=False) domain_controllers = get_domain_controllers(client, directories) all_domain_controllers.append(domain_controllers) except ClientError as error: @@ -251,6 +242,8 @@ def main(args, pacu_main): if args.trusts: try: + if directories == None: + directories = get_directories(client, do_print=False) trusts = get_trusts(client, directories) all_trusts.append(trusts) except ClientError as error: @@ -258,6 +251,8 @@ def main(args, pacu_main): if args.settings: try: + if directories == None: + directories = get_directories(client, do_print=False) settings = get_settings(client, directories) all_settings.append(settings) except ClientError as error: @@ -265,8 +260,66 @@ def main(args, pacu_main): if args.shared_directories: try: + if directories == None: + directories = get_directories(client, do_print=False) shared_directories = get_shared_directories(client, directories) all_shared_directories.append(shared_directories) except ClientError as error: print(f"Failed to list settings: {error}") - \ No newline at end of file + + + gathered_data = { + 'Directories': all_directories, + 'DomainControllers': all_domain_controllers, + 'Trusts': all_trusts, + 'Settings': all_settings, + 'SharedDirectories': all_shared_directories + } + + for var in vars(args): + if var == 'regions': + continue + if not getattr(args, var): + del gathered_data[ARG_FIELD_MAPPER[var]] + + ds_data = deepcopy(session.DS) + for key, value in gathered_data.items(): + ds_data[key] = value + session.update(pacu_main.database, DS=ds_data) + + gathered_data['regions'] = regions + + if any([args.directories, args.domain_controllers, args.trusts, args.settings, args.shared_directories]): + return gathered_data + else: + print('No data successfully enumerated.\n') + return None + + + +def summary(data, pacu_main): + results = [] + + results.append(' Regions:') + for region in data['regions']: + results.append(' {}'.format(region)) + + results.append('') + + if 'Directories' in data: + results.append(' {} total directorie(s) found.'.format(len(data['Directories']))) + + if 'DomainControllers' in data: + results.append(' {} total domain controller(s) found.'.format(len(data['DomainControllers'][0]))) + + if 'Trusts' in data: + results.append(' {} total trust(s) found.'.format(len(data['Trusts'][0]))) + + if 'Settings' in data: + results.append(' {} total setting(s) found.'.format(len(data['Settings'][0]))) + + if 'SharedDirectories' in data: + directory_id = list(data['SharedDirectories'][0])[0] + results.append(' {} total shared directorie(s) found.'.format(len(data['SharedDirectories'][0][directory_id]))) + + return '\n'.join(results) From 49cd6b35ed05abad9059767602923d9881d8dfba Mon Sep 17 00:00:00 2001 From: chebuya Date: Thu, 1 Aug 2024 09:24:38 -0600 Subject: [PATCH 5/6] Add DS to model --- pacu/core/models.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pacu/core/models.py b/pacu/core/models.py index c0ee1a3f..a43aad30 100644 --- a/pacu/core/models.py +++ b/pacu/core/models.py @@ -77,6 +77,7 @@ class PacuSession(Base, ModelUpdateMixin): 'Config', 'DataPipeline', 'DynamoDB', + 'DS', 'EC2', 'ECS', 'EKS', @@ -124,6 +125,7 @@ class PacuSession(Base, ModelUpdateMixin): Config = Column(JSONType, nullable=False, default=dict) DataPipeline = Column(JSONType, nullable=False, default=dict) DynamoDB = Column(JSONType, nullable=False, default=dict) + DS = Column(JSONType, nullable=False, default=dict) EC2 = Column(JSONType, nullable=False, default=dict) ECS = Column(JSONType, nullable=False, default=dict) EKS = Column(JSONType, nullable=False, default=dict) From f799bbb36a4ea926e8484f52ddda469d19f69d54 Mon Sep 17 00:00:00 2001 From: chebuya Date: Mon, 5 Aug 2024 12:46:59 -0600 Subject: [PATCH 6/6] Fix all regions issue and minor edits --- pacu/modules/ds__enum/main.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pacu/modules/ds__enum/main.py b/pacu/modules/ds__enum/main.py index 900f4dc1..5efeae39 100755 --- a/pacu/modules/ds__enum/main.py +++ b/pacu/modules/ds__enum/main.py @@ -83,7 +83,8 @@ def get_directories(client, do_print=True): print(f" Description: {directory['Description']}") if "Edition" in directory: print(f" Edition: {directory['Edition']}") - print(f" Size: {directory['Size']}") + if "Size" in directory: + print(f" Size: {directory['Size']}") print(f" CreatedAt: {str(directory['LaunchTime'])}") return directories @@ -119,8 +120,8 @@ def get_domain_controllers(client, directories): print(f" SubnetId: {domain_controller['SubnetId']}") print(f" AvailabilityZone: {domain_controller['AvailabilityZone']}") + return directory_domain_controllers - return domain_controllers def get_trusts(client, directories): trusts = {} @@ -148,7 +149,6 @@ def get_trusts(client, directories): elif trust_direction == 'Two-Way': print(f"{remote_domain_name} and {domain_name} trust each other") - print(f" TrustId: {trust_id}") print(f" TrustState: {trust['TrustState']}") print(f" TrustType: {trust['TrustType']}") @@ -188,7 +188,6 @@ def get_shared_directories(client, directories): print(f" ShareMethod: {setting['ShareMethod']}") print(f" ShareNotes: {setting['ShareNotes']}") - return shared_directories @@ -319,7 +318,8 @@ def summary(data, pacu_main): results.append(' {} total setting(s) found.'.format(len(data['Settings'][0]))) if 'SharedDirectories' in data: - directory_id = list(data['SharedDirectories'][0])[0] - results.append(' {} total shared directorie(s) found.'.format(len(data['SharedDirectories'][0][directory_id]))) + if len(list(data['SharedDirectories'][0])) != 0: + directory_id = list(data['SharedDirectories'][0])[0] + results.append(' {} total shared directorie(s) found.'.format(len(data['SharedDirectories'][0][directory_id]))) - return '\n'.join(results) + return '\n'.join(results) \ No newline at end of file