Skip to content

Commit

Permalink
Merge pull request #32 from EcoDataFlow/fix/dag-minor-fix
Browse files Browse the repository at this point in the history
Fix/dag fix
  • Loading branch information
srlee056 authored Jan 11, 2024
2 parents 84dcb4a + a92b598 commit 8e64336
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions dags/air/air_quality_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ def fetch_data_and_save_csv(**context):
for col_name in ["so2", "co", "o3", "no2", "pm10", "pm25"]:
df_selected.loc[df_selected[col_name] == "-", col_name] = ""
# convert to csv
df_selected.to_csv("dags/air/output.csv", index=False)
df_selected.to_csv(os.path.abspath("air_output.csv"), index=False)


# Function to delete the CSV file
def delete_csv_file():
file_path = "dags/air/output.csv"
file_path = os.path.abspath("air_output.csv")
if os.path.exists(file_path):
os.remove(file_path)
else:
Expand Down Expand Up @@ -129,7 +129,7 @@ def delete_csv_file():

upload_operator = LocalFilesystemToGCSOperator(
task_id="upload_csv_to_gcs_task",
src="dags/air/output.csv",
src=os.path.abspath("air_output.csv"),
dst="{{ var.value.aqi_gcs_file_path }}",
bucket="data-lake-storage",
gcp_conn_id="google_cloud_conn_id", # The Conn Id from the Airflow connection setup
Expand Down
6 changes: 3 additions & 3 deletions dags/water/hourly_water_pollutants_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ def fetch_data_and_save_csv(**context):
formatted_path = f"water/hourly/{current_datetime.strftime('%Y-%m-%d_%H')}.csv"
Variable.set("gcs_file_path", formatted_path)

df_selected.to_csv("dags/water/output.csv", index=False)
df_selected.to_csv(os.path.abspath("water_output.csv"), index=False)


# Function to delete the CSV file
def delete_csv_file():
file_path = "dags/water/output.csv"
file_path = os.path.abspath("water_output.csv")
if os.path.exists(file_path):
os.remove(file_path)
else:
Expand Down Expand Up @@ -128,7 +128,7 @@ def delete_csv_file():

upload_operator = LocalFilesystemToGCSOperator(
task_id="upload_csv_to_gcs_task",
src="dags/water/output.csv",
src=os.path.abspath("water_output.csv"),
dst="{{ var.value.gcs_file_path }}",
bucket="data-lake-storage",
gcp_conn_id="google_cloud_conn_id", # The Conn Id from the Airflow connection setup
Expand Down

0 comments on commit 8e64336

Please sign in to comment.