Skip to content

Commit

Permalink
feat: Updated syntax of variable names, and added progress bars, and …
Browse files Browse the repository at this point in the history
…enabled all_my_enis to find public IPs (#86)

* Updated with enhancements due to case-sensitivity in the API

* Updated to remove region being separate from the ocredentials object. Now it's a field within the object.

* Updated exception logic to find elastic load balancers with no security groups attached

* Brought files in line with both Inventory_Script repos

* Updated with changes from customer work...

* Updated all_my_enis.py with some additional cosmetic updates.

* Updated with some additional cosmetic updates.

* Updated all_my_functions.py with proper error type.

* Updates on progress bars, and using the right variables.
  • Loading branch information
paulbayer authored Jan 21, 2025
1 parent d84c850 commit 71bd039
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 31 deletions.
9 changes: 5 additions & 4 deletions inventory-scripts/Inventory_Modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1846,13 +1846,16 @@ def find_account_subnets2(ocredentials, fipaddresses=None):

def find_account_enis2(ocredentials, fRegion=None, fipaddresses=None):
"""
ocredentials is an object with the following structure:
@param: ocredentials is an object with the following structure:
- ['AccessKeyId'] holds the AWS_ACCESS_KEY
- ['SecretAccessKey'] holds the AWS_SECRET_ACCESS_KEY
- ['SessionToken'] holds the AWS_SESSION_TOKEN
- ['Region'] holds the region
- ['AccountNumber'] holds the account number
- ['Profile'] can hold the profile, instead of the session credentials
@param: fRegion is a string specifying which region we're looking at
@param: fipaddresses is a list of IP addresses to search for
"""
import boto3
from botocore.exceptions import ClientError
Expand All @@ -1866,7 +1869,7 @@ def find_account_enis2(ocredentials, fRegion=None, fipaddresses=None):
session_ec2 = boto3.Session(aws_access_key_id=ocredentials['AccessKeyId'],
aws_secret_access_key=ocredentials['SecretAccessKey'],
aws_session_token=ocredentials['SessionToken'],
region_name=fRegion)
region_name=ocredentials['Region'])
eni_info = session_ec2.client('ec2')
ENIs = {'NextToken': None}
AllENIs = []
Expand All @@ -1878,8 +1881,6 @@ def find_account_enis2(ocredentials, fRegion=None, fipaddresses=None):
try:
logging.info(f"Looking for ENIs that match any of {fipaddresses} in account #{ocredentials['AccountNumber']} in region {fRegion}")
ENIs = eni_info.describe_network_interfaces()
# Run through each of the subnets, and determine if the passed in IP address fits within any of them
# If it does - then include that data within the array, otherwise next...
for interface in ENIs['NetworkInterfaces']:
if fipaddresses is not None:
return_this_result = False
Expand Down
73 changes: 51 additions & 22 deletions inventory-scripts/all_my_enis.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
#!/usr/bin/env python3
import sys
import os

import Inventory_Modules
from Inventory_Modules import display_results, get_all_credentials
from Inventory_Modules import display_results, get_all_credentials, find_account_enis2
from ArgumentsClass import CommonArguments
from datetime import datetime
# from datetime import datetime
from colorama import init, Fore
from botocore.exceptions import ClientError
from queue import Queue
from threading import Thread
from tqdm.auto import tqdm
from time import time

import logging

init()

__version__ = '2024.05.07'
__version__ = '2024.10.24'


##################
# Functions
##################


