|
| 1 | +import datetime |
| 2 | +import json |
| 3 | +import os |
| 4 | + |
| 5 | +import requests |
| 6 | + |
| 7 | +# CONFIGURATION |
| 8 | + |
| 9 | +# Enter in the URL of the API that we're testing. |
| 10 | +API_URL = "" # ex - "https://api.prod.imap-mission.com" |
| 11 | + |
| 12 | +# When an authentication system is set up, these must be set to log in to the APIs |
| 13 | +AWS_REGION = "" # ex - "us-west-2" |
| 14 | +COGNITO_CLIENT_ID = "" # random string of letters assigned to the congito client |
| 15 | + |
| 16 | +# GLOBAL VARIABLES |
| 17 | + |
| 18 | +# These variables are set when a user is logged in. |
| 19 | +USER_TOKEN = None |
| 20 | +LOGIN_TIME = None |
| 21 | + |
| 22 | +# These variables are never changed |
| 23 | +EXPIRE_TIME = 3600 |
| 24 | +STATUS_OK = 200 |
| 25 | +STATUS_NOT_FOUND = 404 |
| 26 | +STATUS_BAD_REQUEST = 400 |
| 27 | + |
| 28 | + |
| 29 | +def _set_user_token(t): |
| 30 | + global LOGIN_TIME |
| 31 | + global USER_TOKEN |
| 32 | + |
| 33 | + LOGIN_TIME = datetime.datetime.now() |
| 34 | + USER_TOKEN = t |
| 35 | + |
| 36 | + |
| 37 | +def _get_user_token(): |
| 38 | + if LOGIN_TIME is None: |
| 39 | + print("New login needed. Login is valid for 60 minutes.") |
| 40 | + elif (datetime.datetime.now() - LOGIN_TIME).total_seconds() >= EXPIRE_TIME: |
| 41 | + print("Login expired. Please log in again.") |
| 42 | + else: |
| 43 | + return USER_TOKEN |
| 44 | + |
| 45 | + t = get_sdc_token() |
| 46 | + |
| 47 | + return t |
| 48 | + |
| 49 | + |
| 50 | +def get_sdc_token(user_name=None, password=None): |
| 51 | + """ |
| 52 | + This function authenticates the user. An access token is automatically stored in |
| 53 | + the USER_TOKEN variable in this file, and functions will attempt to find a valid |
| 54 | + user token in that variable. |
| 55 | +
|
| 56 | + :param user_name: User's SDC username |
| 57 | + :param password: User's SDC password |
| 58 | +
|
| 59 | + :return: A string that also gets stored in the USER_TOKEN variable in this file. |
| 60 | + You don't need this string unless you plan on making your own API calls, |
| 61 | + using functions outside of this file. |
| 62 | + """ |
| 63 | + |
| 64 | + if user_name is None: |
| 65 | + user_name = input("Username:") |
| 66 | + if password is None: |
| 67 | + import getpass |
| 68 | + |
| 69 | + password = getpass.getpass("Password for " + user_name + ":") |
| 70 | + |
| 71 | + authentication_url = f"https://cognito-idp.{AWS_REGION}.amazonaws.com/" |
| 72 | + authentication_headers = { |
| 73 | + "X-Amz-Target": "AWSCognitoIdentityProviderService.InitiateAuth", |
| 74 | + "Content-Type": "application/x-amz-json-1.1", |
| 75 | + } |
| 76 | + data = json.dumps( |
| 77 | + { |
| 78 | + "ClientId": COGNITO_CLIENT_ID, |
| 79 | + "AuthFlow": "USER_PASSWORD_AUTH", |
| 80 | + "AuthParameters": {"USERNAME": user_name, "PASSWORD": password}, |
| 81 | + } |
| 82 | + ) |
| 83 | + |
| 84 | + # Attempt to grab the SDC token. |
| 85 | + try: |
| 86 | + token_response = requests.post( |
| 87 | + authentication_url, data=data, headers=authentication_headers |
| 88 | + ) |
| 89 | + t = token_response.json()["AuthenticationResult"]["AccessToken"] |
| 90 | + except KeyError: |
| 91 | + print("Invalid username and/or password. Please try again. ") |
| 92 | + return |
| 93 | + |
| 94 | + _set_user_token(t) |
| 95 | + |
| 96 | + return t |
| 97 | + |
| 98 | + |
| 99 | +def _execute_api_get(endpoint, login, **kwargs): |
| 100 | + if login: |
| 101 | + token = _get_user_token() |
| 102 | + headers = {"Authorization": token} |
| 103 | + else: |
| 104 | + headers = {} |
| 105 | + query_parameters = [] |
| 106 | + for kw in kwargs: |
| 107 | + query_parameters.append(kw + "=" + str(kwargs[kw])) |
| 108 | + query_parameters = "&".join(query_parameters) |
| 109 | + url_with_parameters = API_URL + "/" + endpoint + "?" + query_parameters |
| 110 | + print(url_with_parameters) |
| 111 | + try: |
| 112 | + response = requests.get(url_with_parameters, headers=headers) |
| 113 | + except Exception as e: |
| 114 | + print(f"Could not finish query due to error {e!s}") |
| 115 | + return |
| 116 | + return response |
| 117 | + |
| 118 | + |
| 119 | +def download(filename, download_dir=".", login=False): |
| 120 | + """ |
| 121 | + This function is used to download files from the SDS. |
| 122 | +
|
| 123 | + :param filename: The full S3 URI to download |
| 124 | + :param download_dir: The directory on the local machine to download the file to. |
| 125 | +
|
| 126 | + :return: None, but downloads the file to the specified download directory |
| 127 | + """ |
| 128 | + endpoint = "download" |
| 129 | + download_url = _execute_api_get(endpoint, login, s3_uri=filename) |
| 130 | + |
| 131 | + if download_url.status_code == STATUS_BAD_REQUEST: |
| 132 | + print("Not a valid S3 URI. Example input: s3://bucket/path/file.ext") |
| 133 | + return |
| 134 | + elif download_url.status_code == STATUS_NOT_FOUND: |
| 135 | + print("No files were found matching the given URI.") |
| 136 | + return |
| 137 | + |
| 138 | + file_name_and_path = os.path.join(download_dir, filename[5:]) |
| 139 | + download_dir = os.path.dirname(file_name_and_path) |
| 140 | + if not os.path.exists(download_dir): |
| 141 | + os.makedirs(download_dir) |
| 142 | + |
| 143 | + with open(file_name_and_path, "wb") as file: |
| 144 | + print(f"Downloading {file_name_and_path}") |
| 145 | + file_location = requests.get(download_url.json()["download_url"]) |
| 146 | + file.write(file_location.content) |
| 147 | + |
| 148 | + return file_name_and_path |
| 149 | + |
| 150 | + |
| 151 | +def query(login=False, **kwargs): |
| 152 | + """ |
| 153 | + This function is used to query files from the SDS. |
| 154 | + There are no required arguments, the search strings will depend on the mission |
| 155 | +
|
| 156 | + :return: This returns JSON with all information about the files. |
| 157 | + """ |
| 158 | + endpoint = "query" |
| 159 | + response = _execute_api_get(endpoint, login, **kwargs) |
| 160 | + return response.json() |
| 161 | + |
| 162 | + |
| 163 | +def upload(local_file_location, remote_file_name, login=False, **kwargs): |
| 164 | + """ |
| 165 | + This function is used to upload files to the SDS. |
| 166 | +
|
| 167 | + :param local_file_location: The full filename and path to the file on the |
| 168 | + local machine to upload to the SDS. |
| 169 | + :param remote_file_name: The name of the file you'd like the uploaded file to be |
| 170 | + :param kwargs: Any additional key word arguments passed into this function |
| 171 | + are stored as tags on the SDS. |
| 172 | +
|
| 173 | + :return: This returns a requests response object. |
| 174 | + If the upload was successful, it'll be code 200. |
| 175 | + """ |
| 176 | + endpoint = "upload" |
| 177 | + response = _execute_api_get(endpoint, login, filename=remote_file_name, **kwargs) |
| 178 | + |
| 179 | + if response.status_code != STATUS_OK: |
| 180 | + print( |
| 181 | + "Could not generate an upload URL with the following error: " |
| 182 | + + response.text |
| 183 | + ) |
| 184 | + return |
| 185 | + |
| 186 | + with open(local_file_location, "rb") as object_file: |
| 187 | + object_text = object_file.read() |
| 188 | + |
| 189 | + response = requests.put(response.json(), data=object_text) |
| 190 | + return response |
0 commit comments