Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions ExtractAPIDatatoGCP.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
mountain_time_zone = pytz.timezone('US/Mountain')


def extract_data_from_api(limit=50000, order='animal_id'):
def extract_api_data(limit=50000, order='animal_id'):
"""
Function to extract data from data.austintexas.gov API.
"""
api_url = 'https://data.austintexas.gov/resource/9t4d-g238.json'

api_key = '58778d3tul9ykaurce5wf29ek'
api_key = 'cx1ax1xme7m1r8bler55ab7jk'

headers = {
'accept': "application/json",
Expand Down Expand Up @@ -46,7 +46,7 @@ def extract_data_from_api(limit=50000, order='animal_id'):
return data


def create_dataframe(data):
def create_data(data):
columns = [
'animal_id', 'name', 'datetime', 'monthyear', 'date_of_birth',
'outcome_type', 'animal_type', 'sex_upon_outcome', 'age_upon_outcome',
Expand All @@ -62,7 +62,7 @@ def create_dataframe(data):
return df


def upload_to_gcs(dataframe, bucket_name, file_path):
def gcs_upload(dataframe, bucket_name, file_path):
"""
Upload a DataFrame to a Google Cloud Storage bucket using service account credentials.
"""
Expand Down Expand Up @@ -95,11 +95,11 @@ def upload_to_gcs(dataframe, bucket_name, file_path):


def main():
data_extracted = extract_data_from_api(limit=50000, order='animal_id')
shelter_data = create_dataframe(data_extracted)
data_extracted = extract_api_data(limit=50000, order='animal_id')
shelter_data = create_data(data_extracted)

gcs_bucket_name = 'data_center_lab3'
gcs_file_path = 'data/{}/outcomes_{}.csv'

upload_to_gcs(shelter_data, gcs_bucket_name, gcs_file_path)
gcs_upload(shelter_data, gcs_bucket_name, gcs_file_path)

10 changes: 5 additions & 5 deletions LoadData.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def getcredentials(self):
}
return credentials

def connect_to_gcp_and_get_data(self, file_name):
def connect_and_get_data(self, file_name):
gcp_file_path = f'transformed_data/{file_name}'

credentials_info = self.getcredentials()
Expand All @@ -41,7 +41,7 @@ def connect_to_gcp_and_get_data(self, file_name):
return df

def get_data(self, file_name):
df = self.connect_to_gcp_and_get_data(file_name)
df = self.connect_and_get_data(file_name)
return df


Expand Down Expand Up @@ -123,7 +123,7 @@ def create_table(self, connection, table_query):
cursor.close()
print("Finished creating tables...")

def load_data_into_postgres(self, connection, gcp_data, table_name):
def load_postgres_data(self, connection, gcp_data, table_name):
cursor = connection.cursor()
print(f"Dropping Table {table_name}")
truncate_table = f"DROP TABLE {table_name};"
Expand All @@ -142,7 +142,7 @@ def load_data_into_postgres(self, connection, gcp_data, table_name):

print(f"Number of rows inserted for table {table_name}: {len(gcp_data)}")

def load_data_to_postgres(file_name, table_name):
def load_postgres_data(file_name, table_name):
gcp_loader = GCPDataLoader()
table_data_df = gcp_loader.get_data(file_name)

Expand All @@ -151,4 +151,4 @@ def load_data_to_postgres(file_name, table_name):
postgres_connection = postgres_dataloader.connect_to_postgres()

postgres_dataloader.create_table(postgres_connection, table_query)
postgres_dataloader.load_data_into_postgres(postgres_connection, table_data_df, table_name)
postgres_dataloader.load_postgres_data(postgres_connection, table_data_df, table_name)
24 changes: 8 additions & 16 deletions shelteroutcomes_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,12 @@
from airflow.operators.python_operator import PythonOperator


# code_path = "/root/demo/lab03/etl_scripts"
# sys.path.insert(0, code_path)


from ExtractTransformLoad_Steps.TransformData import transform_data
from ExtractTransformLoad_Steps.ExtractAPItoGCP import main
from ExtractTransformLoad_Steps.LoadData import load_data_to_postgres

# AIRFLOW_HOME = os.environ.get('AIRFLOW_HOME', '/opt/airflow')
# CREDS_TARGET_DIR = AIRFLOW_HOME + '/warm-physics-405522-a07e9b7bfc0d.json'

# with open(CREDS_TARGET_DIR, 'r') as f:
# credentials_content = f.read()


default_args = {
Expand All @@ -42,30 +36,28 @@

# copy_creds = BashOperator(task_id = "COPY_CREDS", bash_command = "echo start")

extract_data_from_api_to_gcp = PythonOperator(task_id = "EXTRACT_DATA_FROM_API_TO_GCP",
extract_api_to_gcp = PythonOperator(task_id = "EXTRACT_DATA_FROM_API_TO_GCP",
python_callable = main,)

transform_data_from_gcp_step = PythonOperator(task_id="TRANSFORM_DATA_FROM_GCP",
transform_gcp_step = PythonOperator(task_id="TRANSFORM_DATA_FROM_GCP",
python_callable=transform_data,)

load_dim_animals_tab = PythonOperator(task_id="LOAD_DIM_ANIMALS",
load_dim_animals = PythonOperator(task_id="LOAD_DIM_ANIMALS",
python_callable=load_data_to_postgres,
op_kwargs={"file_name": 'dim_animal.csv', "table_name": 'animaldimension'},)

load_dim_outcome_types_tab = PythonOperator(task_id="LOAD_DIM_OUTCOME_TYPES",
load_dim_outcome = PythonOperator(task_id="LOAD_DIM_OUTCOME_TYPES",
python_callable=load_data_to_postgres,
op_kwargs={"file_name": 'dim_outcome_types.csv', "table_name": 'outcomedimension'},)

load_dim_dates_tab = PythonOperator(task_id="LOAD_DIM_DATES",
load_dim_dates = PythonOperator(task_id="LOAD_DIM_DATES",
python_callable=load_data_to_postgres,
op_kwargs={"file_name": 'dim_dates.csv', "table_name": 'datedimension'},)

load_fct_outcomes_tab = PythonOperator(task_id="LOAD_FCT_OUTCOMES",
load_fct_outcomes = PythonOperator(task_id="LOAD_FCT_OUTCOMES",
python_callable=load_data_to_postgres,
op_kwargs={"file_name": 'fct_outcomes.csv', "table_name": 'outcomesfact'},)

end = BashOperator(task_id = "END", bash_command = "echo end")

# start >> extract_data_from_api_to_gcp >> transform_data_step >>
# [load_dim_animals, load_dim_outcome_types, load_dim_dates, load_fct_outcomes] >> end
start >> extract_data_from_api_to_gcp >> transform_data_from_gcp_step >> [load_dim_animals_tab, load_dim_outcome_types_tab, load_dim_dates_tab] >> load_fct_outcomes_tab >> end
start >> extract_api_to_gcp >> transform_gcp_step >> [load_dim_animals, load_dim_outcome, load_dim_dates] >> load_fct_outcomes >> end