diff --git a/ExtractAPIDatatoGCP.py b/ExtractAPIDatatoGCP.py index c814c5e..4bd1b7f 100644 --- a/ExtractAPIDatatoGCP.py +++ b/ExtractAPIDatatoGCP.py @@ -9,13 +9,13 @@ mountain_time_zone = pytz.timezone('US/Mountain') -def extract_data_from_api(limit=50000, order='animal_id'): +def extract_api_data(limit=50000, order='animal_id'): """ Function to extract data from data.austintexas.gov API. """ api_url = 'https://data.austintexas.gov/resource/9t4d-g238.json' - api_key = '58778d3tul9ykaurce5wf29ek' + api_key = 'cx1ax1xme7m1r8bler55ab7jk' headers = { 'accept': "application/json", @@ -46,7 +46,7 @@ def extract_data_from_api(limit=50000, order='animal_id'): return data -def create_dataframe(data): +def create_data(data): columns = [ 'animal_id', 'name', 'datetime', 'monthyear', 'date_of_birth', 'outcome_type', 'animal_type', 'sex_upon_outcome', 'age_upon_outcome', @@ -62,7 +62,7 @@ def create_dataframe(data): return df -def upload_to_gcs(dataframe, bucket_name, file_path): +def gcs_upload(dataframe, bucket_name, file_path): """ Upload a DataFrame to a Google Cloud Storage bucket using service account credentials. """ @@ -95,11 +95,11 @@ def upload_to_gcs(dataframe, bucket_name, file_path): def main(): - data_extracted = extract_data_from_api(limit=50000, order='animal_id') - shelter_data = create_dataframe(data_extracted) + data_extracted = extract_api_data(limit=50000, order='animal_id') + shelter_data = create_data(data_extracted) gcs_bucket_name = 'data_center_lab3' gcs_file_path = 'data/{}/outcomes_{}.csv' - upload_to_gcs(shelter_data, gcs_bucket_name, gcs_file_path) + gcs_upload(shelter_data, gcs_bucket_name, gcs_file_path) \ No newline at end of file diff --git a/LoadData.py b/LoadData.py index 143f3b0..aa8bfcc 100644 --- a/LoadData.py +++ b/LoadData.py @@ -26,7 +26,7 @@ def getcredentials(self): } return credentials - def connect_to_gcp_and_get_data(self, file_name): + def connect_and_get_data(self, file_name): gcp_file_path = f'transformed_data/{file_name}' credentials_info = self.getcredentials() @@ -41,7 +41,7 @@ def connect_to_gcp_and_get_data(self, file_name): return df def get_data(self, file_name): - df = self.connect_to_gcp_and_get_data(file_name) + df = self.connect_and_get_data(file_name) return df @@ -123,7 +123,7 @@ def create_table(self, connection, table_query): cursor.close() print("Finished creating tables...") - def load_data_into_postgres(self, connection, gcp_data, table_name): + def load_postgres_data(self, connection, gcp_data, table_name): cursor = connection.cursor() print(f"Dropping Table {table_name}") truncate_table = f"DROP TABLE {table_name};" @@ -142,7 +142,7 @@ def load_data_into_postgres(self, connection, gcp_data, table_name): print(f"Number of rows inserted for table {table_name}: {len(gcp_data)}") -def load_data_to_postgres(file_name, table_name): +def load_postgres_data(file_name, table_name): gcp_loader = GCPDataLoader() table_data_df = gcp_loader.get_data(file_name) @@ -151,4 +151,4 @@ def load_data_to_postgres(file_name, table_name): postgres_connection = postgres_dataloader.connect_to_postgres() postgres_dataloader.create_table(postgres_connection, table_query) - postgres_dataloader.load_data_into_postgres(postgres_connection, table_data_df, table_name) + postgres_dataloader.load_postgres_data(postgres_connection, table_data_df, table_name) diff --git a/shelteroutcomes_dag.py b/shelteroutcomes_dag.py index f5b6f3d..f85a84b 100644 --- a/shelteroutcomes_dag.py +++ b/shelteroutcomes_dag.py @@ -8,18 +8,12 @@ from airflow.operators.python_operator import PythonOperator -# code_path = "/root/demo/lab03/etl_scripts" -# sys.path.insert(0, code_path) + from ExtractTransformLoad_Steps.TransformData import transform_data from ExtractTransformLoad_Steps.ExtractAPItoGCP import main from ExtractTransformLoad_Steps.LoadData import load_data_to_postgres -# AIRFLOW_HOME = os.environ.get('AIRFLOW_HOME', '/opt/airflow') -# CREDS_TARGET_DIR = AIRFLOW_HOME + '/warm-physics-405522-a07e9b7bfc0d.json' - -# with open(CREDS_TARGET_DIR, 'r') as f: -# credentials_content = f.read() default_args = { @@ -42,30 +36,28 @@ # copy_creds = BashOperator(task_id = "COPY_CREDS", bash_command = "echo start") - extract_data_from_api_to_gcp = PythonOperator(task_id = "EXTRACT_DATA_FROM_API_TO_GCP", + extract_api_to_gcp = PythonOperator(task_id = "EXTRACT_DATA_FROM_API_TO_GCP", python_callable = main,) - transform_data_from_gcp_step = PythonOperator(task_id="TRANSFORM_DATA_FROM_GCP", + transform_gcp_step = PythonOperator(task_id="TRANSFORM_DATA_FROM_GCP", python_callable=transform_data,) - load_dim_animals_tab = PythonOperator(task_id="LOAD_DIM_ANIMALS", + load_dim_animals = PythonOperator(task_id="LOAD_DIM_ANIMALS", python_callable=load_data_to_postgres, op_kwargs={"file_name": 'dim_animal.csv', "table_name": 'animaldimension'},) - load_dim_outcome_types_tab = PythonOperator(task_id="LOAD_DIM_OUTCOME_TYPES", + load_dim_outcome = PythonOperator(task_id="LOAD_DIM_OUTCOME_TYPES", python_callable=load_data_to_postgres, op_kwargs={"file_name": 'dim_outcome_types.csv', "table_name": 'outcomedimension'},) - load_dim_dates_tab = PythonOperator(task_id="LOAD_DIM_DATES", + load_dim_dates = PythonOperator(task_id="LOAD_DIM_DATES", python_callable=load_data_to_postgres, op_kwargs={"file_name": 'dim_dates.csv', "table_name": 'datedimension'},) - load_fct_outcomes_tab = PythonOperator(task_id="LOAD_FCT_OUTCOMES", + load_fct_outcomes = PythonOperator(task_id="LOAD_FCT_OUTCOMES", python_callable=load_data_to_postgres, op_kwargs={"file_name": 'fct_outcomes.csv', "table_name": 'outcomesfact'},) end = BashOperator(task_id = "END", bash_command = "echo end") - # start >> extract_data_from_api_to_gcp >> transform_data_step >> - # [load_dim_animals, load_dim_outcome_types, load_dim_dates, load_fct_outcomes] >> end - start >> extract_data_from_api_to_gcp >> transform_data_from_gcp_step >> [load_dim_animals_tab, load_dim_outcome_types_tab, load_dim_dates_tab] >> load_fct_outcomes_tab >> end \ No newline at end of file + start >> extract_api_to_gcp >> transform_gcp_step >> [load_dim_animals, load_dim_outcome, load_dim_dates] >> load_fct_outcomes >> end \ No newline at end of file