def parse_args(f_args):
Expand All @@ -25,6 +31,7 @@ def parse_args(f_args):
@return: returns an object namespace that contains the individualized parameters passed in
"""
parser = CommonArguments()
script_path, script_name = os.path.split(sys.argv[0])
parser.multiprofile()
parser.multiregion()
parser.extendedargs()
Expand All @@ -33,19 +40,29 @@ def parse_args(f_args):
parser.save_to_file()
parser.verbosity()
parser.version(__version__)
parser.my_parser.add_argument(
local = parser.my_parser.add_argument_group(script_name, 'Parameters specific to this script')
local.add_argument(
"--ipaddress", "--ip",
dest="pipaddresses",
nargs="*",
metavar="IP address",
default=None,
help="IP address(es) you're looking for within your accounts")
local.add_argument(
"--public-only", "--po",
action="store_true",
dest="ppublic",
help="Whether you want to return only those results with a Public IP")
return parser.my_parser.parse_args(f_args)


def check_accounts_for_enis(fCredentialList, fip=None):
def check_accounts_for_enis(fCredentialList, fip=None, fPublicOnly: bool = False):
"""
Note that this function takes a list of Credentials and checks for ENIs in every account and region it has creds for
@Description: Note that this function takes a list of Credentials and checks for ENIs in every account and region it has creds for
@param fCredentialList: The list of credentials to check
@param fip: The IP address to look for
@param fPublicOnly: Whether to look for only public IPs
@return: LIst of ENIs from the list of credentials
"""

class FindENIs(Thread):
Expand All @@ -58,14 +75,19 @@ def run(self):
while True:
# Get the work from the queue and expand the tuple
c_account_credentials, c_region, c_fip, c_PlacesToLook, c_PlaceCount = self.queue.get()
pbar.update()
logging.info(f"De-queued info for account {c_account_credentials['AccountId']}")
try:
logging.info(f"Attempting to connect to {c_account_credentials['AccountId']}")
account_enis = Inventory_Modules.find_account_enis2(c_account_credentials, c_region, c_fip)
account_enis = find_account_enis2(c_account_credentials, c_region, c_fip)
logging.info(f"Successfully connected to account {c_account_credentials['AccountId']} in region {c_region}")
for eni in account_enis:
eni['MgmtAccount'] = c_account_credentials['MgmtAccount']
Results.extend(account_enis)
if fPublicOnly and eni['PublicIp'] == "No Public IP":
pass
else:
Results.append(eni)
# Results.extend(account_enis)
except KeyError as my_Error:
logging.error(f"Account Access failed - trying to access {c_account_credentials['AccountId']}")
logging.info(f"Actual Error: {my_Error}")
Expand All @@ -75,14 +97,19 @@ def run(self):
logging.warning(my_Error)
continue
finally:
print(f"{ERASE_LINE}Finished finding ENIs in account {c_account_credentials['AccountId']} in region {c_region} - {c_PlaceCount} / {c_PlacesToLook}", end='\r')
# print(f"{ERASE_LINE}Finished finding ENIs in account {c_account_credentials['AccountId']} in region {c_region} - {c_PlaceCount} / {c_PlacesToLook}", end='\r')
self.queue.task_done()

checkqueue = Queue()

pbar = tqdm(desc=f'Finding enis from {len(CredentialList)} accounts / regions',
total=len(fCredentialList), unit=' locations'
)

Results = []
PlaceCount = 0
PlacesToLook = WorkerThreads = min(len(fCredentialList), 50)
PlacesToLook = fCredentialList.__len__()
WorkerThreads = min(len(fCredentialList), 50)

for x in range(WorkerThreads):
worker = FindENIs(checkqueue)
Expand All @@ -105,10 +132,10 @@ def run(self):
return Results


def present_results(ENIsFound:list):
def present_results(f_ENIsFound: list):
"""
Description: Presents results at the end of the script
@param ENIsFound: The list of records to show...
@param f_ENIsFound: The list of records to show...
"""
display_dict = {'MgmtAccount' : {'DisplayOrder': 1, 'Heading': 'Mgmt Acct'},
'AccountId' : {'DisplayOrder': 2, 'Heading': 'Acct Number'},
Expand All @@ -119,26 +146,28 @@ def present_results(ENIsFound:list):
'ENIId' : {'DisplayOrder': 7, 'Heading': 'ENI Id'},
'PrivateIpAddress': {'DisplayOrder': 8, 'Heading': 'Assoc. IP'}}

sorted_ENIs_Found = sorted(ENIsFound, key=lambda d: (d['MgmtAccount'], d['AccountId'], d['Region'], d['VpcId']))
sorted_ENIs_Found = sorted(f_ENIsFound, key=lambda d: (d['MgmtAccount'], d['AccountId'], d['Region'], d['VpcId']))
display_results(sorted_ENIs_Found, display_dict, 'None', pFilename)

DetachedENIs = [x for x in sorted_ENIs_Found if x['Status'] in ['available', 'attaching', 'detaching']]
RegionList = list(set([x['Region'] for x in CredentialList]))
AccountList = list(set([x['AccountId'] for x in CredentialList]))
RegionList = list(set([x['Region'] for x in sorted_ENIs_Found]))
AccountList = list(set([x['AccountId'] for x in sorted_ENIs_Found]))

print()
print() if pSkipAccounts is not None or pSkipProfiles is not None else ""
print(f"These accounts were skipped - as requested: {pSkipAccounts}") if pSkipAccounts is not None else ""
print(f"These profiles were skipped - as requested: {pSkipProfiles}") if pSkipProfiles is not None else ""
print()
print(f"Your output will be saved to {Fore.GREEN}'{pFilename}-{datetime.now().strftime('%y-%m-%d--%H:%M:%S')}'{Fore.RESET}") if pFilename is not None else ""
print(f"Found {len(ENIsFound)} ENIs across {len(AccountList)} accounts across {len(RegionList)} regions")
print(f"{Fore.RED}Found {len(DetachedENIs)} ENIs that are not listed as 'in-use' and may therefore be costing you additional money while they're unused.{Fore.RESET}")
print(f"The output has also been written to a file beginning with '{pFilename}' + the date and time") if pFilename is not None else ""
print(f"Found {len(f_ENIsFound)} ENIs {'with public IPs' if pPublicOnly else ''} across {len(AccountList)} accounts across {len(RegionList)} regions")
print(f"{Fore.RED}Found {len(DetachedENIs)} ENIs that are not listed as 'in-use' and may therefore be costing you additional money while they're unused.{Fore.RESET}") if DetachedENIs else ""
print()
if verbose < 40:
for x in DetachedENIs:
print(x)


##################
# Main
##################

ERASE_LINE = '\x1b[2K'
Expand All @@ -152,6 +181,7 @@ def present_results(ENIsFound:list):
pAccounts = args.Accounts
pRootOnly = args.RootOnly
pIPaddressList = args.pipaddresses
pPublicOnly = args.ppublic
pFilename = args.Filename
pTiming = args.Time
verbose = args.loglevel
Expand All @@ -173,12 +203,11 @@ def present_results(ENIsFound:list):
CredentialList = get_all_credentials(pProfiles, pTiming, pSkipProfiles, pSkipAccounts, pRootOnly, pAccounts, pRegionList)

# Find ENIs across all children accounts
ENIsFound = check_accounts_for_enis(CredentialList, fip=pIPaddressList)
ENIsFound = check_accounts_for_enis(CredentialList, pIPaddressList, pPublicOnly)
# Display results
present_results(ENIsFound)

if pTiming:
print(ERASE_LINE)
print(f"{Fore.GREEN}This script took {time() - begin_time:.2f} seconds{Fore.RESET}")

print()
Expand Down
2 changes: 1 addition & 1 deletion inventory-scripts/all_my_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def run(self):
sleep(3)
logging.info(f"Sleeping to allow {c_function_name} to update to runtime {c_new_runtime}")
elif Status == 'Failed':
raise Exception(f'Runtime update for {c_function_name} to {c_new_runtime} failed')
raise RuntimeError(f'Runtime update for {c_function_name} to {c_new_runtime} failed')
except TypeError as my_Error:
logging.info(f"Error: {my_Error}")
continue
Expand Down
16 changes: 12 additions & 4 deletions inventory-scripts/all_my_subnets.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@
from Inventory_Modules import display_results, get_all_credentials

init()
__version__ = "2024.05.31"
__version__ = "2024.10.24"

# TODO: Add Elastic IPs to this script as well.


##################
# Functions
##################
def parse_args(args):

def parse_args(f_args):
"""
Description: Parses the arguments passed into the script
@param args: args represents the list of arguments passed in
@param f_args: args represents the list of arguments passed in
@return: returns an object namespace that contains the individualized parameters passed in
"""
script_path, script_name = os.path.split(sys.argv[0])
Expand All @@ -43,7 +48,7 @@ def parse_args(args):
metavar="IP address",
default=None,
help="IP address(es) you're looking for within your VPCs")
return parser.my_parser.parse_args(args)
return parser.my_parser.parse_args(f_args)


def check_accounts_for_subnets(CredentialList, fip=None):
Expand Down Expand Up @@ -172,6 +177,9 @@ def analyze_results(fSubnetsFound: list):


##################
# Main
##################


ERASE_LINE = '\x1b[2K'
begin_time = time()
Expand Down

0 comments on commit 71bd039

Please sign in to comment.